nsq/nsqlookupd 分析之一

概览

NSQ是一个实时分布式消息平台, 旨在大规模运行, 每天处理数十亿条消息, 被许多互联网公司所使用;

其中 nsqd 是一个守护进程, 负责接收, 排队, 投递消息给客户端;
它可以独立运行, 不过通常它是由 nsqlookupd 实例所在集群配置的(它在这能声明 topics 和 channels, 以便大家能找到);
它在 2 个 TCP 端口监听, 一个给客户端, 另一个是 HTTP API; 同时, 它也能在第三个端口监听 HTTPS

模块

nsq大概分nsqd nsqlookupd nsqadmin三个部分

nsqlookupd

nsqlookupd 是守护进程负责管理拓扑信息; 客户端通过查询nsqlookupd 来发现指定话题(topic)的生产者, 并且 nsqd 节点广播话题(topic)和通道(channel)信息, 具有以下功能

  • 唯一性, 在一个nsq服务中只有一个nsqlookupd服务, 当然也可以在集群中部署多个nsqlookupd, 但它们之间是没有关联的
  • 去中心化, 即使nsqlookupd崩溃, 也会不影响正在运行的nsqd服务
  • 充当nsqdnaqadmin信息交互的中间件
  • 提供一个http查询服务, 给客户端定时更新nsqd的地址目录

nsqd

nsqd 是一个守护进程, 负责接收, 排队, 投递消息给客户端

  • 对订阅了同一个topic, 同一个channel的消费者使用负载均衡策略(不是轮询)
  • 只要channel存在, 即使没有该channel的消费者, 也会将生产者的message缓存到队列中(注意消息的过期处理)
  • 保证队列中的message至少会被消费一次, 即使nsqd退出, 也会将队列中的消息暂存磁盘上(结束进程等意外情况除外)
  • 限定内存占用, 能够配置nsqd中每个channel队列在内存中缓存的message数量, 一旦超出, message将被缓存到磁盘中
  • topic, channel一旦建立, 将会一直存在, 要及时在管理台或者用代码清除无效的topicchannel, 避免资源的浪费

nsqadmin

是一套 WEB UI, 用来汇集集群的实时统计, 并执行不同的管理任务

官方图片

源码分析

本文以及后面的分析都是基于 1.0.0 版本代码, 为了增加可读性, 我把注释放在了函数外, 基本都覆盖到, 本文中就不啰嗦讲如何使用了, 查阅文档即可

nsqlookupd.go

package nsqlookupd

// 锁
// 配置选项
// tcpListener 如上文所说 tcp http 端口监听
// httpListener
// waitGroup 线程同步
// 数据库
type NSQLookupd struct {
  sync.RWMutex
  opts         *Options
  tcpListener  net.Listener
  httpListener net.Listener
  waitGroup    util.WaitGroupWrapper
  DB           *RegistrationDB
}

// 如果没有指定 Logger, 就new一个
// new NSQLookupd, 待会看一下 `NewRegistrationDB` 做了什么事情
// 解析 log level
func New(opts *Options) *NSQLookupd {
  if opts.Logger == nil {
    opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
  }
  n := &NSQLookupd{
    opts: opts,
    DB:   NewRegistrationDB(),
  }

  var err error
  opts.logLevel, err = lg.ParseLogLevel(opts.LogLevel, opts.Verbose)
  if err != nil {
    n.logf(LOG_FATAL, "%s", err)
    os.Exit(1)
  }

  n.logf(LOG_INFO, version.String("nsqlookupd"))
  return n
}

// 创建 context, 其实 ctx 就是 NSQLookupd, 不明白为什么多此一举, 想要引入原生的 Context struct?
// 创建 tcpListener, 这里用到了锁, 说明该场景有并发
// 根据 ctx 创建 tcpServer
// waitGroup 线程同步后, 创建 TCPServer
// 重复以上步骤,创建 HTTPServer
func (l *NSQLookupd) Main() {
  ctx := &Context{l}

  tcpListener, err := net.Listen("tcp", l.opts.TCPAddress)
  if err != nil {
    l.logf(LOG_FATAL, "listen (%s) failed - %s", l.opts.TCPAddress, err)
    os.Exit(1)
  }
  l.Lock()
  l.tcpListener = tcpListener
  l.Unlock()
  tcpServer := &tcpServer{ctx: ctx}
  l.waitGroup.Wrap(func() {
    protocol.TCPServer(tcpListener, tcpServer, l.logf)
  })

  httpListener, err := net.Listen("tcp", l.opts.HTTPAddress)
  if err != nil {
    l.logf(LOG_FATAL, "listen (%s) failed - %s", l.opts.HTTPAddress, err)
    os.Exit(1)
  }
  l.Lock()
  l.httpListener = httpListener
  l.Unlock()
  httpServer := newHTTPServer(ctx)
  l.waitGroup.Wrap(func() {
    http_api.Serve(httpListener, httpServer, "HTTP", l.logf)
  })
}

// 获取 TCP 地址, 继续锁, 说明地址可能会修改
func (l *NSQLookupd) RealTCPAddr() *net.TCPAddr {
  l.RLock()
  defer l.RUnlock()
  return l.tcpListener.Addr().(*net.TCPAddr)
}

// 获取 HTTP 地址
func (l *NSQLookupd) RealHTTPAddr() *net.TCPAddr {
  l.RLock()
  defer l.RUnlock()
  return l.httpListener.Addr().(*net.TCPAddr)
}

// 关闭 tcpListener httpListener, 等待线程同步后结束
func (l *NSQLookupd) Exit() {
  if l.tcpListener != nil {
    l.tcpListener.Close()
  }

  if l.httpListener != nil {
    l.httpListener.Close()
  }
  l.waitGroup.Wait()

OK, 至此 nsqlookupd.go 已经分析完毕, 如果想知道以上代码如何单独使用, 可以看测试nsqlookupd_test.go呀 😂, 在以上代码中, 我们看到了 db 部分, 接下来看看

registrationdb.go

package nsqlookupd

// 锁
// 以 Registration 为 key 储存 Producers, 即生产者
type RegistrationDB struct {
  sync.RWMutex
  registrationMap map[Registration]Producers
}

type Registration struct {
  Category string
  Key      string
  SubKey   string
}
type Registrations []Registration

// *节点信息*
// 上次更新时间
// 标识符
// 地址
// 主机名
// 广播地址
// tcp 地址
// http 地址
// 版本号
type PeerInfo struct {
  lastUpdate       int64
  id               string
  RemoteAddress    string `json:"remote_address"`
  Hostname         string `json:"hostname"`
  BroadcastAddress string `json:"broadcast_address"`
  TCPPort          int    `json:"tcp_port"`
  HTTPPort         int    `json:"http_port"`
  Version          string `json:"version"`
}

// *生产者*
// 节点信息
// 是否删除
// 删除时间
type Producer struct {
  peerInfo     *PeerInfo
  tombstoned   bool
  tombstonedAt time.Time
}

type Producers []*Producer

// 转换为字符串
func (p *Producer) String() string {
  return fmt.Sprintf("%s [%d, %d]", p.peerInfo.BroadcastAddress, p.peerInfo.TCPPort, p.peerInfo.HTTPPort)
}

// 删除
func (p *Producer) Tombstone() {
  p.tombstoned = true
  p.tombstonedAt = time.Now()
}

// 是否删除
func (p *Producer) IsTombstoned(lifetime time.Duration) bool {
  return p.tombstoned && time.Now().Sub(p.tombstonedAt) < lifetime
}

// 创建 RegistrationDB
func NewRegistrationDB() *RegistrationDB {
  return &RegistrationDB{
    registrationMap: make(map[Registration]Producers),
  }
}

// 增加一个注册表 key
func (r *RegistrationDB) AddRegistration(k Registration) {
  r.Lock()
  defer r.Unlock()
  _, ok := r.registrationMap[k]
  if !ok {
    r.registrationMap[k] = Producers{}
  }
}

// 添加一个 producer 到 registration
// 取出 producers, 并遍历,
// 如果不存在, 就添加进去
// 如果存在, 返回 false
func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool {
  r.Lock()
  defer r.Unlock()
  producers := r.registrationMap[k]
  found := false
  for _, producer := range producers {
    if producer.peerInfo.id == p.peerInfo.id {
      found = true
    }
  }
  if found == false {
    r.registrationMap[k] = append(producers, p)
  }
  return !found
}

// 根据 id 从 registration 中删除 producer
// 如果不存在, 返回 false
// 创建一个新的 Producers, 遍历原来的 Producers,
// 如果 id 不相同就添加进去, 即删除成功 简单粗暴 哈哈哈哈哈哈
func (r *RegistrationDB) RemoveProducer(k Registration, id string) (bool, int) {
  r.Lock()
  defer r.Unlock()
  producers, ok := r.registrationMap[k]
  if !ok {
    return false, 0
  }
  removed := false
  cleaned := Producers{}
  for _, producer := range producers {
    if producer.peerInfo.id != id {
      cleaned = append(cleaned, producer)
    } else {
      removed = true
    }
  }
  // Note: this leaves keys in the DB even if they have empty lists
  r.registrationMap[k] = cleaned
  return removed, len(cleaned)
}

// 删除一个 registration
func (r *RegistrationDB) RemoveRegistration(k Registration) {
  r.Lock()
  defer r.Unlock()
  delete(r.registrationMap, k)
}

// 需要过滤
func (r *RegistrationDB) needFilter(key string, subkey string) bool {
  return key == "*" || subkey == "*"
}

// 根据 category, key, subkey 查找 Registrations
// 如果 key == '*' 或者 subkey == '*', 则只查找一个
// 否则 遍历 registrationMap, 返回所有条件符合的 registration
func (r *RegistrationDB) FindRegistrations(category string, key string, subkey string) Registrations {
  r.RLock()
  defer r.RUnlock()
  if !r.needFilter(key, subkey) {
    k := Registration{category, key, subkey}
    if _, ok := r.registrationMap[k]; ok {
      return Registrations{k}
    }
    return Registrations{}
  }
  results := Registrations{}
  for k := range r.registrationMap {
    if !k.IsMatch(category, key, subkey) {
      continue
    }
    results = append(results, k)
  }
  return results
}

// 根据 category, key, subkey 查找 Producers
// 同上 没什么好说的, 多了个根据 id 去重, 略啰嗦
func (r *RegistrationDB) FindProducers(category string, key string, subkey string) Producers {
  r.RLock()
  defer r.RUnlock()
  if !r.needFilter(key, subkey) {
    k := Registration{category, key, subkey}
    return r.registrationMap[k]
  }

  results := Producers{}
  for k, producers := range r.registrationMap {
    if !k.IsMatch(category, key, subkey) {
      continue
    }
    for _, producer := range producers {
      found := false
      for _, p := range results {
        if producer.peerInfo.id == p.peerInfo.id {
          found = true
        }
      }
      if found == false {
        results = append(results, producer)
      }
    }
  }
  return results
}

// 根据 id 查找 Registrations
// 依然遍历 没什么好说的
func (r *RegistrationDB) LookupRegistrations(id string) Registrations {
  r.RLock()
  defer r.RUnlock()
  results := Registrations{}
  for k, producers := range r.registrationMap {
    for _, p := range producers {
      if p.peerInfo.id == id {
        results = append(results, k)
        break
      }
    }
  }
  return results
}

// 是否匹配
func (k Registration) IsMatch(category string, key string, subkey string) bool {
  if category != k.Category {
    return false
  }
  if key != "*" && k.Key != key {
    return false
  }
  if subkey != "*" && k.SubKey != subkey {
    return false
  }
  return true
}

// 过滤
func (rr Registrations) Filter(category string, key string, subkey string) Registrations {
  output := Registrations{}
  for _, k := range rr {
    if k.IsMatch(category, key, subkey) {
      output = append(output, k)
    }
  }
  return output
}

// keys
func (rr Registrations) Keys() []string {
  keys := make([]string, len(rr))
  for i, k := range rr {
    keys[i] = k.Key
  }
  return keys
}

// subkeys
func (rr Registrations) SubKeys() []string {
  subkeys := make([]string, len(rr))
  for i, k := range rr {
    subkeys[i] = k.SubKey
  }
  return subkeys
}

// 根据时间过滤
func (pp Producers) FilterByActive(inactivityTimeout time.Duration, tombstoneLifetime time.Duration) Producers {
  now := time.Now()
  results := Producers{}
  for _, p := range pp {
    cur := time.Unix(0, atomic.LoadInt64(&p.peerInfo.lastUpdate))
    if now.Sub(cur) > inactivityTimeout || p.IsTombstoned(tombstoneLifetime) {
      continue
    }
    results = append(results, p)
  }
  return results
}

// 节点信息
func (pp Producers) PeerInfo() []*PeerInfo {
  results := []*PeerInfo{}
  for _, p := range pp {
    results = append(results, p.peerInfo)
  }
  return results
}

好了, 可以看出 RegistrationDBmap 结构包含了所有节点信息; 名为db, 实则最多算个cache罢了 2333333; 印证了上文中的 客户端通过查询 nsqlookupd 来发现指定话题(topic)的生产者 ;

好了, 第一篇暂时结束