Tuesday, January 4, 2011

Preparation part 2

Class services Queues:

Warning! Consumer may not send messages and producer may not receive. So do not be surprised.

  1. public class AmqQueClient
  2.   {
  3.     private Connection MyConnection;
  4.     private MessageConsumer MyConsumer;
  5.     private ActiveMQDestination MyDestination;
  6.     private ConnectionFactory MyFactory;
  7.     private MessageProducer MyProducer;
  8.     private Session MySession;
  9.     public string Er = string.Empty;
  10.     public readonly string AmqUrl;
  11.     public readonly string AmqQue;
  12.     public readonly bool IsProducer;
  13.  
  14.  
  15.     public AmqQueClient(string amqurl, string amqQue, bool isproducer)
  16.     {
  17.       AmqUrl = amqurl;
  18.       AmqQue = amqQue;
  19.       IsProducer = isproducer;
  20.     }
  21.  
  22.     public void AmqConnect()
  23.     {
  24.     if (string.IsNullOrEmpty(AmqUrl)) throw new ApplicationException("AmqUrl null or empty");
  25.     if (string.IsNullOrEmpty(AmqQue)) throw new ApplicationException("AmqQue null or empty");
  26.       MyFactory = new ConnectionFactory(AmqUrl);
  27.       {
  28.         try
  29.         {
  30.           MyConnection = MyFactory.CreateConnection() as Connection;
  31.           if (MyConnection != null)
  32.           {
  33.             MyConnection.AsyncSend = true;
  34.             MyConnection.AsyncClose = true;
  35.             MyConnection.ExceptionListener += ConnectionExceptionListener;
  36.             MySession = MyConnection.CreateSession() as Session;
  37.  
  38.             if (MySession != null)
  39.             {
  40.               MyDestination = MySession.GetQueue(AmqQue) as ActiveMQDestination;
  41.               if (MyDestination != null)
  42.               {
  43.                 if (IsProducer)
  44.                 {
  45.               MyProducer = MySession.CreateProducer(MyDestination) as MessageProducer;
  46.                   if (MyProducer == null)
  47.                   {
  48.                     MySession.Dispose();
  49.                     MyConnection.Dispose();
  50.                     Er = "Error:AMQ Producer nullable.";
  51.                     return;
  52.                   }
  53.                 }
  54.                 if (!IsProducer)
  55.                 {
  56.               MyConsumer = MySession.CreateConsumer(MyDestination) as MessageConsumer;
  57.                   if (MyConsumer == null)
  58.                   {
  59.                     MySession.Dispose();
  60.                     MyConnection.Dispose();
  61.                     Er = "Error:AMQ Consumer nullable.";
  62.                     return;
  63.                   }
  64.                 }
  65.  
  66.                 if (!IsProducer)
  67.                 {
  68.                   while (MyConsumer.ReceiveNoWait() != null)
  69.                   {
  70.                   }
  71.                   MyConsumer.Listener += OnMassage;
  72.                   MyConnection.Start();
  73.                 }
  74.               }
  75.               else
  76.               {
  77.                 MySession.Dispose();
  78.                 MyConnection.Dispose();
  79.                 Er = "Error:AMQ Destination nullable.";
  80.               }
  81.             }
  82.             else
  83.             {
  84.               MyConnection.Dispose();
  85.               Er = "Error:AMQ Session nullable.";
  86.             }
  87.           }
  88.           else
  89.           {
  90.             Er = "Error:AMQ Connection nullable.";
  91.           }
  92.         }
  93.         catch (Exception ex)
  94.         {
  95.           Er = "Error:AMQ Connection Error.";
  96.           if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
  97.         }
  98.       }
  99.     }
  100.  
  101.     private void ConnectionExceptionListener(Exception ex)
  102.     {
  103.       if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
  104.     }
  105.  
  106.     public void Close()
  107.     {
  108.       try
  109.       {
  110.         if (MyProducer != null)
  111.         {
  112.           MyProducer.Dispose();
  113.         }
  114.         if (MyConsumer != null)
  115.         {
  116.           MyConsumer.Close();
  117.           MyConsumer.Listener -= OnMassage;
  118.           MyConsumer.Dispose();
  119.         }
  120.  
  121.  
  122.         if (MyConnection != null)
  123.         {
  124.           if (MyConnection.IsStarted)
  125.             MyConnection.Stop();
  126.           MyConnection.Close();
  127.           MyConnection.ExceptionListener -= ConnectionExceptionListener;
  128.           MyConnection.Dispose();
  129.         }
  130.       }
  131.       catch (Exception ex)
  132.       {
  133.         if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
  134.       }
  135.     }
  136.  
  137.     public event NewDataReceive NewDataReceive;
  138.  
  139.     public event AmqClientStatusEventHandler AmqClientStatusDebug;
  140.  
  141.     public event AmqClientStatusEventHandler AmqClientStatusError;
  142.  
  143.     private void OnMassage(IMessage m)
  144.     {
  145.       if (NewDataReceive != null)
  146.         NewDataReceive(m);
  147.     }
  148.  
  149.     private delegate void MySendDelegate(object o, TimeSpan ts);
  150.     public void Send(object o, TimeSpan ts)
  151.     {
  152.       (new MySendDelegate(MySend)).BeginInvoke(o, ts, null, null);
  153.     }
  154.  
  155.     private void MySend(object o, TimeSpan ts)
  156.     {
  157.       try
  158.       {
  159.         if (MySession != null)
  160.         {
  161.           IMessage request = MySession.CreateObjectMessage(o);
  162.  
  163.           if (MyProducer != null)
  164.           {            
  165.             if (request != null)
  166.             {
  167.               request.NMSTimeToLive = ts;
  168.               MyProducer.Send(request);
  169.             }
  170.         if (AmqClientStatusDebug != null) AmqClientStatusDebug("Object sent:" + o.GetType());
  171.           }
  172.         }
  173.       }
  174.  
  175.       catch (Exception ex)
  176.       {
  177.         if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
  178.       }
  179.     }
  180.   }
* This source code was highlighted with Source Code Highlighter.

Any question please post in comments.

No comments:

Post a Comment