nsq/nsqlookupd 分析之二

上文分析, 来看看 lookup_protocol_v1 协议的实现

lookup_protocol_v1.go

package nsqlookupd

import (
  "bufio"
  "encoding/binary"
  "encoding/json"
  "fmt"
  "io"
  "log"
  "net"
  "os"
  "strings"
  "sync/atomic"
  "time"

  "github.com/nsqio/nsq/internal/protocol"
  "github.com/nsqio/nsq/internal/version"
)

// 核心 都包含 ctx
type LookupProtocolV1 struct {
  ctx *Context
}

// IO 循环
func (p *LookupProtocolV1) IOLoop(conn net.Conn) error {
  var err error
  var line string

  // 创建 NewClientV1
  client := NewClientV1(conn)
  // 读取器, 读取数据
  reader := bufio.NewReader(client)
  for {
    line, err = reader.ReadString('\n')
    if err != nil {
      break
    }

    line = strings.TrimSpace(line)
    params := strings.Split(line, " ")

    var response []byte
    // 执行指令
    response, err = p.Exec(client, reader, params)
    // 这块包含了 nsq/internal/protocol 暂时还没看到, 猜一猜
    // 如果执行指令出现错误
    if err != nil {
      ctx := ""
      if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
        ctx = " - " + parentErr.Error()
      }
      // 执行指令错误后, 打印日志
      p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)

      // 响应空内容
      _, sendErr := protocol.SendResponse(client, []byte(err.Error()))
      if sendErr != nil {
        p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
        break
      }

      // FatalClientErr 类型错误应该关闭连接
      if _, ok := err.(*protocol.FatalClientErr); ok {
        break
      }
      continue
    }
    // 如果执行指令成功, 返回结果
    if response != nil {
      _, err = protocol.SendResponse(client, response)
      if err != nil {
        break
      }
    }
  }

  // 关闭连接, 打印日志
  conn.Close()
  p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): closing", client)
  // 如果包含节点信息, 则获取所有 registrations (上篇已经分析过), 遍历删除所有生产者(因为本轮 io loop 已经结束)
  if client.peerInfo != nil {
    registrations := p.ctx.nsqlookupd.DB.LookupRegistrations(client.peerInfo.id)
    for _, r := range registrations {
      if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed {
        p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",
          client, r.Category, r.Key, r.SubKey)
      }
    }
  }
  return err
}

// IOLoop 函数中用到了, 根据参数来执行不同的指令
func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
  switch params[0] {
  case "PING":
    return p.PING(client, params)
  case "IDENTIFY":
    return p.IDENTIFY(client, reader, params[1:])
  case "REGISTER":
    return p.REGISTER(client, reader, params[1:])
  case "UNREGISTER":
    return p.UNREGISTER(client, reader, params[1:])
  }
  return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}

// 获取 topicName, channelName, 函数名应该改一改
func getTopicChan(command string, params []string) (string, string, error) {
  if len(params) == 0 {
    return "", "", protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("%s insufficient number of params", command))
  }

  topicName := params[0]
  var channelName string
  if len(params) >= 2 {
    channelName = params[1]
  }

  if !protocol.IsValidTopicName(topicName) {
    return "", "", protocol.NewFatalClientErr(nil, "E_BAD_TOPIC", fmt.Sprintf("%s topic name '%s' is not valid", command, topicName))
  }

  if channelName != "" && !protocol.IsValidChannelName(channelName) {
    return "", "", protocol.NewFatalClientErr(nil, "E_BAD_CHANNEL", fmt.Sprintf("%s channel name '%s' is not valid", command, channelName))
  }

  return topicName, channelName, nil
}

// 指令: 注册
func (p *LookupProtocolV1) REGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
  // 如果节点信息为 nil, 还注册个👻啊
  if client.peerInfo == nil {
    return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY")
  }

  // 获取 topicName, channelName
  topic, channel, err := getTopicChan("REGISTER", params)
  if err != nil {
    return nil, err
  }

  // 如果 channel 中有信息, 则以 channel 创建 Registration 为 key, 并添加到生产者 Producer 中
  if channel != "" {
    key := Registration{"channel", topic, channel}
    if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) {
      p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s",
        client, "channel", topic, channel)
    }
  }
  // 以 topic 创建 Registration 为 key, 并添加到生产者 Producer 中
  key := Registration{"topic", topic, ""}
  if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) {
    p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s",
      client, "topic", topic, "")
  }

  // 第一次看觉得返回这个 []byte("OK") 没什么意义啊, 上面也没携带什么信息, 第二次看发现 此函数只是指令的一种, 为了保证函数的统一, 所以...
  return []byte("OK"), nil
}

// 取消注册, 同上, 依旧是删除 channel 和 topic 的 Producter 在上文中我们已经分析过
func (p *LookupProtocolV1) UNREGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
  if client.peerInfo == nil {
    return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY")
  }

  topic, channel, err := getTopicChan("UNREGISTER", params)
  if err != nil {
    return nil, err
  }

  if channel != "" {
    key := Registration{"channel", topic, channel}
    removed, left := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id)
    if removed {
      p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",
        client, "channel", topic, channel)
    }
    // for ephemeral channels, remove the channel as well if it has no producers
    if left == 0 && strings.HasSuffix(channel, "#ephemeral") {
      p.ctx.nsqlookupd.DB.RemoveRegistration(key)
    }
  } else {
    // no channel was specified so this is a topic unregistration
    // remove all of the channel registrations...
    // normally this shouldn't happen which is why we print a warning message
    // if anything is actually removed
    registrations := p.ctx.nsqlookupd.DB.FindRegistrations("channel", topic, "*")
    for _, r := range registrations {
      if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed {
        p.ctx.nsqlookupd.logf(LOG_WARN, "client(%s) unexpected UNREGISTER category:%s key:%s subkey:%s",
          client, "channel", topic, r.SubKey)
      }
    }

    key := Registration{"topic", topic, ""}
    if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id); removed {
      p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",
        client, "topic", topic, "")
    }
  }

  return []byte("OK"), nil
}

// 识别消息, 并返回构建的响应体
func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
  var err error

  if client.peerInfo != nil {
    return nil, protocol.NewFatalClientErr(err, "E_INVALID", "cannot IDENTIFY again")
  }

  // 获取信息长度
  var bodyLen int32
  err = binary.Read(reader, binary.BigEndian, &bodyLen)
  if err != nil {
    return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body size")
  }

  // 读取信息
  body := make([]byte, bodyLen)
  _, err = io.ReadFull(reader, body)
  if err != nil {
    return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body")
  }

  // body 是个 json struct, 转 json
  peerInfo := PeerInfo{id: client.RemoteAddr().String()}
  err = json.Unmarshal(body, &peerInfo)
  if err != nil {
    return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to decode JSON body")
  }

  // 赋值 RemoteAddress
  peerInfo.RemoteAddress = client.RemoteAddr().String()

  // 验证所有字段
  if peerInfo.BroadcastAddress == "" || peerInfo.TCPPort == 0 || peerInfo.HTTPPort == 0 || peerInfo.Version == "" {
    return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", "IDENTIFY missing fields")
  }

  // 原子增值, 比系统锁高效
  atomic.StoreInt64(&peerInfo.lastUpdate, time.Now().UnixNano())

  p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): IDENTIFY Address:%s TCP:%d HTTP:%d Version:%s",
    client, peerInfo.BroadcastAddress, peerInfo.TCPPort, peerInfo.HTTPPort, peerInfo.Version)

  // 赋值 peerInfo
  client.peerInfo = &peerInfo
  if p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo}) {
    p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "client", "", "")
  }

  // 构建响应体
  data := make(map[string]interface{})
  data["tcp_port"] = p.ctx.nsqlookupd.RealTCPAddr().Port
  data["http_port"] = p.ctx.nsqlookupd.RealHTTPAddr().Port
  // 引入 version 包
  data["version"] = version.Binary
  hostname, err := os.Hostname()
  if err != nil {
    log.Fatalf("ERROR: unable to get hostname %s", err)
  }
  data["broadcast_address"] = p.ctx.nsqlookupd.opts.BroadcastAddress
  data["hostname"] = hostname

  // json 序列化
  response, err := json.Marshal(data)
  if err != nil {
    p.ctx.nsqlookupd.logf(LOG_ERROR, "marshaling %v", data)
    return []byte("OK"), nil
  }
  return response, nil
}

// ping, 原子锁
func (p *LookupProtocolV1) PING(client *ClientV1, params []string) ([]byte, error) {
  if client.peerInfo != nil {
    // 获取上一次 ping 的时间
    cur := time.Unix(0, atomic.LoadInt64(&client.peerInfo.lastUpdate))
    now := time.Now()
    p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): pinged (last ping %s)", client.peerInfo.id,
      now.Sub(cur))
    // 设置本地 ping 的时间
    atomic.StoreInt64(&client.peerInfo.lastUpdate, now.UnixNano())
  }
  return []byte("OK"), nil
}

在上面的代码中, 用到了 NewClientV1, 简单到没话说, 我觉得此乃 go 语言精髓

client_v1.go

package nsqlookupd

import (
  "net"
)

type ClientV1 struct {
  net.Conn
  peerInfo *PeerInfo
}

func NewClientV1(conn net.Conn) *ClientV1 {
  return &ClientV1{
    Conn: conn,
  }
}

func (c *ClientV1) String() string {
  return c.RemoteAddr().String()
}

默认配置选项 Options

options.go

type Options struct {
  LogLevel  string `flag:"log-level"`
  LogPrefix string `flag:"log-prefix"`
  Verbose   bool   `flag:"verbose"` // for backwards compatibility
  Logger    Logger
  logLevel  lg.LogLevel // private, not really an option

  TCPAddress       string `flag:"tcp-address"`
  HTTPAddress      string `flag:"http-address"`
  BroadcastAddress string `flag:"broadcast-address"`

  InactiveProducerTimeout time.Duration `flag:"inactive-producer-timeout"`
  TombstoneLifetime       time.Duration `flag:"tombstone-lifetime"`
}

func NewOptions() *Options {
  hostname, err := os.Hostname()
  if err != nil {
    log.Fatal(err)
  }

  return &Options{
    LogPrefix:        "[nsqlookupd] ", // 默认前缀
    LogLevel:         "info", // 日志级别
    TCPAddress:       "0.0.0.0:4160", // 默认 tpc 端口
    HTTPAddress:      "0.0.0.0:4161", // 默认 http 端口
    BroadcastAddress: hostname, // hostname

    InactiveProducerTimeout: 300 * time.Second, // 与 Producer 交互超时时间
    TombstoneLifetime:       45 * time.Second, // 销毁周期, 一共45秒, 理解为 45秒内完成操作
  }
}

代码中比较玄妙的地方也就是 atomic.StoreInt64atomic.LoadInt64, 替换了 RLock, 性能得到了提升; 然鹅 Go 语言并没有很多可以炫技的地方, 充满了浓浓的实用风格, 深得我心;

http.go

package nsqlookupd

import (
  "fmt"
  "net/http"
  "net/http/pprof"
  "sync/atomic"

  "github.com/julienschmidt/httprouter"
  "github.com/nsqio/nsq/internal/http_api"
  "github.com/nsqio/nsq/internal/protocol"
  "github.com/nsqio/nsq/internal/version"
)

// 上下文和路由
type httpServer struct {
  ctx    *Context
  router http.Handler
}

func newHTTPServer(ctx *Context) *httpServer {
  log := http_api.Log(ctx.nsqlookupd.logf)

  // 一个轻量级的路由, 这部分基本都在给 nsq 做 api 来使用了,
  router := httprouter.New()
  // 自定义 NotFound 需要开启该选项
  router.HandleMethodNotAllowed = true
  // panic recover
  router.PanicHandler = http_api.LogPanicHandler(ctx.nsqlookupd.logf)
  router.NotFound = http_api.LogNotFoundHandler(ctx.nsqlookupd.logf)
  router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqlookupd.logf)
  s := &httpServer{
    ctx:    ctx,
    router: router,
  }

  // 路由注册 ...
  router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
  router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))

  // v1 negotiate
  router.Handle("GET", "/debug", http_api.Decorate(s.doDebug, log, http_api.V1))
  router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_api.V1))
  router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_api.V1))
  router.Handle("GET", "/channels", http_api.Decorate(s.doChannels, log, http_api.V1))
  router.Handle("GET", "/nodes", http_api.Decorate(s.doNodes, log, http_api.V1))

  // only v1
  router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
  router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))
  router.Handle("POST", "/channel/create", http_api.Decorate(s.doCreateChannel, log, http_api.V1))
  router.Handle("POST", "/channel/delete", http_api.Decorate(s.doDeleteChannel, log, http_api.V1))
  router.Handle("POST", "/topic/tombstone", http_api.Decorate(s.doTombstoneTopicProducer, log, http_api.V1))

  // debug
  router.HandlerFunc("GET", "/debug/pprof", pprof.Index)
  router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline)
  router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol)
  router.HandlerFunc("POST", "/debug/pprof/symbol", pprof.Symbol)
  router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile)
  router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap"))
  router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine"))
  router.Handler("GET", "/debug/pprof/block", pprof.Handler("block"))
  router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate"))

  return s
}

// 把路由交给 http_server 包
func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
  s.router.ServeHTTP(w, req)
}

func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  return "OK", nil
}

// 奇怪的函数 不知道在做什么
func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  return struct {
    Version string `json:"version"`
  }{
    Version: version.Binary,
  }, nil
}

// 获取所有 topic
func (s *httpServer) doTopics(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  topics := s.ctx.nsqlookupd.DB.FindRegistrations("topic", "*", "").Keys()
  return map[string]interface{}{
    "topics": topics,
  }, nil
}

// 获取所有 channel
func (s *httpServer) doChannels(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  // 获取 topicName
  reqParams, err := http_api.NewReqParams(req)
  if err != nil {
    return nil, http_api.Err{400, "INVALID_REQUEST"}
  }

  topicName, err := reqParams.Get("topic")
  if err != nil {
    return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
  }

  // 根据 topicName 获取 channel
  channels := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, "*").SubKeys()
  return map[string]interface{}{
    "channels": channels,
  }, nil
}

// 根据条件查找
func (s *httpServer) doLookup(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  reqParams, err := http_api.NewReqParams(req)
  if err != nil {
    return nil, http_api.Err{400, "INVALID_REQUEST"}
  }

  topicName, err := reqParams.Get("topic")
  if err != nil {
    return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
  }

  // 获取 registration (map[Key]Producers)
  registration := s.ctx.nsqlookupd.DB.FindRegistrations("topic", topicName, "")
  if len(registration) == 0 {
    return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}
  }

  // 获取 channel, 同上
  channels := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, "*").SubKeys()
  // 获取 topic 的 producers
  producers := s.ctx.nsqlookupd.DB.FindProducers("topic", topicName, "")
  // 过滤还存活的 producers
  producers = producers.FilterByActive(s.ctx.nsqlookupd.opts.InactiveProducerTimeout,
    s.ctx.nsqlookupd.opts.TombstoneLifetime)
  // 返回 channel 和所有 producer 的节点信息
  return map[string]interface{}{
    "channels":  channels,
    "producers": producers.PeerInfo(),
  }, nil
}

// 创建 topic, 没有特别的, 如上文分析 registration_db
func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  reqParams, err := http_api.NewReqParams(req)
  if err != nil {
    return nil, http_api.Err{400, "INVALID_REQUEST"}
  }

  topicName, err := reqParams.Get("topic")
  if err != nil {
    return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
  }

  if !protocol.IsValidTopicName(topicName) {
    return nil, http_api.Err{400, "INVALID_ARG_TOPIC"}
  }

  s.ctx.nsqlookupd.logf(LOG_INFO, "DB: adding topic(%s)", topicName)
  key := Registration{"topic", topicName, ""}
  s.ctx.nsqlookupd.DB.AddRegistration(key)

  return nil, nil
}

// 删除 topic
func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  reqParams, err := http_api.NewReqParams(req)
  if err != nil {
    return nil, http_api.Err{400, "INVALID_REQUEST"}
  }

  topicName, err := reqParams.Get("topic")
  if err != nil {
    return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
  }

  // 获取 channel 的 registrations, 遍历删除所有
  registrations := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, "*")
  for _, registration := range registrations {
    s.ctx.nsqlookupd.logf(LOG_INFO, "DB: removing channel(%s) from topic(%s)", registration.SubKey, topicName)
    s.ctx.nsqlookupd.DB.RemoveRegistration(registration)
  }

  // // 获取 topic 的 registrations, 遍历删除所有
  registrations = s.ctx.nsqlookupd.DB.FindRegistrations("topic", topicName, "")
  for _, registration := range registrations {
    s.ctx.nsqlookupd.logf(LOG_INFO, "DB: removing topic(%s)", topicName)
    s.ctx.nsqlookupd.DB.RemoveRegistration(registration)
  }

  return nil, nil
}

// 墓碑话题生产者? 应该是冻结/封存/删除之类的意思, 同上还是 registration_db 中的内容
func (s *httpServer) doTombstoneTopicProducer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  reqParams, err := http_api.NewReqParams(req)
  if err != nil {
    return nil, http_api.Err{400, "INVALID_REQUEST"}
  }

  topicName, err := reqParams.Get("topic")
  if err != nil {
    return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
  }

  node, err := reqParams.Get("node")
  if err != nil {
    return nil, http_api.Err{400, "MISSING_ARG_NODE"}
  }

  s.ctx.nsqlookupd.logf(LOG_INFO, "DB: setting tombstone for [email protected]%s of topic(%s)", node, topicName)
  producers := s.ctx.nsqlookupd.DB.FindProducers("topic", topicName, "")
  for _, p := range producers {
    thisNode := fmt.Sprintf("%s:%d", p.peerInfo.BroadcastAddress, p.peerInfo.HTTPPort)
    if thisNode == node {
      p.Tombstone()
    }
  }

  return nil, nil
}

// 创建 channel, 同 doCreateTopic
func (s *httpServer) doCreateChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  reqParams, err := http_api.NewReqParams(req)
  if err != nil {
    return nil, http_api.Err{400, "INVALID_REQUEST"}
  }

  topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams)
  if err != nil {
    return nil, http_api.Err{400, err.Error()}
  }

  s.ctx.nsqlookupd.logf(LOG_INFO, "DB: adding channel(%s) in topic(%s)", channelName, topicName)
  key := Registration{"channel", topicName, channelName}
  s.ctx.nsqlookupd.DB.AddRegistration(key)

  s.ctx.nsqlookupd.logf(LOG_INFO, "DB: adding topic(%s)", topicName)
  key = Registration{"topic", topicName, ""}
  s.ctx.nsqlookupd.DB.AddRegistration(key)

  return nil, nil
}

// 删除 channel, 同 doDeleteTopic
func (s *httpServer) doDeleteChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  reqParams, err := http_api.NewReqParams(req)
  if err != nil {
    return nil, http_api.Err{400, "INVALID_REQUEST"}
  }

  topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams)
  if err != nil {
    return nil, http_api.Err{400, err.Error()}
  }

  registrations := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, channelName)
  if len(registrations) == 0 {
    return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"}
  }

  s.ctx.nsqlookupd.logf(LOG_INFO, "DB: removing channel(%s) from topic(%s)", channelName, topicName)
  for _, registration := range registrations {
    s.ctx.nsqlookupd.DB.RemoveRegistration(registration)
  }

  return nil, nil
}

// 节点信息, 基本和 PeerInfo 的定义一致
type node struct {
  RemoteAddress    string   `json:"remote_address"`
  Hostname         string   `json:"hostname"`
  BroadcastAddress string   `json:"broadcast_address"`
  TCPPort          int      `json:"tcp_port"`
  HTTPPort         int      `json:"http_port"`
  Version          string   `json:"version"`
  Tombstones       []bool   `json:"tombstones"`
  Topics           []string `json:"topics"`
}

// 处理节点, 其实就是 PeerInfo 到 node 数据结构的转换
func (s *httpServer) doNodes(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {

  // 获取 client 的 producers, 现在有三种 producers, 分别为: channel topic client
  producers := s.ctx.nsqlookupd.DB.FindProducers("client", "", "").FilterByActive(
    s.ctx.nsqlookupd.opts.InactiveProducerTimeout, 0)
  nodes := make([]*node, len(producers))
  for i, p := range producers {
    topics := s.ctx.nsqlookupd.DB.LookupRegistrations(p.peerInfo.id).Filter("topic", "*", "").Keys()

    // for each topic find the producer that matches this peer
    // to add tombstone information
    tombstones := make([]bool, len(topics))
    for j, t := range topics {
      topicProducers := s.ctx.nsqlookupd.DB.FindProducers("topic", t, "")
      for _, tp := range topicProducers {
        if tp.peerInfo == p.peerInfo {
          tombstones[j] = tp.IsTombstoned(s.ctx.nsqlookupd.opts.TombstoneLifetime)
        }
      }
    }

    nodes[i] = &node{
      RemoteAddress:    p.peerInfo.RemoteAddress,
      Hostname:         p.peerInfo.Hostname,
      BroadcastAddress: p.peerInfo.BroadcastAddress,
      TCPPort:          p.peerInfo.TCPPort,
      HTTPPort:         p.peerInfo.HTTPPort,
      Version:          p.peerInfo.Version,
      Tombstones:       tombstones,
      Topics:           topics,
    }
  }

  return map[string]interface{}{
    "producers": nodes,
  }, nil
}

// debug, 锁
func (s *httpServer) doDebug(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  s.ctx.nsqlookupd.DB.RLock()
  defer s.ctx.nsqlookupd.DB.RUnlock()

  data := make(map[string][]map[string]interface{})
  // 遍历 registrationMap, 转换结构
  for r, producers := range s.ctx.nsqlookupd.DB.registrationMap {
    key := r.Category + ":" + r.Key + ":" + r.SubKey
    for _, p := range producers {
      m := map[string]interface{}{
        "id":                p.peerInfo.id,
        "hostname":          p.peerInfo.Hostname,
        "broadcast_address": p.peerInfo.BroadcastAddress,
        "tcp_port":          p.peerInfo.TCPPort,
        "http_port":         p.peerInfo.HTTPPort,
        "version":           p.peerInfo.Version,
        "last_update":       atomic.LoadInt64(&p.peerInfo.lastUpdate),
        "tombstoned":        p.tombstoned,
        "tombstoned_at":     p.tombstonedAt.UnixNano(),
      }
      data[key] = append(data[key], m)
    }
  }

  return data, nil
}

tcp.go

package nsqlookupd

import (
  "io"
  "net"

  "github.com/nsqio/nsq/internal/protocol"
)

type tcpServer struct {
  ctx *Context
}

// 处理 TCP
func (p *tcpServer) Handle(clientConn net.Conn) {
  p.ctx.nsqlookupd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr())

  // 协议规定客户端需要以4个字节为初始消息, so ...
  buf := make([]byte, 4)
  _, err := io.ReadFull(clientConn, buf)
  if err != nil {
    p.ctx.nsqlookupd.logf(LOG_ERROR, "failed to read protocol version - %s", err)
    return
  }
  protocolMagic := string(buf)

  p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
    clientConn.RemoteAddr(), protocolMagic)

  // 使用 v1 版本协议
  var prot protocol.Protocol
  switch protocolMagic {
  case "  V1":
    prot = &LookupProtocolV1{ctx: p.ctx}
  default:
    protocol.SendResponse(clientConn, []byte("E_BAD_PROTOCOL"))
    clientConn.Close()
    p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
      clientConn.RemoteAddr(), protocolMagic)
    return
  }

  // io loop 前文已经分析过
  err = prot.IOLoop(clientConn)
  if err != nil {
    p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
    return
  }
}

看到底, 发现根源还在 nsq/internal/protocol 部分, 要着重分析一下, nsqlookupd, 已经分析完毕;
大概是以下步骤

  1. 启动 tcp, 负责执行 ping, register ... 等系统性任务,
  2. 启动 http 服务, 负责 api 部分, 包括增加删除节点信息(PeerInfo)等等