Today is a overview of how to use the RabbitMQ message broker with F#. The RabbitMQ tutorials have a lot of great getting started information. Its .NET section focuses on C#, so I wanted to provide an F# angle to the general use cases.
There are a several different use cases for leveraging RabbitMQ. My examples will track against most of the tutorials, but with my own twist on the examples. The producer will generate and publish a datastream of floats using RabbitMQ. Depending on the specific use cases, consumers will filter and processes messages out of RabbitMQ. I want all of these to be singular files, so I also leverage Async workflows to operate both producer and consumers in the same process. To get started, you will need to have .NET Core installed. Once this is complete the application can be setup.
1 2 3
dotnet new console -lang F# -n RabbitMQ cd RabbitMQ dotnet new package RabbitMQ.Client --version 5.1.2
Based on the types of interaction, I’ve broken the post into sections. For quick reference, the sections are:
There is a lot of repetition, but I want each example to stand on it’s own. In all of the below examples, the same libraries and namespaces are used. So this will be a singular header for all examples. The primary difference between the following examples is the exchange, queue, and routing key definitions and usages.
1 2 3 4 5
open System open System.Text open System.Threading open RabbitMQ.Client open RabbitMQ.Client.Events
Work Queues
For this example, the interaction is pretty direct. The producer takes it’s datastream and publishes it to the defined queue every 500 milliseconds. The consumer(s) will read from the same queue, processing data as available. The consumer is setup in a similar fashion to the producer; they both need a connexionand channel. It does has the extra need to setup a handler for receiving messages. Once it is all setup, the consumer must be started for processing. The final piece is running the producer and consumer workflows. Since this is a quick test, I’ll run everything for 5 seconds, then kill the workflow.
let producer hostname queue routingKey (token: CancellationTokenSource) = async { let factory = ConnectionFactory(HostName = hostname) use connection = factory.CreateConnection() use channel = connection.CreateModel() let result = channel.QueueDeclare(queue = queue, durable = false, exclusive = false, autoDelete = false, arguments = null)
let rand = Random() while not token.IsCancellationRequested do let message = sprintf "%f" (rand.NextDouble()) let body = Encoding.UTF8.GetBytes(message) printfn "publish : %s" message channel.BasicPublish(exchange = "", routingKey = routingKey, basicProperties = null, body = body) Thread.Sleep(500) }
let consumer id hostname queue (token: CancellationTokenSource) = async { let factory = ConnectionFactory(HostName = hostname) use connection = factory.CreateConnection() use channel = connection.CreateModel() let result = channel.QueueDeclare(queue = queue, durable = false, exclusive = false, autoDelete = false, arguments = null)
let consumer = EventingBasicConsumer(channel) consumer.Received.AddHandler(new EventHandler<BasicDeliverEventArgs>(fun sender (data:BasicDeliverEventArgs) -> let body = data.Body let message = Encoding.UTF8.GetString(body) printfn "consumed [%s]: %s" id message))
In contrast to the previous example, using a Fanout exchange type distributes the published message to all subscribed consumers. Of note, here I use exchange and routing key. The consumer also needs a queue binding, that wasn’t required in the previous example.
let producer hostname exchange routingKey (token: CancellationTokenSource) = async { let factory = ConnectionFactory(HostName = hostname) use connection = factory.CreateConnection() use channel = connection.CreateModel() channel.ExchangeDeclare(exchange = exchange, ``type`` = ExchangeType.Fanout, durable = false, autoDelete = false, arguments = null)
let rand = Random() while not token.IsCancellationRequested do let message = sprintf "%f" (rand.NextDouble()) let body = Encoding.UTF8.GetBytes(message) printfn "publish: %s" message channel.BasicPublish(exchange = exchange, routingKey = routingKey, basicProperties = null, body = body) Thread.Sleep(500) }
let consumer id hostname exchange routingKey (token: CancellationTokenSource) = async { let factory = ConnectionFactory(HostName = hostname) use connection = factory.CreateConnection() use channel = connection.CreateModel() channel.ExchangeDeclare(exchange = exchange, ``type`` = ExchangeType.Fanout)
let consumer = EventingBasicConsumer(channel) consumer.Received.AddHandler(new EventHandler<BasicDeliverEventArgs>(fun sender (data:BasicDeliverEventArgs) -> let body = data.Body let message = Encoding.UTF8.GetString(body) printfn "consumed [%s]: %A" id message))
For this example, if the first digit is even or odd, the producer will route it to the appropriate ‘even’ or ‘odd’ route. Two consumers are setup, one to listen for even messages, and the other to listen to odd messages.
let producer hostname exchange (routingKeys:string list) (token: CancellationTokenSource) = async { let factory = ConnectionFactory(HostName = hostname) use connection = factory.CreateConnection() use channel = connection.CreateModel() channel.ExchangeDeclare(exchange = exchange, ``type`` = ExchangeType.Direct, durable = false, autoDelete = false, arguments = null)
let rand = Random() while not token.IsCancellationRequested do let data = rand.NextDouble() let message = sprintf "%f" data let body = Encoding.UTF8.GetBytes(message) printfn "publish : %s" message let routingKey = if int ((data * 10.)) % 2 = 0 then routingKeys.[0] else routingKeys.[1] channel.BasicPublish(exchange = exchange, routingKey = routingKey, basicProperties = null, body = body) Thread.Sleep(500) }
let consumer id hostname exchange routingKey (token: CancellationTokenSource) = async { let factory = ConnectionFactory(HostName = hostname) use connection = factory.CreateConnection() use channel = connection.CreateModel() channel.ExchangeDeclare(exchange = exchange, ``type`` = ExchangeType.Direct)
let consumer = EventingBasicConsumer(channel) consumer.Received.AddHandler(new EventHandler<BasicDeliverEventArgs>(fun sender (data:BasicDeliverEventArgs) -> let body = data.Body let message = Encoding.UTF8.GetString(body) printfn "consumed [%s]: %s" id message))
The Topic exchange type provides a mechanism for consumers to apply advanced filtering to the messages they process. In this example, the same even/odd producer is used. The consumers use routing keys that support wildcards. all.* will process messages that start with ‘all’, while *.odd will process messages ending in ‘odd’. This functionality allows you to setup topics in such a fashion to provided more advanced consumer processing.
let producer hostname exchange (routingKeys:string list) (token: CancellationTokenSource) = async { let factory = ConnectionFactory(HostName = hostname) use connection = factory.CreateConnection() use channel = connection.CreateModel() channel.ExchangeDeclare(exchange = exchange, ``type`` = ExchangeType.Topic, durable = false, autoDelete = false, arguments = null)
let rand = Random() while not token.IsCancellationRequested do let data = rand.NextDouble() let message = sprintf "%f" data let body = Encoding.UTF8.GetBytes(message) printfn "publish: %s" message let routingKey = if int ((data * 10.)) % 2 = 0 then routingKeys.[0] else routingKeys.[1] channel.BasicPublish(exchange = exchange, routingKey = routingKey, basicProperties = null, body = body) Thread.Sleep(500) }
let consumer id hostname exchange routingKey (token: CancellationTokenSource) = async { let factory = ConnectionFactory(HostName = hostname) use connection = factory.CreateConnection() use channel = connection.CreateModel() channel.ExchangeDeclare(exchange = exchange, ``type`` = ExchangeType.Topic) //, durable = false, autoDelete = false, arguments = null)
let consumer = EventingBasicConsumer(channel) consumer.Received.AddHandler(new EventHandler<BasicDeliverEventArgs>(fun sender (data:BasicDeliverEventArgs) -> let body = data.Body let message = Encoding.UTF8.GetString(body) printfn "consumed [%s]: %A" id message))
The below results show consumers pulling their appropriately filtered messages. Even messages are processed by consumers 0, 2, and 3. Odd messages are processed by consumers 1 and 2.