nsq源码解析

最近刚刚把go的语法看完,寻思着看些开源项目,值得学习的go语言开源项目,beego的作者asta谢推荐nsq,这对于我们认识channel、分布式开发都有很大的帮助。nsq其实就是一个分布式消息中间件,鼎鼎大名的Kafka就是用java实现的一个MQ。

nsq环境部署

看源码前,首先还是要先会用,然后对整个项目有个大体的了解。
nsq文档,看这个文档就没问题了,最简单的方式是使用docker来部署。
nsq docker部署,文档上直接就有docker-compose,真是十分贴心。不过里面有处错误,会导致consumer.ConnectToNSQLookupd时出错。

version: '3'
services:
  nsqlookupd:
    image: nsqio/nsq
    command: /nsqlookupd
    ports:
      - "4160:4160"
      - "4161:4161"
  nsqd:
    image: nsqio/nsq
    command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 --broadcast-address=127.0.0.1    //文档里少了broadcast
    depends_on:
      - nsqlookupd
    ports:
      - "4150:4150"
      - "4151:4151"
  nsqadmin:
    image: nsqio/nsq
    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
    depends_on:
      - nsqlookupd
    ports:
      - "4171:4171"

这个问题的解答:I am using the nsq client go-nsq to produce and consume messages, the messages can be consumed by connecting to nsqd directly, but cannot be consumed by connecting to nsqlookupd

nsq流程图

网上有很多nsq流程图,都是直接从nsq文档上clone下来的:
nsq_deliver
不过这个拓扑图还是简单了点。

nsqd

nsqd流程

nsqd的入口函数是nsqd.go的main(),简化后就是:

func (n *NSQD) Main() {
    ...
    
    tcpServer := &tcpServer{ctx: ctx}
	n.waitGroup.Wrap(func() {
		protocol.TCPServer(n.tcpListener, tcpServer, n.logf)
	})
    
    httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)    //有https则也会启动httpsServer
	n.waitGroup.Wrap(func() {
		http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)
	})
    
    n.waitGroup.Wrap(func() { n.queueScanLoop() })
    
	n.waitGroup.Wrap(func() { n.lookupLoop() })
    
	if n.getOpts().StatsdAddress != "" {
		n.waitGroup.Wrap(func() { n.statsdLoop() })
	}
    
    ...
}  

func (w *WaitGroupWrapper) Wrap(cb func()) {
	w.Add(1)
	go func() {
		cb()
		w.Done()
	}()
}


主要就是启动了这5个goroutine

  • tcp
//protocol_v2.go
func (p *protocolV2) IOLoop(conn net.Conn) error {
    /*运行messagePump的负责处理数据,把数据给客户端clientV2推送给客户端,
    主要就是推给sub订阅的client*/
    go p.messagePump(client, messagePumpStartedChan)
    
    for {
         param := p.read(client)      //从client读取命令(client可能是生产者也可能是消费者)
         response := p.exec(param)    //执行命令(FIN/RDY/REQ/PUB/SUB等)
         p.send(response)             //发送response给client
    }
}

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    /*channel有很多client,client有对应的channel。
    因为是共用channel,memoryMsgChan/backendMsgChan共用的。所以不同的client订阅同一 
    个topic-channel,所有的client共享message。也就是说每个client都有可能获得msg,从而
    实现负载均衡。
    每次一个订阅者只能订阅一个topic-channel,然后这个channel就赋给了subChannel,而  
    subEventChan被置为nil,之后在发送SUB命令也没有用了。*/
    case subChannel = <-subEventChan:	
			subEventChan = nil
}

func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
    //SUB需要topicName和channelName
    topicName := string(params[1])
    channelName := string(params[2])
    
    
    //每个channel会对应多个client
    topic := p.ctx.nsqd.GetTopic(topicName)
	channel = topic.GetChannel(channelName)
	channel.AddClient(client.ID, client)
    
    //更新message pump,每次有SUB命令过来,channel会进SubEventChan
	client.SubEventChan <- channel
}

func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
    //投递message
    topic := p.ctx.nsqd.GetTopic(topicName)
	msg := NewMessage(topic.GenerateID(), messageBody)
	err = topic.PutMessage(msg)
}

func (p *protocolV2) REQ(client *clientV2, params [][]byte) ([]byte, error) {
    /*是消费者给的回复,如果第二个字段duration<=0,则该msg重新请求,不然加入到deferr里, 
    也就是失败了的回复*/
    client.Channel.RequeueMessage(client.ID, *id, timeoutDuration)
    client.RequeuedMessage()
}

  • http/https
    代码主要在http.go里,这个代码就很容易了,就是router.Handle来处理各种http请求,主要是为nsqadmin提供服务
  • queueScanLoop
    注释里提到了It copies Redis's probabilistic expiration algorithm

参考《Redis设计与实现》9.6 Redis的过期键删除策略,结合了两种策略:

惰性删除。每次客户端对某个key读写时,会检查它是否过期,如果过期,就把它删掉。
定期删除。定期删除并不会遍历整个DB,它会在规定时间内,分多次遍历服务器中各个DB,从数据库的expires字典中随机检查一部分键的过期时间,如果过期,则删除。

func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
	for {
		select {
		case c := <-workCh:
			now := time.Now().UnixNano()
			dirty := false
			if c.processInFlightQueue(now) {
				dirty = true
			}
			if c.processDeferredQueue(now) {
				dirty = true
			}
			responseCh <- dirty
		case <-closeCh:
			return
		}
	}
}

resizePool可以动态调整工作Channel池的大小,这个算法有兴趣的可以自己看。
主要的目的就是定期对一部分channel的InFlightQueue和DeferredQueue两个队列做清理

  • lookupLoop
//lookpup.go

//默认从配置文件里读出NSQLookupd的tcp地址建立连接
for _, host := range n.getOpts().NSQLookupdTCPAddresses {
				if in(host, lookupAddrs) {
					continue
				}
				n.logf(LOG_INFO, "LOOKUP(%s): adding peer", host)
				lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.logf,
					connectCallback(n, hostname, syncTopicChan))
				lookupPeer.Command(nil) // start the connection
				lookupPeers = append(lookupPeers, lookupPeer)
				lookupAddrs = append(lookupAddrs, host)
}

//每当有新的channel,topic,则会遍历lookupPeers,一个个通知到
case val := <-n.notifyChan:
			var cmd *nsq.Command
			var branch string

			switch val.(type) {
			case *Channel:
				// notify all nsqlookupds that a new channel exists, or that it's removed
				branch = "channel"
				channel := val.(*Channel)
				if channel.Exiting() == true {
					cmd = nsq.UnRegister(channel.topicName, channel.name)
				} else {
					cmd = nsq.Register(channel.topicName, channel.name)
				}
			case *Topic:
				// notify all nsqlookupds that a new topic exists, or that it's removed
				branch = "topic"
				topic := val.(*Topic)
				if topic.Exiting() == true {
					cmd = nsq.UnRegister(topic.name, "")
				} else {
					cmd = nsq.Register(topic.name, "")
				}
			}

			for _, lookupPeer := range lookupPeers {
				n.logf(LOG_INFO, "LOOKUPD(%s): %s %s", lookupPeer, branch, cmd)
				_, err := lookupPeer.Command(cmd)
				if err != nil {
					n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err)
				}
			}

  • statsdLoop
    代码在statsd.go里,算是可选项,用于获取nsqd的一些状态数据。比如每个channel的DeferredCount数(需要重发message数量)

topic

//topic.go
func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
    ...
    t.waitGroup.Wrap(func() { t.messagePump() })
    //对照lookup.go,会通知所以的nsqlookupds
	t.ctx.nsqd.Notify(t)
    ...
}

func (t *Topic) messagePump() {
    ...
    //循环
    for {
		select {
        /*从memoryMsgChan读取到的消息是*Message类型,而从backendChan取到的消息是byte 
        数组的。memoryMsgChan的容量大小可以在配置文件里设置,超过memoryMsgChan的  
        message则会持久化存储起来,之后可以从backendChan里取出*/
		case msg = <-memoryMsgChan:
		case buf = <-backendChan:
			msg, err = decodeMessage(buf)
			if err != nil {
				t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
				continue
			}
        /*Channel需要更新时,会从channelMap中取出每个channel,构成channel的数组以便后 
        续进行消息的投递。*/
		case <-t.channelUpdateChan:
			chans = chans[:0]
			t.RLock()
			for _, c := range t.channelMap {
				chans = append(chans, c)
			}
			t.RUnlock()
			if len(chans) == 0 || t.IsPaused() {
				memoryMsgChan = nil
				backendChan = nil
			} else {
				memoryMsgChan = t.memoryMsgChan
				backendChan = t.backend.ReadChan()
			}
			continue
		case pause := <-t.pauseChan:
			if pause || len(chans) == 0 {
				memoryMsgChan = nil
				backendChan = nil
			} else {
				memoryMsgChan = t.memoryMsgChan
				backendChan = t.backend.ReadChan()
			}
			continue
		case <-t.exitChan:
			goto exit
		}

        /*随后是将消息投到每个channel中,首先先对消息进行复制操作,这里有个优化,对于第一 
        次循环,直接使用原消息进行发送以减少复制对象的开销,此后的循环将对消息进行复制。   
        之后就是对消息进行投递。因为是for循环,这也就说明一个message,一个topic下所有   
        的channel都会投递。*/
		for i, channel := range chans {
			chanMsg := msg
			if i 
				chanMsg = NewMessage(msg.ID, msg.Body)
				chanMsg.Timestamp = msg.Timestamp
				chanMsg.deferred = msg.deferred
			}
			if chanMsg.deferred != 0 {
				channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
				continue
			}
			err := channel.PutMessage(chanMsg)
			if err != nil {
				t.ctx.nsqd.logf(LOG_ERROR,
					"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
					t.name, msg.ID, channel.name, err)
			}
		}
	}
    ...
}

channel

//channel.go

//投递失败队列
deferredMessages map[MessageID]*pqueue.Item
//普通的slice,没有特别的地方,但是通过heap.push也实现了堆的功能
deferredPQ       pqueue.PriorityQueue
//正在投递中队列
inFlightMessages map[MessageID]*Message
 //小堆,父节点比子节点小,根节点是最小的。主要每次插入的不一定是pri最大的,所以单纯数组实现效率低。链表实现会好点。或者基于数组用堆实现
inFlightPQ       inFlightPqueue

//memoryMsgChan数量可以设置,超过数量了则writeMessageToBackend,和topic很像
func (c *Channel) put(m *Message) error {
	select {
	case c.memoryMsgChan <- m:
	default:
		b := bufferPoolGet()
		err := writeMessageToBackend(b, m, c.backend)
		bufferPoolPut(b)
		c.ctx.nsqd.SetHealth(err)
		if err != nil {
			c.ctx.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s",
				c.name, err)
			return err
		}
	}
	return nil
}

主要都是对message的各种操作,比如投递成功或者失败,定时清除,重新投递等。需要结合上面的protocol_v2.go来看。

go-diskqueue

diskQueue是backendQueue接口的一个实现。backendQueue的作用是在实现在内存go channel缓冲区满的情况下对消息的处理的对象。

//diskqueue.go
type Interface interface {
	Put([]byte) error
	ReadChan() chan []byte // this is expected to be an *unbuffered* channel
	Close() error
	Delete() error
	Depth() int64
	Empty() error
}

/*消息读取对外暴露的是一个go channel,而数据的最终来源是ioLoop中调用的readOne函数。
readOne函数逻辑跟writeOne类似,只是把写操作换成了读操作,唯一差异较大的地方是  
d.nextReadPos和d.nextReadFileNum这两个变量的使用。
在写操作时,如果写入成功,则可以直接将写入位置和写入文件更新。但是对于读操作来说,  
由于读取的目的是为了向客户端投递,因此无法保证一定能投递成功。因此需要使用next开头  
的两个变量来保存成功后需要读的位置,如果投递没有成功,则继续使用当前的读取位置将再  
一次尝试将消息投递给客户端。*/

func (d *diskQueue) ioLoop() {
    ...
	for {
		select {
		case r <- dataRead:
			count++
			// moveForward sets needSync flag if a file is removed
			d.moveForward()
		case <-d.emptyChan:
			d.emptyResponseChan <- d.deleteAllFiles()
			count = 0
		case dataWrite := <-d.writeChan:
			count++
			d.writeResponseChan <- d.writeOne(dataWrite)
		case <-syncTicker.C:
			if count == 0 {
				// avoid sync when there's no activity
				continue
			}
			d.needSync = true
		case <-d.exitChan:
			goto exit
		}
	}
    ...
}

func (d *diskQueue) moveForward() {
	oldReadFileNum := d.readFileNum
	d.readFileNum = d.nextReadFileNum
    d.readPos = d.nextReadPos
    ...
}

/*binary.Read/Write是用来存data的长度的,之后的data加在len数据后面,msgSize必须是定长的,即为int32
这种方式可以更好的判断,之后make固定长度的slcie来接受数据,性能更好*/
func (d *diskQueue) readOne() ([]byte, error) {
    ...
    err = binary.Read(d.reader, binary.BigEndian, &msgSize)
    ...
}
func (d *diskQueue) writeOne(data []byte) error {
    ...
    dataLen := int32(len(data))

	if dataLen < d.minMsgSize || dataLen > d.maxMsgSize {
		return fmt.Errorf("invalid message write size (%d) maxMsgSize=%d", dataLen, d.maxMsgSize)
	}

	d.writeBuf.Reset()
	err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
    ...
}

nsqlookupd

nsqlookupd的代码比较简单,首先看下main函数:

//nsqlookupd.go
func (l *NSQLookupd) Main() {
    ...
	l.waitGroup.Wrap(func() {
		protocol.TCPServer(tcpListener, tcpServer, l.logf)
	})

	...
	l.waitGroup.Wrap(func() {
		http_api.Serve(httpListener, httpServer, "HTTP", l.logf)
	})
    ...
}

主要就是开启tcpListener和httpListener,分别对应http.go,tcp.go。

http

http.go里的代码很简单,主要要注意router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_api.V1)),consumer就是通过这个请求,根据topicName来获得发布这个topoc的所有nsqd的地址,从而可以连接nsqd。

func (s *httpServer) doLookup(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
	reqParams, err := http_api.NewReqParams(req)
	if err != nil {
		return nil, http_api.Err{400, "INVALID_REQUEST"}
	}

	topicName, err := reqParams.Get("topic")
	if err != nil {
		return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
	}

	registration := s.ctx.nsqlookupd.DB.FindRegistrations("topic", topicName, "")
	if len(registration) == 0 {
		return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}
	}

	channels := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, "*").SubKeys()
	producers := s.ctx.nsqlookupd.DB.FindProducers("topic", topicName, "")
	producers = producers.FilterByActive(s.ctx.nsqlookupd.opts.InactiveProducerTimeout,
		s.ctx.nsqlookupd.opts.TombstoneLifetime)
	return map[string]interface{}{
		"channels":  channels,
		"producers": producers.PeerInfo(),
	}, nil
}

tcp

tcp.go主要就是prot.IOLoop(clientConn)。对应的代码在lookup_protocol_v1.go里。这个都是和nsqd直连的,根据nsqd的发来的各个命令,好做出映射。
map[Registration]Producers就能很方便的实现,Registration对应Producer数组,可以理解为nsqd,可以直接把nsqd数组发给consumer,由consumer决定连接哪个nsqd。

//registration_db.go
type RegistrationDB struct {
	sync.RWMutex
	registrationMap map[Registration]Producers
}

type Registration struct {
	Category string    //"topic"或者channel
	Key      string    //topicName,为*则是通配
	SubKey   string    //channelName,为*则是通配
}

直觉上可能会觉得Registration不需要那么复杂,但是做成这样非常便于统计。http.go里提供了很多用于统计的api,就用到了registration_db.go里的很多方法。

//对应的四种命令
func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
	switch params[0] {
	case "PING":
		return p.PING(client, params)
	case "IDENTIFY":
		return p.IDENTIFY(client, reader, params[1:])
	case "REGISTER":
		return p.REGISTER(client, reader, params[1:])
	case "UNREGISTER":
		return p.UNREGISTER(client, reader, params[1:])
	}
	return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}

最主要的三个命令:

  • IDENTIFY
    identify主要是loopup根据nsqd传过来的信息生成peer
  • REGISTER
    注意中间没有else,如果channel,topic都存在,Registration会调用两次,所以两种都会add进去
func (p *LookupProtocolV1) REGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
    ...
	topic, channel, err := getTopicChan("REGISTER", params)

    ...
	if channel != "" {
		key := Registration{"channel", topic, channel}
	}
	key := Registration{"topic", topic, ""}
    ...
}
  • UNREGISTER
    这边是ifelse,只会删掉一个,因为同一个topic下有很多channel,有channel相当于只删channel,没有channel会删topic,并且根据通配符遍历topic下所有的channel并且删除,非常合理。
func (p *LookupProtocolV1) UNREGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
    ...
	topic, channel, err := getTopicChan("UNREGISTER", params)
    ...
	if channel != "" {
		key := Registration{"channel", topic, channel}
		removed, left := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id)
		if left == 0 && strings.HasSuffix(channel, "#ephemeral") {
			p.ctx.nsqlookupd.DB.RemoveRegistration(key)
		}
	} else {
		registrations := p.ctx.nsqlookupd.DB.FindRegistrations("channel", topic, "*")
		for _, r := range registrations {
			if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed {
				p.ctx.nsqlookupd.logf(LOG_WARN, "client(%s) unexpected UNREGISTER category:%s key:%s subkey:%s",
					client, "channel", topic, r.SubKey)
			}
		}

		key := Registration{"topic", topic, ""}
		if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id); removed {
			p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",
				client, "topic", topic, "")
		}
	}
    ...
}

go-nsq

这个主要是给consumer用的,核心代码当然在consumer.go里,最主要的两个方法:

  • ConnectToNSQLookupd
  • ConnectToNSQD

这边就不分析源码了,consumer可以ConnectToNSQD,也可以ConnectToNSQLookupd。不过推荐做法是ConnectToNSQLookupd。这个方法会先http请求nsqlookupd获取对应topic/channel的所有nsqd地址,然后随机选一个,再ConnectToNSQD。

感想

花了几天时间看完nsq源码,正如beego所说,对于channel的认识更上了一层。源码里有很多关于channel的神奇用法。当然除了channel外,里面还有很多小Tip。举几个最简单的例子:

channel用法

// apps/nsqd.go
func main() {
	prg := &program{}
	if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
		log.Fatal(err)
	}
}

func Run(service Service, sig ...os.Signal) error {
	env := environment{}
	if err := service.Init(env); err != nil {
		return err
	}

	if err := service.Start(); err != nil {
		return err
	}
	if len(sig) == 0 {
		sig = []os.Signal{syscall.SIGINT, syscall.SIGTERM}
	}

	signalChan := make(chan os.Signal, 1)
	signalNotify(signalChan, sig...)
	<-signalChan

	return service.Stop()
}

这是nsqd启动时一定会调用的Run函数,可以钩住系统的syscall.SIGINT和syscall.SIGTERM信号,用来阻塞主goroutine防止退出。只有接受到了系统发送来的信号,才会退出。

在比如代码中大量用到了exitChan:

select {
    case t.channelUpdateChan <- 1:
    case <-t.exitChan:
    }

此处使用了select,并同时监听了exitChan。如果此时exitChan收到信号则可以正常退出select,避免channelUpdateChan导致阻塞。很多地方都用到了这种机制。

lock

 func (n *NSQD) GetTopic(topicName string) *Topic {
    ...
    n.Lock()

    t, ok := n.topicMap[topicName]
    t.Lock()
    n.Unlock()
    ...
}

在调用完nsqd的变量后转而进行topic操作,这时候会使用topic的小粒度的锁,释放了nsqd全局的大粒度锁,可以保证线程安全的同时减少了效率上的损失。这种锁的用法在源码里很多地方都用到了。

binary

源码中大量用了自己的传输协议,印象最深的就是对于二进制数据的读写,效率很高。类似这种写法:
binary.Read(&writeBuf, binary.BigEndian, &readMsgSize)
binary.Write(&writeBuf, binary.BigEndian, msgSize)
readBuf := make([]byte, msgSize)
binary.Read/Write是用来存data的长度的,之后的data加在len数据后面,msgSize必须是定长的,即为int32,之后可以用固定长度slcie来接受数据,性能更好

nsq的学习价值真的很高,远远不止那么多,还要慢慢消化。

作者:levi
comments powered by Disqus