在Go框架下使用Etcd选主demo

在Go语言里,etcd的clientv3.concurrency包开放了选举方法concurrency.NewElection,可以直接调用。我把选举过程打印出来看,发现其内部逻辑和常规的ZK选主一样。
首先开启一个会话,调用election.Campaign时,在指定目录下创建一个有序id用于排队,并判断自己的id是不是最小值。如果是最小值则成为Leader。会话设置了一个TTL值,如果在TTL的时间内重新建立会话并election.Campaign,则重置TTL计时,给Leader Buff续命。超过TTL则Leader Buff消失。
如果自己的id不是最小值,election.Campaign会阻塞形成排队状态,此时的状态是Follower。election会watch队伍中前一个候选人的id,当它消失时则自己成为最小值,当选Leader并退出排队。如果在会话TTL的时间里重新建立会话并election.Campaign,则重置TTL计时,给Follower排队位置Buff续命。如果超过TTL,election.Campaign内部有个waitDelete方法会清除无效的排队信息,退出排队,给别人让出机会。再次election.Campaign需重新排队。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package main

import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"log"
"time"
)

const prefix = "/root/elect"
const prop = "local"

func main() {
chans := [3]chan bool{make(chan bool, 1), make(chan bool, 1), make(chan bool, 1)}
leader := make(chan int, 10)

go func() {
sec := 0
for {
<-time.After(1 * time.Second)
sec++
fmt.Printf("========第%d秒========\n", sec)
}
}()

go job(0, chans[0], leader)
go job(1, chans[1], leader)
go job(2, chans[2], leader)
for {
time.Sleep(5 * time.Second)
nowLeader := <-leader
chans[nowLeader] <- true
time.Sleep(1 * time.Second)
go job(nowLeader, chans[nowLeader], leader)
}
}

func job(workerId int, quitC chan bool, leader chan int) {
endpoints := []string{"127.0.0.1:2379"}
quitA := make(chan int, 1)
quitB := make(chan int, 1)
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
leaderFlag := false
go campaign(cli, prefix, prop, workerId, leader, &leaderFlag, quitA)
go doCrontab(workerId, &leaderFlag, quitB)

<-quitC
fmt.Printf("worker:%d 退出\n", workerId)
quitA <- 1
quitB <- 1
}

func campaign(cli *clientv3.Client, election string, prop string, workerId int, leader chan int, leaderFlag *bool, quitA chan int) {
quit := false
timer := time.NewTimer(1 * time.Second)
for range timer.C {
if quit {
break
}

s, err := concurrency.NewSession(cli, concurrency.WithTTL(5))
if err != nil {
fmt.Println(err)
continue
}
e := concurrency.NewElection(s, election)
ctx := context.TODO()

fmt.Printf("worker:%d 参加选举\n", workerId)
if err = e.Campaign(ctx, prop); err != nil {
fmt.Println(err)
continue
}
fmt.Printf("worker:%d 选举:成功,Key:%s \n", workerId, e.Key())
*leaderFlag = true
leader <- workerId

select {
case <-s.Done():
*leaderFlag = false
fmt.Println("选举:超时")

case <-quitA:
s.Close()
quit = true
}
}
fmt.Printf("worker:%d 退出选举\n", workerId)
}

func doCrontab(workerId int, leaderFlag *bool, quitB chan int) {
var cronCnt int
ticker := time.NewTicker(time.Duration(1) * time.Second)
quit := false
for !quit {
select {
case <-ticker.C:
cronCnt++
if *leaderFlag == true {
fmt.Printf("worker:%d 每1s执行定时任务: %d\n", workerId, cronCnt)
} else {
fmt.Printf("worker:%d Follow\n", workerId)
}
case <-quitB:
quit = true
}
}
fmt.Printf("worker:%d 结束定时任务\n", workerId)
}

output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
worker:1 Follow
worker:2 Follow
worker:0 Follow
========第1秒========
worker:1 参加选举
worker:0 参加选举
worker:2 参加选举
worker:0 Follow
worker:2 Follow
========第2秒========
worker:1 Follow
worker:1 选举:成功,Key:/root/elect/694d7395306acfcf
worker:1 每1s执行定时任务: 3
worker:0 Follow
worker:2 Follow
========第3秒========
worker:0 Follow
worker:2 Follow
worker:1 每1s执行定时任务: 4
========第4秒========
worker:0 Follow
worker:1 退出
worker:1 结束定时任务
worker:2 Follow
========第5秒========
{"level":"warn","ts":"2020-07-30T00:27:26.601+0800","caller":"clientv3/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"endpoint://client-0ba38cd9-c8be-4d17-9262-b06184f128e2/127.0.0.1:2379","attempt":0,"error":"rpc error: code = Canceled desc = context canceled"}
worker:0 Follow
worker:2 Follow
========第6秒========
worker:0 Follow
worker:2 Follow
worker:1 Follow
worker:1 参加选举
========第7秒========
worker:2 Follow
worker:0 Follow
worker:1 Follow
========第8秒========
worker:0 选举:成功,Key:/root/elect/694d7395306acfd1
worker:2 Follow
worker:0 每1s执行定时任务: 9
worker:1 Follow
========第9秒========
worker:0 每1s执行定时任务: 10
worker:2 Follow
worker:1 Follow
========第10秒========
worker:0 每1s执行定时任务: 11
worker:2 Follow
worker:1 Follow
worker:0 退出
worker:0 结束定时任务
{"level":"warn","ts":"2020-07-30T00:27:32.603+0800","caller":"clientv3/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"endpoint://client-fe9fba67-e756-46a6-9471-992770da575a/127.0.0.1:2379","attempt":0,"error":"rpc error: code = Canceled desc = context canceled"}
========第11秒========
worker:2 Follow
worker:1 Follow
========第12秒========
worker:2 Follow
worker:1 Follow
worker:0 Follow
worker:0 参加选举
========第13秒========
worker:2 Follow
worker:1 Follow
worker:0 Follow
========第14秒========
worker:2 选举:成功,Key:/root/elect/694d7395306acfd2
worker:1 Follow
worker:2 每1s执行定时任务: 15
worker:0 Follow
========第15秒========
worker:2 每1s执行定时任务: 16
worker:1 Follow
worker:0 Follow
========第16秒========
worker:2 每1s执行定时任务: 17
worker:1 Follow
worker:0 Follow
worker:2 退出
worker:2 结束定时任务
{"level":"warn","ts":"2020-07-30T00:27:38.606+0800","caller":"clientv3/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"endpoint://client-b1022d31-efc4-4c89-bb86-4d919e2a9ff6/127.0.0.1:2379","attempt":0,"error":"rpc error: code = Canceled desc = context canceled"}
========第17秒========
worker:1 Follow
worker:0 Follow
========第18秒========
worker:1 Follow
worker:0 Follow
worker:2 Follow
worker:2 参加选举
========第19秒========
worker:1 Follow
worker:0 Follow
worker:2 Follow
========第20秒========
worker:1 选举:成功,Key:/root/elect/694d7395306acfdf
worker:1 每1s执行定时任务: 15
worker:0 Follow
worker:2 Follow
========第21秒========
worker:1 每1s执行定时任务: 16
worker:0 Follow
worker:2 Follow
========第22秒========
worker:1 每1s执行定时任务: 17
worker:0 Follow
worker:1 退出
worker:1 结束定时任务
worker:2 Follow
{"level":"warn","ts":"2020-07-30T00:27:44.609+0800","caller":"clientv3/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"endpoint://client-57e503b2-8212-4eea-b92b-d2f0bca6f281/127.0.0.1:2379","attempt":0,"error":"rpc error: code = Canceled desc = context canceled"}
========第23秒========
worker:0 Follow
worker:2 Follow
========第24秒========
worker:0 Follow
worker:2 Follow
worker:1 Follow
worker:1 参加选举
========第25秒========
worker:0 Follow
worker:2 Follow
worker:1 Follow
========第26秒========
worker:0 选举:成功,Key:/root/elect/694d7395306acfe5
worker:0 每1s执行定时任务: 15
worker:2 Follow
worker:1 Follow
========第27秒========
worker:0 每1s执行定时任务: 16
worker:2 Follow
worker:1 Follow
========第28秒========
worker:0 每1s执行定时任务: 17
worker:2 Follow
worker:1 Follow
worker:0 退出
worker:0 结束定时任务
{"level":"warn","ts":"2020-07-30T00:27:50.616+0800","caller":"clientv3/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"endpoint://client-c04606c3-6323-4351-a40c-0b283bdb43a5/127.0.0.1:2379","attempt":0,"error":"rpc error: code = Canceled desc = context canceled"}
========第29秒========
worker:2 Follow
worker:1 Follow
========第30秒========
worker:2 Follow
worker:1 Follow
worker:0 参加选举
worker:0 Follow
========第31秒========
worker:2 Follow
worker:1 Follow
worker:0 Follow
========第32秒========
worker:2 选举:成功,Key:/root/elect/694d7395306acfeb
worker:2 每1s执行定时任务: 15
worker:1 Follow
worker:0 Follow
========第33秒========
worker:2 每1s执行定时任务: 16
worker:1 Follow
worker:0 Follow
========第34秒========
worker:2 每1s执行定时任务: 17
worker:1 Follow
worker:0 Follow
worker:2 退出
worker:2 结束定时任务
{"level":"warn","ts":"2020-07-30T00:27:56.621+0800","caller":"clientv3/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"endpoint://client-c91918eb-7478-4d18-bef1-39fd076e3cf1/127.0.0.1:2379","attempt":0,"error":"rpc error: code = Canceled desc = context canceled"}
========第35秒========
worker:1 Follow
worker:0 Follow
========第36秒========
worker:1 Follow
worker:0 Follow
worker:2 Follow
worker:2 参加选举
========第37秒========
worker:1 Follow
worker:2 Follow
worker:0 Follow
========第38秒========
worker:1 选举:成功,Key:/root/elect/694d7395306acff1
worker:1 每1s执行定时任务: 15
worker:0 Follow
worker:2 Follow
========第39秒========
worker:1 每1s执行定时任务: 16
worker:0 Follow
worker:2 Follow
========第40秒========
worker:1 每1s执行定时任务: 17
worker:1 退出
worker:0 Follow
worker:2 Follow
worker:1 结束定时任务
{"level":"warn","ts":"2020-07-30T00:28:02.623+0800","caller":"clientv3/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"endpoint://client-0e8ec9ff-5258-4cc9-9314-29e1febb9b16/127.0.0.1:2379","attempt":0,"error":"rpc error: code = Canceled desc = context canceled"}
========第41秒========
worker:0 Follow
worker:2 Follow
========第42秒========
worker:2 Follow
worker:0 Follow
worker:1 参加选举
worker:1 Follow
========第43秒========
worker:0 Follow
worker:2 Follow
worker:1 Follow
========第44秒========
worker:0 选举:成功,Key:/root/elect/694d7395306acff7
worker:0 每1s执行定时任务: 15
worker:2 Follow
worker:1 Follow
========第45秒========
worker:2 Follow
worker:0 每1s执行定时任务: 16
worker:1 Follow
========第46秒========
worker:0 每1s执行定时任务: 17
worker:2 Follow
worker:0 退出
worker:1 Follow
worker:0 结束定时任务
参考资料

Raft算法及etcd/raft的实现思路借鉴
http://atomicer.cn/2019/05/17/etcd-raft%E4%B8%ADraft%E7%AE%97%E6%B3%95%E7%9A%84%E5%AE%9E%E7%8E%B0%E6%80%9D%E8%B7%AF%E5%8F%82%E8%80%83/