VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > temp > C#教程 >
  • C#消息队列应用程序

制作者:剑锋冷月 单位:无忧统计网,www.51stat.net
 

  简介

  Microsoft 近期推出了一种用于生成集成应用程序的新平台——Microsoft .NET 框架。.NET 框架允许开发人员使用任何编程语言迅速生成和部署 Web 服务和应用程序。Microsoft Intermediate Language (MSIL) 和实时 (JIT) 编译器使这种不依赖语言的框架得以实现。

  与 .NET 框架同时面世的还有一种新的编程语言 C#(读作“C sharp”)。C# 是一种简单、新颖、面向对象和类型安全的编程语言。利用 .NET 框架和 C#(除 Microsoft? Visual Basic? 和 Managed C++ 之外),用户可以编写功能强大的 Microsoft Windows? 和 Web 应用程序及服务。本文提供了这样的一个解决方案,它的重点是 .NET 框架和 C# 而不是编程语言。C# 语言的介绍可以在“ C# 简介和概述(英文)”找到。

  近期的文章“MSMQ:可伸缩、高可用性的负载平衡解决方案(英文)”介绍了一种解决方案,用于高可用性消息队列 (MSMQ) 的可伸缩负载平衡解决方案体系结构。此解决方案中涉及了一种将 Windows 服务用作智能消息路由器的开发方案。这样的解决方案以前只有 Microsoft Visual C++? 程序员才能实现,而 .NET 框架的出现改变了这种情况。从下面的解决方案中,您可以看到这一点。

  .NET 框架应用程序

  这里介绍的解决方案是一种用来处理若干消息队列的 Windows 服务;其中每个队列都是由多个线程进行处理(接收和处理消息)。处理程序使用循环法技术或应用程序特定值(消息 AppSpecific 属性)从目的队列列表中路由消息,并使用消息属性来调用组件方法。(示例进程也属于这种情况。)在后一种情况下,组件的要求是它能够实现给定的接口 IWebMessage。要处理错误,应用程序需要将不能处理的消息发送到错误队列中。

 

  消息应用程序的结构与以前的活动模板库 (ATL) 应用程序相似,它们之间的主要不同在于用于管理服务的代码的封装和 .NET 框架组件的使用。要创建 Windows 服务,.NET 框架用户仅仅需要创建一个从 ServiceBase(来自 System.ServiceControl 程序集)继承的类。这毫不奇怪,因为 .NET 框架是面向对象的。

  应用程序结构

  应用程序中主要的类是 ServiceControl,它是从 ServiceBase 继承的。因而,它必须实现 OnStart 和 OnStop 方法,以及可选的 OnPause 和 OnContinue 方法。事实上,类是在静态方法 Main 内构造的:

  using System;
using System.ServiceProcess;
public class ServiceControl: ServiceBase
{
 // 创建服务对象的主入口点
 public static void Main()
 {
  ServiceBase.Run(new ServiceControl());
 }
 // 定义服务参数的构造对象
 public ServiceControl()
 {
  CanPauseAndContinue = true;
  ServiceName = "MSDNMessageService";
  AutoLog = false;
 }
 protected override void OnStart(string[] args) {...}
 protected override void OnStop() {...}
 protected override void OnPause() {...}
 protected override void OnContinue() {...}
}

  ServiceControl 类创建一系列 CWorker 对象,即,为需要处理的每个消息队列创建 CWorker 类的一个实例。根据定义中处理队列所需的线程数目,CWorker 类依次创建了一系列的 CWorkerThread 对象。CWorkerThread 类创建的一个处理线程将执行实际的服务工作。

  使用 CWorker 和 CWorkerThread 类的主要目的是确认服务控件 Start、Stop、Pause 和 Continue 命令。因为这些进程必须是无阻塞的,命令操作最终将在后台处理线程上执行。

 

  CWorkerThread 是一个抽象类,被 CWorkerThreadAppSpecific 、CWorkerThreadRoundRobin 和 CWorkerThreadAssembly 继承。这些类以不同的方式处理消息。前两个类通过给另一队列发送消息来处理消息(其不同之处在于确定接收队列路径的方式),最后一个类则使用消息属性来调用组件方法。

  .NET 框架内部的错误处理是以基类 Exception 为基础的。当系统引发或捕获错误时,这些错误必须是从 Exception 中导出的类。CWorkerThreadException 类就是这样一种实现,它通过附加额外属性(用于定义服务是否应继续运行)来扩展基类。

  最后,应用程序包含两种结构。这些值类型定义了辅助进程或线程的运行时参数,以简化 CWorker 和 CWorkerThread 对象的结构。使用值类型结构(而不是引用类型类)能够确保这些运行时参数维护的是数值(而不是引用)。

  IWebMessage 接口

  CWorkerThread 的实现之一是一个调用组件方法的类。这个名为 CWorkerThreadAssembly 的类使用 IWebMessage 接口来定义服务和组件之间的约定。

  与当前版本的 Microsoft Visual Studio? 不同,C# 接口可以在任何语言中显式定义,而不需要创建和编译 IDL 文件。C# IWebMessage 接口的定义如下:

  public interface IWebMessage
{
 WebMessageReturn Process(string sMessageLabel, string sMessageBody, int iAppSpecific);
 void Release();
}

  ATL 代码中的 Process 方法是为处理消息而指定的。Process 方法的返回代码定义为枚举类型 WebMessageReturn:

  public enum WebMessageReturn
{
 ReturnGood,
 ReturnBad,
 ReturnAbort
}

 

  枚举的定义如下:Good 表示继续处理,Bad 表示将消息写入错误队列,Abort 表示终止处理。Release 方法为服务提供了轻松清除类实例的途径。因为仅在垃圾回收的过程中才调用类实例的析构函数,所以确保所有占用昂贵资源(例如数据库连接)的类都有一个能够在析构之前被调用的方法,用来释放这些资源,这是一种非常好的构思。

  名称空间

  在这里先简单介绍一下名称空间。名称空间允许在内部和外部表示中将应用程序组织成为逻辑元素。服务内的所有代码都包含在 MSDNMessageService.Service 名称空间内。尽管服务代码包含在若干文件中,但是由于它们包含在同一名称空间中,因此用户不需要引用其他文件。

  由于 IWebMessage 接口包含在 MSDNMessageService.Interface 名称空间中,因此使用此接口的线程类具有一个接口名称空间。

  服务类

  应用程序的目的是监视和处理消息队列,每一队列在收到消息时都执行不同的进程。应用程序是作为 Windows 服务来实现的。

  ServiceBase 类

  如前所述,服务的基本结构是从 ServiceBase 继承的类。重要的方法包括 OnStart、OnStop、OnPause 和 OnContinue,每一个替代方法都与一个服务控制操作直接对应。OnStart 方法的目的是创建 CWorker 对象,而 CWorker 类又创建 CWorkerThread 对象,然后在该对象中创建执行服务工作的线程。

  服务的运行时配置(以及 CWorker 和 CWorkerThread 对象的属性)是在基于 XML 的配置文件中维护的。它的名称与创建的 .exe 文件相同,但带有一个 .cfg 后缀。配置示例如下:

  <?xml version="1.0"?>
<configuration>
<ProcessList>
 <ProcessDefinition
    ProcessName="Worker1"
    ProcessDesc="Message Worker with 2 Threads"
    ProcessType="AppSpecific"
    ProcessThreads="2"
    InputQueue=".\private$\test_load1"
    ErrorQueue=".\private$\test_error">
  <OutputList>
   <OutputDefinition OutputName=".\private$\test_out11" />
   <OutputDefinition OutputName=".\private$\test_out12" />
  </OutputList>
 </ProcessDefinition>
 <ProcessDefinition
    ProcessName="Worker2"
    ProcessDesc="Assembly Worker with 1 Thread"
    ProcessType="Assembly"
    ProcessThreads="1"
    InputQueue=".\private$\test_load2"
    ErrorQueue=".\private$\test_error">
  <OutputList>
   <OutputDefinition OutputName="C:\MSDNMessageService\MessageExample.dll" />
   <OutputDefinition OutputName="MSDNMessageService.MessageSample.ExampleClass"/>
  </OutputList>
 </ProcessDefinition>
</ProcessList>
</configuration>

 

  对此信息的访问通过来自 System.Configuration 程序集的 ConfigManager 类来管理。静态 Get 方法返回信息的集合,这些集合将被枚举以获得单个属性。这些属性集的设置决定了辅助对象的运行时特征。除了这一配置文件,您还应该创建定义 XML 文件结构的图元文件,并在其中引用位于服务器 machine.cfg 配置文件中的图元文件:

  <?xml version ="1.0"?>
<MetaData xmlns="x-schema:CatMeta.xms">
 <DatabaseMeta InternalName="MessageService">
 <ServerWiring Interceptor="Core_XMLInterceptor"/>
 <Collection
   InternalName="Process" PublicName="ProcessList"
   PublicRowName="ProcessDefinition"
   SchemaGeneratorFlags="EMITXMLSCHEMA">
  <Property InternalName="ProcessName" Type="String" MetaFlags="PRIMARYKEY" />
  <Property InternalName="ProcessDesc" Type="String" />
  <Property InternalName="ProcessType" Type="Int32" DefaultValue="RoundRobin" >
   <Enum InternalName="RoundRobin" Value="0"/>
   <Enum InternalName="AppSpecific" Value="1"/>
   <Enum InternalName="Assembly" Value="2"/>
  </Property>
  <Property InternalName="ProcessThreads" Type="Int32" DefaultValue="1" />
  <Property InternalName="InputQueue" Type="String" />
  <Property InternalName="ErrorQueue" Type="String" />
  <Property InternalName="OutputName" Type="String" />
  <QueryMeta InternalName="All" MetaFlags="ALL" />
  <QueryMeta InternalName="QueryByFile" CellName="__FILE" Operator="EQUAL" />
 </Collection>
 <Collection
   InternalName="Output" PublicName="OutputList"
   PublicRowName="OutputDefinition"
   SchemaGeneratorFlags="EMITXMLSCHEMA">
  <Property InternalName="ProcessName" Type="String" MetaFlags="PRIMARYKEY" />
  <Property InternalName="OutputName" Type="String" MetaFlags="PRIMARYKEY" />
   <QueryMeta InternalName="All" MetaFlags="ALL" />
   <QueryMeta InternalName="QueryByFile" CellName="__FILE" Operator="EQUAL" />
  </Collection>
 </DatabaseMeta>
 <RelationMeta
   PrimaryTable="Process" PrimaryColumns="ProcessName"
   ForeignTable="Output" ForeignColumns="ProcessName"
   MetaFlags="USECONTAINMENT"/>
</MetaData>

 

  由于 Service 类必须维护一个已创建辅助对象的列表,因此使用了 Hashtable 集合,用于保持类型对象的名称/数值对列表。Hashtable 不仅支持枚举,还允许通过关键字来查询值。在应用程序中,XML 进程名称是唯一的关键字:

  private Hashtable htWorkers = new Hashtable();
IConfigCollection cWorkers = ConfigManager.Get("ProcessList", new AppDomainSelector());
foreach (IConfigItem ciWorker in cWorkers)
{
 WorkerFormatter sfWorker = new WorkerFormatter();
 sfWorker.ProcessName = (string)ciWorker["ProcessName"];
 sfWorker.ProcessDesc = (string)ciWorker["ProcessDesc"];
 sfWorker.NumberThreads = (int)ciWorker["ProcessThreads"];
 sfWorker.InputQueue = (string)ciWorker["InputQueue"];
 sfWorker.ErrorQueue = (string)ciWorker["ErrorQueue"];
 // 计算并定义进程类型
 switch ((int)ciWorker["ProcessType"])
 {
  case 0:
   sfWorker.ProcessType = WorkerFormatter.SFProcessType.ProcessRoundRobin;
   break;
  case 1:
   sfWorker.ProcessType = WorkerFormatter.SFProcessType.ProcessAppSpecific;
   break;
  case 2:
   sfWorker.ProcessType = WorkerFormatter.SFProcessType.ProcessAssembly;
   break;
  default:
  throw new Exception("Unknown Processing Type");
 }
 // 执行更多的工作以读取输出信息
 string sProcessName = (string)ciWorker["ProcessName"];
 if (htWorkers.ContainsKey(sProcessName))
  throw new ArgumentException("Process Name Must be Unique: " + sProcessName);
  htWorkers.Add(sProcessName, new CWorker(sfWorker));
}

 

  在这段代码中没有包含的主要信息是输出数据的获取。每一个进程定义中都有一组相应的输出定义项。该信息是通过如下的简单查询读取的:

  string sQuery = "SELECT * FROM OutputList WHERE ProcessName=" +
sfWorker.ProcessName + " AND Selector=appdomain://";
ConfigQuery qQuery = new ConfigQuery(sQuery);
IConfigCollection cOutputs = ConfigManager.Get("OutputList", qQuery);
int iSize = cOutputs.Count, iLoop = 0;
sfWorker.OutputName = new string[iSize];
foreach (IConfigItem ciOutput in cOutputs)
sfWorker.OutputName[iLoop++] = (string)ciOutput["OutputName"];

  CWorkerThread 和 Cworker 类都有相应的服务控制方法,根据服务控制操作进行调用。由于 Hashtable 中引用了每一个 CWorker 对象,因此需要枚举 Hashtable 的内容,以调用适当的服务控制方法:

  foreach (CWorker cWorker in htWorkers.Values)
 cWorker.Start();

  类似地,实现的 OnPause、OnContinue 和 OnStop 方法是通过调用 CWorker 对象上的相应方法来执行操作的。

  CWorker 类

  CWorker 类的主要功能是创建和管理 CWorkerThread 对象。Start、Stop、Pause 和 Continue 方法调用相应的 CWorkerThread 方法。实际的 CWorkerThread 对象是在Start 方法中创建的。与使用 Hashtable 管理辅助对象引用的 Service 类相似,CWorker 使用 ArrayList(简单的动态数组)来维护线程对象的列表。

  在这个数组内部,CWorker 类创建了 CWorkerThread 类的一个实现版本。CWorkerThread 类(将在下面讨论)是一个必须继承的抽象类。导出类定义了消息的处理方式:

 

  aThreads = new ArrayList();
for (int idx=0; idx<sfWorker.NumberThreads; idx++)
{
 WorkerThreadFormatter wfThread = new WorkerThreadFormatter();
 wfThread.ProcessName = sfWorker.ProcessName;
 wfThread.ProcessDesc = sfWorker.ProcessDesc;
 wfThread.ThreadNumber = idx;
 wfThread.InputQueue = sfWorker.InputQueue;
 wfThread.ErrorQueue = sfWorker.ErrorQueue;
 wfThread.OutputName = sfWorker.OutputName;
 // 定义辅助类型,并将其插入辅助线程结构
 CWorkerThread wtBase;
 switch (sfWorker.ProcessType)
 {
  case WorkerFormatter.SFProcessType.ProcessRoundRobin:
   wtBase = new CWorkerThreadRoundRobin(this, wfThread);
   break;
  case WorkerFormatter.SFProcessType.ProcessAppSpecific:
   wtBase = new CWorkerThreadAppSpecific(this, wfThread);
   break;
  case WorkerFormatter.SFProcessType.ProcessAssembly:
   wtBase = new CWorkerThreadAssembly(this, wfThread);
   break;
  default:
   throw new Exception("Unknown Processing Type");
 }
 // 添加对数组的调用
 aThreads.Insert(idx, wtBase);
}

  一旦所有的对象都已创建,就可以通过调用每个线程对象的 Start 方法来启动它们:

  foreach(CWorkerThread cThread in aThreads)
 cThread.Start();

  Stop、Pause 和 Continue 方法在 foreach 循环里执行的操作类似。Stop 方法具有如下的垃圾收集操作:

 

  GC.SuppressFinalize(this);

  在类析构函数中将调用 Stop 方法,这样,在没有显式调用 Stop 方法的情况下也可以正确地终止对象。如果调用了 Stop 方法,将不需要析构函数。SuppressFinalize 方法能够防止调用对象的 Finalize 方法(析构函数的实际实现)。

  CWorkerThread 抽象类

  CWorkerThread 是一个由 CWorkerThreadAppSpecifc、CWorkerThreadRoundRobin 和 CWorkerThreadAssembly 继承的抽象类。无论如何处理消息,队列的大部分处理是相同的,所以 CWorkerThread 类提供了这一功能。这个类提供了抽象方法(必须被实际方法替代)以管理资源和处理消息。

  类的工作再一次通过 Start、Stop、Pause 和 Continue 方法来实现。在 Start 方法中引用了输入和错误队列。在 .NET 框架中,消息由 System.Messaging 名称空间处理:

  // 尝试打开队列,并设置默认的读写属性
MessageQueue mqInput = new MessageQueue(sInputQueue);
mqInput.MessageReadPropertyFilter.Body = true;
mqInput.MessageReadPropertyFilter.AppSpecific = true;
MessageQueue mqError = new MessageQueue(sErrorQueue);
// 如果使用 MSMQ COM,则将格式化程序设置为 ActiveX
mqInput.Formatter = new ActiveXMessageFormatter();
mqError.Formatter = new ActiveXMessageFormatter();

  一旦定义了消息队列引用,即会创建一个线程用于实际的处理函数(称为 ProcessMessages)。在 .NET 框架中,使用 System.Threading 名称空间很容易实现线程处理:

  procMessage = new Thread(new ThreadStart(ProcessMessages));
procMessage.Start();

 

  ProcessMessages 函数是基于 Boolean 值的处理循环。当数值设为 False,处理循环将终止。因此,线程对象的 Stop 方法只设置这一 Boolean 值,然后关闭打开的消息队列,并加入带有主线程的线程:

  // 加入服务线程和处理线程
bRun = false;
procMessage.Join();
// 关闭打开的消息队列
mqInput.Close();
mqError.Close();

  Pause 方法只设置一个 Boolean 值,使处理线程休眠半秒钟:

  if (bPause)
 Thread.Sleep(500);

  最后,每一个 Start、Stop、Pause 和 Continue 方法将调用抽象的 OnStart、OnStop、OnPause 和 OnContinue 方法。这些抽象方法为实现的类提供了挂钩,以捕获和释放所需的资源。

  ProcessMessages 循环具有如下基本结构:

  1、接收 Message。

  2、如果 Message 具有成功的 Receive,则调用抽象 ProcessMessage 方法。

  3、如果 Receive 或 ProcessMessage 失败,将 Message 发送至错误队列中。

  Message mInput;
try
{
 // 从队列中读取,并等候 1 秒
 mInput = mqInput.Receive(new TimeSpan(0,0,0,1));
}
catch (MessageQueueException mqe)
{
 // 将消息设置为 null
 mInput = null;
 // 查看错误代码,了解是否超时
 if (mqe.ErrorCode != (-1072824293) ) //0xC00E001B
 {
  // 如果未超时,发出一个错误并记录错误号
  LogError("Error: " + mqe.Message);
  throw mqe;
 }
}
if (mInput != null)
{
 // 得到一个要处理的消息,调用处理消息抽象方法
 try
 {
  ProcessMessage(mInput);
 }
 // 捕获已知异常状态的错误
 catch (CWorkerThreadException ex)
 {
  ProcessError(mInput, ex.Terminate);
 }
 // 捕获未知异常,并调用 Terminate
 catch
 {
  ProcessError(mInput, true);
 }
}

 

  消息应用程序的结构与以前的活动模板库 (ATL) 应用程序相似,它们之间的主要不同在于用于管理服务的代码的封装和 .NET 框架组件的使用。要创建 Windows 服务,.NET 框架用户仅仅需要创建一个从 ServiceBase(来自 System.ServiceControl 程序集)继承的类。这毫不奇怪,因为 .NET 框架是面向对象的。

  应用程序结构

  应用程序中主要的类是 ServiceControl,它是从 ServiceBase 继承的。因而,它必须实现 OnStart 和 OnStop 方法,以及可选的 OnPause 和 OnContinue 方法。事实上,类是在静态方法 Main 内构造的:

  using System;
using System.ServiceProcess;
public class ServiceControl: ServiceBase
{
 // 创建服务对象的主入口点
 public static void Main()
 {
  ServiceBase.Run(new ServiceControl());
 }
 // 定义服务参数的构造对象
 public ServiceControl()
 {
  CanPauseAndContinue = true;
  ServiceName = "MSDNMessageService";
  AutoLog = false;
 }
 protected override void OnStart(string[] args) {...}
 protected override void OnStop() {...}
 protected override void OnPause() {...}
 protected override void OnContinue() {...}
}

  ServiceControl 类创建一系列 CWorker 对象,即,为需要处理的每个消息队列创建 CWorker 类的一个实例。根据定义中处理队列所需的线程数目,CWorker 类依次创建了一系列的 CWorkerThread 对象。CWorkerThread 类创建的一个处理线程将执行实际的服务工作。

  使用 CWorker 和 CWorkerThread 类的主要目的是确认服务控件 Start、Stop、Pause 和 Continue 命令。因为这些进程必须是无阻塞的,命令操作最终将在后台处理线程上执行。



相关教程