C#操作RocketMQ用NewLife.RocketMQ发布消息,消费消息

  • Post author:
  • Post category:其他



发布消息

using NewLife.Log;
using NewLife.Messaging;
using NewLife.RocketMQ;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1_RocketMq
{
    class Program
    {
        

        //押金订单
        static string json2 =
"			{					" +
"			OrderNo:\"DFGDFGIAL456827689\"," +
"			OrderTime:\"2020-08-31 02:55:35\"," +
"          ShopNo:\"DSLFJHLJFIEINCXLKFG\",       " +
"          ShopName:\"五大花园店泡脚牛肉米线\",           " +
"			ReceivingArea:\"北京(BJ)北京(010)\"," +
"			Address:\"北京市海淀区知春路65号中国卫星通信大厦B座23层\"," +
"			Revicer:\"李世民\"," +
"			Phone:\"18756542695\"," +
"			Total:\"\"," +
"          ShipFee:\"3.50\",     " +
"   TotalDeposit:\"200.00\",     " +
"   DailyRent:\"50\",     " +
"			PointsDiscount:\"\"," +
"			RealPay:\"\"," +
"          Remark:\"9-1-01测试订单,不需要加肉\",  " +
"			DataItems:[" +
"			\"肥牛 x2¥59.00\"," +
"			\"大白菜 x1¥59.00\",         " +
"			\"白萝卜 x3¥59.00\",         " +
"			\"千层肚 x5¥59.00\",         " +
"			\"黄喉 x6¥59.00\",               " +
"			\"菌花 x6¥59.00\",               " +
"			\"腰片 x6¥59.00\",               " +
"			\"虾滑 x6¥59.00\",               " +
"			\"肥牛 x2¥59.00\",               " +
"			\"大白菜 x1¥59.00\",         " +
"			\"白萝卜 x3¥59.00\",         " +
"			\"千层肚 x5¥59.00\",         " +
"			\"黄喉 x6¥59.00\",               " +
"			\"菌花 x6¥59.00\",               " +
"			\"腰片 x6¥59.00\",               " +
"			\"虾滑 x6¥59.00\",               " +
"			\"肥牛 x2¥59.00\",               " +
"			\"大白菜 x1¥59.00\",         " +
"			\"白萝卜 x3¥59.00\",         " +
"			\"千层肚 x5¥59.00\",         " +
"			\"黄喉 x6¥59.00\",               " +
"			\"菌花 x6¥59.00\",               " +
"			\"腰片 x6¥59.00\",               " +
"			\"虾滑 x6¥59.00\",               " +
"			\"肥牛 x2¥59.00\",               " +
"			\"大白菜 x1¥59.00\",         " +
"			\"白萝卜 x3¥59.00\",         " +
"			\"千层肚 x5¥59.00\",         " +
"			\"黄喉 x6¥59.00\",               " +
"			\"菌花 x6¥59.00\",               " +
"			\"腰片 x6¥59.00\",               " +
"			\"虾滑 x6¥59.00\",				" +
"			] }";


        static void Main(string[] args)
        {
            var mq = new Producer
            {
                Topic = "DESKTOP-ACRFQSI",
                NameServerAddress = "192.168.1.194:9876",
                //Log = XTrace.Log,
            };
            mq.Start();

            //发送消息方式一,可以设置key
            NewLife.RocketMQ.Protocol.Message message = new NewLife.RocketMQ.Protocol.Message()
            {
                BodyString = json2,          
                Keys ="key002",
                Tags = "TagC",
                Flag = 0,
                WaitStoreMsgOK = true
            };
            var sr = mq.Publish(message);

            //发送消息方式二
            //var sr = mq.Publish(json2, "TagA");
            string log = $"发送成功的消息,内容>{json2},MsgId={sr.MsgId},BrokerName= {sr.Queue.BrokerName} ,QueueId={sr.Queue.QueueId},QueueOffset= {sr.QueueOffset}";
            Console.WriteLine(log);
            // 阿里云发送消息不能过快,否则报错“服务不可用”
            LogHelpter.AddLog(log);

            Console.WriteLine("完成");
            mq.Dispose();
            Console.ReadLine();


        }
    }
}


消费消息

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApp_Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("消息接收测试");
            //测试消费消息
            var consumer = new NewLife.RocketMQ.Consumer
            {
                Topic = "DESKTOP-ACRFQSI",
                Group = "CID_ONSAPI_OWNER",
                NameServerAddress = "192.168.1.194:9876",
                //设置每次接收消息只拉取一条信息
                BatchSize = 1,
                //FromLastOffset = true,
                //SkipOverStoredMsgCount = 0,
                //BatchSize = 20,
                //Log = NewLife.Log.XTrace.Log,
            };
            consumer.OnConsume = (q, ms) =>
            {
                string mInfo= $"BrokerName={q.BrokerName},QueueId={q.QueueId},Length={ms.Length}";
                Console.WriteLine(mInfo);
                foreach (var item in ms.ToList())
                {
                    string msg = $"消息:msgId={item.MsgId},key={item.Keys},产生时间【{item.BornTimestamp.ToDateTime()}】,内容>{item.Body.ToStr()}";
                    Console.WriteLine(msg);
                }
             //   return false;//通知消息队:不消费消息
               return true;		//通知消息队:消费了消息
            };
      
            consumer.Start();
            Console.ReadLine();
        }
    }
}

停止接收RocketMq消息,

          consumer.OnConsume = null;
          consumer.Dispose();

重新启动接收参考:

  
     static Func<MessageQueue, MessageExt[], bool> reviceMsgFunc= (q, ms) =>
   {
			//     string mInfo = $"BrokerName={q.BrokerName},QueueId={q.QueueId},Length={ms.Length}";
			//	Console.WriteLine(mInfo);
			//     LogHelpter.AddLog("********收到消息结构>" + mInfo);
					foreach (var item in ms.ToList())
					{
						string msg = $"消息msgId={item.MsgId},key={item.Keys},QueueOffset={item.QueueOffset},产生时间【{item.BornTimestamp.ToDateTime()}】,内容>{item.Body.ToStr()}";
						Console.WriteLine(msg);
						LogHelpter.AddLog("收到消息>" + msg);
					}
					//   return false;//不消费
					return true;//消费了消息
	};
  
  //启动
 Task.Run(() =>
 {
     System.Threading.Thread.Sleep(50 * 1000);
     try
     {
         consumer = new NewLife.RocketMQ.Consumer
         {
             Topic = "topic_order_list_print",
             Group = "group_order_print",
             NameServerAddress = "127.0.0.1:9876",
             //设置每次接收消息只拉取一条信息
             BatchSize = 1,
             //FromLastOffset = true,
             //SkipOverStoredMsgCount = 0,
             //BatchSize = 20,
             //Log = NewLife.Log.XTrace.Log,
         };
         consumer.OnConsume = reviceMsgFunc;
         consumer.Start();
         //consumer.StartSchedule();
         string log = "再次启动接收消息,成功" + DateTime.Now.ToFullString();
         Console.WriteLine(log);
         LogHelpter.AddLog(log);
         Console.ReadLine();
     }
     catch (Exception ex)
     {
         Console.WriteLine("再次启动接收消息出错" + ex.Message);
         LogHelpter.AddLog("再次启动接收消息出错," + ex.Message);
     }
 });



版权声明:本文为u011511086原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。