As I promissed I'm going to test newest version of ActiveMQ client actually NMS library which is responsible for a connection and reconnection.
New version of demonstrator is available fo download at: Demonstrator Application NMS 1.5.0
Usage, test scenarios is the same as in the previous test Topic test. ActiveMQ client 1.4.1. NMS 1.4.0
Test results:
Tests 1 and 3 have been successfully completed. All messages were sent and received. In test numer 2 demonstrator did not reach very few messages. You can talk about a few messages to several thousand sended but it's still understandable when client connects and disconnects from broker.
Unfortunately, the last test again has not been finished successfully and memory usage continues to rise.
Lets wait for NMS 1.5.1
Competition and comparison using test scenarios between ActiveMQ RabbitMQ and Qpid. Usage examples in .net C#
Showing posts with label ActiveMQ. Show all posts
Showing posts with label ActiveMQ. Show all posts
Monday, January 17, 2011
Friday, January 7, 2011
Topic test. ActiveMQ client 1.4.1. NMS 1.4.0
In this article I'm going to test behavior of ActiveMQ .net C# client in version 1.4.1 and NMS 1.4.0.
To see how I'm doing it look at: Preparation for use Topic and My Topic usage
Let's see my four test scenarios:
For testing were used 4 brokers in version 5.4.2 in failover mode with configuration:
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.base}/conf/credentials.properties</value>
</property>
</bean>
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.base}/data" destroyApplicationContextOnStop="true">
<networkConnectors>
<networkConnector name="test-net" uri="static:(tcp://192.168.44.133:61616,tcp://192.168.4.61:61616,tcp://192.168.4.62)"/>
</networkConnectors>
<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
</transportConnectors>
</broker>
<import resource="jetty.xml"/>
</beans>
You can do the test on your own using my demonstrator application. The demonstrator application is available for download from: Demonstrator application topic test or Demonstrator application topic test.
Few words about usage:
To see how I'm doing it look at: Preparation for use Topic and My Topic usage
Let's see my four test scenarios:
- Client connect to one broker then start to generate messages (10 per second) and recive.
- Client connect to one broker then start to generate messages (10 per second) and recive but between sending messages he disconnect from broker and connect again.
- Client connect to the brokers with failover option:
failover://(tcp://192.168.44.133:61616,tcp://192.168.4.61:61616,tcp://192.168.4.62:61616,tcp://192.168.4.40:61616)
then start to generate messages (10 per second) and recive. - Client connect to the brokers with failover option:
failover://(tcp://192.168.44.133:61616,tcp://192.168.4.61:61616,tcp://192.168.4.62:61616,tcp://192.168.4.40:61616)
then start to generate messages (10 per second) and recive but between sending messages it disconnects from broker and connects again.
For testing were used 4 brokers in version 5.4.2 in failover mode with configuration:
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.base}/conf/credentials.properties</value>
</property>
</bean>
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.base}/data" destroyApplicationContextOnStop="true">
<networkConnectors>
<networkConnector name="test-net" uri="static:(tcp://192.168.44.133:61616,tcp://192.168.4.61:61616,tcp://192.168.4.62)"/>
</networkConnectors>
<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
</transportConnectors>
</broker>
<import resource="jetty.xml"/>
</beans>
You can do the test on your own using my demonstrator application. The demonstrator application is available for download from: Demonstrator application topic test or Demonstrator application topic test.
Few words about usage:
- Fill the fields in step 1 and push the button Connect
- Choose generator type, set time for test (in hours) and timer interval for sending messages (in milisecond)
Test results:
Tests 1 and 3 have been successfully completed. All messages are sent and received. In test 2 some messages disappeared but it's understandable when a client connects and disconnects from broker.
The worst falls last test. Actually it's not coming to an end. When a client running in failover connection and disconnection causes a rapid increase memory usage and the application stops responding!!!
This bug is described on ActiveMQ jira with key AMQNET-298 .
Don't worry!!! Guys from ActiveMQ team already have fixed this bug in NMS version nms-1.5.0 .
The next test will check if the bug has been fixed!
The worst falls last test. Actually it's not coming to an end. When a client running in failover connection and disconnection causes a rapid increase memory usage and the application stops responding!!!
This bug is described on ActiveMQ jira with key AMQNET-298 .
Don't worry!!! Guys from ActiveMQ team already have fixed this bug in NMS version nms-1.5.0 .
The next test will check if the bug has been fixed!
Wednesday, January 5, 2011
Topic usage
As I wrote in ActiveMQ preparation part 1 "In the future I will try to make entire code of demonstator application available to download." so now I going to show you usage of the class AmqTopicClient.
1. Connect to the broker and declare a topic.
2. Send simple message string or object type.
2. Synchronous is similar (just add "My" before method name):
-string
-object
1. Connect to the broker and declare a topic.
In the constructor, the first parameter of type string is the address of the broker and the second is the name of the topic.
Then add the event handlers for incoming messages and occuring errors. Finally connect to the broker.
Then add the event handlers for incoming messages and occuring errors. Finally connect to the broker.
* This source code was highlighted with Source Code Highlighter.
- private AmqTopicClient amqTopicClient;
- amqTopicClient = new AmqTopicClient(AMQuRL.Text, AmqTopic.Text);
- amqTopicClient.NewDataReceive += AmqTopicClient_NewDataReceive;
- amqTopicClient.AmqClientStatusError += AmqTopicClient_AmqClientStatusError;
- amqTopicClient.AmqConnect();
2. Send simple message string or object type.
To send messages are two ways. Read carefully!
1. Asynchronous send a command to send a message.
2. Synchronous send a command to send a message.
Sure it seems confusing. But if anyone reviewed the code and read what I wrote now it follows that the application would asynchronously send messages that are sent asynchronously.
1. Asynchronous send a command to send a message.
2. Synchronous send a command to send a message.
Sure it seems confusing. But if anyone reviewed the code and read what I wrote now it follows that the application would asynchronously send messages that are sent asynchronously.
Usage is very simple:
1. Asynchronous
-string
* This source code was highlighted with Source Code Highlighter.
- amqTopicClient.TxtSend(textBox1.Text + " " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"));
-object
* This source code was highlighted with Source Code Highlighter.
- amqTopicClient.Send((textBox1.Text + " " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"))as object);
-string
* This source code was highlighted with Source Code Highlighter.
- amqTopicClient.MyTxtSend(textBox1.Text + " " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"));
* This source code was highlighted with Source Code Highlighter.
- amqTopicClient.MySend((textBox1.Text + " " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"))as object);
Tuesday, January 4, 2011
Preparation part 2
Class services Queues:
Any question please post in comments.
Warning! Consumer may not send messages and producer may not receive. So do not be surprised.
* This source code was highlighted with Source Code Highlighter.
- public class AmqQueClient
- {
- private Connection MyConnection;
- private MessageConsumer MyConsumer;
- private ActiveMQDestination MyDestination;
- private ConnectionFactory MyFactory;
- private MessageProducer MyProducer;
- private Session MySession;
- public string Er = string.Empty;
- public readonly string AmqUrl;
- public readonly string AmqQue;
- public readonly bool IsProducer;
- public AmqQueClient(string amqurl, string amqQue, bool isproducer)
- {
- AmqUrl = amqurl;
- AmqQue = amqQue;
- IsProducer = isproducer;
- }
- public void AmqConnect()
- {
- if (string.IsNullOrEmpty(AmqUrl)) throw new ApplicationException("AmqUrl null or empty");
- if (string.IsNullOrEmpty(AmqQue)) throw new ApplicationException("AmqQue null or empty");
- MyFactory = new ConnectionFactory(AmqUrl);
- {
- try
- {
- MyConnection = MyFactory.CreateConnection() as Connection;
- if (MyConnection != null)
- {
- MyConnection.AsyncSend = true;
- MyConnection.AsyncClose = true;
- MyConnection.ExceptionListener += ConnectionExceptionListener;
- MySession = MyConnection.CreateSession() as Session;
- if (MySession != null)
- {
- MyDestination = MySession.GetQueue(AmqQue) as ActiveMQDestination;
- if (MyDestination != null)
- {
- if (IsProducer)
- {
- MyProducer = MySession.CreateProducer(MyDestination) as MessageProducer;
- if (MyProducer == null)
- {
- MySession.Dispose();
- MyConnection.Dispose();
- Er = "Error:AMQ Producer nullable.";
- return;
- }
- }
- if (!IsProducer)
- {
- MyConsumer = MySession.CreateConsumer(MyDestination) as MessageConsumer;
- if (MyConsumer == null)
- {
- MySession.Dispose();
- MyConnection.Dispose();
- Er = "Error:AMQ Consumer nullable.";
- return;
- }
- }
- if (!IsProducer)
- {
- while (MyConsumer.ReceiveNoWait() != null)
- {
- }
- MyConsumer.Listener += OnMassage;
- MyConnection.Start();
- }
- }
- else
- {
- MySession.Dispose();
- MyConnection.Dispose();
- Er = "Error:AMQ Destination nullable.";
- }
- }
- else
- {
- MyConnection.Dispose();
- Er = "Error:AMQ Session nullable.";
- }
- }
- else
- {
- Er = "Error:AMQ Connection nullable.";
- }
- }
- catch (Exception ex)
- {
- Er = "Error:AMQ Connection Error.";
- if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
- }
- }
- }
- private void ConnectionExceptionListener(Exception ex)
- {
- if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
- }
- public void Close()
- {
- try
- {
- if (MyProducer != null)
- {
- MyProducer.Dispose();
- }
- if (MyConsumer != null)
- {
- MyConsumer.Close();
- MyConsumer.Listener -= OnMassage;
- MyConsumer.Dispose();
- }
- if (MyConnection != null)
- {
- if (MyConnection.IsStarted)
- MyConnection.Stop();
- MyConnection.Close();
- MyConnection.ExceptionListener -= ConnectionExceptionListener;
- MyConnection.Dispose();
- }
- }
- catch (Exception ex)
- {
- if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
- }
- }
- public event NewDataReceive NewDataReceive;
- public event AmqClientStatusEventHandler AmqClientStatusDebug;
- public event AmqClientStatusEventHandler AmqClientStatusError;
- private void OnMassage(IMessage m)
- {
- if (NewDataReceive != null)
- NewDataReceive(m);
- }
- private delegate void MySendDelegate(object o, TimeSpan ts);
- public void Send(object o, TimeSpan ts)
- {
- (new MySendDelegate(MySend)).BeginInvoke(o, ts, null, null);
- }
- private void MySend(object o, TimeSpan ts)
- {
- try
- {
- if (MySession != null)
- {
- IMessage request = MySession.CreateObjectMessage(o);
- if (MyProducer != null)
- {
- if (request != null)
- {
- request.NMSTimeToLive = ts;
- MyProducer.Send(request);
- }
- if (AmqClientStatusDebug != null) AmqClientStatusDebug("Object sent:" + o.GetType());
- }
- }
- }
- catch (Exception ex)
- {
- if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
- }
- }
- }
Any question please post in comments.
Preparation part 1
So first I going to show you how I'm using ActiveMQ .net C# client in my tests. Client activemq version is 1.4.1.
In the future I will try to make entire code of demonstator application available to download.
Step 1: Prepare wrapper class to make usage little easier for topics and queues:
So let's begin:
Delegate are in the same namespace as both class(for topic and queue).
First delegate is for messages second for errors:
Topics class:
If you have any question please post a comment.
For Queues code look at Introduction part 2
In the future I will try to make entire code of demonstator application available to download.
Step 1: Prepare wrapper class to make usage little easier for topics and queues:
I need a little bit to explain how the code works. In principle, each instance of the class allows you to connect to a single topic or queue. Client connects to the topic itself as consumer and porducer but for queues you have to choose producer or consumer side:) . In my scheduled test this would be enough.
So let's begin:
Delegate are in the same namespace as both class(for topic and queue).
First delegate is for messages second for errors:
* This source code was highlighted with Source Code Highlighter.
- public delegate void NewDataReceive(IMessage m);
- public delegate void AmqClientStatusEventHandler(string msg);
Topics class:
As shown below in the method AmqConnect () has been enabled asynchronous message send, closing and confirmation messages although I'm not sure last one is needed since it is automatic confirmation mode.
Rest of the code should be quite clear but If you have any question please post comments.
Usage: class AmqTopicClient
* This source code was highlighted with Source Code Highlighter.
- public class AmqTopicClient
- {
- private Connection MyConnection;
- private MessageConsumer MyConsumer;
- private ActiveMQDestination MyDestination;
- private ConnectionFactory MyFactory;
- private MessageProducer MyProducer;
- private Session Mysession;
- public string Er = string.Empty;
- public readonly string AmqUrl;
- public readonly string AmqQue;
- public AmqTopicClient(string amqurl, string amqQue)
- {
- AmqUrl = amqurl;
- AmqQue = amqQue;
- }
- public void AmqConnect()
- {
- if (string.IsNullOrEmpty(AmqUrl)) throw new ApplicationException("AmqUrl null or empty");
- if (string.IsNullOrEmpty(AmqQue)) throw new ApplicationException("AmqQue null or empty");
- MyFactory = new ConnectionFactory(AmqUrl);
- {
- try
- {
- MyConnection = MyFactory.CreateConnection() as Connection;
- if (MyConnection != null)
- {
- MyConnection.AsyncSend = true;
- MyConnection.AsyncClose = true;
- MyConnection.AcknowledgementMode = AcknowledgementMode.AutoAcknowledge;
- MyConnection.ProducerWindowSize = 1024000;
- MyConnection.SendAcksAsync = true;
- MyConnection.ExceptionListener += ConnectionExceptionListener;
- Mysession = MyConnection.CreateSession() as Session;
- if (Mysession != null)
- {
- MyDestination = Mysession.GetTopic(AmqQue) as ActiveMQDestination;
- MyProducer = Mysession.CreateProducer(MyDestination) as MessageProducer;
- if (MyProducer != null) MyProducer.DeliveryMode = MsgDeliveryMode.NonPersistent;
- MyConsumer = Mysession.CreateConsumer(MyDestination) as MessageConsumer;
- if (MyConsumer == null)
- {
- Mysession.Dispose();
- MyConnection.Dispose();
- Er = "Error:AMQ Consumer nullable.";
- return;
- }
- while (MyConsumer.ReceiveNoWait() != null)
- {
- }
- MyConsumer.Listener += OnMassage;
- MyConnection.Start();
- }
- else
- {
- MyConnection.Dispose();
- Er = "Error:AMQ Session nullable.";
- }
- }
- else
- {
- Er = "Error:AMQ Connection nullable.";
- }
- }
- catch (Exception ex)
- {
- Er = "Error:AMQ Connection Error.";
- if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
- }
- }
- }
- private void ConnectionExceptionListener(Exception ex)
- {
- if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
- }
- public void Close()
- {
- try
- {
- if (MyProducer != null)
- {
- MyProducer.Dispose();
- }
- if (MyConsumer != null)
- {
- MyConsumer.Listener -= OnMassage;
- MyConsumer.Close();
- }
- if (MyConnection != null)
- {
- if (MyConnection.IsStarted)
- MyConnection.Stop();
- MyConnection.Close();
- MyConnection.ExceptionListener -= ConnectionExceptionListener;
- }
- }
- catch (Exception ex)
- {
- if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
- }
- }
- public event NewDataReceive NewDataReceive;
- public event AmqClientStatusEventHandler AmqClientStatusDebug;
- public event AmqClientStatusEventHandler AmqClientStatusError;
- private void OnMassage(IMessage m)
- {
- if (NewDataReceive != null)
- NewDataReceive(m);
- }
- private delegate void MySendDelegate(object o);
- public void Send(object o)
- {
- (new MySendDelegate(MySend)).BeginInvoke(o, null, null);
- }
- public void SendSync(object o)
- {
- MySend(o);
- }
- public void MySend(object o)
- {
- try
- {
- if (Mysession != null)
- {
- ActiveMQMessage request = Mysession.CreateObjectMessage(o) as ActiveMQObjectMessage;
- if (MyProducer != null)
- {
- if (request != null)
- {
- MyProducer.Send(request);
- if (AmqClientStatusDebug != null) AmqClientStatusDebug("Object sent:" + o);
- }
- }
- }
- }
- catch (Exception ex)
- {
- if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
- }
- }
- private delegate void MyTxtSendDelegate(string o);
- public void TxtSend(string o)
- {
- (new MyTxtSendDelegate(MyTxtSend)).BeginInvoke(o, null, null);
- }
- public void MyTxtSend(string o)
- {
- try
- {
- if (Mysession != null)
- {
- ActiveMQMessage request = Mysession.CreateTextMessage(o) as ActiveMQTextMessage;
- if (MyProducer != null)
- {
- if (request != null)
- {
- MyProducer.Send(request);
- if (AmqClientStatusDebug != null) AmqClientStatusDebug("Text sent:" + o);
- }
- }
- }
- }
- catch (Exception ex)
- {
- if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
- }
- }
- }
For Queues code look at Introduction part 2
_
_
_
_
_
Subscribe to:
Posts (Atom)