前言 最近在开发的ingress-controller组件,副本是有状态服务。每个副本将收到的ingress事件进行解析,然后与slb同步。如果同时多个副本运行,势必会造成对slb访问的混乱。因此,同一时刻,只能有一个副本真正在工作。但是,还需要多副本部署方式来保证高可用。 为了解决这个问题,本组件参考kube-scheduler、kube-controller-manager等组件的实现方式,也利用到client-go/tools/leaderelection的选主机制,保证只有leader处于工作状态,并定时进行leader的重新选举或续租。当leader挂掉之后,从其他节点选举新的leader以保证组件正常工作。
本文以ingress-controller组件为例,讲述如何使用leaderelection。并深入分析它的实现原理。
使用 1、首先创建leaderElectionClient 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 config, err := buildConfig(conf.KubeConfigFile) if err != nil { klog.Fatal(err) } leaderElectionClient := kubernetes.NewForConfigOrDie(config) func buildConfig (kubeconfig string ) (*rest.Config, error ) {if kubeconfig != "" { cfg, err := clientcmd.BuildConfigFromFlags("" , kubeconfig) if err != nil { return nil , err } return cfg, nil } cfg, err := rest.InClusterConfig() if err != nil { return nil , err } return cfg, nil }
2、创建event recorder,记录选举产生的事件 1 2 3 4 5 6 7 8 eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{ Interface: kubeClient.CoreV1().Events(conf.Namespace), }) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{ Component: "ksyun-ingress-controller" , })
3、选举为leader后,所做的工作,即为run函数中的内容 1 2 3 4 5 ctl := controller.NewKSyunIngressController(conf) run := func (ctx context.Context) { ctl.Start() panic ("unreachable" ) }
4、设置节点标识、资源锁、ctx 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 id, err := os.Hostname() if err != nil { klog.Fatalf("error getting hostname: %+v" , err) } rl := resourcelock.EndpointsLock{ EndpointsMeta: metav1.ObjectMeta{ Namespace: "kube-system" , Name: "ksyun-ingress-controller" , }, Client: leaderElectionClient.CoreV1(), LockConfig: resourcelock.ResourceLockConfig{ Identity: id, EventRecorder: recorder, }, } ctx, cancel := context.WithCancel(context.Background()) defer cancel()
5、开始leader选举loop,成为leader的节点执行run操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ Lock: &rl, LeaseDuration: DefaultLeaseDuration, RenewDeadline: DefaultRenewDeadline, RetryPeriod: DefaultRetryPeriod, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func (ctx context.Context) { run(ctx) }, OnStoppedLeading: func () { klog.Fatalf("leaderelection lost" ) }, }, })
原理 利用通过Kubernetes中 configmap , endpoints 或者 lease 资源实现一个分布式锁,抢(acqure)到锁的节点成为leader,并且定期更新(renew)。其他进程也在不断的尝试进行抢占,抢占不到则继续等待下次循环。当leader节点挂掉之后,租约到期,其他节点就成为新的leader。
LeaderElectionConfig.lock 支持保存在以下三种资源锁:configmap、endpoint 、lease 包中还提供了一个 multilock ,即可以进行选择两种,当其中一种保存失败时,选择第二张可以在interface.go中看到:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 switch lockType {case EndpointsResourceLock: return endpointsLock, nil case ConfigMapsResourceLock: return configmapLock, nil case LeasesResourceLock: return leaseLock, nil case EndpointsLeasesResourceLock: return &MultiLock{ Primary: endpointsLock, Secondary: leaseLock, }, nil case ConfigMapsLeasesResourceLock: return &MultiLock{ Primary: configmapLock, Secondary: leaseLock, }, nil default : return nil , fmt.Errorf("Invalid lock-type %s" , lockType) }
在本组件中采用的是endpoint资源锁,可以通过查看endpoint的yaml,检查选举信息
我们重点看上一节中的第5部分:leaderelection.RunOrDie()入口在client-go/tools/leaderelection/leaderelection.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func RunOrDie (ctx context.Context, lec LeaderElectionConfig) { le, err := NewLeaderElector(lec) if err != nil { panic (err) } if lec.WatchDog != nil { lec.WatchDog.SetLeaderElection(le) } le.Run(ctx) } func (le *LeaderElector) Run(ctx context.Context) { defer runtime.HandleCrash() defer func () { le.config.Callbacks.OnStoppedLeading() }() if !le.acquire(ctx) { return } ctx, cancel := context.WithCancel(ctx) defer cancel() go le.config.Callbacks.OnStartedLeading(ctx) le.renew(ctx) }
acquire和renew中都使用了”k8s.io/apimachinery/pkg/util/wait”中的wait包进行循环操作。当acquire中某一次循环执行成功时,会退出获取循环,进行接下来的操作; 如果不成功,则一直循环获取,直到退出竞争。而renew是进行更新循环,一次循环成功,会继续循环,不行的续约。如果某次循环失败,则退出续约操作,即变更leader 两者内部都是调用tryAcquireOrRenew()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 func (le *LeaderElector) tryAcquireOrRenew() bool { now := metav1.Now() leaderElectionRecord := rl.LeaderElectionRecord{ HolderIdentity: le.config.Lock.Identity(), LeaseDurationSeconds: int (le.config.LeaseDuration / time.Second), RenewTime: now, AcquireTime: now, } oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get() if err != nil { if !errors.IsNotFound(err) { klog.Errorf("error retrieving resource lock %v: %v" , le.config.Lock.Describe(), err) return false } if err = le.config.Lock.Create(leaderElectionRecord); err != nil { klog.Errorf("error initially creating leader election record: %v" , err) return false } le.observedRecord = leaderElectionRecord le.observedTime = le.clock.Now() return true } if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) { le.observedRecord = *oldLeaderElectionRecord le.observedRawRecord = oldLeaderElectionRawRecord le.observedTime = le.clock.Now() } if len (oldLeaderElectionRecord.HolderIdentity) > 0 && le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && !le.IsLeader() { klog.V(4 ).Infof("lock is held by %v and has not yet expired" , oldLeaderElectionRecord.HolderIdentity) return false } if le.IsLeader() { leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions } else { leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1 } if err = le.config.Lock.Update(leaderElectionRecord); err != nil { klog.Errorf("Failed to update lock: %v" , err) return false } le.observedRecord = leaderElectionRecord le.observedTime = le.clock.Now() return true }