java消息分发机制_RabbitMQ消息队列:任务分发机制

  • Post author:
  • Post category:java


我们解决了从发送端(Producer)向接收端(Consumer)发送“Hello World”的问题。在实际的应用场景中,这是远远不够的。从本篇文章开始,我们将结合更加实际的应用场景来讲解更多的高级用法。

有时Consumer需要大量的运算时,RabbitMQ Server需要一定的分发机制来balance每个Consumer的load。试想一下,对于web application来说,在一个很多的HTTP request里是没有时间来处理复杂的运算的,只能通过后台的多个工作线程来完成,队列中的任务将会被工作线程共享执行,这样的概念在web应用这非常有用。接下来我们分布讲解。

应用场景就是RabbitMQ Server会将queue的Message分发给不同的Consumer以处理计算密集型的任务:

87462a3ba3423cd4eac524dd97519b64.png

1. 准备

实际应用Consumer可能做的是计算密集型的工作,那就不能简单的字符串了。在现实应用中,Consumer有可能做的是一个图片的resize,或者是pdf文件的渲染或者内容提取。但是作为Demo,还是用字符串模拟吧:通过字符串中的.的数量来决定计算的复杂度,每个.都会消耗1s,即sleep(1)。

发送端:

package com.zhy.rabbitMq._02_workqueue;

import java.io.IOException;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

public class NewTask{

//队列名称

private final static String QUEUE_NAME = “queue2”;

public static void main(String[] args) throws IOException{

//创建连接和频道

ConnectionFactory factory = new ConnectionFactory();

factory.setHost(“localhost”);

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

//声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//发送10条消息,依次在消息后面附加1-10个点

for (int i = 0; i < 10; i++){

String dots = “”;

for (int j = 0; j <= i; j++){

dots += “.”;

}

String message = “helloworld” + dots+dots.length();

channel.basicPublish(“”, QUEUE_NAME, null, message.getBytes());

System.out.println(” [x] Sent ‘” + message + “‘”);

}

//关闭频道和资源

channel.close();

connection.close();

}

}

接收端:

package com.zhy.rabbitMq._02_workqueue;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.QueueingConsumer;

public class Work{

//队列名称

private final static String QUEUE_NAME = “workqueue”;

<



版权声明:本文为weixin_31569663原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。