EricJJ' Blog

写代码

  1. 1. 概览
  2. 2. 模块
    1. 2.1. nsqlookupd
    2. 2.2. nsqd
    3. 2.3. nsqadmin
  3. 3. 源码分析
    1. 3.1. nsqlookupd.go
    2. 3.2. registrationdb.go

概览

NSQ是一个实时分布式消息平台, 旨在大规模运行, 每天处理数十亿条消息, 被许多互联网公司所使用;

其中 nsqd 是一个守护进程, 负责接收, 排队, 投递消息给客户端;
它可以独立运行, 不过通常它是由 nsqlookupd 实例所在集群配置的(它在这能声明 topics 和 channels, 以便大家能找到);
它在 2 个 TCP 端口监听, 一个给客户端, 另一个是 HTTP API; 同时, 它也能在第三个端口监听 HTTPS

模块

nsq大概分nsqd nsqlookupd nsqadmin三个部分

nsqlookupd

nsqlookupd 是守护进程负责管理拓扑信息; 客户端通过查询nsqlookupd 来发现指定话题(topic)的生产者, 并且 nsqd 节点广播话题(topic)和通道(channel)信息, 具有以下功能

  • 唯一性, 在一个nsq服务中只有一个nsqlookupd服务, 当然也可以在集群中部署多个nsqlookupd, 但它们之间是没有关联的
  • 去中心化, 即使nsqlookupd崩溃, 也会不影响正在运行的nsqd服务
  • 充当nsqdnaqadmin信息交互的中间件
  • 提供一个http查询服务, 给客户端定时更新nsqd的地址目录

nsqd

nsqd 是一个守护进程, 负责接收, 排队, 投递消息给客户端

  • 对订阅了同一个topic, 同一个channel的消费者使用负载均衡策略(不是轮询)
  • 只要channel存在, 即使没有该channel的消费者, 也会将生产者的message缓存到队列中(注意消息的过期处理)
  • 保证队列中的message至少会被消费一次, 即使nsqd退出, 也会将队列中的消息暂存磁盘上(结束进程等意外情况除外)
  • 限定内存占用, 能够配置nsqd中每个channel队列在内存中缓存的message数量, 一旦超出, message将被缓存到磁盘中
  • topic, channel一旦建立, 将会一直存在, 要及时在管理台或者用代码清除无效的topicchannel, 避免资源的浪费

nsqadmin

是一套 WEB UI, 用来汇集集群的实时统计, 并执行不同的管理任务

官方图片

源码分析

本文以及后面的分析都是基于 1.0.0 版本代码, 为了增加可读性, 我把注释放在了函数外, 基本都覆盖到, 本文中就不啰嗦讲如何使用了, 查阅文档即可

nsqlookupd.go

package nsqlookupd

// 锁
// 配置选项
// tcpListener 如上文所说 tcp http 端口监听
// httpListener
// waitGroup 线程同步
// 数据库
type NSQLookupd struct {
  sync.RWMutex
  opts         *Options
  tcpListener  net.Listener
  httpListener net.Listener
  waitGroup    util.WaitGroupWrapper
  DB           *RegistrationDB
}

// 如果没有指定 Logger, 就new一个
// new NSQLookupd, 待会看一下 `NewRegistrationDB` 做了什么事情
// 解析 log level
func New(opts *Options) *NSQLookupd {
  if opts.Logger == nil {
    opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
  }
  n := &NSQLookupd{
    opts: opts,
    DB:   NewRegistrationDB(),
  }

  var err error
  opts.logLevel, err = lg.ParseLogLevel(opts.LogLevel, opts.Verbose)
  if err != nil {
    n.logf(LOG_FATAL, "%s", err)
    os.Exit(1)
  }

  n.logf(LOG_INFO, version.String("nsqlookupd"))
  return n
}

// 创建 context, 其实 ctx 就是 NSQLookupd, 不明白为什么多此一举, 想要引入原生的 Context struct?
// 创建 tcpListener, 这里用到了锁, 说明该场景有并发
// 根据 ctx 创建 tcpServer
// waitGroup 线程同步后, 创建 TCPServer
// 重复以上步骤,创建 HTTPServer
func (l *NSQLookupd) Main() {
  ctx := &Context{l}

  tcpListener, err := net.Listen("tcp", l.opts.TCPAddress)
  if err != nil {
    l.logf(LOG_FATAL, "listen (%s) failed - %s", l.opts.TCPAddress, err)
    os.Exit(1)
  }
  l.Lock()
  l.tcpListener = tcpListener
  l.Unlock()
  tcpServer := &tcpServer{ctx: ctx}
  l.waitGroup.Wrap(func() {
    protocol.TCPServer(tcpListener, tcpServer, l.logf)
  })

  httpListener, err := net.Listen("tcp", l.opts.HTTPAddress)
  if err != nil {
    l.logf(LOG_FATAL, "listen (%s) failed - %s", l.opts.HTTPAddress, err)
    os.Exit(1)
  }
  l.Lock()
  l.httpListener = httpListener
  l.Unlock()
  httpServer := newHTTPServer(ctx)
  l.waitGroup.Wrap(func() {
    http_api.Serve(httpListener, httpServer, "HTTP", l.logf)
  })
}

// 获取 TCP 地址, 继续锁, 说明地址可能会修改
func (l *NSQLookupd) RealTCPAddr() *net.TCPAddr {
  l.RLock()
  defer l.RUnlock()
  return l.tcpListener.Addr().(*net.TCPAddr)
}

// 获取 HTTP 地址
func (l *NSQLookupd) RealHTTPAddr() *net.TCPAddr {
  l.RLock()
  defer l.RUnlock()
  return l.httpListener.Addr().(*net.TCPAddr)
}

// 关闭 tcpListener httpListener, 等待线程同步后结束
func (l *NSQLookupd) Exit() {
  if l.tcpListener != nil {
    l.tcpListener.Close()
  }

  if l.httpListener != nil {
    l.httpListener.Close()
  }
  l.waitGroup.Wait()

OK, 至此 nsqlookupd.go 已经分析完毕, 如果想知道以上代码如何单独使用, 可以看测试nsqlookupd_test.go呀 😂, 在以上代码中, 我们看到了 db 部分, 接下来看看

registrationdb.go

package nsqlookupd

// 锁
// 以 Registration 为 key 储存 Producers, 即生产者
type RegistrationDB struct {
  sync.RWMutex
  registrationMap map[Registration]Producers
}

type Registration struct {
  Category string
  Key      string
  SubKey   string
}
type Registrations []Registration

// *节点信息*
// 上次更新时间
// 标识符
// 地址
// 主机名
// 广播地址
// tcp 地址
// http 地址
// 版本号
type PeerInfo struct {
  lastUpdate       int64
  id               string
  RemoteAddress    string `json:"remote_address"`
  Hostname         string `json:"hostname"`
  BroadcastAddress string `json:"broadcast_address"`
  TCPPort          int    `json:"tcp_port"`
  HTTPPort         int    `json:"http_port"`
  Version          string `json:"version"`
}

// *生产者*
// 节点信息
// 是否删除
// 删除时间
type Producer struct {
  peerInfo     *PeerInfo
  tombstoned   bool
  tombstonedAt time.Time
}

type Producers []*Producer

// 转换为字符串
func (p *Producer) String() string {
  return fmt.Sprintf("%s [%d, %d]", p.peerInfo.BroadcastAddress, p.peerInfo.TCPPort, p.peerInfo.HTTPPort)
}

// 删除
func (p *Producer) Tombstone() {
  p.tombstoned = true
  p.tombstonedAt = time.Now()
}

// 是否删除
func (p *Producer) IsTombstoned(lifetime time.Duration) bool {
  return p.tombstoned && time.Now().Sub(p.tombstonedAt) < lifetime
}

// 创建 RegistrationDB
func NewRegistrationDB() *RegistrationDB {
  return &RegistrationDB{
    registrationMap: make(map[Registration]Producers),
  }
}

// 增加一个注册表 key
func (r *RegistrationDB) AddRegistration(k Registration) {
  r.Lock()
  defer r.Unlock()
  _, ok := r.registrationMap[k]
  if !ok {
    r.registrationMap[k] = Producers{}
  }
}

// 添加一个 producer 到 registration
// 取出 producers, 并遍历,
// 如果不存在, 就添加进去
// 如果存在, 返回 false
func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool {
  r.Lock()
  defer r.Unlock()
  producers := r.registrationMap[k]
  found := false
  for _, producer := range producers {
    if producer.peerInfo.id == p.peerInfo.id {
      found = true
    }
  }
  if found == false {
    r.registrationMap[k] = append(producers, p)
  }
  return !found
}

// 根据 id 从 registration 中删除 producer
// 如果不存在, 返回 false
// 创建一个新的 Producers, 遍历原来的 Producers,
// 如果 id 不相同就添加进去, 即删除成功 简单粗暴 哈哈哈哈哈哈
func (r *RegistrationDB) RemoveProducer(k Registration, id string) (bool, int) {
  r.Lock()
  defer r.Unlock()
  producers, ok := r.registrationMap[k]
  if !ok {
    return false, 0
  }
  removed := false
  cleaned := Producers{}
  for _, producer := range producers {
    if producer.peerInfo.id != id {
      cleaned = append(cleaned, producer)
    } else {
      removed = true
    }
  }
  // Note: this leaves keys in the DB even if they have empty lists
  r.registrationMap[k] = cleaned
  return removed, len(cleaned)
}

// 删除一个 registration
func (r *RegistrationDB) RemoveRegistration(k Registration) {
  r.Lock()
  defer r.Unlock()
  delete(r.registrationMap, k)
}

// 需要过滤
func (r *RegistrationDB) needFilter(key string, subkey string) bool {
  return key == "*" || subkey == "*"
}

// 根据 category, key, subkey 查找 Registrations
// 如果 key == '*' 或者 subkey == '*', 则只查找一个
// 否则 遍历 registrationMap, 返回所有条件符合的 registration
func (r *RegistrationDB) FindRegistrations(category string, key string, subkey string) Registrations {
  r.RLock()
  defer r.RUnlock()
  if !r.needFilter(key, subkey) {
    k := Registration{category, key, subkey}
    if _, ok := r.registrationMap[k]; ok {
      return Registrations{k}
    }
    return Registrations{}
  }
  results := Registrations{}
  for k := range r.registrationMap {
    if !k.IsMatch(category, key, subkey) {
      continue
    }
    results = append(results, k)
  }
  return results
}

// 根据 category, key, subkey 查找 Producers
// 同上 没什么好说的, 多了个根据 id 去重, 略啰嗦
func (r *RegistrationDB) FindProducers(category string, key string, subkey string) Producers {
  r.RLock()
  defer r.RUnlock()
  if !r.needFilter(key, subkey) {
    k := Registration{category, key, subkey}
    return r.registrationMap[k]
  }

  results := Producers{}
  for k, producers := range r.registrationMap {
    if !k.IsMatch(category, key, subkey) {
      continue
    }
    for _, producer := range producers {
      found := false
      for _, p := range results {
        if producer.peerInfo.id == p.peerInfo.id {
          found = true
        }
      }
      if found == false {
        results = append(results, producer)
      }
    }
  }
  return results
}

// 根据 id 查找 Registrations
// 依然遍历 没什么好说的
func (r *RegistrationDB) LookupRegistrations(id string) Registrations {
  r.RLock()
  defer r.RUnlock()
  results := Registrations{}
  for k, producers := range r.registrationMap {
    for _, p := range producers {
      if p.peerInfo.id == id {
        results = append(results, k)
        break
      }
    }
  }
  return results
}

// 是否匹配
func (k Registration) IsMatch(category string, key string, subkey string) bool {
  if category != k.Category {
    return false
  }
  if key != "*" && k.Key != key {
    return false
  }
  if subkey != "*" && k.SubKey != subkey {
    return false
  }
  return true
}

// 过滤
func (rr Registrations) Filter(category string, key string, subkey string) Registrations {
  output := Registrations{}
  for _, k := range rr {
    if k.IsMatch(category, key, subkey) {
      output = append(output, k)
    }
  }
  return output
}

// keys
func (rr Registrations) Keys() []string {
  keys := make([]string, len(rr))
  for i, k := range rr {
    keys[i] = k.Key
  }
  return keys
}

// subkeys
func (rr Registrations) SubKeys() []string {
  subkeys := make([]string, len(rr))
  for i, k := range rr {
    subkeys[i] = k.SubKey
  }
  return subkeys
}

// 根据时间过滤
func (pp Producers) FilterByActive(inactivityTimeout time.Duration, tombstoneLifetime time.Duration) Producers {
  now := time.Now()
  results := Producers{}
  for _, p := range pp {
    cur := time.Unix(0, atomic.LoadInt64(&p.peerInfo.lastUpdate))
    if now.Sub(cur) > inactivityTimeout || p.IsTombstoned(tombstoneLifetime) {
      continue
    }
    results = append(results, p)
  }
  return results
}

// 节点信息
func (pp Producers) PeerInfo() []*PeerInfo {
  results := []*PeerInfo{}
  for _, p := range pp {
    results = append(results, p.peerInfo)
  }
  return results
}

好了, 可以看出 RegistrationDBmap 结构包含了所有节点信息; 名为db, 实则最多算个cache罢了 2333333; 印证了上文中的 客户端通过查询 nsqlookupd 来发现指定话题(topic)的生产者 ;

好了, 第一篇暂时结束

This article was last updated on days ago, and the information described in the article may have changed.