raftexample是etcd提供的示例,演示了etcd raft共识算法库的使用。 raftexample最终实现了一个提供REST API的分布式键值存储服务。
本文将对raftexample的代码进行阅读和分析,希望能够帮助读者更好地理解如何使用etcd raft库以及raft库的实现逻辑。
raftexample的架构非常简单,主要文件如下:
写入请求通过 HTTP PUT 请求到达 httpapi 模块的 ServeHTTP 方法。
curl -L http://127.0.0.1:12380/key -XPUT -d value
通过switch匹配到HTTP请求方法后,进入PUT方法处理流程:
通过raft算法库提供的Propose方法将proposal提交给raft算法库。
提案的内容可以是添加新的键值对、更新已有的键值对等
// httpapi.go v, err := io.ReadAll(r.Body) if err != nil { log.Printf("Failed to read on PUT (%v)\n", err) http.Error(w, "Failed on PUT", http.StatusBadRequest) return } h.store.Propose(key, string(v)) w.WriteHeader(http.StatusNoContent)
接下来我们看看kvstore模块的Propose方法,看看提案是如何构造和处理的。
在Propose方法中,我们首先使用gob对要写入的键值对进行编码,然后将编码后的内容传递给proposeC,proposeC是负责将kvstore模块构建的proposal传输到raft模块的通道。
// kvstore.go func (s *kvstore) Propose(k string, v string) { var buf strings.Builder if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil { log.Fatal(err) } s.proposeC由kvstore构造并传递给proposeC的proposal由raft模块中的serveChannels方法接收和处理。
在确认proposeC没有被关闭后,raft模块使用raft算法库提供的Propose方法将proposal提交给raft算法库进行处理。
// raft.go select { case prop, ok :=提案提交后,遵循raft算法流程。提案最终将转发到领导节点(如果当前节点不是领导节点,并且您允许追随者转发提案,由 DisableProposalForwarding 配置控制)。 Leader 会将提案作为日志条目添加到其 raft 日志中,并与其他 follower 节点同步。被视为已提交后,将应用到状态机并将结果返回给用户。
但是,由于etcd raft库本身不处理节点之间的通信、追加到raft日志、应用到状态机等,所以raft库只准备这些操作所需的数据。实际操作必须由我们来执行。
因此,我们需要从raft库接收这些数据,并根据其类型进行相应的处理。 Ready方法返回一个只读通道,通过该通道我们可以接收需要处理的数据。
需要注意的是,接收到的数据包括多个字段,例如要应用的快照、要附加到raft日志的日志条目、要通过网络传输的消息等。
继续我们的写请求示例(Leader节点),收到相应数据后,我们需要持久保存快照、HardState和Entries,以处理服务器崩溃引起的问题(例如,一个follower为多个候选人投票)。 HardState 和 Entries 共同构成了本文中提到的所有服务器上的持久状态。持久保存它们后,我们可以应用快照并追加到 raft 日志中。
由于我们当前是leader节点,raft库会返回MsgApp类型的消息给我们(对应论文中的AppendEntries RPC)。我们需要将这些消息发送到跟随者节点。这里,我们使用etcd提供的rafthttp进行节点通信,并使用Send方法将消息发送给follower节点。
// raft.go case rd :=接下来,我们使用publishEntries方法将提交的raft日志条目应用到状态机。如前所述,在 raftexample 中,kvstore 模块充当状态机。在publishEntries方法中,我们将需要应用到状态机的日志条目传递给commitC。与之前的proposeC类似,commitC负责将raft模块认为已提交的日志条目传输到kvstore模块,以应用到状态机。
// raft.go rc.commitC在kvstore模块的readCommits方法中,从commitC读取的消息被gob解码以检索原始键值对,然后将其存储在kvstore模块内的map结构中。
// kvstore.go for commit := range commitC { ... for _, data := range commit.data { var dataKv kv dec := gob.NewDecoder(bytes.NewBufferString(data)) if err := dec.Decode(&dataKv); err != nil { log.Fatalf("raftexample: could not decode message (%v)", err) } s.mu.Lock() s.kvStore[dataKv.Key] = dataKv.Val s.mu.Unlock() } close(commit.applyDoneC) }回到raft模块,我们使用Advance方法通知raft库我们已经处理完从Ready通道读取的数据,准备处理下一批数据。
之前,在leader节点上,我们使用Send方法向follower节点发送MsgApp类型的消息。 follower节点的rafthttp监听相应的端口,接收请求并返回响应。无论是follower节点收到的请求,还是leader节点收到的响应,都会通过Step方法提交到raft库处理。
raftNode实现了rafthttp中的Raft接口,调用Raft接口的Process方法来处理接收到的请求内容(如MsgApp消息)。
// raft.go func (rc *raftNode) Process(ctx context.Context, m raftpb.Message) error { return rc.node.Step(ctx, m) }上面描述了raftexample中一个写请求的完整处理流程。
概括
本文的内容到此结束。通过概述raftexample的结构以及详细说明一个写请求的处理流程,希望能够帮助您更好地理解如何使用etcd raft库构建自己的分布式KV存储服务。
如果有任何错误或问题,请随时评论或直接给我留言。谢谢。
参考
https://github.com/etcd-io/etcd/tree/main/contrib/raftexample
https://github.com/etcd-io/raft
https://raft.github.io/raft.pdf
免责声明: 提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发到邮箱:[email protected] 我们会第一时间内为您处理。
Copyright© 2022 湘ICP备2022001581号-3