消息队列(MSMQ)

  • Post author:
  • Post category:其他


System.Messaging命名空间包含的类可以用windows操作系统的Message Queuing功能读写信息。消息传送功能可以在断开连接的环境下作用。

在同步编程中,调用一个方法时,调用者必须等待该方法执行完毕,在异步编程中,调用线程可以启动并行运行的方法,异步编程可以通过委托或者支持异步方法的类库,或线程来实现。在同步和异步编程中,客户机和服务器必须同时运行。

MSMQ是异步进行的,但因为客户机(发送者)不等待服务器(接收者)读取发送给它的数据,MSMQ可以在断开连接的环境下进行,在传送数据时,接收者可以离线,在以后的某个时刻,接收者上线时,就会接受到数据,而无需发送应用程序的干预。

MSMQ可以在断快连接的时候使用,假定在一个电子商务站点上,服务器在某些时刻的订单事物负载时满的,但在晚上,其负载很低,一个解决方案是购买一台更快的服务器或在系统中添加更多的服务器,以处理高峰时的订单,还有一种成本较低的解决方案:把事务从高负载的时段移动到低负载的时候,即削峰平谷。在这种方案中,订单发送到消息队列中,接收端按对数据库系统有利的速度读取订单。现在,系统的负载被降低了,处理事务的服务器就可以比升级数据库系统便宜。

一,MSMQ特性
MSMQ是windows操作系统的一部分,这个服务的主要功能如下:
1,消息可以在断开连接的环境下发送,不需要同时运行发送和接收应用程序。
2,使用快速模式,消息可以非常快地发送,在快递模式下,消息存储在内存中。
3,对于恢复机制,消息可以使用有保证的交付方式发送,可恢复的消息存储在文件中,在服务器重启时发送它们。
4,消息队列用访问控制表来保护,确定了哪些用户可以发送或几首队列中的消息,消息还可以加密,避免网络嗅探器读取其中的数据。
5,消息在发送时可以指定优先级,更快地处理高优秀级的项。
6,MSMQ支持多播消息的发送
7,在事务消息队列中,消息可以参与DTC事务。
二,安装MSMQ
在windows xp中,使用“添加或删除程序”,就会在windows组件显示一个单独的部分,在其中可以选择MSMQ选项,在MSMQ选项中,可以选择下面的组件:
Common:Common子组件是MSMQ基本功能所必须的
Active Directory Integration:有了它,消息队列名酒会写入Active Directory,利用这个选项可以使用Active DirectoryIntegration查找队列,用Windows用户和组保护队列。
MSMQ Http Support:它允许使用HTTP协议发送和接收消息。
Triggers:利用Triggers可以在接收新消息时实例化应用程序。
在Message Queuing安装时,必须启动Message Queuing服务,这个服务读消息,与其他Message Queuing服务器通信,把消息路由到网络上。
三,MSMQ结构
通过Message Queuing,可以从消息队列中读写消息。
(1)消息
消息会发送到消息队列中,消息包含消息体和消息标题,消息体重可以放置任意消息。消息体中可以放置任意消息。
消息除了有消息标题和消息体外,还包含发送者,超时配置,事务ID或优先级
消息队列有几种类型的消息:
一般消息:由应用程序发送
确认消息:报告一般消息的状态,确认消息发送到管理队列中,报告一般消息的发送是否成功。
响应消息:当最初的发送者需要某种回应时,由接收应用程序发送。
报告消息:由MSMQ系统生成,测试消息和路由跟踪消息属于此类。
消息可以有优先级,定义消息从队列中读取出来的顺序,从队列中读取的消息就是优先级最高的那个消息。
消息有两种递送模式:快递模式和可恢复模式。
快递消息的传送速度非常快,因为消息只使用内存来存储,可恢复消息在路由的每一个阶段都要存储在文件中,直到消息传送到目的地为止。这样,即使计算机重启或网络失败,消息的传送也能得到保证。
事务消息是可恢复消息的一种特殊版本,在事务消息传送过程中,可以确保消息只到达目的地一次,且按照它们发送的顺序到达目的地,优先级不能在事务消息中使用。
(2)消息队列
消息队列是一个消息库,存储在磁盘上的消息位于/system32/msmq/storage目录。
四,MSMQ编程实现
(1)创建消息队列
消息队列还可以用MessageQueue类的Create()方法编程创建,在Create()方法中,必须传送新队列的路径,路径包括队列所在主机的名称和队列的名称。例如:要在本地主机上创建队列MyNewPublicQueue,为了创建私有队列,路径名必须包括Private$,例如/Private$/MyNewPrivateQueue.
调用Create()方法之后,就可以修改队列的属性,使用Label属性,把队列的标签设置为Demo Queue,示例程序把队列的路径和格式名称写到控制台上,格式名称用UUID自动创建,UUID可用于访问队列,且无需服务器名:
static void Main(string[] args)
{
using(MessageQueue queue=MessageQueue.Create(@”./MyNewPublicQueue”))
{
queue.Label=”Demo Queue”;
Console.WriteLine(“Queue created:”);
Console.WriteLine(“Path:{0}”,quequ.Path);
console.WriteLine(“FormatName:{0}”,queue.FormatName);
}
}
注意:
创建队列时需要管理权限,通常不能让应用程序的用户拥有管理权限,这就是队列通常用安装程序创建的原因,还可以使用MessageQueueInstaller类创建消息队列。
(2)查找队列
路径名和格式名可以用于标识队列,要查找队列,必须区分公共队列和私有队列。公共队列在Active Directory中发布,对于这些队列,无需知道他们所在的系统,私有队列只有在已知队列所在的系统名时才能找到。
在Active Directory域中收索队列的标签,类别或格式名,就可以找到公共队列,还可以获得机器上的所有队列,MessageQueue类的静态方法GetPublicQueuesByLabel()、GetPublicQueuesByCategory()和GetPublicQueuesByMachine()可以收索队列,GetPublicQueues()方法返回包含域中所有公共队列的数组。
static void Main(string[] args)
{
foreach(MessageQueue queue in MessageQueue.GetPublicQueues())
{
Console.WriteLine(queue.Path);
}
}
GetPublicQueues()方法是重载的,它的一个重载版本允许传送MessageQueueCriteria类的一个实例。利用这个类可以收索在某个时刻之前或之后创建或修改队列,还可以查找队列的类别,标签或机器名。
私有队列可以使用静态方法GetPrivateQueuesByMachine()收索,这个方法返回指定系统中的所有私有队列。
(3)打开已知的队列
如果队列名已知,就不需要收索它,使用路径或格式名就可以打开队列,路径或格式名在MessageQueue类的构造函数中设置。
路径指定了打开队列需要的机器名和队列名,下面的代码示例打开了本地主机上的队列MyPublicQueue,为了确定队列是否存在,使用了静态方法MessageQueue.Exists();
static void Main(string[] args)
{
if(MessageQueue.Exists(@”./MyPublicQueue”))
{
MessageQueue queue=new MessageQueue(@”./MyPublicQueue”);
}
else
{
Console.WriteLine(“Queue ./MyPublicQueue not existing”);
}
}

根据队列的类型,在打开队列时需要不同的标识符,如下是指定类型的队列名语法。
公共队列   MachineName/QueueName
私有队列   MachineName/Private$/QueueName
日志队列   MachineName/QueueName/Journal$
机器日志队列   MachineName/Journal$
机器死信队列   MachineName/DeadLetter$
机器事务死信队列 MachineName/XactDeadLetter$
在使用路径名打开公共队列时,需要传送机器名,如果机器名未知,则可以使用格式名替,私有队列的路径名只能在本地系统上使用,必须使用格式名远程访问私有队列。
除了路径名之外,还可以使用格式名打开队列,格式名用于在Active Directory中收索队列,获得队列所在的主机,在断开连接的环境下,消息不能发送到队列中,消息不能发送到队列中,此时就需要使用格式名:
MessageQueue queue=new MessageQueue(@”FormatName:PUBLIC=09816AFF-3608-4c5d-B892-69754BA151FF”);
格式名还可以打开私有队列,指定要使用的协议
a,要访问私有队列,必须给构造函数传送字符串FormatName:PRIVATE=MachineGUID/QueueNumber,在创建队列时,会生成私有队列的队列号,队列号在<windows>/System32/msmq/storage/lqs目录下。
b,使用FormatName:DIRECT=Protocol:MachineAddress/QueueName,可以指定用于发送消息的协议。
c,FormatName:DIRect=OS:MachineName/QueueName是使用格式名指定队列的另一种方式,此时不需要指定协议,但仍可以使用机器名和格式名。
(4)发送消息
使用MessageQueue类的Send方法给队列发送消息,作为参数传给Send()方法的对象串行化到相关的队列上,Send()方法是重载的,这样才能传送标签和MessageQueueTransaction对象,路径名给服务器名指定”.”表示它本地系统,私有队列的路径名只能在本地使用。
static void Main(string[] args)
{
try
{
if(!MessageQueue.Exists(@”./Private$/MyPrivateQueue”))
{
MessageQueue.Create(@”./Private$/MyPrivateQueue”);
}
MessageQueue queue=new MessageQueue(@”./Private$/MyPrivateQueue”);
queue.Send(“Sample Message”,”Label”);
}
catch(MessageQueueException ex)
{
Console.WriteLine(ex.Message);
}
}
消息用xml格式化了,消息的格式化是消息队列附带的格式化器的功能。
发送复杂的消息,除了传送字符串之外,还可以给MessageQueue类的Send()方法传送对象,该类的类型必须满足一些特殊的要求,但它们取决于格式化器,对于二进制格式化器,该类必须用[Serializable]属性串行化,使用.net运行库的串行化功能,对所有的字段进行串行化(包括私有字段)实现ISerializable接口就可以定义定制串行化,XML串行化在使用XML格式化器时进行,在XML串行化过程中,会串行化所有的公共字段和属性,使用System.Xml.Serialization命名空间中的属性可以影响xml串行化。
(5)消息格式化器
消息传送给队列的格式取决于格式化器,MessageQueue类有一个Formatter属性,通过它可以指定格式化器,默认的格式化器XmlMessageFormatter会用XML语法格式化消息。
消息格式化器实现了IMessageFormatter接口,System.Messaging 命名空间中有3个消息格式化器:
XmlMessageFormatter是默认的格式化器,它使用XML串行化对象;使用BinaryMessageFormatter,可以用二进制格式对消息进行串行化,这些消息比使用XML格式化的消息短;ActiveXMessageFormatter是一个二进制格式化器,所以可以用COM对象读写消息,使用这个格式化器,可以用.net类把消息写入队列,使用COM对象从队列中读取消息,反之亦然。
(6)接收消息
要读取消息,也可以使用MessageQueue类,通过Receive()方法可以读取一个消息,再将该消息从队列中删除,如果消息是使用不同的优先级发送的,就读取优先级最高的消息,读取优先级相同的消息时,第一个发送的消息不一定是第一个读取的,因为消息在网络中传送顺序无法保证,要保证发送顺序和读取顺序相同,可以使用事务消息队列。
static void Main(stirng[] args)
{
MessageQueue queue=new MessageQueue(@”,/Private$/MyPrivateQueue”);
queue.Formatter=new XmlMessageFormatter(new string[]{“System.String”});
Message message=queue.Receive();
Console.WriteLine(message.Body);
}
Receive()方法将同步执行,如果队列中没有消息,就会等待队列中有消息时执行。
1,枚举消息
除了使用Receive()方法逐个消息地读取之外,还可以使用枚举器迭代所有MessageQueue类实现了IEnumerable接口,所以可以在foreach语句中使用,使用迭代器消息不会从队列中删除,但可以查看他们的内容:
MessageQueue queue=new MessageQueue(@“./Private$/MyPrivateQueue”);
queue.Formatter=new XmlMessageFormatter(new string[]{“System.String”});
foreach(Message message in queue)
{
Console.WriteLine(message.Body);
}
如果不使用IEnumerable接口,还可以使用MessageEnumerator类,MessageEnumerator实现了IEnumerator接口,有更多的特性,实现IEnumerable接口,就表示不从队列中删除消息,MessageEnumerator类的RemoveCurrent()方法可以从枚举器得当前光标位置删除消息。
使用MessageQueue类的GetMessageEnumerator()方法访问MessageEnumerator,通过MessageEnumerator的MoveNext()方法,可以逐个访问消息,MoveNext()方法重载为把一个时间段作为参数,这是使用这个枚举器的一个主要优点,现在,线程可以在指定的时间段内等待消息进入队列,之后就不等待了,IEnumerator接口定义的Current属性返回消息的一个引用。
MessageQueue queue=new MessageQueue(@“./Private$/MyPrivateQueue”);
queue.Formatter=new XmlMessageFormatter(new string[]{“System.String”});
using(MessageEnumerator messages=queue.GetMessageEnumerator())
{
while(messages.MoveNext(TimeSpan.FormMinutes(30)))
{
Message message=message.Current;
Cosole.WriteLine(message.Body);
}
}

2,异步读取
MessageQueue类的Receive方法会等到队列中的消息可以读取为止,为了避免阻碍线程的执行,可以在Receive方法的一个重载版本中指定一个超时设置,要从超时读取队列中的消息,必须再次调用Receive()方法,这里不是查询消息,而可以调用BeginReceive()异步方法,在使用BeginReceive()开始异步读取之前,应设置ReceiveCompleted事件,ReceiveCompleted事件需要委托ReceiveCompletedEventHandler,该委托引用在消息到达队列并可以读取时要调用的方法了
MessageQueue queue=new MessageQueue(@“./Private$/MyPrivateQueue”);
queue.Formatter=new XmlMessageFormatter(new string[]{“System.String”});
queue.ReceiveCompleted+=new ReceiveComletedEventHandler(MessageArrived);
queue.BeginReceive();
MessageArrived处理程序需要两个参数,第一个参数是事件源MessageQueue,第二个参数是ReceiveCompletedEventArgs类型,它包含消息和异步结果,调用EndReceive()方法,以获得异步方法的结果,即消息:
public static void MessageArrived(object source,ReceiveCompletedEventArgs e)
{
MessageQueue queue=(MessageQueue)source;
Message message=queue.EndReceive(e.AsyncResult);
Console.WriteLine(message.Body);
}
如果消息不应从队列中删除,那么BeginPeek()和EndPeek()方法就可以与异步I/O一起使用。
五,事务队列
对于可恢复的消息来说,不能保证消息的接收顺序,也不能保证消息只接收一次,网络失败可能会使消息被接收多次,如果发送程序和接收程序安装了供Message Queuing使用的多个网络协议,也会发生这种情况。
在需要确保消息的接收顺序和只接收一次的情况下,可以使用事务队列:
消息的接收顺序与其发送顺序相同,消息只接收一次
对于事务队列,一个事务不能横跨消息和发送和接收过程,Message Queuing的本质是发送和接收的时间可能非常长,而事务处理是很短的,在Message Queuing 中,第一个事务把消息发送到队列中,第二个事务把消息写入网络,第三个事务用于接收消息。
给MessageQueue.Create()方法的第二个参数传送true,就创建了一个事务消息队列。
如果要在一个事务中把多个消息写入队列,就必须实例化MessageQueueTransaction对象,调用Begin()方法,在发送完属于该事务的所有消息后,必须调用MessageQueueTranscaction对象的Commit()方法,要取消一个事务,就必须再catch块中调用Abort()方法。
static void Main(string[] args)
{
if(!MessageQueue.Exists(@”./MyTransactionalQueue”))
{
MessageQueue.Create(@”./MyTransactionalQueue”,true);
}
MessageQueue queue=new MessageQueue(@”./MyTransactionalQueue”);
MessageQueueTransaction  transaction=new MessageQueueTransaction();
try
{
transaction.Begin();
queue.Send(“a”,transaction);
queue.Send(“b”,transaction);
queue.Send(“c”,transaction);
transaction.Commit();
}
catch
{
transaction.Abort();
}
}



版权声明:本文为jinzhengquanqq原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。