In the previous two examples I built a simple .net application to demonstrate first two sections the RabbitMQ getting started guide in .net. In this post I’ll be looking at the third. Download the Source
3.) Publish/Subscribe
The original article (in Java) is here:http://www.rabbitmq.com/tutorials/tutorial-three-java.html
I’m going to take a slightly different approach to my previous two examples and split the Producer and Consumer into two different Windows Forms. This will allow us to run as many Consumers as we like and so demonstrate Pub/Sub effectively.
First up is the Producer.
Create a new Form and add the Input TextBox and Button as in the first two examples. Also and a new Button “Start New Consumer” .
Next create the consumer. We only need to output messages so we only need one RichTextBox
In the previous two examples we had pretty much duplicate constructors for both Consumers and Producers. We will now fix this by creating a base class that these can both inherit from. Create a new class called IConnectToRabbitMQ.
public abstract class IConnectToRabbitMQ : IDisposable { protected IModel Model { get; set; } protected IConnection Connection { get; set; } public string Server { get; set; } public string ExchangeName{ get; set; } public string ExchangeTypeName { get; set; } public IConnectToRabbitMQ(string server, string exchange, string exchangeType) { Server = server; Exchange = exchange; ExchangeTypeName = exchangeType; } //Create the connection, Model and Exchange(if one is required) public virtual bool ConnectToRabbitMQ() { try { var connectionFactory = new ConnectionFactory(); connectionFactory.HostName = Server; Connection = connectionFactory.CreateConnection(); Model = Connection.CreateModel(); bool durable = true; if (!String.IsNullOrEmpty(Exchange)) Model.ExchangeDeclare(Exchange, ExchangeTypeName, durable); return true; } catch (BrokerUnreachableException e) { return false; } } public void Dispose() { if (Connection != null) Connection.Close(); if (Model != null) Model.Abort(); } }
The class name may look a little odd to most as it begins with an “I”, this is usually the naming convention for an Interface but I’m using what I like to call Simon Says naming convention. I’ll be writing a post about this in the near future. The main gist of it, is I like to have classes tell me what they do. For example, a class which calls a remote service might inherit from a class(or interface) called ICallRemoteServices. So the full class name definition would be FooService : ICallRemoteServices. There would also be a abstract method defined that implements the action e.g CallRemoteService. Other example are IAmAnOrder(for a value object), ICalculateShipping, IDeliverEmail etc. This may seem a little weird but I like it :).
So enough of that for now, lets go through the class. First we declare fields to hold the familiar IModel and Connection instances. Next up are fields to store the details of the Server, Exchange and ExchangeTypeName. Exchange is the name of the exchange we want to publish/consume messages from and ExchangeTypeName holds the type of exchange we want to use(in this example it will be “fanout”). ExchangeType is set from a constant declared in the RabbitMQ.Client.ExchangeType class, so for us it will be ExchangeType .Fanout(More on this later.) Next we have the ConnectToRabbitMQ() method, this is almost exactly the same as the Constructor methods of the Producer/Consumer methods in my previous two examples. We have this additional block which declares the Exchange.
bool durable = true; if (!String.IsNullOrEmpty(ExchangeName)) Model.ExchangeDeclare(ExchangeName, ExchangeTypeName, durable);
We are declaring a durable exchange of the type ExchangeTypeName with the name ExchangeName. If this exchange had already been declared by another Producer or Consumer a new one is not created, the existing one will be used.
Now we’ll write our Producer.
public class Producer : IConnectToRabbitMQ { public Producer(string server, string exchange, string exchangeType) : base(server, exchange, exchangeType) { } public void SendMessage(byte[] message) { IBasicProperties basicProperties = Model.CreateBasicProperties(); basicProperties.SetPersistent(true); Model.BasicPublish(ExchangeName, "", basicProperties, message); } } }
Here we a have a nice lightweight publisher, the only difference from our previous examples is we are publishing to a named exchange called ExchangeName. We do not know about or use a Queue.
Then it’s our consumer, this is slightly more complicated.
public class Consumer : IConnectToRabbitMQ { protected bool isConsuming; protected string QueueName; // used to pass messages back to UI for processing public delegate void onReceiveMessage(byte[] message); public event onReceiveMessage onMessageReceived; public Consumer(string server, string exchange, string exchangeType) : base(server, exchange, exchangeType) { } //internal delegate to run the consuming queue on a seperate thread private delegate void ConsumeDelegate(); public void StartConsuming() { Model.BasicQos(0, 1, false); QueueName = Model.QueueDeclare(); Model.QueueBind(QueueName, ExchangeName, ""); isConsuming = true; ConsumeDelegate c = new ConsumeDelegate(Consume); c.BeginInvoke(null, null); } protected Subscription mSubscription { get; set; } private void Consume() { bool autoAck = false; //create a subscription mSubscription = new Subscription(Model, QueueName, autoAck); while (isConsuming) { BasicDeliverEventArgs e = mSubscription.Next(); byte[] body = e.Body; onMessageReceived(body); mSubscription.Ack(e); } } public void Dispose() { isConsuming = false; base.Dispose(); } }
We need to store the name of our Queue that we will be binding to the exchange so we have a field QueueName for this purpose. The next code of interest is the StartConsuming() method. Most of this is familiar with this additional block:
QueueName = Model.QueueDeclare(); Model.QueueBind(QueueName, ExchangeName, "");
What we are doing here is asking the model to declare a temporary queue for us and give it a random unique name(stored in QueueName), we then bind this queue to the Exchange called ExchangeName.
This a key concept to exchanges in RabbitMQ, a publisher/producer only knows about the Exchange, it will publish messages directly to the Exchange and has no concept of a queue. Each consumer knows about the Exchange but they will also have a queue that is bound to the Exchange. The way I look at it is the one or more Producers own an Exchange(and publish to it) and each Consumer owns a Queue(which is bound to an Exchange.)
The Consume() method is very different to what we have seen before(and the Java Example). Instead of using a QueueingBasicConsumer we are using a Subscription. Subscription is part of the RabbitMQ.Client.MessagePatterns package in the .net client Library. It give us a nice wrapper to the boilerplate message de-queuing code. More info is here.
mSubscription = new Subscription(Model, QueueName, autoAck); ..... BasicDeliverEventArgs e = mSubscription.Next(); ..... mSubscription.Ack(e);
Now we need to add the code for our Producer Form
public string HOST_NAME = "localhost"; public string EXCHANGE_NAME = "logs"; private Producer producer; //delegate to show messages on the UI thread private delegate void showMessageDelegate(string message); public PubSub_Producer() { InitializeComponent(); //Declare the producer producer = new Producer(HOST_NAME, EXCHANGE_NAME, ExchangeType.Fanout); //connect to RabbitMQ if(!producer.ConnectToRabbitMQ()) { //Show a basic error if we fail MessageBox.Show("Could not connect to Broker"); } } private int count = 0; private void button1_Click(object sender, EventArgs e) { string message = String.Format("{0} - {1}", count++, textBox1.Text); producer.SendMessage(System.Text.Encoding.UTF8.GetBytes(message)); } private void button2_Click(object sender, EventArgs e) { //Open a new Consumer Form PubSub_Consumer consumer = new PubSub_Consumer(); consumer.Show(); }
This should be fairly self explanatory. The producer.ConnectToRabbitMQ() call is handled in the base IConnectToRabbitMQ class. We’ve added little error handling code just in case the broker is unavailable(if it is run rabbitmq-server -detached from the command line .) There’s also a method to handle clicks on the “Start New Consumer” Button which spawns a new Consumer Form.
Then we have our Consumer Form.
public partial class PubSub_Consumer : Form { public string HOST_NAME = "localhost"; public string EXCHANGE_NAME = "logs"; private Consumer consumer; public PubSub_Consumer() { InitializeComponent(); //create the consumer consumer = new Consumer(HOST_NAME, EXCHANGE_NAME, ExchangeType.Fanout); //connect to RabbitMQ if (!consumer.ConnectToRabbitMQ()) { //Show a basic error if we fail MessageBox.Show("Could not connect to Broker"); } //Register for message event consumer.onMessageReceived += handleMessage; //Start consuming consumer.StartConsuming(); } //delegate to post to UI thread private delegate void showMessageDelegate(string message); //Callback for message receive public void handleMessage(byte[] message) { showMessageDelegate s = new showMessageDelegate(richTextBox1.AppendText); this.Invoke(s, System.Text.Encoding.UTF8.GetString(message) + Environment.NewLine); } }
This is exactly the same as previous Consumer examples with the additional call to the base class.
Now we can run the project after making sure the correct Form is opened on starup
[STAThread] static void Main() { Application.EnableVisualStyles(); Application.SetCompatibleTextRenderingDefault(false); Application.Run(new PubSub_Producer()); }
Click the “Start New Consumer” Button a couple of time to get a few consumers running, then put your message in the “Producer Input” TextBox and hit send. You should see the message appear in all the Consumer output windows. Good stuff 🙂
Summary
What we have done here is create a Fanout Exchange named “logs”, we’ve created some Consumers(three in my example above) each with their own unique temporary queue bound to the exchange. We have then published a message to the exchange using our Producer, the exchange then routes the message to all bound queues which in turn delivers it to the Consumers. Download the Source
Laval
May 25, 2011
Thanks for your blog. Very nice articles!
simonwdixon
May 25, 2011
Laval, Thanks! keep checking back I’ve got a lot more in the pipeline.
Max
May 29, 2011
Nice implementation! Forms are, as you mentioned in the first part, more familiar to .NET:ers than the command line.
I can’t find the source for download under the provided link (“This project currently has no downloads.”).
You learn more when you have to code it yourself though 🙂
Max
May 29, 2011
Never mind, I found the SVN-link now…
simonwdixon
May 29, 2011
Thanks for pointing that out! I’ll upload some sort of archive for non-svn users after I’ve finished the next section.
Tim
August 23, 2011
Awesome examples of RabbitMQ in .Net. Any plans on doing an RPC example?
BryanB
November 4, 2011
In IConnectToRabbitMQ in the constructor you have Exchange = exchange, it should read ExchangeName = exchange
BryanB
November 4, 2011
25 if (!String.IsNullOrEmpty(Exchange))
26 Model.ExchangeDeclare(Exchange, ExchangeTypeName, durable);
Should be ExchangeName
itskumaranand
December 9, 2011
part 4.. i completed the example 4..
http://www.4shared.com/file/lreFq1gf/getting-started-with-rabbitmq.html?
hope this helps..
Issi
February 7, 2012
Thanks a lot for this blog. It’s really clear and helps a lot.
Is anyone working on the fifth and the sixth part?
I’m working on it and it takes me a lot of time to complete the whole code.
I would appreciate to know if someone has completed them
Thanks for all help.
Ant
February 9, 2012
+1 Thanks for the posts
glo
February 22, 2013
Excellent Example!!!!!
Heller
November 25, 2014
Very good blog! Many Thanks!
bicervolkan
May 29, 2015
Hi, thanks for the post it is really useful, but i have question. What if rabbitMQ server is down and then back, how can i keep continue consuming. Because i lost connection.
manjerekar
October 28, 2015
hi,
I’m unable to connect to my rabbitmq server.
DatDT
October 30, 2015
Thanks for you, very good for me. 🙂
John G
November 27, 2023
Subscription is deprecated. Did you update your example