Redis消息队列三种方案

  • Post author:
  • Post category:其他


什么是消息队列:

消息(Message)是指在应用间传输的数据,消息可以包括简答的文本字符串,也可以有嵌入对象等,消息队列(Message Queue)是一种应用间的通信方式,用来监视消息是否发送成功,确保消息传出,并基于数据通信来进行分布式系统的集成。

消息队列主流中间件:

当前使用较多的消息队列有

RabbitMQ



RocketMQ



ActiveMQ



Kafka



ZeroMQ



MetaMQ

等,而部分

数据库



Redis



MySQL

以及

phxsql

,如果硬搞的话,其实也可实现消息队列的功能。

Redis消息队列适用范围:

适用于简单的业务场景,Redis是特别轻量级的消息队列。不需要用到RabbitMQ和Kafka的

消息队列基础:

三个角色:

生产者、消费者、消息处理中心

消息队列的异步处理模式:

消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。也就是实现松耦合。

List数据类型的消息队列

原理:

List列表是一种可以按照顺序进出,并插入排序的数据结构,可以当做

异步队列

处理,将需要处理的消息序列化成字符串塞进List,另一个线程从这个列表中轮询数据进行处理。

List基本操作:


1、LPUSH key value1 [value2]


与  RPUSH key value1 [value2]

将一个或多个值插入到列表头部

127.0.0.1:6379> LPUSH list1 “foo”           #返回值为列表长度

(integer) 1

127.0.0.1:6379> LPUSH list1 “bar”

(integer) 2

127.0.0.1:6379> LRANGE list1 0 -1          #查询列表值

1) “bar”

2) “foo”

redis 127.0.0.1:6379> RPUSH mylist “hello”         #执行 RPUSH 操作后,列表的长度。

(integer) 1

redis 127.0.0.1:6379> RPUSH mylist “foo”

(integer) 2

redis 127.0.0.1:6379> LRANGE mylist 0 -1

1) “hello”

2) “foo”


2、LPOP key

移出并获取列表的第一个元素


RPOP key

移除列表的最后一个元素,返回值为移除的元素。

redis 127.0.0.1:6379> RPUSH list1 “foo”

(integer) 1

redis 127.0.0.1:6379> RPUSH list1 “bar”

(integer) 2

redis 127.0.0.1:6379> LPOP list1           #移除尾元素

“foo”

redis> RPUSH mylist “one”                 #RPUSH插入元素

(integer) 1

redis> RPUSH mylist “two”

(integer) 2

redis> RPUSH mylist “three”

(integer) 3

redis> RPOP mylist                               #RPOP移除尾元素

“three”

redis> LRANGE mylist 0 -1

1) “one”

2) “two”


3、LRANGE key start stop

获取列表指定范围内的元素


4、LINDEX key index

通过索引获取列表中的元素

redis 127.0.0.1:6379> LINDEX mylist -1       #下标为指定索引值的元素。

“World”

redis 127.0.0.1:6379> LINDEX mylist 3        # index不在 mylist 的区间范围内

(nil)


5、 LLEN key

获取列表长度

redis 127.0.0.1:6379> LLEN list1

(integer) 2                  #返回列表长度


6、LREM key count value

移除指定元素

redis> LREM mylist -2 “hello”          #删除指定元素

(integer) 2


#count > 0 : 从表头开始向表尾搜索,移除与 VALUE 相同的元素,数量为 COUNT 。

#count < 0 : 从表尾开始向表头搜索,移除与 VALUE 相同的元素,数量为 COUNT 的绝对值。

#count = 0 : 移除表中所有与 VALUE 相等的值。

list实现消息队列:

使用几个push和pop实现:

127.0.0.1:6379> lpush mylist a a b c d e
(integer) 6
127.0.0.1:6379> rpop mylist
"a"
127.0.0.1:6379> rpop mylist
"a"
127.0.0.1:6379> rpop mylist
"b"
127.0.0.1:6379> 

List缺陷

即时消费问题:

使用List作为消息队列可能会造成,如果消费者想要及时的处理消息数据,就要在程序中不断进行遍历,不断地进行pop命令,这会给程序造成一定的性能损失。

解决办法:

所以,Redis 还提供了

BLPOP



BRPOP

这种阻塞式读取的命令(带 B-Bloking的都是阻塞式),客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。这种方式就节省了不必要的 CPU 开销。

127.0.0.1:6379> lpush yourlist a b c d
(integer) 4
127.0.0.1:6379> blpop yourlist 10
1) "yourlist"
2) "d"
127.0.0.1:6379> blpop yourlist 10
1) "yourlist"
2) "c"
127.0.0.1:6379> blpop yourlist 10
1) "yourlist"
2) "b"
127.0.0.1:6379> blpop yourlist 10
1) "yourlist"
2) "a"
127.0.0.1:6379> blpop yourlist 10
(nil)
(10.02s)

BLPOP
BLPOP key [key …] timeout 移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止

消息处理机制(ACK机制)

问题描述:

List队列中的消息一旦发出去,就会从原队列中删除,但如果消费者因为否中网络原因,或者数据崩溃了,导致消费者没有接收到这条消息,此时就会丢失消息,这种问题就是缺少了消息确认机制

解决办法:

1、阻塞的从List中发送一条消息的同时,将这条消息复制到另一个队列中,当做备份消息。

2、在业务流程安全结束后,再删除该备份的队列元素,向队列发送受到消息状态,完成消息确认机制。

127.0.0.1:6379> rpush myqueue one
(integer) 1
127.0.0.1:6379> rpush myqueue two
(integer) 2
127.0.0.1:6379> rpush myqueue three
(integer) 3
127.0.0.1:6379> rpoplpush myqueue queuebak
"three"
127.0.0.1:6379> lrange myqueue 0 -1
1) "one"
2) "two"
127.0.0.1:6379> lrange queuebak 0 -1
1) "three"
BRPOPLPUSH BRPOPLPUSH source destination timeout 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
RPOPLPUSH RPOPLPUSH source destinationb 命令 RPOPLPUSH 在一个原子时间内,执行以下两个动作:将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端。将 source 弹出的元素插入到列表 destination ,作为 destination 列表的的头元素 RPOPLPUSH list01 list02


发布订阅模式实现消息队列

Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。


消息发布者

,即publish客户端,无需独占链接,你可以在publish消息的同时,使用同一个redis-client链接进行其他操作(例如:INCR等)


消息订阅者

,即subscribe客户端,需要独占链接,即进行subscribe期间,redis-client无法穿插其他操作,此时client以阻塞的方式等待“publish端”的消息;这一点很好理解,因此subscribe端需要使用单独的链接,甚至需要在额外的线程中使用。

发布订阅原理

Redis 客户端可以订阅任意数量的频道。

下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:

当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:

实例演示:


发布-订阅需要打开两个redis-cli客户端,首先先创建订阅频道

# SUBSCRIBEchannel1…订阅给定的一个或多个频道的信息。

redis 127.0.0.1:6379> SUBSCRIBE mychannel

Reading messages… (press Ctrl-C to quit)

1) “subscribe”                  #显示模式,subscribe是订阅

2) “mychannel”                #显示订阅的频道

3) (integer) 1                   #订阅成功返回1

1) “message”

2) “mychannel”

3) “a”


第二个redis-cli客户端,在同一个频道 mychannel发布消息


PUBLISH channel message

将信息发送到指定的频道。

redis 127.0.0.1:6379> PUBLISH mychannel “send the message to mtchannel”

(integer) 1             #发送消息成功返回1


此时第一个客户端,也就是订阅者客户端会显示发送的消息

# SUBSCRIBEchannel1…订阅给定的一个或多个频道的信息。

redis 127.0.0.1:6379> SUBSCRIBE mychannel

Reading messages… (press Ctrl-C to quit)

1) “subscribe”

2) “mychannel”

3) (integer) 1

1) “message”#显示模式,此处是消息

2) “mychannel”#显示订阅者频道

3) “send the message to mtchannel “#显示消息内容

订阅发布使用场景

该模式主要用于用户编辑某个模块需要清除缓存,需要在编辑完某模块后发布到频道,在订阅该频道实现该模块清除缓存。

订阅发布其他基本操作


1、PUBSUB subcommand [argument [argument …]]

查看订阅与发布系统状态

redis 127.0.0.1:6379> PUBSUB CHANNELS

(empty list or set)#若此时有值,则返回活跃频道的列表


2、 UNSUBSCRIBE [channel [channel …]]

指退订给定的频道。

redis 127.0.0.1:6379> UNSUBSCRIBE mychannel

1) “unsubscribe”

2) ” send the message to mtchannel ”

3) (integer) 0

模式匹配订阅

Redis 的Pub/Sub实现支持模式匹配。

如果需要

订阅test.name   test.addr    test.age等test频道

,可以直接订阅全风格的模式如:

PSUBCSCRIBE test.*


插入消息:

127.0.0.1:6379> PUBLISH test.name haha

(integer) 1


取消该订阅:

PUNSUBSCRIBE test.*

发布订阅MQ缺点

Redis 发布订阅 (pub/sub) 有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。而且也没有 Ack 机制来保证数据的可靠性,假设一个消费者都没有,那消息就直接被丢弃了。

Stream数据类型

原理:

Redis 5.0 版本新增了一个更强大的数据结构——

Stream

。它提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失,像一个仅追加内容的消息链表,将所有加入的消息串起来,且消息持久化。

增删改查消息操作例子:

127.0.0.1:6379> xadd mystream * f1 v1 f2 v2 f3 v3
"1609404470049-0"  ## 生成的消息 ID,有两部分组成,毫秒时间戳-该毫秒内产生的第1条消息

# 消息ID 必须要比上个 ID 大
127.0.0.1:6379> xadd mystream 123 f4 v4  
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

# 自定义ID
127.0.0.1:6379> xadd mystream 1609404470049-1 f4 v4
"1609404470049-1"

# -表示最小值 , + 表示最大值,也可以指定最大消息ID,或最小消息ID,配合 -、+ 使用
127.0.0.1:6379> xrange mystream - +
1) 1) "1609404470049-0"
   2) 1) "f1"
      2) "v1"
      3) "f2"
      4) "v2"
      5) "f3"
      6) "v3"
2) 1) "1609404470049-1"
   2) 1) "f4"
      2) "v4"

127.0.0.1:6379> xdel mystream 1609404470049-1
(integer) 1
127.0.0.1:6379> xlen mystream
(integer) 1
# 删除整个 stream
127.0.0.1:6379> del mystream
(integer) 1

XADD
添加消息到末尾,保证有序,可以自动生成唯一ID

XADD key ID field value [field value …]

XDEL 删除消息 XDEL key ID [ID …]
XLEN 获取流包含的元素数量,即消息长度 XLEN key
XRANGE 获取消息列表,会自动过滤已经删除的消息 XRANGE key start end [COUNT count]

阻塞或非阻塞形式获取消息列表:

# 从ID是0-0的开始读前2条
127.0.0.1:6379> xread count 2 streams mystream 0
1) 1) "mystream"
   2) 1) 1) "1609405178536-0"
         2) 1) "f5"
            2) "v5"
      2) 1) "1609405198676-0"
         2) 1) "f1"
            2) "v1"
            3) "f2"
            4) "v2"

# 阻塞的从尾部读取流,开启新的客户端xadd后发现这里就读到了,block 0 表示永久阻塞
127.0.0.1:6379> xread block 0 streams mystream $
1) 1) "mystream"
   2) 1) 1) "1609408791503-0"
         2) 1) "f6"
            2) "v6"
(42.37s)

#$这个特殊的 ID 意思是 XREAD 应该使用流 mystream 已经存储的最大 ID 作为最后一个 ID。

创建消费者组:

在某些问题中,我们希望让不同的消费者从同一流中向许多不同客户端提供不同消费集,因此就需要创建消费者组。

消费者组基本命令:

Stream 不像 Kafak 那样有分区的概念,如果想实现类似分区的功能,就要在客户端使用一定的策略将消息写到不同的 Stream。


  • xgroup create

    :创建消费者组

  • xgreadgroup

    :读取消费组中的消息

  • xack

    :ack 掉指定消息
# 创建消费者组的时候必须指定 ID, ID 为 0 表示从头开始消费,为 $ 表示只消费新的消息,也可以自己指定
127.0.0.1:6379> xgroup create mystream mygroup $
OK

# 查看流和消费者组的相关信息,可以查看流、也可以单独查看流下的某个组的信息
127.0.0.1:6379> xinfo stream mystream
 1) "length"
 2) (integer) 4  # 共 4 个消息
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1609408943089-0"
 9) "groups"
10) (integer) 1  # 一个消费组
11) "first-entry" # 第一个消息
12) 1) "1609405178536-0"
    2) 1) "f5"
       2) "v5"
13) "last-entry"  # 最后一个消息
14) 1) "1609408943089-0"
    2) 1) "f6"
       2) "v6"

按照消费组进行消费

方法:

1、使用xreadgroup指令进行消费组组内消费,它也可以阻塞等待新消息

2、当读到新消息后,对应的消息 ID 就会进入消费者的 PEL(正在处理的消息) 结构中

3、客户端处理完毕后使用

xack

指令通知服务器,本条消息已经处理完毕,该消息 ID 就会从 PEL 中移除。

#  消费组 mygroup1 中的 消费者 c1 从 mystream 中 消费组数据
# > 号表示从当前消费组的 last_delivered_id 后面开始读
# 每当消费者读取一条消息,last_delivered_id 变量就会前进
#last_delivered_id :每个消费组会有个游标 last_delivered_id 在数组之上往前移动,表示当前消费#组已经消费到哪条消息了
127.0.0.1:6379> xreadgroup group mygroup1 c1 count 1 streams mystream >
1) 1) "mystream"
   2) 1) 1) "1609727806627-0"
         2) 1) "f1"
            2) "v1"
            3) "f2"
            4) "v2"
            5) "f3"
            6) "v3"
127.0.0.1:6379> xreadgroup group mygroup1 c1 count 1 streams mystream >
1) 1) "mystream"
   2) 1) 1) "1609727818650-0"
         2) 1) "f4"
            2) "v4"
# 已经没有消息可读了            
127.0.0.1:6379> xreadgroup group mygroup1 c1 count 2 streams mystream >
(nil)

# 还可以阻塞式的消费
127.0.0.1:6379> xreadgroup group mygroup1 c2 block 0 streams mystream >
µ1) 1) "mystream"
   2) 1) 1) "1609728270632-0"
         2) 1) "f5"
            2) "v5"
(89.36s)

# 观察消费组信息
127.0.0.1:6379> xinfo groups mystream
1) 1) "name"
   2) "mygroup1"
   3) "consumers"
   4) (integer) 2  # 2个消费者
   5) "pending"
   6) (integer) 3   # 共 3 条正在处理的信息还没有 ack
   7) "last-delivered-id"
   8) "1609728270632-0"
   
127.0.0.1:6379> xack mystream mygroup1 1609727806627-0  # ack掉指定消息
(integer) 1

XREAD
以阻塞或非阻塞方式获取消息列表 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] id [id …]

XGROUP CREATE
创建消费者组 XGROUP [CREATE key groupname id-or-] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

XREADGROUP GROUP
读取消费者组中的消息 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]
XACK 将消息标记为”已处理” XACK key group ID [ID …]



版权声明:本文为weixin_45986788原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。