nsq详解

  • Post author:
  • Post category:其他




写在前面

NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。

NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。

NSQ 非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。另外,官方还提供了拆箱即用 Go 和 Python 库。如果读者有兴趣构建自己的客户端的话,还可以参考官方提供的协议规范。

基于上述特性,对于一个Go语言开发人员,非常有必要从源码角度深入理解nsq设计原理。接下来会陆续更新。

在阅读时建议配合源码观看(源码版本: v1.2.0)



新建Topic



url path: /topic/create



实现入口函数:doCreateTopic->getTopicFromQuery



一、解析请求参数,获取topic名称


二、通过IsValidTopicName判断topicName是否有效(匹配0~9、英文字母大小写、下划线,且ephemeral只能出现一次)


三、然后就是执行GetTopic,开始具体创建topic操作

1、检查该topicName是否已经存在,已经存在则直接返回Topic对象指针,这里需要着重注意,如下图:

图一


这里读了两次topicMap,是针对热读和不常写进行了优化,先使用读锁,允许并发的读,如果topic存在则直接返回,然后因为读锁释放了,可能存在写入的情况,所以在第一次获取对应topicName对象不存在的情况下,需要再用写锁读一次


2、设置删除topic的回调函数

3、接下来是重点函数NewTopic,用于新建topic对象,最后具体详解

4、把新建的topic对象指针保存在topicMap对象中

5、通过isLoading判断是否在加载metadata,即服务是否在启动中,如果为1,则直接返回topic对象指针,不进行topic启动逻辑(步骤6之后的逻辑),等到加载完毕之后topic会启动

6、如果开启了lookupd服务,则获取topic对应的channelNames,如果是临时channel则略过,否则调用GetChannel创建通道(此处是把lookupd上的channel在nsqd中创建出来)

7、调用Start(),开始接收消息,通过给startChan通道赋值,开启消息接收逻辑(接收消息的逻辑在nsqd启动代码中)



NewTopic逻辑详解:

1、新建Topic对象,并赋初始值,如果设置的内存队列长度大于0,则初始化memoryMsgChan;

2、如果是临时topic,那么返回dummyBackendQueue对象指针,没什么好说的

3、如果不是临时topic,新建diskQueue对象,,同时检索metaData,把metaData文件中的数据读到内存(当前的读、写文件的序号和对应的读写位移,还有深度), 同时启动ioLoop,等待文件读写操作,包括定时同步文件数据到磁盘、写文件、读文件删除文件操作

4、启动messagePump处理消息,这个和ioLoop一样非常重要,贯穿了整个逻辑

5、t.ctx.nsqd.Notify(t):把topic信息保存下来,同时会发给lookupd(nsqd和lookupd有个tcp长连接,用于告诉lookupd一些topic、channel等消息,这样一来客户端也能从lookupd知道当前nsqd信息,有哪些topic和channel等)



版权声明:本文为weixin_40693431原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。