什么是Apache Pulsar
Pulsar是一个支持多租户的、高性能的服务与服务之间消息通讯的解决方案,最初由雅虎开发,现在由Apache软件基金会管理。
Pulsar在Yahoo的生产环境运行了三年多,助力Yahoo的主要应用,如Yahoo Mail、Yahoo Finance、Yahoo Sports、Flickr、Gemini广告平台和Yahoo分布式键值存储系统Sherpa。
Kafka不够好,智联招聘基于Pulsar打造企业级事件中心。
Pulsar的主要特性如下:
- Pulsar实例原生支持多集群,能无缝的基于地理位置进行跨集群备份
- 非常低的消息发布和端到端的延迟
- 无缝扩展到超过百万个topic
- 支持Java,Go,Pytho和C++的客户端
- Topic支持多种订阅模式: 独占(exclusive), 共享(shared)和灾备(failover)
- 通过Apache BookKeeper提供的持久化消息存储机制保证消息的送达
- serverless的轻量级计算框架Pulsar Functions提供了原生的流数据处理
- serverless的连接器框架Pulsar IO构建于 Pulsar Functions之上,能够轻松的将数据从Pulsar中移入和移出
- 当数据老化时,分层存储将数据从热存储卸载到冷存储(如S3和GCS)
apache pulsar结构
客户端不依赖于zookeeper,不能直接连接bookeeper,提供kafka api兼容,方便从kafka 迁移代码到pulsar
数据保存在bookeeper中,每一个broker负责多个partition,broker是无状态的,当broker down时数据不会迁移
第一个partition是一个distribute log,该log拆分成多个segment分布在多个bookies上,segment可以按大小时间来形成新的
如果集群加入新的broker,pulsar会自动将新的segement写入该broker,并且pulsar可以做到机架,区域感知
如果某个segment失效,可以failover到其他的replica segment
订阅模式
Exclusive模式
Failover模式
Shared模式
Key_shared模式
多topic订阅
当consumer订阅pulsar的topic时,它默认指定订阅了一个topic,例如:persistent://public/default/my-topic。 从Pulsar的1.23.0-incubating的版本开始,Pulsar消费者可以同时订阅多个topic。 你可以用以下两种方式定义topic的列表:
通过最基础的正则表达式(regex),例如 persistent://public/default/finance-.*
通过明确指定的topic列表
通过正则订阅多主题时,所有的主题必须在同一个namespace。
当订阅多主题时,Pulsar客户端会自动调用Pulsar的API来发现匹配表达式或者列表的所有topic,然后全部订阅。 如果此时有暂不存在的topic,那么一旦这些topic被创建,conusmer会自动订阅。
不能保证顺序性
当消费者订阅多topic时,Pulsar所提供对单一topic订阅的顺序保证,就hold不住了。 如果你在使用Pulsar的时候,遇到必须保证顺序的需求,强烈建议不要使用此特性。
Partitioned topics
通常一个topic仅被一个broker服务,这限制了topic的最大吞吐量。 分区topic是特殊的topic类型,他可以被多个broker处理,这让topic有更高的吞吐量。
其实在背后,分区的topic通过N个内部topic实现,N是分区的数量。 当向分区的topic发送消息,每条消息被路由到其中一个broker。 Pulsar自动处理跨broker的分区分布。
Topic1有5个分区(P0到P4),分布在3个broker上。因为分区数量多于broker数量,其中有两个broker每个处理两个分区,第三个broker则只处理一个。(再次强调,分区的分布是Pulsar自动处理的)。
这个topic的消息被广播给两个consumer,路由模式决定哪个broker处理哪个partition,订阅模式决定哪条消息发送到哪个consumer。
大多数境况下,路由和订阅模式可以分开制定。通常来讲,吞吐能力的要求,决定了分区/路 的方式。订阅模式则应该由应用来做决定。
分区topic和普通topic,对于订阅模式如何工作,没有任何不同。分区只是决定了从生产者生产消息到消费者处理及确认消息过程中发生的事情。
分区topic需要通过admin API显式创建,创建topic时可以指定分区数。
路由模式
发布到分区主题时,必须指定路由模式。路由模式决定每个消息应该发布到哪个分区,即哪个内部主题。三种路由模式如下:
-
RoundRobinPartition:如果没有key,所有的消息通过轮询方式被路由到不同的分区,以达到最大吞吐量。请注意round-robin并不是作用于每条单独的消息,而是作用于延迟处理的批次边界,以确保批处理有效。
如果为message指定了key,分区的producer会把key做hash,然后分配消息到指定的分区。 这是默认的模式。 -
SinglePartition:如果没有key被提供,producer将会随机选择一个分区,把所有的消息发往该分区。
如果为message指定了key,分区的producer会把key做hash,然后分配消息到指定的分区。 -
CustomPartition:使用客制化消息路由实现,可以决定特定的消息进入指定的分区。 用户可以创建客制化的路由模式,通过使用
Java client ,实现MessageRouter接口。
消息顺序
消息的顺序与消息路由模式和消息的key有关。通常,用户需要对每个key分区的消息保证顺序。
当使用 SinglePartition或者RoundRobinPartition模式时,如果消息有key,消息将会被路由到匹配的分区,这是基于ProducerBuilder 中HashingScheme 指定的散列shema。
顺序保证有两种方式:
- 按key分区:所有拥有相同key的消息有序,并且会被发送至相同的partition。使用SinglePartition或RoundRobinPartition模式,每条消息都需要有key。
- 按producer:来自于相同producer的消息有序,路由策略为SinglePartition, 且每条消息都没有key。
非持久topic
默认情况下,Pulsar保存所有没有确认的消息到多个BookKeeper的bookies中(存储节点)。持久topic的消息数据可以在broker重启或者订阅者出问题的情况下存活下来。 因此,持久性topic上的消息数据可以在 broker 重启和订阅者故障转移之后继续存在。
但是,Pulsar还支持非持久性topic,这些topic的消息从不持久化存储到磁盘,只存在于内存中。 Pulsar也提供了非持久topic。非持久topic的消息不会被保存在硬盘上,只存活于内存中。当使用非持久topic分发时,关掉Pulsar的broker或者关闭订阅者,此topic( non-persistent))上所有的瞬时消息都会丢失,意味着客户端可能会遇到消息缺失。
非持久性topic具有这种形式的名称(注意名称中的 non-persistent):
non-persistent://tenant/namespace/topic
非持久topic中,broker会立即发布消息给所有连接的订阅者,而不会在BookKeeper中存储。 如果有一个订阅者断开连接,broker将无法重发这些瞬时消息,订阅者将永远也不能收到这些消息了。 去掉持久化存储的步骤,在某些情况下,使得非持久topic的消息比持久topic稍微变快。但是同时,Pulsar的一些核心优势也丧失掉了。
非持久topic,消息数据仅存活在内存。 如果broker挂掉或者因其他情况不能从内存取到,你的消息数据就可能丢失。 只有在真的确信你的使用场景符合,并且你可以忍受时,才可去使用非持久topic。
默认非持久topic在broker上是开启的。 你可以通过broker的配置关闭。 你可以通过使用pulsar-admin-topics接口管理非持久topic。
非持久消息通常比持久消息更快,因为broker无须持久化消息,当消息被分发给所有订阅者时,会立即发送ack给producer。 非持久topic让producer有更低的发布延迟。
消息保留和到期
Pulsar broker默认如下:
- 立即删除所有已经被cunsumer确认过的的消息
- 以消息backlog的形式,持久保存所有的未被确认消息
Pulsar有两个特性,让你可以覆盖上面的默认行为:
- 消息存留让你可以保存consumer确认过的消息
- 消息过期让你可以给未被确认的消息设置存活时长(TTL) 所有消息保留和到期都在namespace级别进行管理。
重复数据消除
当消息被Pulsar持久化多于一次的时候,消息就会重复。 消息去重是Pulsar可选的特性,阻止不必要的消息重复,每条消息仅处理一次,即使消息被接收多次。
消息去重被关闭。 Producer发布消息1到一个topic,消息到达broker后,被持久化到BookKeeper。 然后producer又发送了消息1(可能因为某些重试逻辑),然后消息被接收后又持久化在BookKeeper,这意味着消息重复发生了。
在第二个场景中,producer发送了消息1,消息被broker接收然后持久化,和第一个场景是一样的。 当producer再次发送消息时,broker知道已经收到个消息1,所以不会再持久化消息1。
消息重复数据消除是在namespace级别处理的。