I’ve been writing a series of posts about how to use RabbitMQ with .net. I initially started using RabbitMQ so I could push live price feeds to my white-label Android trading app VisualSpreads. So I thought it was now time to write an article about how to use RabbitMQ with Android. As Android is Java you can follow the Java getting started tutorials on the RabbitMQ website to learn the key concepts. What I will be concentrating on is the concepts and patterns related to RabbitMQ that are Android specific . I’m assuming you’ve downloaded the Android SDK and setup your environment in eclipse. [Source Code Coming Soon]
1.) Publish/Subscribe
My first example is Publish/Subscribe. What I’m going to do is use and existing Publisher(Producer) from my last .net article and write a Subscriber(Consumer) on Android. If you are not a .net person then you can use the Publisher from the example on the RabbitMQ website.
Ok so first we need to create and Android project. Open up eclipse and go File -> New -> Android Project
Use the settings/names I have here. Give the package name something unique to you though 🙂 We won’t create a test project right now so hit “Finish”
Next we want to write our message consumer. We’re going to follow a similar pattern to my .net example and crate a base class that consumers and producers can inherit from.
First of all we need to include the RabbitMQ Java Client Library. If you don’t have it already get the latest version from here . Unzip this and find the following three files:
- rabbitmq-client.jar
- commons-io-1.2.jar
- commons-io-1.2
Create a folder in the root of your Android project called lib , copy and paste the jars in to the new folder. The RabbitMQ client usage should be obvious but you may wonder what the other two files are used for. The RabbitMQ client uses Java API’s not included in the Android SDK Java API. These two files add the missing API’s for the RabbitMQ client to use. If you didn’t add these files you wouldn’t notice anything until runtime when the connection to the broker would silently fail . If you then looked through the debug logcat output you would see something like this:
WARN/System.err(9557): Caused by: java.lang.NoClassDefFoundError: com.rabbitmq.client.impl.TruncatedInputStream
Now we need to reference the jar files in the project, go Project -> Properties -> Java Build Path. Click the “Add Jar” button and navigate to the lib folder and select the three jar files and click ok.
Ok now to the base class. Add a new class IConnectToRabbitMQ as follows
/** * Base class for objects that connect to a RabbitMQ Broker */ public abstract class IConnectToRabbitMQ { public String mServer; public String mExchange; protected Channel mModel = null; protected Connection mConnection; protected boolean Running ; protected String MyExchangeType ; /** * * @param server The server address * @param exchange The named exchange * @param exchangeType The exchange type name */ public IConnectToRabbitMQ(String server, String exchange, String exchangeType) { mServer = server; mExchange = exchange; MyExchangeType = exchangeType; } public void Dispose() { Running = false; try { if (mConnection!=null) mConnection.close(); if (mModel != null) mModel.abort(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * Connect to the broker and create the exchange * @return success */ public boolean connectToRabbitMQ() { if(mModel!= null && mModel.isOpen() )//already declared return true; try { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(mServer); mConnection = connectionFactory.newConnection(); mModel = mConnection.createChannel(); mModel.exchangeDeclare(mExchange, MyExchangeType, true); return true; } catch (Exception e) { e.printStackTrace(); return false; } } }
You should be able to see the similarities here to the .net example. What I’ve tried to do is keep the Android and .net code as similar as possible as I switch between the two quite often. Here is a summary of the main differences
The use of a concrete Chanel and Connection
//Java protected Channel mModel = null; protected Connection mConnection;
Instead of a IModel and IConnection
//.net protected IModel Model { get; set; } protected IConnection Connection { get; set; }
The Connection and Model initialisation code is also slightly different
//Java connectionFactory.setHost(mServer); mConnection = connectionFactory.newConnection() mModel = mConnection.createChannel();
As opposed to in .net
//.net connectionFactory.HostName = Server; Connection = connectionFactory.CreateConnection(); Model = Connection.CreateModel();
Now we need to write our Consumer that will inherit from this base class. There are a few more differences here when compared to .net but the basic pattern is exactly the same.
/** *Consumes messages from a RabbitMQ broker * */ public class MessageConsumer extends IConnectToRabbitMQ{ public MessageConsumer(String server, String exchange, String exchangeType) { super(server, exchange, exchangeType); } //The Queue name for this consumer private String mQueue; private QueueingConsumer MySubscription; //last message to post back private byte[] mLastMessage; // An interface to be implemented by an object that is interested in messages(listener) public interface OnReceiveMessageHandler{ public void onReceiveMessage(byte[] message); }; //A reference to the listener, we can only have one at a time(for now) private OnReceiveMessageHandler mOnReceiveMessageHandler; /** * * Set the callback for received messages * @param handler The callback */ public void setOnReceiveMessageHandler(OnReceiveMessageHandler handler){ mOnReceiveMessageHandler = handler; }; private Handler mMessageHandler = new Handler(); private Handler mConsumeHandler = new Handler(); // Create runnable for posting back to main thread final Runnable mReturnMessage = new Runnable() { public void run() { mOnReceiveMessageHandler.onReceiveMessage(mLastMessage); } }; final Runnable mConsumeRunner = new Runnable() { public void run() { Consume(); } }; /** * Create Exchange and then start consuming. A binding needs to be added before any messages will be delivered */ @Override public boolean connectToRabbitMQ() { if(super.connectToRabbitMQ()) { try { mQueue = mModel.queueDeclare().getQueue(); MySubscription = new QueueingConsumer(mModel); mModel.basicConsume(mQueue, false, MySubscription); } catch (IOException e) { e.printStackTrace(); return false; } if (MyExchangeType == "fanout") AddBinding("");//fanout has default binding Running = true; mConsumeHandler.post(mConsumeRunner); return true; } return false; } /** * Add a binding between this consumers Queue and the Exchange with routingKey * @param routingKey the binding key eg GOOG */ public void AddBinding(String routingKey) { try { mModel.queueBind(mQueue, mExchange, routingKey); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * Remove binding between this consumers Queue and the Exchange with routingKey * @param routingKey the binding key eg GOOG */ public void RemoveBinding(String routingKey) { try { mModel.queueUnbind(mQueue, mExchange, routingKey); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void Consume() { Thread thread = new Thread() { @Override public void run() { while(Running){ QueueingConsumer.Delivery delivery; try { delivery = MySubscription.nextDelivery(); mLastMessage = delivery.getBody(); mMessageHandler.post(mReturnMessage); try { mModel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } catch (IOException e) { e.printStackTrace(); } } catch (InterruptedException ie) { ie.printStackTrace(); } } } }; thread.start(); } public void dispose(){ Running = false; } }
Ok, lets look at the Android specific bits here. The first code of interest is this
// An interface to be implemented by an object that is interested in messages(listener) public interface OnReceiveMessageHandler{ public void onReceiveMessage(byte[] message); }; //A reference to the listener, we can only have one at a time(for now) private OnReceiveMessageHandler mOnReceiveMessageHandler; /** * * Set the callback for received messages * @param handler The callback */ public void setOnReceiveMessageHandler(OnReceiveMessageHandler handler){ mOnReceiveMessageHandler = handler; };
What we are doing here is declaring an interface OnReceiveMessageHandler{} that a message listener will implement. We want to have one listener at a time which we keep a reference to in mOnReceiveMessageHandler. We can set the listener by using the method setOnReceiveMessageHandler(OnReceiveMessageHandler handler) . This is a standard Android pattern and is used throughout the SDK for onClick listeners etc.
Next we have
private Handler mMessageHandler = new Handler(); private Handler mConsumeHandler = new Handler();
Here we are declaring two Handler. A handler is a special object in Android that is used to safely pass data between processes (Threads) and to execute Runnables. It’s vaguely like a delegate in .net
Next we have two Runnable.
// Create runnable for posting back to main thread final Runnable mReturnMessage = new Runnable() { public void run() { mOnReceiveMessageHandler.onReceiveMessage(mLastMessage); } }; final Runnable mConsumeRunner = new Runnable() { public void run() { Consume(); } };
A runnable is a block of work that can be passed to a new Thread for execution. We will be using our Handlers to run(or post) these Runnables . This is slightly confusing but a very important concept to get a grasp of if you plan to do anything but trivial Android development.
Now within the connectToRabbitMq() method we have this:
</pre> mConsumeHandler.post(mConsumeRunner);
This is posting the mConsumeRunner runnable to the mConsumeHandler for execution on a new Thread. mConsumeHandler will call the run() method of the mConsumeRunner. This in turn calls the Consume() method which spawns a new blocking message-listening/de-queuing thread.
When we have a new message we then call
mMessageHandler.post(mReturnMessage);
This invokes the mReturnMessage runnable which in turn calls the mOnReceiveMessageHandler.onReceiveMessage(mLastMessage) listener/handler, phew 🙂
If this is looking confusing then please read more about Runnables, Threads and Handlers. It is very important.
We now have our consuming code, lets use it. We need to output the messages somewhere so we need a TextView to display it. In the project folder structure open /res/layout/ there should be a file in there called main.xml. Open this file up and make sure it looks like this:
<?xml version="1.0" encoding="utf-8"?> <ScrollView xmlns:android="http://schemas.android.com/apk/res/android" android:orientation="vertical" android:layout_width="fill_parent" android:layout_height="wrap_content" > <TextView android:id="@+id/output" android:layout_width="fill_parent" android:layout_height="wrap_content" android:text="@string/hello" /> </ScrollView>
What we have here is a root ScrollView containing one TextView with an ID of output.
Next open up the ActivityHome.java file we created with the project. Make it look like this.
public class ActivityHome extends Activity { private MessageConsumer mConsumer; private TextView mOutput; /** Called when the activity is first created. */ @Override public void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.main); //The output TextView we'll use to display messages mOutput = (TextView) findViewById(R.id.output); //Create the consumer mConsumer = new MessageConsumer("192.168.1.67", "logs", "fanout"); //Connect to broker mConsumer.connectToRabbitMQ(); //register for messages mConsumer.setOnReceiveMessageHandler(new OnReceiveMessageHandler(){ public void onReceiveMessage(byte[] message) { String text = ""; try { text = new String(message, "UTF8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } mOutput.append("\n"+text); } }); } @Override protected void onResume() { super.onPause(); mConsumer.connectToRabbitMQ(); } @Override protected void onPause() { super.onPause(); mConsumer.dispose(); } }
This should all be fairly self explanatory. We get a reference to our output view, create the and connect the consumer. When creating the consumer you must use the IP of your rabbitMQ broker machine(in may case “192.168.1.67”) on your local network not “localhost”. We then listen for messages with a new setOnReceiveMessageHandler.
We are stopping the Consumer onPause. This is very good practise as it stops us draining the device battery and data allowance.
Now we need to test it!
If we ran this now it wouldn’t work. We need to tell the Android system this app is allowed to access the internet(and in turn the broker). In the root of the project open AndroidManifest.xml. Just before the closing /manifest tag, add this permission.
<?xml version="1.0" encoding="utf-8"?> <manifest xmlns:android="http://schemas.android.com/apk/res/android" package="com.atplatforms.rabbitmq" android:versionCode="1" android:versionName="1.0"> ...... <uses-permission android:name="android.permission.INTERNET"></uses-permission> </manifest>
Hit Run -> Run. The app will start in the emulator or you connected Android device.
Then startup the .net Form app from the last .net example.
Start up a consumer and send a message. You should get the same message on both the Consumer running on your desktop machine and on your Android device. Nice 🙂
Take a moment to consider this. What we have here is very cool, we can send a message to all connected consumers without ever knowing they are there or how many there are. We are sending messages cross language , platform and OS with very little work. We could use this exact same code to send these messages to thousands of Android hansets with no problem if we wanted to.
In the next example I’ll start to get a little more complex sending some more interesting messages using both .net and Android, I’ll also be looking at routing. [Code available soon]
wanderer99
June 10, 2011
Simon it’s absolutely great that you’re doing this! And BTW, VisualSpread looks amazing. Keep up the good work!
simonwdixon
June 10, 2011
Thanks and thanks! The work will be continued 🙂
skhayman
June 10, 2011
Thanks you so much for this tutorial!
Can you give us your source code please? 🙂
Steve Gaines
July 29, 2011
Fantastic – exactly what I was looking for. Thanks
Steve Gaines
July 29, 2011
p.s. I agree with your comments – this is very significant. Cant wait to test it out. Have you used the public RabbitMQ Broker to do WAN testing?
simonwdixon
August 19, 2011
Steve, I have used the public WAN for testing but in the end I setup a test broker on a PC on my home network. I then used DynDns to make it available from the outside world. You’d be surprised how well this works! http://dyn.com/dns/
ah
December 12, 2011
What is coming in the second part? Will the code be shared?
Tx & best
simonwdixon
January 16, 2012
hi, in the second part I’ll be sending JSON between a .net app and Android, I’ll also be sending messages between Android handsets. Code coming shortly.
BenC
December 17, 2011
Yes, this is awesome! Thanks for sharing. I am eagerly awaiting the 2nd part.
Daniel
January 16, 2012
Hi everyone. I will try to implement this code as soon as possible but first I need to know if it helps me. So i’m trying to implement an event notification for Android clients. A ReSTful service (using Ruby on Rails) will publish messages in the brooker (like you did with .Net application) and try to receive notifications with the smartphone. There is any problem with this approach? Thanks in advance
simonwdixon
January 16, 2012
Hi Daniel. This will indeed help you. You’ll need to write custom Rails code to push messages to the broker, such as this: http://4loc.wordpress.com/2010/03/01/rabbitmq-and-amqp-in-ruby/ . The Android client code will be exactly the same 🙂
Daniel
March 31, 2012
Hi again!
I really need to get this done. I’m having lots of problems and tried almost everything i found.
Thanks to your advice, I already figure out how Rails server publish messages to the rabbitMQ… however i just can’t subscribe those messages in android 😦
So Simon, could you share with us android source code? Please!
Thanks in advance
Kamran
April 1, 2012
Hi Daniel,
Its easy all you need for publishing is go for “asynctask” and implement the publish commands in it. Implement it the “doinbackground()” method of asynctask.
Regards,
Kamran
Daniel
April 2, 2012
Ok, now it’s working… just need some tune. Thanks Kamran for your reply, it saves me a lot of time!
ah
February 8, 2012
Hi Simon,
i couldnt get it working until changing line in IConnectToRabbitMQ to:
mChannel.exchangeDeclare(mExchange, MyExchangeType); //tutorialstyle
Havent fully reflected the semantic. Did u?
Best, ah
cmdrdats0
February 13, 2012
Hey Simon, thanks for the awesome walkthrough on getting Android going with RabbitMQ! Just a quick note though – the pausing and resuming doesn’t really work correctly, your MessageConsumer.dispose() overrides the IConnectToRabbitMQ.dispose(), so it never disconnects properly on pause. Then on resume, it starts a new connection, which opens a brand new queue, resulting with a growing list of queues that the client is connecting to but no longer listening to.
A better solution would involve unsubscribing to the queue, but keeping it open onPause. Then when you resume, it should resubscribe to the same queue. That way you’ll receive the messages waiting for you. Of course, if you did want to drop those messages, disconnecting would be fine, but better to do it all the way then and clean up 🙂
Also, your onResume() calls super.onPause()…
Otherwise, very awesome – this article helped me get rabbitMQ up and running really quickly 🙂 Thank you again!
Kamran
February 19, 2012
Hi Simon, great tutorial. Any thoughts on sending back messages from Android phone to a java file? I tried the “basicpublish” method with/without handlers and it is not working.
Kamran
February 20, 2012
The code for android is given below, but it gives an error “unable to open stack traces”
I also tried permission “write external storage”
TextView mOutput;
/** Called when the activity is first created. */
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.main);
mOutput = (TextView) findViewById(R.id.output);
Handler handler = new Handler();
Runnable send = new Runnable(){
public void run() {
// TODO Auto-generated method stub
send();
}
};
mOutput.append(“/nBefore”);
handler.post(send);
mOutput.append(“/nAfter”);
}
private void send(){
Thread thread = new Thread(){
@Override
public void run(){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(“sethost”);
Connection connection = null;
try {
connection = factory.newConnection();
mOutput.append(“/ninstance1”);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Channel channel = null;
try {
mOutput.append(“/ninstance2”);
channel = connection.createChannel();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
mOutput.append(“/ninstance3”);
channel.queueDeclare(“hello”, false, false, false, null);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
String message = “Kamran Android E”;
try {
mOutput.append(“/ninstance4”);
channel.basicPublish(“logs”,”hello”, null, message.getBytes());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
thread.start();
}
}
shasanka
April 2, 2012
Hello Kamran,
I am writing a similar type of application in Android. If you can share more details we can work to reach a solution. I am able to produce messages from my android device.
Kamran
April 2, 2012
Thanks Shasanka,
I was able to over come the problem. We need to implement it in the Async task. I am working for a project in my university related RabbitMQ/Android. Though I am done with my RabbitMQ part.
You need to implement the publish commands in the “doinbackground()” mehtod of Aysntask.
Regards!
shasanka
April 4, 2012
Hello Kamran,
Thanks for the reply
I am facing some issues while publishing my message from android. Can you please help. Can you provide me some working code to publish from android device. You can mail me on shasanka.cet@gmail.com
plz help. Currently my code for publish is:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import android.app.Activity;
import android.os.Bundle;
import android.view.View;
import android.widget.Button;
import android.widget.EditText;
public class TestRAActivity extends Activity {
private EditText editText;
private static final String EXCHANGE_NAME = “xyz”;
/** Called when the activity is first created. */
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.main);
//The edit_text_out EditText we’ll use to edit messages
editText = (EditText) findViewById(R.id.edit_text_out);
final Button button= (Button)findViewById(R.id.button_send);
button.setOnClickListener(new View.OnClickListener() {
public void onClick(View v) {
// Perform action on click
String message= editText.getText().toString();
try{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(“localhost”);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, “fanout”,true);
channel.basicPublish(EXCHANGE_NAME, “hi”, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(” [x] Sent ‘” + message.getBytes() + “‘”);
System.out.println(” [x] Sent ‘” + message + “‘”);
channel.close();
connection.close();
}catch(Exception e){
e.printStackTrace();
}
}
});
}
}
Kamran
April 4, 2012
Hi shasanka,
The code would not run this way. You need to use handlers, runnables and threads. Try to implement the code with “try/catch” section in a “thread()”.
http://stackoverflow.com/questions/1921514/how-to-run-a-runnable-thread-in-android
For further understanding refer to “painless threading” at developer.android.com
kingkong
February 26, 2012
Hi,
I am trying to implement this however I have this problem. On the line mConsumer.connectToRabbitMQ() in the ActivityHome, I have changed it to:
mOutput.append(“\n”+mConsumer.connectToRabbitMQ());
now when I have the wrong server, it prints false as expected however when I have the write server it does not print anything. In fact it does not even print the @string/hello which it should anyways..
The app does not consume messages and I think this could be the reason.. Also I have used loger and found out that the error must be on this method because I can logg before the call however I cannot logg anything after the call.
Any help would be greatly appreciated..
Thanks!
shasanka
March 28, 2012
Hello Simon,
I really need the 2nd part of the android-rabbitMQ series as soon as possible. I am new to RabbitMQ. Please, help and if possible send me some codes as per the functionality you mentioned in your 2nd part yet to come.
Kamran
April 18, 2012
Hi Simon,
When ever i try to implement your tutorial my android application crashes (The application has stopped unexpectedly). I am using windows, though I ran the same application on another system on unix/linux it was working. Is there something specific I should look for?
Kamran
April 19, 2012
I believe that there is a bug in rabbitmq 2.8.1 because I have made other applications for rabbitmq.
Kamran
April 19, 2012
OK I found the solution sometimes when we update our eclipse or Android SDK we might face this problem. Though if we right-click “lib” -> build-path -> add as source folder…. then hopefully the application won’t crash.
Cbal
July 17, 2012
For some reason Eclipse is not able to compile due to line 23 in ActivityHome.java. It says
“The methodsetOnReceiveMessageHandler(MessageConsumer.OnReceiveMessageHandler) in the type MessageConsumer is not applicable for the arguments (new OnReceiveMessageHandler(){})”. Has anyone else had this issue?
Cbal
July 17, 2012
Nevermind, the method call is supposed to pass in new MessageConsumer.OnReceiveMessageHandler() instead, from what I understand.
Cbal
July 18, 2012
I’m having a really hard time running this with Send.java from the RabbitMQ tutorials (http://www.rabbitmq.com/tutorials/tutorial-one-java.html). I noticed that the tutorial does not include the entire manifest, and I think that might be part of the problem. Does anybody have suggestions on how to get this to work with the RabbitMQ Send.java and not the .NET version?
rahuj
July 29, 2012
i am unable to start rabbit
my logcat is
07-28 13:59:31.107: I/global(372): Default buffer size used in BufferedInputStream constructor. It would be better to be explicit if an 8k buffer is required.
07-28 13:59:31.107: I/global(372): Default buffer size used in BufferedOutputStream constructor. It would be better to be explicit if an 8k buffer is required.
07-28 13:59:38.222: W/ActivityManager(58): Launch timeout has expired, giving up wake lock!
07-28 13:59:38.632: W/ActivityManager(58): Activity idle timeout for HistoryRecord{44ebbc30 com.shrey.connec/.androidconn}
07-28 13:59:43.708: D/dalvikvm(117): GC_EXPLICIT freed 820 objects / 45864 bytes in 71ms
07-28 13:59:48.708: D/dalvikvm(218): GC_EXPLICIT freed 100 objects / 4056 bytes in 65ms
07-28 13:59:53.708: D/dalvikvm(259): GC_EXPLICIT freed 6 objects / 176 bytes in 70ms
07-28 13:59:54.958: I/ActivityManager(58): Displayed activity com.shrey.connec/.androidconn: 26738 ms (total 26738 ms)
and my code is public abstract class rabbitmq { public String mServer; public String mExchange;
protected Channel mModel = null;
protected Connection mConnection;
protected boolean Running ;
protected String MyExchangeType ;
/**
*
* @param server The server address
* @param exchange The named exchange
* @param exchangeType The exchange type name
*/
public rabbitmq(String server, String exchange, String exchangeType)
{
mServer = server;
mExchange = exchange;
MyExchangeType = exchangeType;
}
public void Dispose()
{
Running = false;
try {
if (mConnection!=null)
mConnection.close();
if (mModel != null)
mModel.abort();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* Connect to the broker and create the exchange
* @return success
*/
public boolean connectToRabbitMQ()
{
if(mModel!= null && mModel.isOpen() )//already declared
return true;
try
{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(mServer);
mConnection = connectionFactory.newConnection();
mModel = mConnection.createChannel();
mModel.exchangeDeclare(mExchange, MyExchangeType, true);
return true;
}
catch (Exception e)
{
e.printStackTrace();
return false;
}
}
}
rabbitmqclient is
public class rabbitmqclient extends rabbitmq {
public rabbitmqclient(String server, String exchange, String exchangeType) {
super(server, exchange, exchangeType);
}
// The Queue name for this consumer
private String mQueue;
private QueueingConsumer MySubscription;
// last message to post back
private byte[] mLastMessage;
// An interface to be implemented by an object that is interested in
// messages(listener)
public interface OnReceiveMessageHandler {
public void onReceiveMessage(byte[] message);
};
// A reference to the listener, we can only have one at a time(for now)
private OnReceiveMessageHandler mOnReceiveMessageHandler;
/**
*
* Set the callback for received messages
*
* @param handler
* The callback
*/
public void setOnReceiveMessageHandler(OnReceiveMessageHandler handler) {
mOnReceiveMessageHandler = handler;
};
private Handler mMessageHandler = new Handler();
private Handler mConsumeHandler = new Handler();
// Create runnable for posting back to main thread
final Runnable mReturnMessage = new Runnable() {
public void run() {
mOnReceiveMessageHandler.onReceiveMessage(mLastMessage);
}
};
final Runnable mConsumeRunner = new Runnable() {
public void run() {
Consume();
}
};
/**
* Create Exchange and then start consuming. A binding needs to be added
* before any messages will be delivered
*/
@Override
public boolean connectToRabbitMQ() {
if (super.connectToRabbitMQ()) {
try {
mQueue = mModel.queueDeclare().getQueue();
MySubscription = new QueueingConsumer(mModel);
mModel.basicConsume(mQueue, false, MySubscription);
} catch (IOException e) {
e.printStackTrace();
return false;
}
if (MyExchangeType == “fanout”)
AddBinding(“”);// fanout has default binding
Running = true;
mConsumeHandler.post(mConsumeRunner);
return true;
}
return false;
}
/**
* Add a binding between this consumers Queue and the Exchange with
* routingKey
*
* @param routingKey
* the binding key eg GOOG
*/
public void AddBinding(String routingKey) {
try {
mModel.queueBind(mQueue, mExchange, routingKey);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* Remove binding between this consumers Queue and the Exchange with
* routingKey
*
* @param routingKey
* the binding key eg GOOG
*/
public void RemoveBinding(String routingKey) {
try {
mModel.queueUnbind(mQueue, mExchange, routingKey);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void Consume() {
Thread thread = new Thread() {
@Override
public void run() {
while (Running) {
QueueingConsumer.Delivery delivery;
try {
delivery = MySubscription.nextDelivery();
mLastMessage = delivery.getBody();
mMessageHandler.post(mReturnMessage);
try {
mModel.basicAck(delivery.getEnvelope()
.getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
};
thread.start();
}
public void dispose() {
Running = false;
}
}
and activity code is
public class androidconn extends Activity {
private rabbitmqclient mConsumer;
private TextView mOutput;
/** Called when the activity is first created. */
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
//The output TextView we’ll use to display messages
mOutput = (TextView) findViewById(R.id.moutput);
//Create the consumer
mConsumer = new rabbitmqclient(“115.245.143.22 :5672”,
“logs”,
“fanout”);
//Connect to broker
mConsumer.connectToRabbitMQ();
//register for messages
mConsumer.setOnReceiveMessageHandler(new OnReceiveMessageHandler(){
public void onReceiveMessage(byte[] message) {
String text = “”;
try {
text = new String(message, “UTF”);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
mOutput.append(“\n”+text);
}
});
}
@Override
protected void onResume() {
super.onPause();
mConsumer.connectToRabbitMQ();
}
@Override
protected void onPause() {
super.onPause();
mConsumer.dispose();
}
}
and the messanger code is from which i am wishing to send the message
public class rabbitchat {
private final static String QUEUE_NAME = “hello”;
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(“115.245.143.22 “);
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = “Hello World!”;
channel.basicPublish(“”, QUEUE_NAME, null, message.getBytes());
System.out.println(” [x] Sent ‘” + message + “‘”);
channel.close();
connection.close();
}
my logcat is
07-28 13:59:31.107: I/global(372): Default buffer size used in BufferedInputStream constructor. It would be better to be explicit if an 8k buffer is required.
07-28 13:59:31.107: I/global(372): Default buffer size used in BufferedOutputStream constructor. It would be better to be explicit if an 8k buffer is required.
07-28 13:59:38.222: W/ActivityManager(58): Launch timeout has expired, giving up wake lock!
07-28 13:59:38.632: W/ActivityManager(58): Activity idle timeout for HistoryRecord{44ebbc30 com.shrey.connec/.androidconn}
07-28 13:59:43.708: D/dalvikvm(117): GC_EXPLICIT freed 820 objects / 45864 bytes in 71ms
07-28 13:59:48.708: D/dalvikvm(218): GC_EXPLICIT freed 100 objects / 4056 bytes in 65ms
07-28 13:59:53.708: D/dalvikvm(259): GC_EXPLICIT freed 6 objects / 176 bytes in 70ms
07-28 13:59:54.958: I/ActivityManager(58): Displayed activity com.shrey.connec/.androidconn: 26738 ms (total 26738 ms)
and my code is public abstract class rabbitmq { public String mServer; public String mExchange;
protected Channel mModel = null;
protected Connection mConnection;
protected boolean Running ;
protected String MyExchangeType ;
/**
*
* @param server The server address
* @param exchange The named exchange
* @param exchangeType The exchange type name
*/
public rabbitmq(String server, String exchange, String exchangeType)
{
mServer = server;
mExchange = exchange;
MyExchangeType = exchangeType;
}
public void Dispose()
{
Running = false;
try {
if (mConnection!=null)
mConnection.close();
if (mModel != null)
mModel.abort();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* Connect to the broker and create the exchange
* @return success
*/
public boolean connectToRabbitMQ()
{
if(mModel!= null && mModel.isOpen() )//already declared
return true;
try
{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(mServer);
mConnection = connectionFactory.newConnection();
mModel = mConnection.createChannel();
mModel.exchangeDeclare(mExchange, MyExchangeType, true);
return true;
}
catch (Exception e)
{
e.printStackTrace();
return false;
}
}
}
rabbitmqclient is
public class rabbitmqclient extends rabbitmq {
public rabbitmqclient(String server, String exchange, String exchangeType) {
super(server, exchange, exchangeType);
}
// The Queue name for this consumer
private String mQueue;
private QueueingConsumer MySubscription;
// last message to post back
private byte[] mLastMessage;
// An interface to be implemented by an object that is interested in
// messages(listener)
public interface OnReceiveMessageHandler {
public void onReceiveMessage(byte[] message);
};
// A reference to the listener, we can only have one at a time(for now)
private OnReceiveMessageHandler mOnReceiveMessageHandler;
/**
*
* Set the callback for received messages
*
* @param handler
* The callback
*/
public void setOnReceiveMessageHandler(OnReceiveMessageHandler handler) {
mOnReceiveMessageHandler = handler;
};
private Handler mMessageHandler = new Handler();
private Handler mConsumeHandler = new Handler();
// Create runnable for posting back to main thread
final Runnable mReturnMessage = new Runnable() {
public void run() {
mOnReceiveMessageHandler.onReceiveMessage(mLastMessage);
}
};
final Runnable mConsumeRunner = new Runnable() {
public void run() {
Consume();
}
};
/**
* Create Exchange and then start consuming. A binding needs to be added
* before any messages will be delivered
*/
@Override
public boolean connectToRabbitMQ() {
if (super.connectToRabbitMQ()) {
try {
mQueue = mModel.queueDeclare().getQueue();
MySubscription = new QueueingConsumer(mModel);
mModel.basicConsume(mQueue, false, MySubscription);
} catch (IOException e) {
e.printStackTrace();
return false;
}
if (MyExchangeType == “fanout”)
AddBinding(“”);// fanout has default binding
Running = true;
mConsumeHandler.post(mConsumeRunner);
return true;
}
return false;
}
/**
* Add a binding between this consumers Queue and the Exchange with
* routingKey
*
* @param routingKey
* the binding key eg GOOG
*/
public void AddBinding(String routingKey) {
try {
mModel.queueBind(mQueue, mExchange, routingKey);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* Remove binding between this consumers Queue and the Exchange with
* routingKey
*
* @param routingKey
* the binding key eg GOOG
*/
public void RemoveBinding(String routingKey) {
try {
mModel.queueUnbind(mQueue, mExchange, routingKey);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void Consume() {
Thread thread = new Thread() {
@Override
public void run() {
while (Running) {
QueueingConsumer.Delivery delivery;
try {
delivery = MySubscription.nextDelivery();
mLastMessage = delivery.getBody();
mMessageHandler.post(mReturnMessage);
try {
mModel.basicAck(delivery.getEnvelope()
.getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
};
thread.start();
}
public void dispose() {
Running = false;
}
}
and activity code is
public class androidconn extends Activity {
private rabbitmqclient mConsumer;
private TextView mOutput;
/** Called when the activity is first created. */
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
//The output TextView we’ll use to display messages
mOutput = (TextView) findViewById(R.id.moutput);
//Create the consumer
mConsumer = new rabbitmqclient(“115.245.143.22 :5672”,
“logs”,
“fanout”);
//Connect to broker
mConsumer.connectToRabbitMQ();
//register for messages
mConsumer.setOnReceiveMessageHandler(new OnReceiveMessageHandler(){
public void onReceiveMessage(byte[] message) {
String text = “”;
try {
text = new String(message, “UTF”);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
mOutput.append(“\n”+text);
}
});
}
@Override
protected void onResume() {
super.onPause();
mConsumer.connectToRabbitMQ();
}
@Override
protected void onPause() {
super.onPause();
mConsumer.dispose();
}
}
and the messanger code is from which i am wishing to send the message
public class rabbitchat {
private final static String QUEUE_NAME = “hello”;
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(“115.245.143.22 “);
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = “Hello World!”;
channel.basicPublish(“”, QUEUE_NAME, null, message.getBytes());
System.out.println(” [x] Sent ‘” + message + “‘”);
channel.close();
connection.close();
}
test
August 13, 2012
Hi, is there any reason not to use the above code security wise. When connecting to RabbitMQ server, the android app is using the default username guest:guest. I guess we need to limit what the default user can do with access controls, but I guess an attacker could connect to a rabbitmq server and get a list of all queues, their members, and possibly even messaged that are not meant for him. Any idea how this can be prevented?
Rizki Sunaryo
October 21, 2012
Dear Mr. Simon Dixon,
Do we need to install Erlang and RabbitMQ in order to achieve this messaging?
Cause I tried on RedHat as producer, it should install RabbitMQ first
android tutorial
November 23, 2012
Very nice post. I absolutely love this site. Keep writing!
Jean-Baptiste DUPONT
November 25, 2012
Hello,
I’m keep trying to get this to work with latest download (november 2012).
Here are some of the issues I had to fix:
In the proposed source code for MessageConsumer.java
add the following three directives:
import android.os.Handler;
import com.rabbitmq.client.QueueingConsumer;
import java.io.*;
In the IConnectToRabbitMQ.java
add the following directives:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.*;
Hope this will help someone out there.
jb
Jean-Baptiste DUPONT
November 25, 2012
In addition, in the MainActivity.java (new name for ActivityHome) you need to replace
the R.layout.main with R.layout.activity_main
I’m getting there…
Jean-Baptiste DUPONT
November 25, 2012
Now I bump into a issue I’m unable to fix because I dont understand what the message means:
Here is the message I get in Eclipse:
**************************
“Multiple markers at this line
– The method setOnReceiveMessageHandler(MessageConsumer.OnReceiveMessageHandler) in the type MessageConsumer is not applicable for the
arguments (new OnReceiveMessageHandler(){})
– OnReceiveMessageHandler cannot be resolved to a type”
*********************************
this is in the mail class whose name is MainActivity.java in my case rather than ActivityHome.java in the tutorial
The code at stake is the following:
********************
//register for messages
mConsumer.setOnReceiveMessageHandler(new OnReceiveMessageHandler(){
public void onReceiveMessage(byte[] message) {
String text = “”;
try {
text = new String(message, “UTF8”);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
mOutput.append(“\n”+text);
}
});
***************************
Any help would be greatly appreciated.
Many thanks,
Jean-Baptiste
Jean-Baptiste DUPONT
November 26, 2012
So here is what I needed to do to fix the problem reported above:
Just add the following missing directive
import com.atplatform.rabbitmqonandroid.MessageConsumer.OnReceiveMessageHandler;
And this error desappeared eventually.
More oft better
February 7, 2013
Can I have rabbit rule them all?
An android MQ client which manages all my Jabber IDs and respective rosters? Preferably a client that does not require privacy eroding-, battery sucking GAPPS? (or amazon analogue)
and ultimately sms to wean sheeple to MQ or xmpp
seshachalam
March 18, 2013
Hi Simon,
When we are using public void Dispose(); in connection class. while closing activity on onPause we are using dispose() method which is setting the Running variable to false. But we are closing the connections in Dispose method ? Please revert back ASAP.
Thanks
Droid Stars
August 16, 2013
Great post you have there! Thanks! Keep up the hardwork, I’ll wait for your next tutorial.
Edward Brey
September 27, 2013
Typo after “Unzip this and find the following three files”. In the list, you have commons-io twice, and commons-cli is missing.
sowmya
October 2, 2013
here im running the above given code. could run successfully but im getting error on main thread. then i made the connection in AsynTask in doInBackground but now i couldn’t connect to server. at the time of establising connection its trying to connect but the connection is not established and getting timedout.
Thomas
October 4, 2013
sowmya, code will not work with honeycumb:
http://developer.android.com/training/articles/perf-anr.html
Your connection to your rmq must be established in a new thread. You can not use the mainthread for it.
Mohammad M D P
July 9, 2014
Thanks for great help.
Can you please tell me how to connect with username/password for RabbitMQ ?
And what is exchangeType? I have run the rabbitmq broker with all default configurations, I want my android app to subscripe to this exchange: “topicID.messages”
and the last thing how can we handle timeout on connection? because it gives connection time out always on mConnection = connectionFactory.newConnection();
Battle Camp hack ios
September 25, 2014
Thanks for sharing your thoughts about download ipa.
Regards
1a2b3c4d5e6f7g
November 15, 2014
nmj,kyuilyuioo
itai
November 16, 2014
hey simon, can you please post you’r code ? it’s very important to me.
Roberto
December 25, 2014
remember to add ShutdownSignalException in try catch on Consume method. I had lost several hours to understand why my application crash when i disconnected the wifi network.
Maricruz
August 30, 2017
Hi it’s me, I аm also visiting tһis website daily, thіs web site
is tгuly gߋod and the visitors аre aϲtually sharing good thougһts.
pakistanaz
July 30, 2023
Great Post. Thank for this Share. katmoviehd Apk