精通
英语
和
开源
,
擅长
开发
与
培训
,
胸怀四海
第一信赖
锐英源精品原创,禁止全文或局部转载,禁止任何形式的非法使用,侵权必究。点名“简易百科”和闲暇巴盗用锐英源原创内容。
进程内的消息队列可以在线程之间传递消息,要在进程之间传递消息就要用能跨进程的消息队列,在平台越来越复杂的当今,跨进程消息队列也是个热点,对应的开源项目有RabbitMQ、RocketMQ、MSMQ和IBMMQ等等,这些开源项目各有特点,也都是成熟产品,在写架构复杂平台时使用,可以择一使用。
不过如果架构不复杂的情况下,也不想引用很多库,就要用一些轻量级的消息队列MQ了,DotNetMQ就是这类开源项目,虽然是轻量级的,但是也比较复杂,例子代码也过了1M,里面也有很多部分。如果想学习,请找锐英源软件。注意,看不懂codeproject,请找锐英源软件。
本文翻译自codeproject,特此注明。
在本文中,我将介绍一个全新的、独立的开源消息队列系统,它完全构建在 C# 和 .NET framework 3.5 中。DotNetMQ是一个消息代理,具有多种功能,包括保证交付、路由、负载平衡、服务器图......等等。我将首先解释消息传递概念和对消息代理的需求。然后我将研究 DotNetMQ 是什么以及如何使用它。
消息传递是在相同或不同机器上运行的应用程序异步通信的一种方式,具有可靠的传递。程序通过相互发送称为消息的数据包进行通信。
消息可以是字符串、字节数组、对象...等。通常,发送者(生产者)程序创建消息并将其推送到消息队列,接收者(消费者)程序从队列中获取消息并处理它。发送方和接收方程序不必同时运行,因为消息传递是一个异步过程。这称为松散耦合通信。
另一方面,Web Service 方法调用(Remote Method Invocation)是一种紧密耦合的同步通信(在整个通信过程中,两个应用程序都必须处于运行状态且可用;如果 Web Service 离线或在调用过程中发生错误)方法调用,客户端应用程序获取异常)。
在上图中,两个应用程序以松散耦合的方式通过消息队列进行通信。如果接收方消耗消息的速度比发送方产生消息的速度慢,那么队列上的消息计数将会增加。此外,在发送者发送消息时,接收者可能处于离线状态。在这种情况下,接收者在联机时(当它启动并加入队列时)从队列中获取消息。
消息队列通常由消息代理提供。消息代理是其他应用程序连接并发送/接收消息的独立应用程序(服务)。消息代理负责存储消息,直到接收者收到它们。消息代理可以跨机器路由消息以将消息传递到目标应用程序,并且可以尝试传递消息直到接收者正确处理它。消息代理有时称为面向消息的中间件( MOM ) 或简称为消息队列( MQ )。
DotNetMQ是一个开源消息代理,具有以下几个特性:
在第一次创建 DotNetMQ 时,我更喜欢将其命名为 MDS(消息传递系统)。因为它不仅被设计为消息队列,而且还被设计为直接将消息传递给应用程序的系统和提供构建应用程序服务的框架的环境。我称它为DotNetMQ,因为它完全是使用 .NET 开发的,而且 DotNetMQ 的名称更令人难忘。因此,它的原始名称(和内部项目名称)是 MDS 并且应用程序有许多带有前缀MDS的类。
首先,我将演示一个需要消息代理的简单情况。
在我的商业生活经验中,我观察到非常糟糕且不常见的异步企业应用程序集成解决方案。通常有一个应用程序在服务器上运行并执行一些任务并产生数据,然后将结果数据发送到另一个服务器上的另一个应用程序。第二个应用程序对数据执行其他任务或评估结果(服务器位于同一网络上或通过 Internet 连接)。此外,消息数据必须是持久的。即使远程应用程序不工作或网络不可用,也必须在第一时间传递消息。
让我们看看下图中的设计。
Application - 1和Application - 2是可执行应用程序(或 Windows 服务),Sender Service是 Windows 服务。应用程序 - 1 执行一些任务,生成数据,并在服务器 - B上调用远程 Web 服务方法来传输数据。此 Web 服务将数据插入到数据库表中。应用程序 - 2 定期检查表中是否有新的传入数据行并处理它们(并将它们从表中删除或将它们标记为已处理以不再处理相同的数据)。
如果在 Web Service 调用期间或在 Web Service 中处理数据时发生错误,则数据不得丢失,必须稍后发送。但是,Application-1 有其他任务要做,所以它不能一次又一次地尝试发送数据。它只是将数据插入到数据库表中。另一个 Windows 服务(或 Application - 1 中的一个线程,如果应用程序始终运行)定期检查此表并尝试将数据发送到 Web 服务,直到成功发送数据。
这种情况确实可靠(保证消息传递),但不是两个应用程序之间通信的有效方式。这个解决方案有一些非常关键的问题:
消息代理完成所有这些工作并承担以最有效的方式将消息传递到远程应用程序的所有责任。使用 DotNetMQ 的相同应用程序集成如下图所示。
DotNetMQ 是在Server-A和Server-B上运行的独立 Windows 服务。因此,您只需要编写代码来与 DotNetMQ 进行通信。使用 DotNetMQ 客户端库,可以非常轻松快速地连接和发送/接收来自 DotNetMQ 服务的消息。Application - 1准备消息,设置目的地,并将消息传递给 DotNetMQ Broker。DotNetMQ 代理将以最有效和最快的方式将消息传递到Application - 2 。
很明显,消息代理需要集成应用程序。我在网上搜索并阅读书籍以找到一个易于与 .NET 一起使用的免费(和开源,如果可用)消息代理。让我们谈谈我的发现:
您会看到上面列表中没有完全使用 .NET 开发的 Message Broker。
从用户的角度来看,我只想将“消息数据、目标服务器和应用程序名称”传递给我的本地代理。我对其余的不感兴趣。它将根据需要多次通过网络路由消息,并将消息传递到目标服务器上的目标应用程序。我的消息系统必须为我提供这种简单性。这是我的第一个起点,我根据这一点评估了 Message Brokers。下图显示了我想要的。
Application - 1将消息传递给本地服务器( Server - A )中的Message Broker:
Server-A与Server-D没有直接连接。所以Message Brokers通过服务器转发消息(消息依次通过Server-A、Server-B、Server-C、Server-D),最终到达Server-D中的Message Broker传递消息到应用程序- 2。请注意,在 Server-E 上运行的 Application-2 的另一个实例,但它没有收到此消息,因为消息的目标服务器是Server-D。
DotNetMQ 提供了这种功能和简单性。它在图上找到从源服务器到目标服务器的最佳(最短)路径并转发消息。
综合介绍完之后,让我们看看如何在实践中使用 DotNetMQ。
目前没有自动安装,但安装 DotNetMQ 非常容易。从文章顶部下载并解压缩二进制文件下载文件。只需将所有内容从那里复制到C:\Program Files\DotNetMQ\并运行INSTALL_x86.bat(或INSTALL_x64.bat,如果您使用的是 64 位操作系统)。
您可以检查 Windows 服务以查看 DotNetMQ 是否已安装并正常工作。
让我们看看 DotNetMQ 的实际应用。为了使第一个应用程序最简单,我假设有两个控制台应用程序在同一台机器上运行(事实上(我们将在本文档后面看到)如果应用程序在不同的机器上没有显着差异;唯一的区别正在正确设置消息中目标服务器的名称)。
我们需要注册一次应用程序才能将它们与 DotNetMQ 一起使用。这是一个非常简单的过程。运行DotNetMQ 管理器(DotNetMQ程序文件夹中的 MDSManager.exe(默认:C:\Program Files\DotNetMQ\ )),然后从应用程序菜单中打开应用程序列表。单击添加新应用程序按钮并输入应用程序的名称。
如上所述,将Application1和Application2应用程序添加到 DotNetMQ。最后,您的应用程序列表必须如下所示。
此屏幕显示已注册到 DotNetMQ 的应用程序。Connected Clients 列显示当前连接到 DotNetMQ 的应用程序实例的计数。由于此屏幕的更改,不需要重新启动DotNetMQ。
在 Visual Studio 中创建一个名为Application1的新控制台应用程序,并添加对MDSCommonLib.dll的引用,该引用提供了连接到 DotNetMQ 所需的类。然后在 Program.cs 文件中编写如下代码:
using System; using System.Text; using MDS.Client; namespace Application1 { class Program { static void Main(string[] args) { //Create MDSClient object to connect to DotNetMQ //Name of this application: Application1 var mdsClient = new MDSClient("Application1"); //Connect to DotNetMQ server mdsClient.Connect(); Console.WriteLine("Write a text and press enter to send " + "to Application2. Write 'exit' to stop application."); while (true) { //Get a message from user var messageText = Console.ReadLine(); if (string.IsNullOrEmpty(messageText) || messageText == "exit") { break; } //Create a DotNetMQ Message to send to Application2 var message = mdsClient.CreateMessage(); //Set destination application name message.DestinationApplicationName = "Application2"; //Set message data message.MessageData = Encoding.UTF8.GetBytes(messageText); //Send message message.Send(); } //Disconnect from DotNetMQ server mdsClient.Disconnect(); } } }
创建MDSClient对象时,我们传递连接到 DotNetMQ 的应用程序名称。使用此构造函数,我们使用默认端口号 (10905) 连接到本地服务器 (127.0.0.1) 上的 DotNetMQ。重载的构造函数可用于连接到另一个服务器和端口。
MDSClient的CreateMessage方法返回一个类型的对象IOutgoingMessage。该MessageData属性是要发送到目标应用程序的实际数据。它是一个字节数组。我们正在使用 UTF8 编码将用户输入文本转换为字节数组。DestinationApplicationName和DestinationServerName属性用于设置消息的目的地址。如果我们不指定目标服务器,则假定它是本地服务器。最后,我们发送消息。
在 Visual Studio 中创建一个名为Application2的新控制台应用程序,添加对MDSCommonLib.dll的引用并编写以下代码:
using System; using System.Text; using MDS.Client; namespace Application2 { class Program { static void Main(string[] args) { //Create MDSClient object to connect to DotNetMQ //Name of this application: Application2 var mdsClient = new MDSClient("Application2"); //Register to MessageReceived event to get messages. mdsClient.MessageReceived += MDSClient_MessageReceived; //Connect to DotNetMQ server mdsClient.Connect(); //Wait user to press enter to terminate application Console.WriteLine("Press enter to exit..."); Console.ReadLine(); //Disconnect from DotNetMQ server mdsClient.Disconnect(); }
/// <summary> /// This method handles received messages from other applications via DotNetMQ. /// </summary> /// <param name="sender"></param> /// <param name="e">Message parameters</param>
static void MDSClient_MessageReceived(object sender, MessageReceivedEventArgs e) { //Get message var messageText = Encoding.UTF8.GetString(e.Message.MessageData); //Process message Console.WriteLine(); Console.WriteLine("Text message received : " + messageText); Console.WriteLine("Source application : " + e.Message.SourceApplicationName); //Acknowledge that message is properly handled //and processed. So, it will be deleted from queue. e.Message.Acknowledge(); } } }
创建MDSClient对象与Application1类似,但应用名称为Application2。MessageReceived要接收应用程序的消息,它需要注册到MDSClient. 然后我们连接到 DotNetMQ 并保持连接,直到用户按下 Enter。
当消息发送到 Application2 时,该MDSClient_MessageReceived方法处理该事件。我们从 的Message属性中得到消息MessageReceivedEventArgs。消息的类型是IIncomingMessage。的MessageData属性IIncomingMessage包含由 Application1 发送的实际消息数据。由于它是一个字节数组,我们使用 UTF8 编码将其转换为字符串。我们将 Application1 发送的消息文本写入控制台屏幕。
处理传入消息后,需要确认消息。这意味着消息被正确接收和正确处理。DotNetMQ 然后从消息队列中删除该消息。我们也可以使用Reject方法(如果我们无法处理错误情况下的消息)。在这种情况下,消息会返回消息队列并稍后发送到目标应用程序(或者如果存在,它将发送到同一服务器上的另一个 Application2 实例)。这是 DotNetMQ 系统的强大机制。因此,保证消息不会丢失并且绝对被处理。如果您不确认或拒绝消息,则将其假定为已拒绝。因此,即使您的应用程序崩溃,您的消息也会在稍后发送回您的应用程序。
如果您运行Application2 的多个实例,哪一个会收到消息?在这种情况下,DotNetMQ 按顺序将消息传递给应用程序。因此,您可以创建多发送者/接收者系统。一条消息仅由一个应用程序实例接收(应用程序接收不同的消息)。DotNetMQ 提供所有功能和同步。
在发送消息之前,您可以像这样设置消息的传输规则:
message.TransmitRule = MessageTransmitRules.NonPersistent;
传输规则分为三种:
由于默认传输规则是StoreAndForward,让我们尝试一下:
即使您在从 Application1 发送消息后从 Windows 服务中停止 DotNetMQ 服务,您的消息也不会丢失。这就是所谓的坚持。
默认情况下,应用程序可以使用 MDSClient ( CommunicationWays.SendAndReceive) 发送和接收消息。如果应用程序不想接收消息,它必须将CommunicationWay属性设置为CommunicationWays.Send。可以在连接之前或与 DotNetMQ 通信期间更改此属性。
默认情况下,如果 MDSClient 断开连接,它会自动重新连接到 DotNetMQ。因此,即使您重新启动 DotNetMQ,也不需要重新启动连接到 DotNetMQ 的应用程序。您可以将该ReConnectServerOnError属性设置false为禁用自动重新连接。
默认情况下,您必须在MessageReceived事件中明确确认消息。否则,它被假定为Rejected。如果您想反对这种方法,则必须将AutoAcknowledgeMessages属性设置为true。在这种情况下,如果您的 MessageReceived事件处理程序没有抛出异常,或者您没有明确地确认/拒绝消息,则会自动确认(如果抛出异常,则消息被拒绝)。
您可以通过两种方式配置 DotNetMQ:使用XML 设置文件或DotNetMQ 管理器(Windows 窗体应用程序)。在这里,我将展示两种方法。某些配置要求您重新启动DotNetMQ,而另一些则不需要。
您只能在一台服务器上运行 DotNetMQ 。在这种情况下,无需为服务器配置任何内容。但是,如果您想在多台服务器上运行 DotNetMQ并使它们与其他服务器通信,则必须定义您的服务器图。
服务器图由两个或多个节点组成。每个节点都是具有IP 地址和TCP 端口(由 DotNetMQ 使用)的服务器。您可以使用 DotNetMQ 管理器配置/设计服务器图。
在上图中,您会看到一个由五个节点组成的服务器图。红色节点代表此服务器(此服务器表示您与 DotNetMQ Manager 连接的服务器)。一条线表示两个节点(它们称为相邻节点)之间存在连接(它们可以发送/接收消息)。图中服务器/节点的名称很重要,在向服务器发送消息时使用。
您可以双击图表中的服务器以更改其属性。要连接两个服务器,请按住 Ctrl,单击第一个,然后单击第二个(要断开连接,请再次执行相同操作)。您可以通过右键单击并选择Set as this server 将服务器设置为此服务器。您还可以使用右键菜单从图表中删除服务器或添加新服务器。最后,您可以通过拖动添加移动服务器。
设计服务器图表后,您必须单击Save & Update Graph按钮以保存更改。更改将保存到DotNetMQ安装文件夹中的 MDSSettings.xml 文件中。您必须重新启动DotNetMQ 才能应用更改。
对于上面的服务器图,对应的MDSSettings.xml设置如下所示:
<?xml version="1.0" encoding="utf-8"?>
<MDSConfiguration>
<Settings>
...
</Settings>
<Servers>
<Server Name="halil_pc" IpAddress="192.168.10.105"
Port="10099" Adjacents="emre_pc" />
<Server Name="emre_pc" IpAddress="192.168.10.244" Port="10099"
Adjacents="halil_pc,out_server,webserver1,webserver2" />
<Server Name="out_server" IpAddress="85.19.100.185"
Port="10099" Adjacents="emre_pc" />
<Server Name="webserver1" IpAddress="192.168.10.263"
Port="10099" Adjacents="emre_pc,webserver2" />
<Server Name="webserver2" IpAddress="192.168.10.44"
Port="10099" Adjacents="emre_pc,webserver1" />
</Servers>
<Applications>
...
</Applications>
<Routes>
...
</Routes>
</MDSConfiguration>
当然,此配置是根据您的实际网络进行的。您必须在图中的所有服务器上安装 DotNetMQ。此外,您必须在所有服务器上配置相同的图表(您可以轻松地将服务器节点从 XML 复制到其他服务器)。
DotNetMQ 使用短路径算法来发送消息(如果设置文件中没有定义手动路由)。考虑在halil_pc上运行的应用程序 A并向webserver2上的应用程序 B发送消息。路径很简单:Application A -> halil_pc -> emre_pc -> webserver2 -> Application B。halil_pc 通过使用服务器图定义知道下一个转发服务器(emre_pc)。
最后,MDSSettings.design.xml文件包含服务器设计信息(屏幕上节点的位置)。这只是 DotNetMQ 管理器的服务器图形窗口中需要的,而不是 DotNetMQ 的运行时所需要的。
如图 - 5 所示,您可以添加/删除使用 DotNetMQ 作为消息代理的应用程序。不需要为这些更改重新启动 DotNetMQ。应用程序设置也保存到MDSSettings.xml文件中,如下所示。
<?xml version="1.0" encoding="utf-8"?>
<MDSConfiguration>
...
<Applications>
<Application Name="Application1" />
<Application Name="Application2" />
</Applications>
...
</MDSConfiguration>
应用程序必须在此列表中才能连接到 DotNetMQ。如果直接更改 XML 文件,则必须重新启动 DotNetMQ 服务器。
DotNetMQ 的一个可用特性是路由。路由设置(目前)仅在 XML 设置文件 ( MDSSettings.xml ) 中配置。您可以在下面的设置文件中看到两种类型的路由:
<?xml version="1.0" encoding="utf-8" ?>
<MDSConfiguration>
...
<Routes>
<Route Name="Route-App2" DistributionType="Sequential" >
<Filters>
<Filter DestinationServer="this" DestinationApplication="Application1" />
</Filters>
<Destinations>
<Destination Server="Server-A" Application="Application1" RouteFactor="1" />
<Destination Server="Server-B" Application="Application1" RouteFactor="1" />
<Destination Server="Server-C" Application="Application1" RouteFactor="1" />
</Destinations>
</Route>
<Route Name="Route-App2" DistributionType="Random" >
<Filters>
<Filter DestinationServer="this" DestinationApplication="Application2" />
<Filter SourceApplication="Application2" TransmitRule="StoreAndForward" />
</Filters>
<Destinations>
<Destination Server="Server-A" Application="Application2" RouteFactor="1" />
<Destination Server="Server-B" Application="Application2" RouteFactor="3" />
</Destinations>
</Route>
</Routes>
...
</MDSConfiguration>
一个Route节点有两个属性:Name是Route条目的用户友好名称(不影响路由),DistributionType是路由的策略。路由策略有两种:
过滤器用于决定消息使用哪个路由。如果消息的属性适用于其中一个过滤器,则路由该消息。定义过滤器有五个条件(XML 属性):
如果未声明一个或多个条件,则在过滤消息时不会考虑它。因此,如果所有条件都为空(或未声明),则所有消息都适合此过滤器。仅当所有条件都适合消息时,才会为消息选择过滤器。如果消息适合(至少)一个路由过滤器,则选择并使用该路由。
目的地用于将消息路由到其他服务器。根据条目的DistributionType属性(前面已解释)选择目的地之一。Route目的地必须定义三个属性:
更改路由后必须重新启动DotNetMQ 。
DotNetMQ 目前支持三种存储类型:SQLite(默认)、MySQL和Memory。您可以在MDSSettings.xml文件中更改存储类型。
<?xml version="1.0" encoding="utf-8"?>
<MDSConfiguration>
...
<Settings>
<Setting Key="ThisServerName" Value="halil_pc" />
<Setting Key="StorageType" Value="SQLite" />
</Settings>
...
</MDSConfiguration>
存储类型必须是以下值之一:
这是使用MySQL-ODBC存储类型的示例配置:
<Settings>
<Setting Key="ThisServerName" Value="halil_pc" />
<Setting Key="StorageType" Value="MySQL-ODBC" />
<Setting Key="ConnectionString"
Value="uid=root;server=localhost;driver={MySQL ODBC 3.51 Driver};database=mds" />
</Settings>
您可以在Setup\Databases文件夹(在 DotNetMQ 安装文件夹中)找到创建 DotNetMQ 使用的数据库和表所需的文件。如果您有问题,请随时向我提问。
还有另一个设置来定义当前/此服务器的名称 ( ThisServerName)。它必须是该Servers部分中的服务器之一。如果您使用 DotNetMQ 管理器来编辑您的服务器图,它会自动设置。
向远程服务器上的应用程序发送消息就像向当前服务器上的应用程序发送消息一样简单。
让我们考虑下面的网络。
ServerA 上运行的应用程序 (Application1) 想要向 ServerC 上的另一个应用程序 (Application2) 发送消息,并且由于防火墙规则,ServerA 和 ServerC 之间没有直接连接。让我们更改我们在第一个应用程序部分中开发的应用程序。
Application2 中甚至没有任何变化。只需在 ServerC 中运行 Application2 并等待传入消息。
Application1 在我们发送消息的方式上有一个小的变化。它必须将DestinationServerName消息设置为ServerC。
var message = mdsClient.CreateMessage();
message.DestinationServerName = "ServerC"; //Set destination server name here!
message.DestinationApplicationName = "Application2";
message.MessageData = Encoding.UTF8.GetBytes(messageText);
message.Send();
就这样。您不必知道 ServerC 在哪里,直接连接到 ServerC...它们都在 DotNetMQ 设置中定义。请注意,如果您不设置DestinationServerName消息的,则假定它是当前/此服务器,并且 DotNetMQ 将消息发送到同一服务器上的应用程序。此外,如果您定义了必要的路由,则不必设置目标服务器:它由 DotNetMQ 自动路由。
当然,必须根据服务器连接(服务器图)正确设置 DotNetMQ 设置,并且必须将 Application1 和 Application2注册到 DotNetMQ 服务器,如配置 DotNetMQ部分所述。
正如您已经看到的,DotNetMQ 可用于构建分布式、负载平衡的应用程序系统。在本节中,我将讨论一个真实的场景:分布式 SMS 处理系统。
假设存在用于投票音乐比赛的短消息(SMS)服务。在所有参赛者都唱完他们的歌曲后,观众会向我们的短信服务发送类似“VOTE 103”的消息,为他们最喜欢的参赛者投票(103 是投票给特定参赛者的示例代码)。并假设此轮询仅在 30 分钟内完成,大约 500 万人将向我们的服务发送 SMS。
我们将接收每条消息,对其进行处理(解析 SMS 文本,更新数据库以增加竞争对手的投票数)并向 SMS 的发送者发送确认消息。我们必须从两台服务器接收消息,在四台服务器上处理消息,并从两台服务器发送确认消息。我们总共有八台服务器。让我们看看我们完整的系统图:
共有三种类型的应用程序:Receiver、Processor和Sender。在这种情况下,您可以使用 DotNetMQ 作为消息队列和负载均衡器,通过配置服务器图和路由来构建分布式、可扩展的消息处理系统,如配置 DotNetMQ部分所述。
在大多数情况下,一个应用程序向另一个应用程序发送消息并获得响应消息。DotNetMQ内置了对这种类型的消息传递的支持。考虑一个用于查询库存状态的服务。有两种类型的消息:
[Serializable] public class StockQueryMessage { public string StockCode { get; set; } } [Serializable] public class StockQueryResultMessage { public string StockCode { get; set; } public int ReservedStockCount { get; set; } public int TotalStockCount { get; set; } }
一个简单的Stock 服务器代码如下所示。
using System; using MDS; using MDS.Client; using StockCommonLib; namespace StockServer { class Program { static void Main(string[] args) { var mdsClient = new MDSClient("StockServer"); mdsClient.MessageReceived += MDSClient_MessageReceived; mdsClient.Connect(); Console.WriteLine("Press enter to exit..."); Console.ReadLine(); mdsClient.Disconnect(); } static void MDSClient_MessageReceived(object sender, MessageReceivedEventArgs e) { //Get message var stockQueryMessage = GeneralHelper.DeserializeObject(e.Message.MessageData) as StockQueryMessage; if (stockQueryMessage == null) { return; } //Write message content Console.WriteLine("Stock Query Message for: " + stockQueryMessage.StockCode); //Get stock counts from a database... int reservedStockCount; int totalStockCount; switch (stockQueryMessage.StockCode) { case "S01": reservedStockCount = 14; totalStockCount = 80; break; case "S02": reservedStockCount = 0; totalStockCount = 25; break; default: //Stock does not exists! reservedStockCount = -1; totalStockCount = -1; break; } //Create a reply message for stock query var stockQueryResult = new StockQueryResultMessage { StockCode = stockQueryMessage.StockCode, ReservedStockCount = reservedStockCount, TotalStockCount = totalStockCount }; //Create a MDS response message to send to client var responseMessage = e.Message.CreateResponseMessage(); responseMessage.MessageData = GeneralHelper.SerializeObject(stockQueryResult); //Send message responseMessage.Send(); //Acknowledge the original request message. //So, it will be deleted from queue. e.Message.Acknowledge(); } } }
库存服务器侦听传入的StockQueryMessage对象并将 a 发送StockQueryResultMessage给发送者。简单地说,我没有从数据库中选择股票。响应消息由传入消息的CreateResponseMessage()方法创建。最后,在发送响应后确认消息。现在我将展示一个简单的股票客户端代码来从服务器获取股票信息:
using System; using MDS; using MDS.Client; using MDS.Communication.Messages; using StockCommonLib; namespace StockApplication { class Program { static void Main(string[] args) { Console.WriteLine("Press enter to query a stock status"); Console.ReadLine(); //Connect to DotNetMQ var mdsClient = new MDSClient("StockClient"); mdsClient.MessageReceived += mdsClient_MessageReceived; mdsClient.Connect(); //Create a stock request message var stockQueryMessage = new StockQueryMessage { StockCode = "S01" }; //Create a MDS message var requestMessage = mdsClient.CreateMessage(); requestMessage.DestinationApplicationName = "StockServer"; requestMessage.TransmitRule = MessageTransmitRules.NonPersistent; requestMessage.MessageData = GeneralHelper.SerializeObject(stockQueryMessage); //Send message and get response var responseMessage = requestMessage.SendAndGetResponse(); //Get stock query result message from response message var stockResult = (StockQueryResultMessage) GeneralHelper.DeserializeObject(responseMessage.MessageData); //Write stock query result Console.WriteLine("StockCode = " + stockResult.StockCode); Console.WriteLine("ReservedStockCount = " + stockResult.ReservedStockCount); Console.WriteLine("TotalStockCount = " + stockResult.TotalStockCount); //Acknowledge received message responseMessage.Acknowledge(); Console.ReadLine(); //Disconnect from DotNetMQ server. mdsClient.Disconnect(); } static void mdsClient_MessageReceived(object sender, MessageReceivedEventArgs e) { //Simply acknowledge other received messages e.Message.Acknowledge(); } } }
在上面的示例中,TransmitRule选择为NonPersistent显示示例用法。当然,您可以发送StoreAndForward(持久)消息。这是正在运行的应用程序的示例屏幕截图:
SOA(面向服务的架构)多年来一直是一个流行的概念。Web 服务和 WCF 是 SOA 的两个主要解决方案。通常,不期望消息队列系统支持 SOA。此外,消息传递是一个异步且松耦合的过程,而 Web 服务方法调用通常是同步且紧耦合的。甚至(正如您在前面的示例应用程序中看到的)消息传递并不像调用远程方法那么容易。但是,当您的消息数量增加时,您的应用程序会变得复杂且难以维护。
DotNetMQ 支持基于持久或非持久消息的远程方法调用机制。所以可以异步调用远程方法,保证调用,保证成功!
在这里,我们将开发一个简单的服务,可以用来发送短信和电子邮件。也许不需要编写发送电子邮件/短信的服务,所有应用程序都可以自己完成。但是想象一下,您有许多正在发送电子邮件的应用程序。如果邮件服务器在发送电子邮件时出现问题怎么办?应用程序必须尝试直到成功发送电子邮件。所以你必须在你的应用程序中建立一个队列机制来尝试一次又一次地发送电子邮件。在最坏的情况下,您的应用程序可能是一个短时间运行的应用程序(例如 Web 服务),或者必须在发送电子邮件之前关闭。但是您必须在邮件服务器上线时发送电子邮件,并且邮件不能丢失。
在这种情况下,您可以开发一个单独的邮件/短信服务,该服务将尝试发送短信/邮件,直到成功发送。您可以开发一个邮件服务,通过 DotNetMQ 接收邮件请求并仅在成功发送电子邮件时确认请求(消息)。如果发送失败,只是不确认(或拒绝)该消息,因此稍后将再次尝试。
我们将首先开发邮件/短信服务。为此,我们必须定义一个从MDSService基类传递的类:
using System; using MDS.Client.MDSServices; namespace SmsMailServer { [MDSService(Description = "This service is a " + "sample mail/sms service.", Version = "1.0.0.0")] public class MyMailSmsService : MDSService { //All parameters and return values can be defined. [MDSServiceMethod(Description = "This method is used send an SMS.")] public void SendSms( [MDSServiceMethodParameter("Phone number to send SMS.")] string phone, [MDSServiceMethodParameter("SMS text to be sent.")] string smsText) { //Process SMS Console.WriteLine("Sending SMS to phone: " + phone); Console.WriteLine("Sms Text: " + smsText); //Acknowledge the message IncomingMessage.Acknowledge(); } //You do not have to define any parameters [MDSServiceMethod] public void SendEmail(string emailAddress, string header, string body) { //Process email Console.WriteLine("Sending an email to " + emailAddress); Console.WriteLine("Header: " + header); Console.WriteLine("Body : " + body); //Acknowledge the message IncomingMessage.Acknowledge(); } // A simple method just to show return values. [MDSServiceMethod] [return: MDSServiceMethodParameter("True, if phone number is valid.")] public bool IsValidPhone([MDSServiceMethodParameter( "Phone number to send SMS.")] string phone) { //Acknowledge the message IncomingMessage.Acknowledge(); //Return result return (phone.Length == 10); } } }
如您所见,它只是一个用属性修饰的常规 C# 类。和属性必须定义,所有其他属性都是可选的(但最好写出来)MDSService。MDSServiceMethod我们很快就会看到为什么使用它们)。您的服务方法必须具有该MDSServiceMethod属性。如果您不想公开某些方法,只需不要添加该MDSServiceMethod属性。
我们还必须在服务方法中确认消息。否则,消息(导致此方法调用的)不会从消息队列中删除,我们的方法将被再次调用。如果我们无法处理它,我们也可以拒绝该消息(例如,如果邮件服务器不工作并且我们无法发送电子邮件)。如果我们拒绝该消息,它将稍后发送给我们(可靠性)。您可以使用类的IncomingMessage属性访问原始消息MDSService。RemoteApplication此外,您可以使用该属性获取远程应用程序(称为服务方法)的信息。
创建适当的服务类后,我们必须创建一个应用程序来运行它。这是一个运行我们的简单控制台应用程序MyMailSmsService:
using System; using MDS.Client.MDSServices; namespace SmsMailServer { class Program { static void Main(string[] args) { using (var service = new MDSServiceApplication("MyMailSmsService")) { service.AddService(new MyMailSmsService()); service.Connect(); Console.WriteLine("Press any key to stop service"); Console.ReadLine(); } } } }
如您所见,创建和运行服务只需三行代码。由于MDSService是可销毁的,您可以使用using声明。此外,您可以使用MDSServiceApplication 的Disconnect方法手动关闭服务。您可以使用MDSServiceApplication的AddService方法在单个服务上运行多个服务。
要开发使用 DotNetMQ 服务的应用程序,您必须创建服务代理(如 Web 服务和 WCF)。为此,您可以使用MDSServiceProxyGenerator工具。首先,编译您的服务项目,然后运行MDSServiceProxyGenerator.exe(在 DotNetMQ 安装文件夹中)。
选择您的服务程序集文件(此示例项目中的SmsMailServer.exe)。您可以选择服务类或生成给定程序集中所有服务的代理。输入命名空间和目标文件夹以生成代理类。生成代理类后,您可以将其添加到您的项目中。
我不会展示这个代理类的内部结构,你必须知道它(你可以在源代码中看到它,它是一个非常简单的类)。您的方法/参数属性用于在此代理文件中生成代码注释。
将生成的代理类添加到我们的项目后,我们可以像简单的方法调用一样简单地向服务发送消息:
using System; using MDS.Client; using MDS.Client.MDSServices; using SampleService; namespace SmsMailClient { class Program { static void Main(string[] args) { Console.WriteLine("Press enter to test SendSms method"); Console.ReadLine(); //Application3 is name of an application that sends sms/email. using (var serviceConsumer = new MDSServiceConsumer("Application3")) { //Connect to DotNetMQ server serviceConsumer.Connect(); //Create service proxy to call remote methods var service = new MyMailSmsServiceProxy(serviceConsumer, new MDSRemoteAppEndPoint("MyMailSmsService")); //Call SendSms method service.SendSms("3221234567", "Hello service!"); } } } }
您还可以调用服务的其他方法,并像常规方法调用一样获取返回值。实际上,您的方法调用被转换为可靠的消息。例如,即使远程应用MyMailSmsService程序SendSms(
TransmitRule您可以使用服务代理的属性更改消息传递的传输规则。如果一个服务方法返回void,它的传输规则是StoreAndForward默认的。如果服务方法返回值,则方法调用不能可靠(因为方法调用是同步的并等待结果),它的规则是DirectlySend.
您可以选择任何类型作为方法参数。如果它是主要类型(string, int, byte...),则无需额外设置,但如果您想使用自己的类作为方法参数,则必须将类标记为,Serializable因为 DotNetMQ 使用二进制序列化作为参数。
请注意,在运行此示例之前,您必须将 MyMailSmsService和Application3注册到 DotNetMQ。
当然,您可以在 Web 服务中连接到 DotNetMQ,因为它也是一个 .NET 应用程序。但是,如果您想编写一个 ASP.NET Web 方法来处理应用程序的消息(并且可以在同一上下文中回复消息),该怎么办?Web 服务适合这种请求/回复风格的方法调用。
DotNetMQ 支持 ASP.NET Web 服务并且可以将消息传递到 Web 服务。示例(在下载文件中)中有一个模板 Web 服务来完成此操作。定义如下:
using System;
using System.Web.Services;
using MDS.Client.WebServices;
[WebService(Namespace = "http://www.dotnetmq.com/mds")]
[WebServiceBinding(ConformsTo = WsiProfiles.BasicProfile1_1)]
public class MDSAppService : WebService
{
/// <summary>
/// MDS server sends messages to this method.
/// </summary>
/// <param name="bytesOfMessage">Byte array form of message</param>
/// <returns>Response message to incoming message</returns>
[WebMethod(Description = "Receives incoming messages to this web service.")]
public byte[] ReceiveMDSMessage(byte[] bytesOfMessage)
{
var message = WebServiceHelper.DeserializeMessage(bytesOfMessage);
try
{
var response = ProcessMDSMessage(message);
return WebServiceHelper.SerializeMessage(response);
}
catch (Exception ex)
{
var response = message.CreateResponseMessage();
response.Result.Success = false;
response.Result.ResultText =
"Error in ProcessMDSMessage method: " + ex.Message;
return WebServiceHelper.SerializeMessage(response);
}
}
/// <summary>
/// Processes incoming messages to this web service.
/// </summary>
/// <param name="message">Message to process</param>
/// <returns>Response Message</returns>
private IWebServiceResponseMessage
ProcessMDSMessage(IWebServiceIncomingMessage message)
{
//Process message
//Send response/result
var response = message.CreateResponseMessage();
response.Result.Success = true;
return response;
}
}
您不更改ReceiveMDSMessage方法,并且必须如上所示在ProcessMDSMessage方法中处理消息。此外,您必须在MDSSettings.xml中定义Web 服务的地址,如下所示。您还可以使用 DotNetMQ 管理工具添加 Web 服务。
...
<Applications>
<Application Name="SampleWebServiceApp">
<Communication Type="WebService"
Url="http://localhost/SampleWebApplication/SampleService.asmx" />
</Application>
</Applications>
...
DotNetMQ 中的消息传递有一些测试结果:
消息:
方法调用(在 DotNetMQ 服务中)
测试平台:Intel Core 2 Duo 3,00 GHz CPU。2 GB RAM 电脑。在同一台计算机上运行的两个应用程序之间进行消息/调用。