EricJJ' Blog

写代码

nsq/nsqlookupd 分析之二

2017-10-10

  1. 1. lookup_protocol_v1.go
  2. 2. client_v1.go
  3. 3. options.go
  4. 4. http.go
  5. 5. tcp.go

上文分析, 来看看 lookup_protocol_v1 协议的实现

lookup_protocol_v1.go

package nsqlookupd

import (
  "bufio"
  "encoding/binary"
  "encoding/json"
  "fmt"
  "io"
  "log"
  "net"
  "os"
  "strings"
  "sync/atomic"
  "time"

  "github.com/nsqio/nsq/internal/protocol"
  "github.com/nsqio/nsq/internal/version"
)

// 核心 都包含 ctx
type LookupProtocolV1 struct {
  ctx *Context
}

// IO 循环
func (p *LookupProtocolV1) IOLoop(conn net.Conn) error {
  var err error
  var line string

  // 创建 NewClientV1
  client := NewClientV1(conn)
  // 读取器, 读取数据
  reader := bufio.NewReader(client)
  for {
    line, err = reader.ReadString('\n')
    if err != nil {
      break
    }

    line = strings.TrimSpace(line)
    params := strings.Split(line, " ")

    var response []byte
    // 执行指令
    response, err = p.Exec(client, reader, params)
    // 这块包含了 nsq/internal/protocol 暂时还没看到, 猜一猜
    // 如果执行指令出现错误
    if err != nil {
      ctx := ""
      if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
        ctx = " - " + parentErr.Error()
      }
      // 执行指令错误后, 打印日志
      p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)

      // 响应空内容
      _, sendErr := protocol.SendResponse(client, []byte(err.Error()))
      if sendErr != nil {
        p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
        break
      }

      // FatalClientErr 类型错误应该关闭连接
      if _, ok := err.(*protocol.FatalClientErr); ok {
        break
      }
      continue
    }
    // 如果执行指令成功, 返回结果
    if response != nil {
      _, err = protocol.SendResponse(client, response)
      if err != nil {
        break
      }
    }
  }

  // 关闭连接, 打印日志
  conn.Close()
  p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): closing", client)
  // 如果包含节点信息, 则获取所有 registrations (上篇已经分析过), 遍历删除所有生产者(因为本轮 io loop 已经结束)
  if client.peerInfo != nil {
    registrations := p.ctx.nsqlookupd.DB.LookupRegistrations(client.peerInfo.id)
    for _, r := range registrations {
      if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed {
        p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",
          client, r.Category, r.Key, r.SubKey)
      }
    }
  }
  return err
}

// IOLoop 函数中用到了, 根据参数来执行不同的指令
func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
  switch params[0] {
  case "PING":
    return p.PING(client, params)
  case "IDENTIFY":
    return p.IDENTIFY(client, reader, params[1:])
  case "REGISTER":
    return p.REGISTER(client, reader, params[1:])
  case "UNREGISTER":
    return p.UNREGISTER(client, reader, params[1:])
  }
  return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}

// 获取 topicName, channelName, 函数名应该改一改
func getTopicChan(command string, params []string) (string, string, error) {
  if len(params) == 0 {
    return "", "", protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("%s insufficient number of params", command))
  }

  topicName := params[0]
  var channelName string
  if len(params) >= 2 {
    channelName = params[1]
  }

  if !protocol.IsValidTopicName(topicName) {
    return "", "", protocol.NewFatalClientErr(nil, "E_BAD_TOPIC", fmt.Sprintf("%s topic name '%s' is not valid", command, topicName))
  }

  if channelName != "" && !protocol.IsValidChannelName(channelName) {
    return "", "", protocol.NewFatalClientErr(nil, "E_BAD_CHANNEL", fmt.Sprintf("%s channel name '%s' is not valid", command, channelName))
  }

  return topicName, channelName, nil
}

// 指令: 注册
func (p *LookupProtocolV1) REGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
  // 如果节点信息为 nil, 还注册个👻啊
  if client.peerInfo == nil {
    return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY")
  }

  // 获取 topicName, channelName
  topic, channel, err := getTopicChan("REGISTER", params)
  if err != nil {
    return nil, err
  }

  // 如果 channel 中有信息, 则以 channel 创建 Registration 为 key, 并添加到生产者 Producer 中
  if channel != "" {
    key := Registration{"channel", topic, channel}
    if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) {
      p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s",
        client, "channel", topic, channel)
    }
  }
  // 以 topic 创建 Registration 为 key, 并添加到生产者 Producer 中
  key := Registration{"topic", topic, ""}
  if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) {
    p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s",
      client, "topic", topic, "")
  }

  // 第一次看觉得返回这个 []byte("OK") 没什么意义啊, 上面也没携带什么信息, 第二次看发现 此函数只是指令的一种, 为了保证函数的统一, 所以...
  return []byte("OK"), nil
}

// 取消注册, 同上, 依旧是删除 channel 和 topic 的 Producter 在上文中我们已经分析过
func (p *LookupProtocolV1) UNREGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
  if client.peerInfo == nil {
    return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY")
  }

  topic, channel, err := getTopicChan("UNREGISTER", params)
  if err != nil {
    return nil, err
  }

  if channel != "" {
    key := Registration{"channel", topic, channel}
    removed, left := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id)
    if removed {
      p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",
        client, "channel", topic, channel)
    }
    // for ephemeral channels, remove the channel as well if it has no producers
    if left == 0 && strings.HasSuffix(channel, "#ephemeral") {
      p.ctx.nsqlookupd.DB.RemoveRegistration(key)
    }
  } else {
    // no channel was specified so this is a topic unregistration
    // remove all of the channel registrations...
    // normally this shouldn't happen which is why we print a warning message
    // if anything is actually removed
    registrations := p.ctx.nsqlookupd.DB.FindRegistrations("channel", topic, "*")
    for _, r := range registrations {
      if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed {
        p.ctx.nsqlookupd.logf(LOG_WARN, "client(%s) unexpected UNREGISTER category:%s key:%s subkey:%s",
          client, "channel", topic, r.SubKey)
      }
    }

    key := Registration{"topic", topic, ""}
    if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id); removed {
      p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",
        client, "topic", topic, "")
    }
  }

  return []byte("OK"), nil
}

// 识别消息, 并返回构建的响应体
func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
  var err error

  if client.peerInfo != nil {
    return nil, protocol.NewFatalClientErr(err, "E_INVALID", "cannot IDENTIFY again")
  }

  // 获取信息长度
  var bodyLen int32
  err = binary.Read(reader, binary.BigEndian, &bodyLen)
  if err != nil {
    return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body size")
  }

  // 读取信息
  body := make([]byte, bodyLen)
  _, err = io.ReadFull(reader, body)
  if err != nil {
    return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body")
  }

  // body 是个 json struct, 转 json
  peerInfo := PeerInfo{id: client.RemoteAddr().String()}
  err = json.Unmarshal(body, &peerInfo)
  if err != nil {
    return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to decode JSON body")
  }

  // 赋值 RemoteAddress
  peerInfo.RemoteAddress = client.RemoteAddr().String()

  // 验证所有字段
  if peerInfo.BroadcastAddress == "" || peerInfo.TCPPort == 0 || peerInfo.HTTPPort == 0 || peerInfo.Version == "" {
    return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", "IDENTIFY missing fields")
  }

  // 原子增值, 比系统锁高效
  atomic.StoreInt64(&peerInfo.lastUpdate, time.Now().UnixNano())

  p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): IDENTIFY Address:%s TCP:%d HTTP:%d Version:%s",
    client, peerInfo.BroadcastAddress, peerInfo.TCPPort, peerInfo.HTTPPort, peerInfo.Version)

  // 赋值 peerInfo
  client.peerInfo = &peerInfo
  if p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo}) {
    p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "client", "", "")
  }

  // 构建响应体
  data := make(map[string]interface{})
  data["tcp_port"] = p.ctx.nsqlookupd.RealTCPAddr().Port
  data["http_port"] = p.ctx.nsqlookupd.RealHTTPAddr().Port
  // 引入 version 包
  data["version"] = version.Binary
  hostname, err := os.Hostname()
  if err != nil {
    log.Fatalf("ERROR: unable to get hostname %s", err)
  }
  data["broadcast_address"] = p.ctx.nsqlookupd.opts.BroadcastAddress
  data["hostname"] = hostname

  // json 序列化
  response, err := json.Marshal(data)
  if err != nil {
    p.ctx.nsqlookupd.logf(LOG_ERROR, "marshaling %v", data)
    return []byte("OK"), nil
  }
  return response, nil
}

// ping, 原子锁
func (p *LookupProtocolV1) PING(client *ClientV1, params []string) ([]byte, error) {
  if client.peerInfo != nil {
    // 获取上一次 ping 的时间
    cur := time.Unix(0, atomic.LoadInt64(&client.peerInfo.lastUpdate))
    now := time.Now()
    p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): pinged (last ping %s)", client.peerInfo.id,
      now.Sub(cur))
    // 设置本地 ping 的时间
    atomic.StoreInt64(&client.peerInfo.lastUpdate, now.UnixNano())
  }
  return []byte("OK"), nil
}

在上面的代码中, 用到了 NewClientV1, 简单到没话说, 我觉得此乃 go 语言精髓

client_v1.go

package nsqlookupd

import (
  "net"
)

type ClientV1 struct {
  net.Conn
  peerInfo *PeerInfo
}

func NewClientV1(conn net.Conn) *ClientV1 {
  return &ClientV1{
    Conn: conn,
  }
}

func (c *ClientV1) String() string {
  return c.RemoteAddr().String()
}

默认配置选项 Options

options.go

type Options struct {
  LogLevel  string `flag:"log-level"`
  LogPrefix string `flag:"log-prefix"`
  Verbose   bool   `flag:"verbose"` // for backwards compatibility
  Logger    Logger
  logLevel  lg.LogLevel // private, not really an option

  TCPAddress       string `flag:"tcp-address"`
  HTTPAddress      string `flag:"http-address"`
  BroadcastAddress string `flag:"broadcast-address"`

  InactiveProducerTimeout time.Duration `flag:"inactive-producer-timeout"`
  TombstoneLifetime       time.Duration `flag:"tombstone-lifetime"`
}

func NewOptions() *Options {
  hostname, err := os.Hostname()
  if err != nil {
    log.Fatal(err)
  }

  return &Options{
    LogPrefix:        "[nsqlookupd] ", // 默认前缀
    LogLevel:         "info", // 日志级别
    TCPAddress:       "0.0.0.0:4160", // 默认 tpc 端口
    HTTPAddress:      "0.0.0.0:4161", // 默认 http 端口
    BroadcastAddress: hostname, // hostname

    InactiveProducerTimeout: 300 * time.Second, // 与 Producer 交互超时时间
    TombstoneLifetime:       45 * time.Second, // 销毁周期, 一共45秒, 理解为 45秒内完成操作
  }
}

代码中比较玄妙的地方也就是 atomic.StoreInt64atomic.LoadInt64, 替换了 RLock, 性能得到了提升; 然鹅 Go 语言并没有很多可以炫技的地方, 充满了浓浓的实用风格, 深得我心;

http.go

package nsqlookupd

import (
  "fmt"
  "net/http"
  "net/http/pprof"
  "sync/atomic"

  "github.com/julienschmidt/httprouter"
  "github.com/nsqio/nsq/internal/http_api"
  "github.com/nsqio/nsq/internal/protocol"
  "github.com/nsqio/nsq/internal/version"
)

// 上下文和路由
type httpServer struct {
  ctx    *Context
  router http.Handler
}

func newHTTPServer(ctx *Context) *httpServer {
  log := http_api.Log(ctx.nsqlookupd.logf)

  // 一个轻量级的路由, 这部分基本都在给 nsq 做 api 来使用了,
  router := httprouter.New()
  // 自定义 NotFound 需要开启该选项
  router.HandleMethodNotAllowed = true
  // panic recover
  router.PanicHandler = http_api.LogPanicHandler(ctx.nsqlookupd.logf)
  router.NotFound = http_api.LogNotFoundHandler(ctx.nsqlookupd.logf)
  router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqlookupd.logf)
  s := &httpServer{
    ctx:    ctx,
    router: router,
  }

  // 路由注册 ...
  router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
  router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))

  // v1 negotiate
  router.Handle("GET", "/debug", http_api.Decorate(s.doDebug, log, http_api.V1))
  router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_api.V1))
  router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_api.V1))
  router.Handle("GET", "/channels", http_api.Decorate(s.doChannels, log, http_api.V1))
  router.Handle("GET", "/nodes", http_api.Decorate(s.doNodes, log, http_api.V1))

  // only v1
  router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
  router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))
  router.Handle("POST", "/channel/create", http_api.Decorate(s.doCreateChannel, log, http_api.V1))
  router.Handle("POST", "/channel/delete", http_api.Decorate(s.doDeleteChannel, log, http_api.V1))
  router.Handle("POST", "/topic/tombstone", http_api.Decorate(s.doTombstoneTopicProducer, log, http_api.V1))

  // debug
  router.HandlerFunc("GET", "/debug/pprof", pprof.Index)
  router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline)
  router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol)
  router.HandlerFunc("POST", "/debug/pprof/symbol", pprof.Symbol)
  router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile)
  router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap"))
  router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine"))
  router.Handler("GET", "/debug/pprof/block", pprof.Handler("block"))
  router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate"))

  return s
}

// 把路由交给 http_server 包
func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
  s.router.ServeHTTP(w, req)
}

func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  return "OK", nil
}

// 奇怪的函数 不知道在做什么
func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  return struct {
    Version string `json:"version"`
  }{
    Version: version.Binary,
  }, nil
}

// 获取所有 topic
func (s *httpServer) doTopics(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  topics := s.ctx.nsqlookupd.DB.FindRegistrations("topic", "*", "").Keys()
  return map[string]interface{}{
    "topics": topics,
  }, nil
}

// 获取所有 channel
func (s *httpServer) doChannels(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  // 获取 topicName
  reqParams, err := http_api.NewReqParams(req)
  if err != nil {
    return nil, http_api.Err{400, "INVALID_REQUEST"}
  }

  topicName, err := reqParams.Get("topic")
  if err != nil {
    return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
  }

  // 根据 topicName 获取 channel
  channels := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, "*").SubKeys()
  return map[string]interface{}{
    "channels": channels,
  }, nil
}

// 根据条件查找
func (s *httpServer) doLookup(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  reqParams, err := http_api.NewReqParams(req)
  if err != nil {
    return nil, http_api.Err{400, "INVALID_REQUEST"}
  }

  topicName, err := reqParams.Get("topic")
  if err != nil {
    return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
  }

  // 获取 registration (map[Key]Producers)
  registration := s.ctx.nsqlookupd.DB.FindRegistrations("topic", topicName, "")
  if len(registration) == 0 {
    return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}
  }

  // 获取 channel, 同上
  channels := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, "*").SubKeys()
  // 获取 topic 的 producers
  producers := s.ctx.nsqlookupd.DB.FindProducers("topic", topicName, "")
  // 过滤还存活的 producers
  producers = producers.FilterByActive(s.ctx.nsqlookupd.opts.InactiveProducerTimeout,
    s.ctx.nsqlookupd.opts.TombstoneLifetime)
  // 返回 channel 和所有 producer 的节点信息
  return map[string]interface{}{
    "channels":  channels,
    "producers": producers.PeerInfo(),
  }, nil
}

// 创建 topic, 没有特别的, 如上文分析 registration_db
func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  reqParams, err := http_api.NewReqParams(req)
  if err != nil {
    return nil, http_api.Err{400, "INVALID_REQUEST"}
  }

  topicName, err := reqParams.Get("topic")
  if err != nil {
    return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
  }

  if !protocol.IsValidTopicName(topicName) {
    return nil, http_api.Err{400, "INVALID_ARG_TOPIC"}
  }

  s.ctx.nsqlookupd.logf(LOG_INFO, "DB: adding topic(%s)", topicName)
  key := Registration{"topic", topicName, ""}
  s.ctx.nsqlookupd.DB.AddRegistration(key)

  return nil, nil
}

// 删除 topic
func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  reqParams, err := http_api.NewReqParams(req)
  if err != nil {
    return nil, http_api.Err{400, "INVALID_REQUEST"}
  }

  topicName, err := reqParams.Get("topic")
  if err != nil {
    return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
  }

  // 获取 channel 的 registrations, 遍历删除所有
  registrations := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, "*")
  for _, registration := range registrations {
    s.ctx.nsqlookupd.logf(LOG_INFO, "DB: removing channel(%s) from topic(%s)", registration.SubKey, topicName)
    s.ctx.nsqlookupd.DB.RemoveRegistration(registration)
  }

  // // 获取 topic 的 registrations, 遍历删除所有
  registrations = s.ctx.nsqlookupd.DB.FindRegistrations("topic", topicName, "")
  for _, registration := range registrations {
    s.ctx.nsqlookupd.logf(LOG_INFO, "DB: removing topic(%s)", topicName)
    s.ctx.nsqlookupd.DB.RemoveRegistration(registration)
  }

  return nil, nil
}

// 墓碑话题生产者? 应该是冻结/封存/删除之类的意思, 同上还是 registration_db 中的内容
func (s *httpServer) doTombstoneTopicProducer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  reqParams, err := http_api.NewReqParams(req)
  if err != nil {
    return nil, http_api.Err{400, "INVALID_REQUEST"}
  }

  topicName, err := reqParams.Get("topic")
  if err != nil {
    return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
  }

  node, err := reqParams.Get("node")
  if err != nil {
    return nil, http_api.Err{400, "MISSING_ARG_NODE"}
  }

  s.ctx.nsqlookupd.logf(LOG_INFO, "DB: setting tombstone for [email protected]%s of topic(%s)", node, topicName)
  producers := s.ctx.nsqlookupd.DB.FindProducers("topic", topicName, "")
  for _, p := range producers {
    thisNode := fmt.Sprintf("%s:%d", p.peerInfo.BroadcastAddress, p.peerInfo.HTTPPort)
    if thisNode == node {
      p.Tombstone()
    }
  }

  return nil, nil
}

// 创建 channel, 同 doCreateTopic
func (s *httpServer) doCreateChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  reqParams, err := http_api.NewReqParams(req)
  if err != nil {
    return nil, http_api.Err{400, "INVALID_REQUEST"}
  }

  topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams)
  if err != nil {
    return nil, http_api.Err{400, err.Error()}
  }

  s.ctx.nsqlookupd.logf(LOG_INFO, "DB: adding channel(%s) in topic(%s)", channelName, topicName)
  key := Registration{"channel", topicName, channelName}
  s.ctx.nsqlookupd.DB.AddRegistration(key)

  s.ctx.nsqlookupd.logf(LOG_INFO, "DB: adding topic(%s)", topicName)
  key = Registration{"topic", topicName, ""}
  s.ctx.nsqlookupd.DB.AddRegistration(key)

  return nil, nil
}

// 删除 channel, 同 doDeleteTopic
func (s *httpServer) doDeleteChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  reqParams, err := http_api.NewReqParams(req)
  if err != nil {
    return nil, http_api.Err{400, "INVALID_REQUEST"}
  }

  topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams)
  if err != nil {
    return nil, http_api.Err{400, err.Error()}
  }

  registrations := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, channelName)
  if len(registrations) == 0 {
    return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"}
  }

  s.ctx.nsqlookupd.logf(LOG_INFO, "DB: removing channel(%s) from topic(%s)", channelName, topicName)
  for _, registration := range registrations {
    s.ctx.nsqlookupd.DB.RemoveRegistration(registration)
  }

  return nil, nil
}

// 节点信息, 基本和 PeerInfo 的定义一致
type node struct {
  RemoteAddress    string   `json:"remote_address"`
  Hostname         string   `json:"hostname"`
  BroadcastAddress string   `json:"broadcast_address"`
  TCPPort          int      `json:"tcp_port"`
  HTTPPort         int      `json:"http_port"`
  Version          string   `json:"version"`
  Tombstones       []bool   `json:"tombstones"`
  Topics           []string `json:"topics"`
}

// 处理节点, 其实就是 PeerInfo 到 node 数据结构的转换
func (s *httpServer) doNodes(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {

  // 获取 client 的 producers, 现在有三种 producers, 分别为: channel topic client
  producers := s.ctx.nsqlookupd.DB.FindProducers("client", "", "").FilterByActive(
    s.ctx.nsqlookupd.opts.InactiveProducerTimeout, 0)
  nodes := make([]*node, len(producers))
  for i, p := range producers {
    topics := s.ctx.nsqlookupd.DB.LookupRegistrations(p.peerInfo.id).Filter("topic", "*", "").Keys()

    // for each topic find the producer that matches this peer
    // to add tombstone information
    tombstones := make([]bool, len(topics))
    for j, t := range topics {
      topicProducers := s.ctx.nsqlookupd.DB.FindProducers("topic", t, "")
      for _, tp := range topicProducers {
        if tp.peerInfo == p.peerInfo {
          tombstones[j] = tp.IsTombstoned(s.ctx.nsqlookupd.opts.TombstoneLifetime)
        }
      }
    }

    nodes[i] = &node{
      RemoteAddress:    p.peerInfo.RemoteAddress,
      Hostname:         p.peerInfo.Hostname,
      BroadcastAddress: p.peerInfo.BroadcastAddress,
      TCPPort:          p.peerInfo.TCPPort,
      HTTPPort:         p.peerInfo.HTTPPort,
      Version:          p.peerInfo.Version,
      Tombstones:       tombstones,
      Topics:           topics,
    }
  }

  return map[string]interface{}{
    "producers": nodes,
  }, nil
}

// debug, 锁
func (s *httpServer) doDebug(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  s.ctx.nsqlookupd.DB.RLock()
  defer s.ctx.nsqlookupd.DB.RUnlock()

  data := make(map[string][]map[string]interface{})
  // 遍历 registrationMap, 转换结构
  for r, producers := range s.ctx.nsqlookupd.DB.registrationMap {
    key := r.Category + ":" + r.Key + ":" + r.SubKey
    for _, p := range producers {
      m := map[string]interface{}{
        "id":                p.peerInfo.id,
        "hostname":          p.peerInfo.Hostname,
        "broadcast_address": p.peerInfo.BroadcastAddress,
        "tcp_port":          p.peerInfo.TCPPort,
        "http_port":         p.peerInfo.HTTPPort,
        "version":           p.peerInfo.Version,
        "last_update":       atomic.LoadInt64(&p.peerInfo.lastUpdate),
        "tombstoned":        p.tombstoned,
        "tombstoned_at":     p.tombstonedAt.UnixNano(),
      }
      data[key] = append(data[key], m)
    }
  }

  return data, nil
}

tcp.go

package nsqlookupd

import (
  "io"
  "net"

  "github.com/nsqio/nsq/internal/protocol"
)

type tcpServer struct {
  ctx *Context
}

// 处理 TCP
func (p *tcpServer) Handle(clientConn net.Conn) {
  p.ctx.nsqlookupd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr())

  // 协议规定客户端需要以4个字节为初始消息, so ...
  buf := make([]byte, 4)
  _, err := io.ReadFull(clientConn, buf)
  if err != nil {
    p.ctx.nsqlookupd.logf(LOG_ERROR, "failed to read protocol version - %s", err)
    return
  }
  protocolMagic := string(buf)

  p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
    clientConn.RemoteAddr(), protocolMagic)

  // 使用 v1 版本协议
  var prot protocol.Protocol
  switch protocolMagic {
  case "  V1":
    prot = &LookupProtocolV1{ctx: p.ctx}
  default:
    protocol.SendResponse(clientConn, []byte("E_BAD_PROTOCOL"))
    clientConn.Close()
    p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
      clientConn.RemoteAddr(), protocolMagic)
    return
  }

  // io loop 前文已经分析过
  err = prot.IOLoop(clientConn)
  if err != nil {
    p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
    return
  }
}

看到底, 发现根源还在 nsq/internal/protocol 部分, 要着重分析一下, nsqlookupd, 已经分析完毕;
大概是以下步骤

  1. 启动 tcp, 负责执行 ping, register ... 等系统性任务,
  2. 启动 http 服务, 负责 api 部分, 包括增加删除节点信息(PeerInfo)等等
This article was last updated on days ago, and the information described in the article may have changed.