1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
|
func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) { var filtered []*v1.Node failedPredicateMap := FailedPredicateMap{}
if len(g.predicates) == 0 { filtered = nodes } else { allNodes := int32(g.cache.NodeTree().NumNodes) numNodesToFind := g.numFeasibleNodesToFind(allNodes)
filtered = make([]*v1.Node, numNodesToFind) errs := errors.MessageCountMap{} var ( predicateResultLock sync.Mutex filteredLen int32 equivClass *equivalence.Class )
ctx, cancel := context.WithCancel(context.Background())
meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)
if g.equivalenceCache != nil { equivClass = equivalence.NewClass(pod) }
checkNode := func(i int) { var nodeCache *equivalence.NodeCache nodeName := g.cache.NodeTree().Next() if g.equivalenceCache != nil { nodeCache, _ = g.equivalenceCache.GetNodeCache(nodeName) } fits, failedPredicates, err := podFitsOnNode( pod, meta, g.cachedNodeInfoMap[nodeName], g.predicates, g.cache, nodeCache, g.schedulingQueue, g.alwaysCheckAllPredicates, equivClass, ) if err != nil { predicateResultLock.Lock() errs[err.Error()]++ predicateResultLock.Unlock() return } if fits { length := atomic.AddInt32(&filteredLen, 1) if length > numNodesToFind { cancel() atomic.AddInt32(&filteredLen, -1) } else { filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node() } } else { predicateResultLock.Lock() failedPredicateMap[nodeName] = failedPredicates predicateResultLock.Unlock() } }
workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
filtered = filtered[:filteredLen] if len(errs) > 0 { return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs) } }
if len(filtered) > 0 && len(g.extenders) != 0 { for _, extender := range g.extenders { if !extender.IsInterested(pod) { continue } filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap) if err != nil { if extender.IsIgnorable() { glog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set", extender, err) continue } else { return []*v1.Node{}, FailedPredicateMap{}, err } }
for failedNodeName, failedMsg := range failedMap { if _, found := failedPredicateMap[failedNodeName]; !found { failedPredicateMap[failedNodeName] = []algorithm.PredicateFailureReason{} } failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg)) } filtered = filteredList if len(filtered) == 0 { break } } } return filtered, failedPredicateMap, nil }
|