Today I continue the MQTT server series by expanding capabilities to include Publish and Subscribe packets. This will build upon the previous F#-based server foundation.
Looking back at the series, I previously built basic networking and Connect functionality into an MQTT server. Just being able to perform a connection handshake is nice, but not real useful. This means it is time to take the next step to support core MQTT functionality, namely the ability to subscribe to topics and publish messages. As always, it is time to turn to the MQTT Spec. Once connected, a client is permitted to perform several actions. The two I’m interested in today are subscribe to topics, and publish messages. An MQTT publish message consists of two parts, a topic and a message. The topic is how different types of messages are grouped and filtered through the system. A system may have topics like ‘/lights/status’ or ‘/weather-station/1/temperature’. The messages can be in any format, but processed through MQTT as bytes. It is up to the MQTT user to define the format of topics and messages. One or more clients subscribe to particular topics. Additionally, clients publish messages, with an attached topic. The MQTT server then forwards messages onto clients based on their particular subscriptions. It is a basic pub/sub model, the important note here is that subscriptions are handled at a topic level.
There are a couple things I need to handle: parsing new MQTT packets, then processing those packets. But first I need to define the types. F# again supports this process well, as types drive everything. Since my immediate focus is on publishing packets. I need to be able to handle a Publish packet. I also need to be able build a PubAck packet to send back to the publishing client. The server also needs to be able to send a Publish packet to other clients. These types are modeled from the spec, so it is pretty direct to put these together. Since the server will be sending messages out, I need to support serialization of these types as well.
With the types in place, it is time to dig into the parsing. Since I’m adding a new type I need to update the main control function. Beyond that, the fixed header is simple to parse. In fact, most of the fixed header parsing is very similar. The only real difference between them is the required flags per message type. Here, I need to extract Dup, QoS, and Retain flags; simply done with some bit-shifts and ANDing. The message’s topic is stored in the variable header, so I need to do the standard string extraction technique that I used before. Messages also have an associated Id, also in the variable header. The payload, or message, is just an array of bytes. With MQTT, it is up to the clients to determine the payload encoding method. This is nice, I’ve used this flexibility where messages are sometimes encoded UTF-8 strings, other times they are raw byte messages off of a device, or even images.
/// Parse buffer for a Publish packet let parsePublishPacket (buffer: ReadOnlySpan<byte>) :RequestPacket option = let position = 0 let (fixedHeader, position) = parsePublishFixedHeader buffer let (variableHeader, position) = parsePublishVariableHeader buffer fixedHeader position let (payload, position) = parsePublishPayload buffer fixedHeader variableHeader position
/// Extract payload (Publish) from buffer let parsePublishPayload (buffer: ReadOnlySpan<byte>) (fixedHeader: FixedPublishHeader) (variableHeader: VariablePublishHeader) (position: int) : (PublishPayload * int) = let dataLength = fixedHeader.RemainingLength - variableHeader.Size
let data = buffer.Slice(position, dataLength).ToArray()
let position = position + dataLength
({ PublishPayload.Message = data }, position)
At this point the packet is parsed, and needs to be processed. Here a couple things need to happen. The server needs to send a PubAck packet back to the publishing client. It also needs to interrogate subscriptions and forward the message onto the appropriate clients. Additionally, the server needs to retain messages for future publishing, if the message is flagged as such. The first step in all of this is construction of the appropriate packets. Once done, they are passed into the state processor, where the filtering and sending are done.
/// Process an mqtt request packet let processPacket (state: StateManager) (context: ConnectionContext) (packet: RequestPacket option) = match packet with | Some(RequestPacket.Connect (p)) -> processConnect state context p | Some(RequestPacket.Publish (p)) -> processPublish state context p | Some(RequestPacket.Disconnect (p)) -> processDisconnect state context | None -> printfn "ERROR: Invalid packet data. Disconnecting..." processDisconnect state context
/// Process a Publish packet let processPublish (state: StateManager) (context: ConnectionContext) (packet: PublishPacket) :ProcessResult option = // Build pubaack packet let ackVariableHeader = { PubAckVariableHeader.PacketIdentifier = packet.VariableHeader.PacketIdentifier } let variableHeader' = ackVariableHeader.Serialize()
Just like previous posts, the state manager controls message sending and tracking state. First it sends a PubAck back to the client. After that it needs to do subscription filtering and sends. The filtering happens based on text matching as defined in the spec. It supports exact string matches as well as wildcarding used the # and * characters. Once the destination clients are determined it is a matter of sending. This can be done in parallel, which can be easily accomplished using Async.Parallel. I realize this isn’t truly unique of F#, but I love how easy the semantics are to parallelize types of workloads. This is definitely a win in the F# column. To ensure proper QoS, it needs to track messages sent to clients. This way the server can resend lost messages if appropriate, etc.
let state' = { state with ActiveMessages = activeMessage :: state.ActiveMessages } return state' }
That wasn’t so bad. Now the server can handle Publish messages. This means I need to handle the other side of the equation, subscriptions. Publishing doesn’t mean much if no one is listening. For this interaction, the server needs to be able to handle a Subscribe message as well as return a SubAck message to the client. Like before, I start with the types. Again, I need to handle serialization so I can send these messages to clients.
This almost feels repetitive, but now parse the incoming packets. Like the publish, there is a packet Id in the variable header for tracking. A subscription message can contain multiple subscriptions. As a result, the message is a list of topics along with their desired QoS.
/// Extract a packet from the buffer let parseBufferSegment (buffer: ReadOnlyMemory<byte>) :RequestPacket option = ... match packetType with | PacketType.Connect -> parseConnectPacket buffer' | PacketType.Publish -> parsePublishPacket buffer' | PacketType.Subscribe -> parseSubscribePacket buffer' | _ -> printfn "ERROR: Unsupported packet type: %A" packetType None ...
/// Parse buffer for a Subscribe packet let parseSubscribePacket (buffer: ReadOnlySpan<byte>) :RequestPacket option = let position = 0 let (fixedHeader, position) = parseSubscribeFixedHeader buffer let (variableHeader, position) = parseSubscribeVariableHeader buffer fixedHeader position let (payload, position) = parseSubscribePayload buffer fixedHeader variableHeader position
Once parsed, we’re back to processing. The match expression is growing as I add functionality. In this case the server needs to send a SubAck back to the client. Then it can pass the subscriptions onto the stateManager for tracking. This is a simple task of just updating the specified client’s list of subscriptions. If you remember from above this is how the Publish determines who sees the messages. When broken down into it’s components, there isn’t much interesting here to see, but again, F# makes this process clean.
let processPacket (state: StateManager) (context: ConnectionContext) (packet: RequestPacket option) = match packet with | Some(RequestPacket.Connect (p)) -> processConnect state context p | Some(RequestPacket.Publish (p)) -> processPublish state context p | Some(RequestPacket.PubComp (p)) -> processPubComp state context p | Some(RequestPacket.Subscribe (p)) -> processSubscribe state context p | Some(RequestPacket.Disconnect (p)) -> processDisconnect state context | None -> printfn "ERROR: Invalid packet data. Disconnecting..." processDisconnect state context
/// Process a Subscribe packet let processSubscribe (state: StateManager) (context: ConnectionContext) (packet: SubscribePacket) : ProcessResult option = let variableHeader = { SubAckVariableHeader.PacketIdentifier = packet.VariableHeader.PacketIdentifier } let variableHeader' = variableHeader.Serialize()
let ackPayload = { SubAckPayload.Qoses = packet.Payload.Filters |> Array.map (fun x -> x.Qos) } let payload' = ackPayload.Serialize()
// Add subscription filter let client = Map.tryFind connection.ConnectionId state.ConnectedClients
let state' = match client with | Some (c) -> let subscriptions' = Array.concat [| subscriptions; c.Subscriptions |] let client' = { c with Subscriptions = subscriptions' }
{ state with ConnectedClients = clients' } | None -> state
return state' }
Now that all the parts in place, it’s time to see it in action. I fire up the new server and run my trusty client script. The goal here is to connect, subscribe to a topic, publish to that topic, and see a result on the client side. Below you can see that is exactly what happens. On the server side you can track the messages as they arrive. The state debugging shows connected clients as well as their subscriptions. I also note what messages the server is sending back to the client. Once the client has sent it’s message it disconnects, thus causing the server to remove it from it’s state tracking. On the client side we get confirmation of this process, after publishing a message I can see the result come back through the appropriate channels. Not shown here, but I did several tests with multiple clients, topics, and subscriptions. All appears to work as intended.
At this point, I’ve completed the primary loop of MQTT functionality. Clients can connect, publish, and subscribe to my MQTT server. I’d say that’s pretty cool. This is by no means complete, there are still many smaller supporting bits to add, but this gets it to a point where it is easy to iteratively improve. Adding Subscribe and Publish weren’t particularly difficult once I had the underlying foundation in place, including types defining the system. I think this is another good example of how F# can be a valuable tool when writing servers. A solid underlying structure and a productive language can go a long way to ensure a properly functioning system. I hope you enjoyed this little adventure and found it useful.