Warning! Consumer may not send messages and producer may not receive. So do not be surprised.
* This source code was highlighted with Source Code Highlighter.
- public class AmqQueClient
- {
- private Connection MyConnection;
- private MessageConsumer MyConsumer;
- private ActiveMQDestination MyDestination;
- private ConnectionFactory MyFactory;
- private MessageProducer MyProducer;
- private Session MySession;
- public string Er = string.Empty;
- public readonly string AmqUrl;
- public readonly string AmqQue;
- public readonly bool IsProducer;
- public AmqQueClient(string amqurl, string amqQue, bool isproducer)
- {
- AmqUrl = amqurl;
- AmqQue = amqQue;
- IsProducer = isproducer;
- }
- public void AmqConnect()
- {
- if (string.IsNullOrEmpty(AmqUrl)) throw new ApplicationException("AmqUrl null or empty");
- if (string.IsNullOrEmpty(AmqQue)) throw new ApplicationException("AmqQue null or empty");
- MyFactory = new ConnectionFactory(AmqUrl);
- {
- try
- {
- MyConnection = MyFactory.CreateConnection() as Connection;
- if (MyConnection != null)
- {
- MyConnection.AsyncSend = true;
- MyConnection.AsyncClose = true;
- MyConnection.ExceptionListener += ConnectionExceptionListener;
- MySession = MyConnection.CreateSession() as Session;
- if (MySession != null)
- {
- MyDestination = MySession.GetQueue(AmqQue) as ActiveMQDestination;
- if (MyDestination != null)
- {
- if (IsProducer)
- {
- MyProducer = MySession.CreateProducer(MyDestination) as MessageProducer;
- if (MyProducer == null)
- {
- MySession.Dispose();
- MyConnection.Dispose();
- Er = "Error:AMQ Producer nullable.";
- return;
- }
- }
- if (!IsProducer)
- {
- MyConsumer = MySession.CreateConsumer(MyDestination) as MessageConsumer;
- if (MyConsumer == null)
- {
- MySession.Dispose();
- MyConnection.Dispose();
- Er = "Error:AMQ Consumer nullable.";
- return;
- }
- }
- if (!IsProducer)
- {
- while (MyConsumer.ReceiveNoWait() != null)
- {
- }
- MyConsumer.Listener += OnMassage;
- MyConnection.Start();
- }
- }
- else
- {
- MySession.Dispose();
- MyConnection.Dispose();
- Er = "Error:AMQ Destination nullable.";
- }
- }
- else
- {
- MyConnection.Dispose();
- Er = "Error:AMQ Session nullable.";
- }
- }
- else
- {
- Er = "Error:AMQ Connection nullable.";
- }
- }
- catch (Exception ex)
- {
- Er = "Error:AMQ Connection Error.";
- if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
- }
- }
- }
- private void ConnectionExceptionListener(Exception ex)
- {
- if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
- }
- public void Close()
- {
- try
- {
- if (MyProducer != null)
- {
- MyProducer.Dispose();
- }
- if (MyConsumer != null)
- {
- MyConsumer.Close();
- MyConsumer.Listener -= OnMassage;
- MyConsumer.Dispose();
- }
- if (MyConnection != null)
- {
- if (MyConnection.IsStarted)
- MyConnection.Stop();
- MyConnection.Close();
- MyConnection.ExceptionListener -= ConnectionExceptionListener;
- MyConnection.Dispose();
- }
- }
- catch (Exception ex)
- {
- if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
- }
- }
- public event NewDataReceive NewDataReceive;
- public event AmqClientStatusEventHandler AmqClientStatusDebug;
- public event AmqClientStatusEventHandler AmqClientStatusError;
- private void OnMassage(IMessage m)
- {
- if (NewDataReceive != null)
- NewDataReceive(m);
- }
- private delegate void MySendDelegate(object o, TimeSpan ts);
- public void Send(object o, TimeSpan ts)
- {
- (new MySendDelegate(MySend)).BeginInvoke(o, ts, null, null);
- }
- private void MySend(object o, TimeSpan ts)
- {
- try
- {
- if (MySession != null)
- {
- IMessage request = MySession.CreateObjectMessage(o);
- if (MyProducer != null)
- {
- if (request != null)
- {
- request.NMSTimeToLive = ts;
- MyProducer.Send(request);
- }
- if (AmqClientStatusDebug != null) AmqClientStatusDebug("Object sent:" + o.GetType());
- }
- }
- }
- catch (Exception ex)
- {
- if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
- }
- }
- }
Any question please post in comments.
No comments:
Post a Comment