Monday, January 17, 2011

Topic test. ActiveMQ client 1.4.1. NMS 1.5.0(RC)

As I promissed I'm going to test newest version of ActiveMQ client actually NMS library which is responsible for a connection and reconnection.

New version of demonstrator is available fo download at:  Demonstrator Application NMS 1.5.0
Usage, test scenarios is the same as in the previous test Topic test. ActiveMQ client 1.4.1. NMS 1.4.0

Test results:
Tests 1 and 3 have been successfully completed. All messages were sent and received. In test numer 2 demonstrator did not reach very few messages. You can talk about a few messages to several thousand sended but it's still understandable when client connects and disconnects from broker.
Unfortunately, the last test again has not been finished successfully and memory usage continues to rise.

Lets wait for NMS 1.5.1

Friday, January 7, 2011

Topic test. ActiveMQ client 1.4.1. NMS 1.4.0

In this article I'm going to test behavior of ActiveMQ .net C# client in version 1.4.1 and NMS 1.4.0.
To see how I'm doing it look at: Preparation for use Topic  and My Topic usage



Let's see my four test scenarios:
  1. Client connect to one broker then start to generate messages (10 per second) and recive.
  2. Client connect to one broker then start to generate messages (10 per second) and recive but between sending messages he disconnect from broker and connect again.
  3. Client connect to the brokers with failover option:
    failover://(tcp://192.168.44.133:61616,tcp://192.168.4.61:61616,tcp://192.168.4.62:61616,tcp://192.168.4.40:61616)
    then start to generate messages (10 per second) and recive.
  4. Client connect to the brokers with failover option:
    failover://(tcp://192.168.44.133:61616,tcp://192.168.4.61:61616,tcp://192.168.4.62:61616,tcp://192.168.4.40:61616)
    then start to generate messages (10 per second) and recive but between sending messages it disconnects from broker and connects again.


For testing were used 4 brokers in version 5.4.2 in failover mode with configuration:

<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
 
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.base}/conf/credentials.properties</value>
        </property>    
    </bean>  
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.base}/data" destroyApplicationContextOnStop="true">
  
    <networkConnectors>
        <networkConnector name="test-net" uri="static:(tcp://192.168.44.133:61616,tcp://192.168.4.61:61616,tcp://192.168.4.62)"/>
    </networkConnectors>

        <persistenceAdapter>
            <kahaDB directory="${activemq.base}/data/kahadb"/>
        </persistenceAdapter>
     
     <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
        </transportConnectors>
  
    </broker>
    <import resource="jetty.xml"/>
  
</beans>

You can do the test on your own using my demonstrator application. The demonstrator application is available for download from: Demonstrator application topic test or Demonstrator application topic test.



Few words about usage:
  1. Fill the fields in step 1 and push the button Connect
  2. Choose generator type, set time for test (in hours) and timer interval for sending messages (in milisecond)



Test results:

Tests 1 and 3 have been successfully completed. All messages are sent and received. In test 2 some messages disappeared but it's understandable when a client connects and disconnects from broker.
 

The worst falls last test. Actually it's not coming to an end. When a client running in failover connection and disconnection causes a rapid increase memory usage and the application stops responding!!!

This bug is described on ActiveMQ jira with key AMQNET-298 .

Don't worry!!! Guys from ActiveMQ team already have fixed this bug in NMS version nms-1.5.0 .

The next test will check if the bug has been fixed!

Wednesday, January 5, 2011

Topic usage

As I wrote in ActiveMQ preparation part 1 "In the future I will try to make entire code of demonstator application available to download." so now I going to show you usage of the class AmqTopicClient.


1. Connect to the broker and declare a topic.


In the constructor, the first parameter of type string is the address of the broker and the second is the name of the topic.
Then add the event handlers for incoming messages and occuring errors. Finally connect to the broker.

  1. private AmqTopicClient amqTopicClient;
  2.  
  3. amqTopicClient = new AmqTopicClient(AMQuRL.Text, AmqTopic.Text);
  4. amqTopicClient.NewDataReceive += AmqTopicClient_NewDataReceive;
  5. amqTopicClient.AmqClientStatusError += AmqTopicClient_AmqClientStatusError;
  6. amqTopicClient.AmqConnect();
* This source code was highlighted with Source Code Highlighter.

2. Send simple message string or object type.


To send messages are two ways. Read carefully!
1. Asynchronous send a command to send a message.
2. Synchronous send a command to send a message.

Sure it seems confusing. But if anyone reviewed the code and read what I wrote now it follows that the application would asynchronously send messages that are sent asynchronously.
 Usage is very simple:
1. Asynchronous
-string

  1. amqTopicClient.TxtSend(textBox1.Text + " " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"));
* This source code was highlighted with Source Code Highlighter.
-object 

  1. amqTopicClient.Send((textBox1.Text + " " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"))as object);
* This source code was highlighted with Source Code Highlighter.
2. Synchronous is similar (just add "My" before method name):
-string

  1. amqTopicClient.MyTxtSend(textBox1.Text + " " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"));
* This source code was highlighted with Source Code Highlighter.
-object

  1. amqTopicClient.MySend((textBox1.Text + " " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"))as object);
* This source code was highlighted with Source Code Highlighter.

Tuesday, January 4, 2011

Preparation part 2

Class services Queues:

Warning! Consumer may not send messages and producer may not receive. So do not be surprised.

  1. public class AmqQueClient
  2.   {
  3.     private Connection MyConnection;
  4.     private MessageConsumer MyConsumer;
  5.     private ActiveMQDestination MyDestination;
  6.     private ConnectionFactory MyFactory;
  7.     private MessageProducer MyProducer;
  8.     private Session MySession;
  9.     public string Er = string.Empty;
  10.     public readonly string AmqUrl;
  11.     public readonly string AmqQue;
  12.     public readonly bool IsProducer;
  13.  
  14.  
  15.     public AmqQueClient(string amqurl, string amqQue, bool isproducer)
  16.     {
  17.       AmqUrl = amqurl;
  18.       AmqQue = amqQue;
  19.       IsProducer = isproducer;
  20.     }
  21.  
  22.     public void AmqConnect()
  23.     {
  24.     if (string.IsNullOrEmpty(AmqUrl)) throw new ApplicationException("AmqUrl null or empty");
  25.     if (string.IsNullOrEmpty(AmqQue)) throw new ApplicationException("AmqQue null or empty");
  26.       MyFactory = new ConnectionFactory(AmqUrl);
  27.       {
  28.         try
  29.         {
  30.           MyConnection = MyFactory.CreateConnection() as Connection;
  31.           if (MyConnection != null)
  32.           {
  33.             MyConnection.AsyncSend = true;
  34.             MyConnection.AsyncClose = true;
  35.             MyConnection.ExceptionListener += ConnectionExceptionListener;
  36.             MySession = MyConnection.CreateSession() as Session;
  37.  
  38.             if (MySession != null)
  39.             {
  40.               MyDestination = MySession.GetQueue(AmqQue) as ActiveMQDestination;
  41.               if (MyDestination != null)
  42.               {
  43.                 if (IsProducer)
  44.                 {
  45.               MyProducer = MySession.CreateProducer(MyDestination) as MessageProducer;
  46.                   if (MyProducer == null)
  47.                   {
  48.                     MySession.Dispose();
  49.                     MyConnection.Dispose();
  50.                     Er = "Error:AMQ Producer nullable.";
  51.                     return;
  52.                   }
  53.                 }
  54.                 if (!IsProducer)
  55.                 {
  56.               MyConsumer = MySession.CreateConsumer(MyDestination) as MessageConsumer;
  57.                   if (MyConsumer == null)
  58.                   {
  59.                     MySession.Dispose();
  60.                     MyConnection.Dispose();
  61.                     Er = "Error:AMQ Consumer nullable.";
  62.                     return;
  63.                   }
  64.                 }
  65.  
  66.                 if (!IsProducer)
  67.                 {
  68.                   while (MyConsumer.ReceiveNoWait() != null)
  69.                   {
  70.                   }
  71.                   MyConsumer.Listener += OnMassage;
  72.                   MyConnection.Start();
  73.                 }
  74.               }
  75.               else
  76.               {
  77.                 MySession.Dispose();
  78.                 MyConnection.Dispose();
  79.                 Er = "Error:AMQ Destination nullable.";
  80.               }
  81.             }
  82.             else
  83.             {
  84.               MyConnection.Dispose();
  85.               Er = "Error:AMQ Session nullable.";
  86.             }
  87.           }
  88.           else
  89.           {
  90.             Er = "Error:AMQ Connection nullable.";
  91.           }
  92.         }
  93.         catch (Exception ex)
  94.         {
  95.           Er = "Error:AMQ Connection Error.";
  96.           if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
  97.         }
  98.       }
  99.     }
  100.  
  101.     private void ConnectionExceptionListener(Exception ex)
  102.     {
  103.       if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
  104.     }
  105.  
  106.     public void Close()
  107.     {
  108.       try
  109.       {
  110.         if (MyProducer != null)
  111.         {
  112.           MyProducer.Dispose();
  113.         }
  114.         if (MyConsumer != null)
  115.         {
  116.           MyConsumer.Close();
  117.           MyConsumer.Listener -= OnMassage;
  118.           MyConsumer.Dispose();
  119.         }
  120.  
  121.  
  122.         if (MyConnection != null)
  123.         {
  124.           if (MyConnection.IsStarted)
  125.             MyConnection.Stop();
  126.           MyConnection.Close();
  127.           MyConnection.ExceptionListener -= ConnectionExceptionListener;
  128.           MyConnection.Dispose();
  129.         }
  130.       }
  131.       catch (Exception ex)
  132.       {
  133.         if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
  134.       }
  135.     }
  136.  
  137.     public event NewDataReceive NewDataReceive;
  138.  
  139.     public event AmqClientStatusEventHandler AmqClientStatusDebug;
  140.  
  141.     public event AmqClientStatusEventHandler AmqClientStatusError;
  142.  
  143.     private void OnMassage(IMessage m)
  144.     {
  145.       if (NewDataReceive != null)
  146.         NewDataReceive(m);
  147.     }
  148.  
  149.     private delegate void MySendDelegate(object o, TimeSpan ts);
  150.     public void Send(object o, TimeSpan ts)
  151.     {
  152.       (new MySendDelegate(MySend)).BeginInvoke(o, ts, null, null);
  153.     }
  154.  
  155.     private void MySend(object o, TimeSpan ts)
  156.     {
  157.       try
  158.       {
  159.         if (MySession != null)
  160.         {
  161.           IMessage request = MySession.CreateObjectMessage(o);
  162.  
  163.           if (MyProducer != null)
  164.           {            
  165.             if (request != null)
  166.             {
  167.               request.NMSTimeToLive = ts;
  168.               MyProducer.Send(request);
  169.             }
  170.         if (AmqClientStatusDebug != null) AmqClientStatusDebug("Object sent:" + o.GetType());
  171.           }
  172.         }
  173.       }
  174.  
  175.       catch (Exception ex)
  176.       {
  177.         if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
  178.       }
  179.     }
  180.   }
* This source code was highlighted with Source Code Highlighter.

Any question please post in comments.

Preparation part 1

So first I going to show you how I'm using ActiveMQ .net C# client in my tests. Client activemq version is 1.4.1.

In the future I will try to make entire code of demonstator application available to download.


Step 1: Prepare wrapper class to make usage little easier for topics and queues:

I need a little bit to explain how the code works. In principle, each instance of the class allows you to connect to a single topic or queue. Client connects to the topic itself as consumer and porducer but for queues you have to choose producer or consumer side:) . In my scheduled test this would be enough.


So let's begin:

Delegate are in the same namespace as both class(for topic and queue). 
First delegate is for messages second for errors:

  1. public delegate void NewDataReceive(IMessage m);
  2.  
  3. public delegate void AmqClientStatusEventHandler(string msg);
* This source code was highlighted with Source Code Highlighter.

Topics class:

As shown below in the method AmqConnect () has been enabled asynchronous message send, closing and confirmation messages although I'm not sure last one is needed since it is automatic confirmation mode. 

Rest of the code should be quite clear but If you have any question please post comments.

  1. public class AmqTopicClient
  2.   {
  3.     private Connection MyConnection;
  4.     private MessageConsumer MyConsumer;
  5.     private ActiveMQDestination MyDestination;
  6.     private ConnectionFactory MyFactory;
  7.     private MessageProducer MyProducer;
  8.     private Session Mysession;
  9.     public string Er = string.Empty;
  10.     public readonly string AmqUrl;
  11.     public readonly string AmqQue;
  12.  
  13.     public AmqTopicClient(string amqurl, string amqQue)
  14.     {
  15.       AmqUrl = amqurl;
  16.       AmqQue = amqQue;
  17.     }
  18.  
  19.     public void AmqConnect()
  20.     {
  21.      if (string.IsNullOrEmpty(AmqUrl)) throw new ApplicationException("AmqUrl null or empty");
  22.      if (string.IsNullOrEmpty(AmqQue)) throw new ApplicationException("AmqQue null or empty");
  23.       MyFactory = new ConnectionFactory(AmqUrl);
  24.  
  25.       {
  26.         try
  27.         {
  28.           MyConnection = MyFactory.CreateConnection() as Connection;
  29.  
  30.           if (MyConnection != null)
  31.           {
  32.             MyConnection.AsyncSend = true;
  33.             MyConnection.AsyncClose = true;
  34.             MyConnection.AcknowledgementMode = AcknowledgementMode.AutoAcknowledge;
  35.             MyConnection.ProducerWindowSize = 1024000;
  36.             MyConnection.SendAcksAsync = true;
  37.             MyConnection.ExceptionListener += ConnectionExceptionListener;
  38.             Mysession = MyConnection.CreateSession() as Session;
  39.  
  40.             if (Mysession != null)
  41.             {
  42.               MyDestination = Mysession.GetTopic(AmqQue) as ActiveMQDestination;
  43.  
  44.               MyProducer = Mysession.CreateProducer(MyDestination) as MessageProducer;
  45.           if (MyProducer != null) MyProducer.DeliveryMode = MsgDeliveryMode.NonPersistent;
  46.               MyConsumer = Mysession.CreateConsumer(MyDestination) as MessageConsumer;
  47.  
  48.               if (MyConsumer == null)
  49.               {
  50.                 Mysession.Dispose();
  51.                 MyConnection.Dispose();
  52.                 Er = "Error:AMQ Consumer nullable.";
  53.                 return;
  54.               }
  55.  
  56.               while (MyConsumer.ReceiveNoWait() != null)
  57.               {
  58.               }
  59.  
  60.               MyConsumer.Listener += OnMassage;
  61.               MyConnection.Start();
  62.             }
  63.             else
  64.             {
  65.               MyConnection.Dispose();
  66.               Er = "Error:AMQ Session nullable.";
  67.             }
  68.           }
  69.           else
  70.           {
  71.  
  72.             Er = "Error:AMQ Connection nullable.";
  73.           }
  74.         }
  75.         catch (Exception ex)
  76.         {
  77.           Er = "Error:AMQ Connection Error.";
  78.           if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
  79.         }
  80.       }
  81.     }
  82.  
  83.     private void ConnectionExceptionListener(Exception ex)
  84.     {
  85.       if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
  86.     }
  87.  
  88.     public void Close()
  89.     {
  90.       try
  91.       {
  92.         if (MyProducer != null)
  93.         {
  94.           MyProducer.Dispose();
  95.         }
  96.         if (MyConsumer != null)
  97.         {
  98.           MyConsumer.Listener -= OnMassage;
  99.           MyConsumer.Close();
  100.         }
  101.  
  102.  
  103.         if (MyConnection != null)
  104.         {
  105.           if (MyConnection.IsStarted)
  106.             MyConnection.Stop();
  107.           MyConnection.Close();
  108.           MyConnection.ExceptionListener -= ConnectionExceptionListener;
  109.         }
  110.       }
  111.       catch (Exception ex)
  112.       {
  113.         if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
  114.       }
  115.     }
  116.  
  117.     public event NewDataReceive NewDataReceive;
  118.  
  119.     public event AmqClientStatusEventHandler AmqClientStatusDebug;
  120.  
  121.     public event AmqClientStatusEventHandler AmqClientStatusError;
  122.  
  123.     private void OnMassage(IMessage m)
  124.     {
  125.       if (NewDataReceive != null)
  126.         NewDataReceive(m);
  127.     }
  128.  
  129.  
  130.     private delegate void MySendDelegate(object o);
  131.     public void Send(object o)
  132.     {
  133.       (new MySendDelegate(MySend)).BeginInvoke(o, null, null);
  134.     }
  135.     public void SendSync(object o)
  136.     {
  137.       MySend(o);
  138.     }
  139.     public void MySend(object o)
  140.     {
  141.       try
  142.       {
  143.         if (Mysession != null)
  144.         {
  145.         ActiveMQMessage request = Mysession.CreateObjectMessage(o) as ActiveMQObjectMessage;
  146.           
  147.           if (MyProducer != null)
  148.           {
  149.             if (request != null)
  150.             {              
  151.               MyProducer.Send(request);
  152.              if (AmqClientStatusDebug != null) AmqClientStatusDebug("Object sent:" + o);
  153.  
  154.             }
  155.           }
  156.         }
  157.       }
  158.       catch (Exception ex)
  159.       {
  160.         if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
  161.  
  162.       }
  163.  
  164.     }
  165.     private delegate void MyTxtSendDelegate(string o);
  166.     public void TxtSend(string o)
  167.     {
  168.       (new MyTxtSendDelegate(MyTxtSend)).BeginInvoke(o, null, null);
  169.     }
  170.     public void MyTxtSend(string o)
  171.     {
  172.       try
  173.       {
  174.         if (Mysession != null)
  175.         {
  176.           ActiveMQMessage request = Mysession.CreateTextMessage(o) as ActiveMQTextMessage;
  177.           if (MyProducer != null)
  178.           {
  179.             if (request != null)
  180.             {
  181.              MyProducer.Send(request);
  182.              if (AmqClientStatusDebug != null) AmqClientStatusDebug("Text sent:" + o);
  183.             }
  184.           }
  185.         }
  186.       }
  187.       catch (Exception ex)
  188.       {
  189.         if (AmqClientStatusError != null) AmqClientStatusError("AMQ Error:" + ex);
  190.       }
  191.     }
  192.   }
* This source code was highlighted with Source Code Highlighter.
If you have any question please post a comment.

For Queues code look at Introduction part 2
_
_
_

Monday, January 3, 2011

Demonstrator scenario and environment

  
In this article I would like to describe an environment for brokers and demonstrator application. Second think to describe is what and how I'm going to test chose opensource solutions.


The most important think of my work is usage of .net C# client in demonstrator application and properly brokers configuration. The application target platform is Windows Mobile but now it will be Windows Forms Applications, later I will migrate it on Windows Mobile.
 
Demonstrator Scenario for tests:
  1. Check:
    -the efficiency and speed,
    -required and used resources.

    The test will consist of sending parcels of increasing volumes suitable for one client and one receiver and models
    many-to-one and one-to-many:
    - send individual messages to multiple client
    - single message-sending producer to one client
    - single message-sending multiple
    producer to one client

    During the test will be timed: sending the package, system resources usage - average since the beginning of the dispatch to the end and the maximum momentary.


  2. Checking the servers redundancy mode from client side:
    - two servers, one type. Sending message by client and listening on second. Then turn off servers alternately.

    The test is designed to test the effectiveness and reliability by verifying whether such a situation, all messages reach their destination.

  3. Verify exception handling. In the scenario, the demonstrator application will have following functionalities to test:
    -  connect and disconnect from the server,
    -  turn off network interfaces for testing purposes
    - generation of test and exception handling on the client side (single, avalanche).


The last one to describe is broker environment and configuration:
- two brokers in failover mode
- environment is CentOs 5.3

Sunday, January 2, 2011

Introduction

Hi,
   My name is Adam and I come from Poland. I'm a student of 5-th year on Gdańsk University of Technology and I'm working as software designer since August 2009. Now I'm writting my thesis which is titled:


Development and evaluation of demonstrator applications for the Windows Mobile platform using middleware software type.


Now you can ask a question:

"Why is he writting a blog about his thesis??"

I have got two answers:

The first is, I want to designate the best opensource solution for my use case.
The winner will be chosen after tests.  Assessment criteria will include performance, resource consumption, reliability and stability (robustness), and exception handling.

The second is so simple and smart because I would like to share my experience and knowledge about message-oriented middleware software which supports .net C# client. I hope the creators of the software(ActiveMQ, RabbitMQ, Qpid) and programmers community will help me to improve my results and correct me when I'll make a mistake.


If you are interested in what and how I'm going to do look at Demonstrator scenario and environment