消费者代码主要包含以下几方面:
01.创建factory
02.创建连接
03.创建channel
04.创建消费者
05.回收资源
消费者代码:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace rabbitMQConsum
{
public static class SimpleConsum
{
private const string QUEUE_NAME = "queue_demo";
private const string IP_ADDRESS = "127.0.0.1";
private const int PORT = 5672;//RabbitMQ 服务端默认端口5672;
private const string USER_NAME = "guest";
private const string PASSWORD = "guest";
public static void Consumer()
{
try
{
//01.创建factory
ConnectionFactory factory = new ConnectionFactory();
factory.UserName = USER_NAME;
factory.Password = PASSWORD;
//02.创建连接
IConnection con = factory.CreateConnection();
//03.创建channel
IModel channel = con.CreateModel();
//创建一个持久的、非排他的、非自动删除的队列
channel.QueueDeclare(QUEUE_NAME, true, false, false, null);
//队列最大接收未被ack的消息的个数
channel.BasicQos(64, 1000, true);
//04.创建消费者-监听方式
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
Run(body);
channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(QUEUE_NAME, false, consumer);
//05.回收资源
channel.Close();
con.Close();
}
catch (Exception ex)
{
throw;
}
}
private static void Run(byte[] body)
{
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
}
}
}
简单异常处理后代码:
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace rabbitMQConsum
{
public static class SimpleConsum
{
private const string QUEUE_NAME = "queue_demo";
private const string IP_ADDRESS = "127.0.0.1";
private const int PORT = 5672;//RabbitMQ 服务端默认端口5672;
private const string USER_NAME = "guest";
private const string PASSWORD = "guest";
public static void Consumer()
{
IConnection con = null;
IModel channel = null;
try
{
//01.创建factory
ConnectionFactory factory = new ConnectionFactory();
factory.UserName = USER_NAME;
factory.Password = PASSWORD;
//02.创建连接
con = factory.CreateConnection();
//03.创建channel
channel = con.CreateModel();
//创建一个持久的、非排他的、非自动删除的队列
channel.QueueDeclare(QUEUE_NAME, true, false, false, null);
//队列最大接收未被ack的消息的个数
channel.BasicQos(64, 1000, true);
//04.创建消费者-监听方式
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
Run(body);
channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(QUEUE_NAME, false, consumer);
}
catch (IOException ioE)
{
throw;
}
catch (SocketException socketEx)//RabbitMQ 用TCP协议,这里除了socket异常
{
throw;
}
catch (Exception ex)
{
throw;
}
finally
{
//05.关闭资源
if (channel != null)
channel.Close();
if (con != null)
con.Close();
}
}
private static void Run(byte[] body)
{
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
}
}
}
版权声明:本文为wby90原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。