配置文件

etcd配置文件位于/etc/etcd/etcd.conf,该配置文件一共有5个section

名称作用
member本节点的配置,包括监听服务端口、心跳时间等
cluster集群配置,包括集群状态、集群名称以及本节点广播地址
proxy用于网络自动发现服务
security安全配置
logging日志功能组件

具体配置采集可以见另一个文章《Etcd集群配置和使用》

初次看到配置文件,都会有一个疑问,为什么在members已经设置了监听服务地址,为什么在cluster还要再次设置一次广播地址呢?

原因:etcd主要的通信协议主要是http协议,对于http协议中所周知它是B/S结构,而非C/S结构,只能一端主动给另一端发消息而反过来则不可。所以对于集群来说,双方必须都要知道对方具体监听地址。

服务监听

我们都知道,建立socket服务端一共有5个基本步骤(C语言):
1、创建socket套接字
2、bind地址及端口
3、listen监听服务4
4、accept接收客户端连接
5、启动新线程为客户端服务。
正所谓万变不离其宗,到了etcd中(etcd使用默认golang http模块)也是这些步骤,只不过是被封装了一下(语法糖)

启动流程见《Etcd源码分析:启动篇》

listener

当进入 /etcd.go里面的StartEtcd()函数的时候

      //为peer创建listener,socket三部曲只到了第二个步骤    if e.Peers, err = startPeerListeners(cfg); err != nil {        return e, err    }      //为client创建listener,socket三部曲只到了第二个步骤    if e.sctxs, err = startClientListeners(cfg); err != nil {        return e, err    }

在创建了listener之后,开始创建EtcdServer

      //创建EtcdServer并且创建raftNode并运行raftNode    if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {        return e, err    }    // buffer channel so goroutines on closed connections won't wait forever    e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))······    e.Server.Start()    if err = e.servePeers(); err != nil {        return e, err    }    if err = e.serveClients(); err != nil {        return e, err    }    if err = e.serveMetrics(); err != nil {        return e, err    }    serving = true

Listener有两个分别为:peer listener和client listener,两者大同小异,这里拿peer listener做为分析对象。

func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {······    peers = make([]*peerListener, len(cfg.LPUrls))······    for i, u := range cfg.LPUrls {   //循环遍历多个peer url        if u.Scheme == "http" {            if !cfg.PeerTLSInfo.Empty() {                plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String())            }            if cfg.PeerTLSInfo.ClientCertAuth {                plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())            }        }        // 构造peerListener对象 监听2380 作为服务端模式        peers[i] = &peerListener{close: func(context.Context) error { return nil }}                //调用接口,创建listener对象,返回来之后,socket套接字已经完成listener监听流程        peers[i].Listener, err = rafthttp.NewListener(u, &cfg.PeerTLSInfo)        if err != nil {            return nil, err        }        // once serve, overwrite with 'http.Server.Shutdown'        peers[i].close = func(context.Context) error {            return peers[i].Listener.Close()        }        plog.Info("listening for peers on ", u.String())    }    return peers, nil}

下面调用关系为

startPeerListeners()  [ /etcd.go]  -> rafthttp.NewListener()  [rafthttp/util.go]    -> transport.NewTimeoutListener()  [pkg/transport/timeout_listener.go]        -> newListener()  [pkg/transport/listener.go]            -> net.Listen()  [golang net库函数]

服务监听

服务端socket需要调用Accept方法,我们来看一下serve方法。方法serve大致内容为:将每个服务放到gorouting中,也就是启动一个协程来监听服务。

先看看servePeers()

func (e *Etcd) servePeers() (err error) {    ph := etcdhttp.NewPeerHandler(e.Server)    var peerTLScfg *tls.Config    if !e.cfg.PeerTLSInfo.Empty() {        if peerTLScfg, err = e.cfg.PeerTLSInfo.ServerConfig(); err != nil {            return err        }    }    for _, p := range e.Peers {        gs := v3rpc.Server(e.Server, peerTLScfg)        m := cmux.New(p.Listener)        go gs.Serve(m.Match(cmux.HTTP2()))        srv := &http.Server{            Handler:     grpcHandlerFunc(gs, ph),            ReadTimeout: 5 * time.Minute,            ErrorLog:    defaultLog.New(ioutil.Discard, "", 0), // do not log user error        }        go srv.Serve(m.Match(cmux.Any()))        p.serve = func() error { return m.Serve() }        p.close = func(ctx context.Context) error {            // gracefully shutdown http.Server            // close open listeners, idle connections            // until context cancel or time-out            stopServers(ctx, &servers{secure: peerTLScfg != nil, grpc: gs, http: srv})            return nil        }    }    // start peer servers in a goroutine    for _, pl := range e.Peers {        go func(l *peerListener) {            e.errHandler(l.serve())        }(pl)    }    return nil}

1、生成http.hander 用于处理peer请求;
2、在for循环里面,起一些goroutine,调用Server()函数来接受Listener传入的连接。

我们来看看NewPeerHandler()

func newPeerHandler(cluster api.Cluster, raftHandler http.Handler, leaseHandler http.Handler) http.Handler {    mh := &peerMembersHandler{        cluster: cluster,    } //将url和业务层handler注册到servemux中,也就是每一个url请求都会有其对应的handler进行处理     //初始化一个Serve Multiplexer结构      mux := http.NewServeMux()    mux.HandleFunc("/", http.NotFound)    mux.Handle(rafthttp.RaftPrefix, raftHandler)    mux.Handle(rafthttp.RaftPrefix+"/", raftHandler)     mux.Handle(peerMembersPrefix, mh)  //处理请求/members handler是mh,即peerMembersHandler    if leaseHandler != nil {        mux.Handle(leasehttp.LeasePrefix, leaseHandler)        mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler)    }    mux.HandleFunc(versionPath, versionHandler(cluster, serveVersion))    return mux}

应用层业务逻辑需要自己注册url和handler,这样才能保证每个http request都能够被处理。而每个handler都必须要实现对应接口ServeHTTP,例如peerMembersHandler,实现的ServeHTTP接口是用于返回集群成员列表

那么此处只是完成注册,那么在什么地方会调用此处handler?

答案是在ServeHTTP()里面

收藏 打印