Tuesday, January 4, 2011

Preparation part 1

So first I going to show you how I'm using ActiveMQ .net C# client in my tests. Client activemq version is 1.4.1.

In the future I will try to make entire code of demonstator application available to download.


Step 1: Prepare wrapper class to make usage little easier for topics and queues:

I need a little bit to explain how the code works. In principle, each instance of the class allows you to connect to a single topic or queue. Client connects to the topic itself as consumer and porducer but for queues you have to choose producer or consumer side:) . In my scheduled test this would be enough.


So let's begin:

Delegate are in the same namespace as both class(for topic and queue). 
First delegate is for messages second for errors:

  1. public delegate void NewDataReceive(IMessage m);
  2.  
  3. public delegate void AmqClientStatusEventHandler(string msg);
* This source code was highlighted with Source Code Highlighter.

Topics class:

As shown below in the method AmqConnect () has been enabled asynchronous message send, closing and confirmation messages although I'm not sure last one is needed since it is automatic confirmation mode. 

Rest of the code should be quite clear but If you have any question please post comments.

  1. public class AmqTopicClient
  2.   {
  3.     private Connection MyConnection;
  4.     private MessageConsumer MyConsumer;
  5.     private ActiveMQDestination MyDestination;
  6.     private ConnectionFactory MyFactory;
  7.     private MessageProducer MyProducer;
  8.     private Session Mysession;
  9.     public string Er = string.Empty;
  10.     public readonly string AmqUrl;
  11.     public readonly string AmqQue;
  12.  
  13.     public AmqTopicClient(string amqurl, string amqQue)
  14.     {
  15.       AmqUrl = amqurl;
  16.       AmqQue = amqQue;
  17.     }
  18.  
  19.     public void AmqConnect()
  20.     {
  21.      if (string.IsNullOrEmpty(AmqUrl)) throw new ApplicationException("AmqUrl null or empty");
  22.      if (string.IsNullOrEmpty(AmqQue)) throw new ApplicationException("AmqQue null or empty");
  23.       MyFactory = new ConnectionFactory(AmqUrl);
  24.  
  25.       {
  26.         try
  27.         {
  28.           MyConnection = MyFactory.CreateConnection() as Connection;
  29.  
  30.           if (MyConnection != null)
  31.           {
  32.             MyConnection.AsyncSend = true;
  33.             MyConnection.AsyncClose = true;
  34.             MyConnection.AcknowledgementMode = AcknowledgementMode.AutoAcknowledge;
  35.             MyConnection.ProducerWindowSize = 1024000;
  36.             MyConnection.SendAcksAsync = true;
  37.             MyConnection.ExceptionListener += ConnectionExceptionListener;
  38.             Mysession = MyConnection.CreateSession() as Session;
  39.  
  40.             if (Mysession != null)
  41.             {
  42.               MyDestination = Mysession.GetTopic(AmqQue) as ActiveMQDestination;
  43.  
  44.               MyProducer = Mysession.CreateProducer(MyDestination) as MessageProducer;
  45.           if (MyProducer != null) MyProducer.DeliveryMode = MsgDeliveryMode.NonPersistent;
  46.               MyConsumer = Mysession.CreateConsumer(MyDestination) as MessageConsumer;
  47.  
  48.               if (MyConsumer == null)
  49.               {
  50.                 Mysession.Dispose();
  51.                 MyConnection.Dispose();
  52.                 Er = "Error:AMQ Consumer nullable.";
  53.                 return;
  54.               }
  55.  
  56.               while (MyConsumer.ReceiveNoWait() != null)
  57.               {
  58.               }
  59.  
  60.               MyConsumer.Listener += OnMassage;
  61.               MyConnection.Start();
  62.             }
  63.             else
  64.             {
  65.               MyConnection.Dispose();
  66.               Er = "Error:AMQ Session nullable.";
  67.             }
  68.           }
  69.           else
  70.           {
  71.  
  72.             Er = "Error:AMQ Connection nullable.";
  73.           }
  74.         }
  75.         catch (Exception ex)
  76.         {
  77.           Er = "Error:AMQ Connection Error.";
  78.           if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
  79.         }
  80.       }
  81.     }
  82.  
  83.     private void ConnectionExceptionListener(Exception ex)
  84.     {
  85.       if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
  86.     }
  87.  
  88.     public void Close()
  89.     {
  90.       try
  91.       {
  92.         if (MyProducer != null)
  93.         {
  94.           MyProducer.Dispose();
  95.         }
  96.         if (MyConsumer != null)
  97.         {
  98.           MyConsumer.Listener -= OnMassage;
  99.           MyConsumer.Close();
  100.         }
  101.  
  102.  
  103.         if (MyConnection != null)
  104.         {
  105.           if (MyConnection.IsStarted)
  106.             MyConnection.Stop();
  107.           MyConnection.Close();
  108.           MyConnection.ExceptionListener -= ConnectionExceptionListener;
  109.         }
  110.       }
  111.       catch (Exception ex)
  112.       {
  113.         if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
  114.       }
  115.     }
  116.  
  117.     public event NewDataReceive NewDataReceive;
  118.  
  119.     public event AmqClientStatusEventHandler AmqClientStatusDebug;
  120.  
  121.     public event AmqClientStatusEventHandler AmqClientStatusError;
  122.  
  123.     private void OnMassage(IMessage m)
  124.     {
  125.       if (NewDataReceive != null)
  126.         NewDataReceive(m);
  127.     }
  128.  
  129.  
  130.     private delegate void MySendDelegate(object o);
  131.     public void Send(object o)
  132.     {
  133.       (new MySendDelegate(MySend)).BeginInvoke(o, null, null);
  134.     }
  135.     public void SendSync(object o)
  136.     {
  137.       MySend(o);
  138.     }
  139.     public void MySend(object o)
  140.     {
  141.       try
  142.       {
  143.         if (Mysession != null)
  144.         {
  145.         ActiveMQMessage request = Mysession.CreateObjectMessage(o) as ActiveMQObjectMessage;
  146.           
  147.           if (MyProducer != null)
  148.           {
  149.             if (request != null)
  150.             {              
  151.               MyProducer.Send(request);
  152.              if (AmqClientStatusDebug != null) AmqClientStatusDebug("Object sent:" + o);
  153.  
  154.             }
  155.           }
  156.         }
  157.       }
  158.       catch (Exception ex)
  159.       {
  160.         if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
  161.  
  162.       }
  163.  
  164.     }
  165.     private delegate void MyTxtSendDelegate(string o);
  166.     public void TxtSend(string o)
  167.     {
  168.       (new MyTxtSendDelegate(MyTxtSend)).BeginInvoke(o, null, null);
  169.     }
  170.     public void MyTxtSend(string o)
  171.     {
  172.       try
  173.       {
  174.         if (Mysession != null)
  175.         {
  176.           ActiveMQMessage request = Mysession.CreateTextMessage(o) as ActiveMQTextMessage;
  177.           if (MyProducer != null)
  178.           {
  179.             if (request != null)
  180.             {
  181.              MyProducer.Send(request);
  182.              if (AmqClientStatusDebug != null) AmqClientStatusDebug("Text sent:" + o);
  183.             }
  184.           }
  185.         }
  186.       }
  187.       catch (Exception ex)
  188.       {
  189.         if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
  190.       }
  191.     }
  192.   }
* This source code was highlighted with Source Code Highlighter.
If you have any question please post a comment.

For Queues code look at Introduction part 2
_
_
_

No comments:

Post a Comment