配置文件
etcd配置文件位于/etc/etcd/etcd.conf,该配置文件一共有5个section
| 名称 | 作用 |
|---|---|
| member | 本节点的配置,包括监听服务端口、心跳时间等 |
| cluster | 集群配置,包括集群状态、集群名称以及本节点广播地址 |
| proxy | 用于网络自动发现服务 |
| security | 安全配置 |
| logging | 日志功能组件 |
具体配置采集可以见另一个文章《Etcd集群配置和使用》
初次看到配置文件,都会有一个疑问,为什么在members已经设置了监听服务地址,为什么在cluster还要再次设置一次广播地址呢?
原因:etcd主要的通信协议主要是http协议,对于http协议中所周知它是B/S结构,而非C/S结构,只能一端主动给另一端发消息而反过来则不可。所以对于集群来说,双方必须都要知道对方具体监听地址。
服务监听
我们都知道,建立socket服务端一共有5个基本步骤(C语言):
1、创建socket套接字
2、bind地址及端口
3、listen监听服务4
4、accept接收客户端连接
5、启动新线程为客户端服务。
正所谓万变不离其宗,到了etcd中(etcd使用默认golang http模块)也是这些步骤,只不过是被封装了一下(语法糖)
启动流程见《Etcd源码分析:启动篇》
listener
当进入 /etcd.go里面的StartEtcd()函数的时候
//为peer创建listener,socket三部曲只到了第二个步骤 if e.Peers, err = startPeerListeners(cfg); err != nil { return e, err } //为client创建listener,socket三部曲只到了第二个步骤 if e.sctxs, err = startClientListeners(cfg); err != nil { return e, err }在创建了listener之后,开始创建EtcdServer
//创建EtcdServer并且创建raftNode并运行raftNode if e.Server, err = etcdserver.NewServer(srvcfg); err != nil { return e, err } // buffer channel so goroutines on closed connections won't wait forever e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))······ e.Server.Start() if err = e.servePeers(); err != nil { return e, err } if err = e.serveClients(); err != nil { return e, err } if err = e.serveMetrics(); err != nil { return e, err } serving = trueListener有两个分别为:peer listener和client listener,两者大同小异,这里拿peer listener做为分析对象。
func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {······ peers = make([]*peerListener, len(cfg.LPUrls))······ for i, u := range cfg.LPUrls { //循环遍历多个peer url if u.Scheme == "http" { if !cfg.PeerTLSInfo.Empty() { plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String()) } if cfg.PeerTLSInfo.ClientCertAuth { plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String()) } } // 构造peerListener对象 监听2380 作为服务端模式 peers[i] = &peerListener{close: func(context.Context) error { return nil }} //调用接口,创建listener对象,返回来之后,socket套接字已经完成listener监听流程 peers[i].Listener, err = rafthttp.NewListener(u, &cfg.PeerTLSInfo) if err != nil { return nil, err } // once serve, overwrite with 'http.Server.Shutdown' peers[i].close = func(context.Context) error { return peers[i].Listener.Close() } plog.Info("listening for peers on ", u.String()) } return peers, nil}下面调用关系为
startPeerListeners() [ /etcd.go] -> rafthttp.NewListener() [rafthttp/util.go] -> transport.NewTimeoutListener() [pkg/transport/timeout_listener.go] -> newListener() [pkg/transport/listener.go] -> net.Listen() [golang net库函数]服务监听
服务端socket需要调用Accept方法,我们来看一下serve方法。方法serve大致内容为:将每个服务放到gorouting中,也就是启动一个协程来监听服务。
先看看servePeers()
func (e *Etcd) servePeers() (err error) { ph := etcdhttp.NewPeerHandler(e.Server) var peerTLScfg *tls.Config if !e.cfg.PeerTLSInfo.Empty() { if peerTLScfg, err = e.cfg.PeerTLSInfo.ServerConfig(); err != nil { return err } } for _, p := range e.Peers { gs := v3rpc.Server(e.Server, peerTLScfg) m := cmux.New(p.Listener) go gs.Serve(m.Match(cmux.HTTP2())) srv := &http.Server{ Handler: grpcHandlerFunc(gs, ph), ReadTimeout: 5 * time.Minute, ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error } go srv.Serve(m.Match(cmux.Any())) p.serve = func() error { return m.Serve() } p.close = func(ctx context.Context) error { // gracefully shutdown http.Server // close open listeners, idle connections // until context cancel or time-out stopServers(ctx, &servers{secure: peerTLScfg != nil, grpc: gs, http: srv}) return nil } } // start peer servers in a goroutine for _, pl := range e.Peers { go func(l *peerListener) { e.errHandler(l.serve()) }(pl) } return nil}1、生成http.hander 用于处理peer请求;
2、在for循环里面,起一些goroutine,调用Server()函数来接受Listener传入的连接。
我们来看看NewPeerHandler()
func newPeerHandler(cluster api.Cluster, raftHandler http.Handler, leaseHandler http.Handler) http.Handler { mh := &peerMembersHandler{ cluster: cluster, } //将url和业务层handler注册到servemux中,也就是每一个url请求都会有其对应的handler进行处理 //初始化一个Serve Multiplexer结构 mux := http.NewServeMux() mux.HandleFunc("/", http.NotFound) mux.Handle(rafthttp.RaftPrefix, raftHandler) mux.Handle(rafthttp.RaftPrefix+"/", raftHandler) mux.Handle(peerMembersPrefix, mh) //处理请求/members handler是mh,即peerMembersHandler if leaseHandler != nil { mux.Handle(leasehttp.LeasePrefix, leaseHandler) mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler) } mux.HandleFunc(versionPath, versionHandler(cluster, serveVersion)) return mux}应用层业务逻辑需要自己注册url和handler,这样才能保证每个http request都能够被处理。而每个handler都必须要实现对应接口ServeHTTP,例如peerMembersHandler,实现的ServeHTTP接口是用于返回集群成员列表
那么此处只是完成注册,那么在什么地方会调用此处handler?
答案是在ServeHTTP()里面
继续阅读与本文标签相同的文章
专访杨开振:程序员除了敲代码还能做什么?
MySQL 查询命令
-
Fast.ai:指标是一把双刃剑,用不好就成了垃圾食品自助站
2026-05-27栏目: 教程
-
【机器学习实战】理解Scikit-Learn中分类性能度量指标
2026-05-27栏目: 教程
-
AI未来如何应对数据饥荒?联邦学习的崛起
2026-05-27栏目: 教程
-
别人家的孩子系列!13岁中学生改良马斯克“超级高铁”方案,可为列车提速一倍
2026-05-27栏目: 教程
-
Node.js服务端开发教程 (二):新的软件交付方式
2026-05-27栏目: 教程
