In the future I will try to make entire code of demonstator application available to download.
Step 1: Prepare wrapper class to make usage little easier for topics and queues:
I need a little bit to explain how the code works. In principle, each instance of the class allows you to connect to a single topic or queue. Client connects to the topic itself as consumer and porducer but for queues you have to choose producer or consumer side:) . In my scheduled test this would be enough.
So let's begin:
Delegate are in the same namespace as both class(for topic and queue).
First delegate is for messages second for errors:
* This source code was highlighted with Source Code Highlighter.
- public delegate void NewDataReceive(IMessage m);
- public delegate void AmqClientStatusEventHandler(string msg);
Topics class:
As shown below in the method AmqConnect () has been enabled asynchronous message send, closing and confirmation messages although I'm not sure last one is needed since it is automatic confirmation mode.
Rest of the code should be quite clear but If you have any question please post comments.
Usage: class AmqTopicClient
* This source code was highlighted with Source Code Highlighter.
- public class AmqTopicClient
- {
- private Connection MyConnection;
- private MessageConsumer MyConsumer;
- private ActiveMQDestination MyDestination;
- private ConnectionFactory MyFactory;
- private MessageProducer MyProducer;
- private Session Mysession;
- public string Er = string.Empty;
- public readonly string AmqUrl;
- public readonly string AmqQue;
- public AmqTopicClient(string amqurl, string amqQue)
- {
- AmqUrl = amqurl;
- AmqQue = amqQue;
- }
- public void AmqConnect()
- {
- if (string.IsNullOrEmpty(AmqUrl)) throw new ApplicationException("AmqUrl null or empty");
- if (string.IsNullOrEmpty(AmqQue)) throw new ApplicationException("AmqQue null or empty");
- MyFactory = new ConnectionFactory(AmqUrl);
- {
- try
- {
- MyConnection = MyFactory.CreateConnection() as Connection;
- if (MyConnection != null)
- {
- MyConnection.AsyncSend = true;
- MyConnection.AsyncClose = true;
- MyConnection.AcknowledgementMode = AcknowledgementMode.AutoAcknowledge;
- MyConnection.ProducerWindowSize = 1024000;
- MyConnection.SendAcksAsync = true;
- MyConnection.ExceptionListener += ConnectionExceptionListener;
- Mysession = MyConnection.CreateSession() as Session;
- if (Mysession != null)
- {
- MyDestination = Mysession.GetTopic(AmqQue) as ActiveMQDestination;
- MyProducer = Mysession.CreateProducer(MyDestination) as MessageProducer;
- if (MyProducer != null) MyProducer.DeliveryMode = MsgDeliveryMode.NonPersistent;
- MyConsumer = Mysession.CreateConsumer(MyDestination) as MessageConsumer;
- if (MyConsumer == null)
- {
- Mysession.Dispose();
- MyConnection.Dispose();
- Er = "Error:AMQ Consumer nullable.";
- return;
- }
- while (MyConsumer.ReceiveNoWait() != null)
- {
- }
- MyConsumer.Listener += OnMassage;
- MyConnection.Start();
- }
- else
- {
- MyConnection.Dispose();
- Er = "Error:AMQ Session nullable.";
- }
- }
- else
- {
- Er = "Error:AMQ Connection nullable.";
- }
- }
- catch (Exception ex)
- {
- Er = "Error:AMQ Connection Error.";
- if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
- }
- }
- }
- private void ConnectionExceptionListener(Exception ex)
- {
- if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
- }
- public void Close()
- {
- try
- {
- if (MyProducer != null)
- {
- MyProducer.Dispose();
- }
- if (MyConsumer != null)
- {
- MyConsumer.Listener -= OnMassage;
- MyConsumer.Close();
- }
- if (MyConnection != null)
- {
- if (MyConnection.IsStarted)
- MyConnection.Stop();
- MyConnection.Close();
- MyConnection.ExceptionListener -= ConnectionExceptionListener;
- }
- }
- catch (Exception ex)
- {
- if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
- }
- }
- public event NewDataReceive NewDataReceive;
- public event AmqClientStatusEventHandler AmqClientStatusDebug;
- public event AmqClientStatusEventHandler AmqClientStatusError;
- private void OnMassage(IMessage m)
- {
- if (NewDataReceive != null)
- NewDataReceive(m);
- }
- private delegate void MySendDelegate(object o);
- public void Send(object o)
- {
- (new MySendDelegate(MySend)).BeginInvoke(o, null, null);
- }
- public void SendSync(object o)
- {
- MySend(o);
- }
- public void MySend(object o)
- {
- try
- {
- if (Mysession != null)
- {
- ActiveMQMessage request = Mysession.CreateObjectMessage(o) as ActiveMQObjectMessage;
- if (MyProducer != null)
- {
- if (request != null)
- {
- MyProducer.Send(request);
- if (AmqClientStatusDebug != null) AmqClientStatusDebug("Object sent:" + o);
- }
- }
- }
- }
- catch (Exception ex)
- {
- if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
- }
- }
- private delegate void MyTxtSendDelegate(string o);
- public void TxtSend(string o)
- {
- (new MyTxtSendDelegate(MyTxtSend)).BeginInvoke(o, null, null);
- }
- public void MyTxtSend(string o)
- {
- try
- {
- if (Mysession != null)
- {
- ActiveMQMessage request = Mysession.CreateTextMessage(o) as ActiveMQTextMessage;
- if (MyProducer != null)
- {
- if (request != null)
- {
- MyProducer.Send(request);
- if (AmqClientStatusDebug != null) AmqClientStatusDebug("Text sent:" + o);
- }
- }
- }
- }
- catch (Exception ex)
- {
- if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
- }
- }
- }
For Queues code look at Introduction part 2
_
_
_
_
_
No comments:
Post a Comment