Getting Started with RabbitMQ on Android – Part 1

Posted on June 3, 2011

54


I’ve been writing a series of posts about how to use RabbitMQ with .net. I initially started using RabbitMQ so I could push live price feeds to my white-label Android trading app VisualSpreads.  So I thought  it was now time to write an article about how to use RabbitMQ with Android. As Android is Java you can follow the Java getting started tutorials on the RabbitMQ website to learn the key concepts. What I will be concentrating on is the concepts and patterns related to RabbitMQ that are Android specific . I’m assuming you’ve downloaded the Android SDK and setup your environment in eclipse. [Source Code Coming Soon]

1.) Publish/Subscribe

My first example is Publish/Subscribe. What I’m going to do is use and existing Publisher(Producer) from my last .net article  and write a Subscriber(Consumer) on Android. If you are not a .net person then you can use the Publisher from the example on the RabbitMQ website.

Ok so first we need to create and Android project. Open up eclipse and go File -> New -> Android Project

#

Use the settings/names I have here. Give the package name something unique to you though 🙂 We won’t create a test project right now so hit “Finish”

Next we want to write our message consumer. We’re going to follow a similar pattern to my .net example and crate a base class that consumers and producers can inherit from.

First of all we need to include the RabbitMQ Java Client Library. If you don’t have it already get the latest version from here .  Unzip this and find the following three files:

  • rabbitmq-client.jar
  • commons-io-1.2.jar
  • commons-io-1.2

Create a folder in the root of your Android project called lib , copy and paste the jars in to the new folder. The RabbitMQ client usage should be obvious but you may wonder what the other two files are used for. The RabbitMQ client uses Java API’s not included in the Android SDK Java API. These two files add the missing API’s for the RabbitMQ client to use. If you didn’t add these files you wouldn’t notice anything until runtime  when the connection to the broker would silently fail . If you then looked through the debug logcat output you would see something like this:

WARN/System.err(9557): Caused by: java.lang.NoClassDefFoundError: com.rabbitmq.client.impl.TruncatedInputStream

Now we need to reference the jar files in the project, go Project -> Properties -> Java Build Path. Click the “Add Jar” button and navigate to the lib folder and select the three jar files and click ok.

Ok now to the base class. Add a new class IConnectToRabbitMQ as follows

/**
 * Base class for objects that connect to a RabbitMQ Broker
 */
public abstract class IConnectToRabbitMQ {
	  public String mServer;
      public String mExchange;

      protected Channel mModel = null;
      protected Connection  mConnection;

      protected boolean Running ;

      protected  String MyExchangeType ;

      /**
       *
       * @param server The server address
       * @param exchange The named exchange
       * @param exchangeType The exchange type name
       */
      public IConnectToRabbitMQ(String server, String exchange, String exchangeType)
      {
    	  mServer = server;
    	  mExchange = exchange;
          MyExchangeType = exchangeType;
      }

      public void Dispose()
      {
          Running = false;

			try {
				if (mConnection!=null)
					mConnection.close();
				if (mModel != null)
					mModel.abort();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}

      }

      /**
       * Connect to the broker and create the exchange
       * @return success
       */
      public boolean connectToRabbitMQ()
      {
    	  if(mModel!= null && mModel.isOpen() )//already declared
    		  return true;
          try
          {
        	  ConnectionFactory connectionFactory = new ConnectionFactory();
              connectionFactory.setHost(mServer);
              mConnection = connectionFactory.newConnection();
              mModel = mConnection.createChannel();
              mModel.exchangeDeclare(mExchange, MyExchangeType, true);

              return true;
          }
          catch (Exception e)
          {
        	  e.printStackTrace();
              return false;
          }
      }
}

You should be able to see the similarities here to the .net example. What I’ve tried to do is keep the Android and .net code as similar as possible as I switch between the two quite often.  Here is a summary of the main differences

The use of a concrete Chanel and Connection


//Java

protected Channel mModel = null;

protected Connection  mConnection;

Instead of a IModel and IConnection


//.net

protected IModel Model { get; set; }

protected IConnection Connection { get; set; }

The Connection and Model initialisation code is also slightly different


//Java

connectionFactory.setHost(mServer);

mConnection = connectionFactory.newConnection()

mModel = mConnection.createChannel();

As opposed to in .net


//.net

connectionFactory.HostName = Server;

Connection = connectionFactory.CreateConnection();

Model = Connection.CreateModel();

Now we need to write our Consumer that will inherit from this base class. There are a few more differences here when compared to .net but the basic pattern is exactly the same.

/**
 *Consumes messages from a RabbitMQ broker
 *
 */
public class MessageConsumer extends  IConnectToRabbitMQ{

	public MessageConsumer(String server, String exchange, String exchangeType) {
		super(server, exchange, exchangeType);
	}

	//The Queue name for this consumer
    private String mQueue;
    private QueueingConsumer MySubscription;

    //last message to post back
    private byte[] mLastMessage;

    // An interface to be implemented by an object that is interested in messages(listener)
    public interface OnReceiveMessageHandler{
    	public void onReceiveMessage(byte[] message);
    };

    //A reference to the listener, we can only have one at a time(for now)
    private OnReceiveMessageHandler mOnReceiveMessageHandler;

    /**
     *
     * Set the callback for received messages
     * @param handler The callback
     */
    public void setOnReceiveMessageHandler(OnReceiveMessageHandler handler){
    	mOnReceiveMessageHandler = handler;
    };

    private Handler mMessageHandler = new Handler();
    private Handler mConsumeHandler = new Handler();

    // Create runnable for posting back to main thread
    final Runnable mReturnMessage = new Runnable() {
        public void run() {
        	mOnReceiveMessageHandler.onReceiveMessage(mLastMessage);
        }
    };

    final Runnable mConsumeRunner = new Runnable() {
        public void run() {
        	Consume();
        }
    };

    /**
     * Create Exchange and then start consuming. A binding needs to be added before any messages will be delivered
     */
    @Override
    public boolean connectToRabbitMQ()
    {
       if(super.connectToRabbitMQ())
       {

           try {
        	   mQueue = mModel.queueDeclare().getQueue();
        	   MySubscription = new QueueingConsumer(mModel);
        	   mModel.basicConsume(mQueue, false, MySubscription);
			} catch (IOException e) {
				e.printStackTrace();
				return false;
			}
			 if (MyExchangeType == "fanout")
	               AddBinding("");//fanout has default binding

	        Running = true;
			mConsumeHandler.post(mConsumeRunner);

           return true;
       }
       return false;
    }

    /**
     * Add a binding between this consumers Queue and the Exchange with routingKey
     * @param routingKey the binding key eg GOOG
     */
    public void AddBinding(String routingKey)
    {
        try {
			mModel.queueBind(mQueue, mExchange, routingKey);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
    }

    /**
     * Remove binding between this consumers Queue and the Exchange with routingKey
     * @param routingKey the binding key eg GOOG
     */
    public void RemoveBinding(String routingKey)
    {
        try {
			mModel.queueUnbind(mQueue, mExchange, routingKey);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
    }

    private void Consume()
    {
    	Thread thread = new Thread()
    	{

    		 @Override
    		    public void run() {
    			 while(Running){
			    	QueueingConsumer.Delivery delivery;
				    try {
				        delivery = MySubscription.nextDelivery();
			        	mLastMessage = delivery.getBody();
			        	mMessageHandler.post(mReturnMessage);
			    	    try {
							mModel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
						} catch (IOException e) {
							e.printStackTrace();
						}
				    } catch (InterruptedException ie) {
				    	ie.printStackTrace();
				    }
    			 }
    		 }
    	};
    	thread.start();

    }

    public void dispose(){
    	Running = false;
    }
}

Ok, lets look at the Android specific bits here.  The first code of interest is this

    // An interface to be implemented by an object that is interested in messages(listener)
    public interface OnReceiveMessageHandler{
    	public void onReceiveMessage(byte[] message);
    };

    //A reference to the listener, we can only have one at a time(for now)
    private OnReceiveMessageHandler mOnReceiveMessageHandler;

    /**
     *
     * Set the callback for received messages
     * @param handler The callback
     */
    public void setOnReceiveMessageHandler(OnReceiveMessageHandler handler){
    	mOnReceiveMessageHandler = handler;
    };

What we are doing here is declaring an interface OnReceiveMessageHandler{} that a message listener will implement. We want to have one listener at a time which we keep a reference to in  mOnReceiveMessageHandler. We can set the listener by using the method setOnReceiveMessageHandler(OnReceiveMessageHandler handler) . This is a standard Android pattern and is used throughout the SDK for onClick listeners etc.

Next we have

private Handler mMessageHandler = new Handler();
private Handler mConsumeHandler = new Handler();

Here we are declaring two Handler. A handler is a special object in Android that is used to safely pass data between processes (Threads) and to execute Runnables. It’s vaguely like a delegate in .net

Next we have two Runnable.

// Create runnable for posting back to main thread
    final Runnable mReturnMessage = new Runnable() {
        public void run() {
        	mOnReceiveMessageHandler.onReceiveMessage(mLastMessage);
        }
    };

    final Runnable mConsumeRunner = new Runnable() {
        public void run() {
        	Consume();
        }
    };

A runnable is a block of work that can be passed to a new Thread for execution. We will be using our Handlers to run(or post) these Runnables . This is slightly confusing but a very important concept to get a grasp of if you plan to do anything but trivial Android development.

Now within the connectToRabbitMq()  method we have this:

</pre>
mConsumeHandler.post(mConsumeRunner);

This is posting the  mConsumeRunner runnable to the mConsumeHandler for execution on a new Thread. mConsumeHandler will call the run() method of the mConsumeRunner. This in turn calls the Consume() method which spawns a new blocking message-listening/de-queuing thread.

When we have a new message we then call

mMessageHandler.post(mReturnMessage);

This invokes the mReturnMessage runnable which in turn calls the mOnReceiveMessageHandler.onReceiveMessage(mLastMessage)  listener/handler, phew 🙂

If this is looking confusing then please read more about Runnables, Threads and Handlers. It is very important.

We now have our consuming code, lets use it. We need to output the messages somewhere so we need a TextView to display it. In the project folder structure open /res/layout/ there should be a file in there called main.xml. Open this file up and make sure it looks like this:

<?xml version="1.0" encoding="utf-8"?>
<ScrollView xmlns:android="http://schemas.android.com/apk/res/android"
    android:orientation="vertical"
    android:layout_width="fill_parent"
    android:layout_height="wrap_content"
    >
<TextView
	android:id="@+id/output"
    android:layout_width="fill_parent"
    android:layout_height="wrap_content"
    android:text="@string/hello"
    />
</ScrollView>

What we have here is a root ScrollView containing one TextView with an ID of output.

Next open up the ActivityHome.java file we created with the project. Make it look like this.

public class ActivityHome extends Activity {
	private MessageConsumer mConsumer;
	private TextView mOutput;

    /** Called when the activity is first created. */
    @Override
    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.main);

        //The output TextView we'll use to display messages
        mOutput =  (TextView) findViewById(R.id.output);

        //Create the consumer
        mConsumer = new MessageConsumer("192.168.1.67",
        		"logs",
        		"fanout");

        //Connect to broker
        mConsumer.connectToRabbitMQ();

        //register for messages
        mConsumer.setOnReceiveMessageHandler(new OnReceiveMessageHandler(){

			public void onReceiveMessage(byte[] message) {
				String text = "";
				try {
					text = new String(message, "UTF8");
				} catch (UnsupportedEncodingException e) {
					e.printStackTrace();
				}

				mOutput.append("\n"+text);
			}
        });

    }

    @Override
	protected void onResume() {
		super.onPause();
		mConsumer.connectToRabbitMQ();
	}

	@Override
	protected void onPause() {
		super.onPause();
		mConsumer.dispose();
	}
}

This should all be fairly self explanatory. We get a reference to our output view, create the and connect the consumer. When creating the consumer you must use the IP of your rabbitMQ broker machine(in may case “192.168.1.67”) on your local network not “localhost”.  We then listen for messages with a new setOnReceiveMessageHandler.

We are stopping the Consumer onPause. This is very good practise as it stops us draining the device battery and data allowance.

Now we need to test it!

If we ran this now it wouldn’t work. We need to tell the Android system this app is allowed to access the internet(and in turn the broker). In the root of the project open AndroidManifest.xml. Just before the closing /manifest tag, add this permission.

<?xml version="1.0" encoding="utf-8"?>
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
      package="com.atplatforms.rabbitmq"
      android:versionCode="1"
      android:versionName="1.0">
     ......
    <uses-permission android:name="android.permission.INTERNET"></uses-permission>
</manifest>

Hit Run -> Run. The app will start in the emulator or you connected Android device.

Then startup the .net Form app from the last .net example.

Start up a consumer and send a message. You should get the same message on both the Consumer running on your desktop machine and on your Android device. Nice 🙂

Take a moment to consider this. What we have here is very cool, we can send a message to all connected consumers without ever knowing they are there or how many there are.  We are sending messages cross language , platform and OS with very little work. We could use this exact same code to send these messages to thousands of Android hansets with no problem if we wanted to.

In the next example I’ll start to get a little more complex sending some more interesting messages using both .net and Android, I’ll also be looking at routing. [Code available soon]

Posted in: Android, RabbitMQ