Today I’ll continue the process of building an MQTT server with F#. The basic networking capabilities I put together last time will be expanded upon to support basic MQTT functionality, specifically the MQTT Connect message.
In my previous post I setup a simple F# server listening on port 1883, but there was no MQTT functionality. Today it’s time to start wiring in some real functionality. Handling a Connect message seems like a logical place to start, so that’s where I’ll begin. Before getting into the code, I want to take a quick look at the MQTT protocol. I’m going to gloss over some details, but the MQTT Spec is always there for details if you want to see more.
A quick description of the client and server interaction is in order. The connection handshake is pretty basic. The client sends a connect request. Assuming all is well with the request (supported version, successful authentication, etc), the server then sends a connection ack back to the client. At this point the connection is established, and the client sends subscribe and publish messages as appropriate. The server can now send messages to the client that meets it’s subscription criteria. What do MQTT packets look like? At a high level, a packet is defined by three components.
- Fixed Header - A common header across all packet types. Byte 1 contains the packet’s type and standard flags. Byte 2+ contains the packet’s length, to aid in parsing.
- Variable Header - This is a packet-specific metadata header. As a result, the content and length of this will vary depending on the packet-type in question.
- Payload - Here is the actual data. As a side note, not all packet types have a payload.
As eluded to, there are different types of messages that the server supports. Each packet type having it’s own packet-specific variable header and payload structure. The ones I’ll concern myself with today are Connect
(client -> server message) and ConnAck
(server -> client message). What does this look like when modeling MQTT packets in F#? The structure of the Connect packet is taken from the spec, and is pretty straight forward.
1 | /// Mqtt Packet type |
Now that the Connect packet is defined, it’s time to parse the byte stream. I handle this in two steps. First, determine what the packet type is by peeking at the first byte. Second, I pass the stream to the appropriate parser based on the type. The packet type is defined in the top half of the first byte of the MQTT packet. So I need to bitshift and then match to determine the packet type. Once I have that I parse a packet. Successful parsing results in a RequestPacket option, otherwise I just return None. The option type allows the calling control flow to easily process data based on parsing results.
1 | /// Given a byte (first byte from fixed header), determine the packet type |
There are a couple ways I could’ve handled the parsing. My approach is to leverage the Span provided to the parsing function. This means I need to track current position as I parse byte by byte through the packet. There is also a common pattern I’ll follow. It is often, but not always, the case that the variable header parsing will depending on values in the fixed header, and the payload will depend on both headers. I’ll use a pattern of always passing in the previously parsed headers as well as current index, in case the current parsing step requires data. This means sometimes passing things I don’t need, but it reduces mental overhead to just always pass along all headers. This is something that often gets overlooked in design. Unless you need to squeeze every ounce of performance out of a system, designing for easing mental load is an important goal to strive for.
1 | /// Parse buffer for a Connect packet |
This has been mostly control flow until now. Now it’s time to dig into the parsing. First is the fixed header. Based on the spec it is at least two bytes. The first byte contains the packet type, and no flags. The rest of the byte(s) are the remaining length of the packet. To accommodate variable length packets (and save space) the length is encoded in a variable length field. When in F# I often find it tempting to rewrite algorithms in a more functional style. But there also isn’t a reason to make it harder for myself than I need to. Direct translation of the provided spec is the easiest path forward. Things like mutuable
are a code smell. In this case it’s a reasonable trade-off to verify that I’m matching the spec’s algorithm.
1 | /// Extract the packet's remaining length for the fixed header |
The variable header contains a bit more parsing to deal with. Specifically it contains four fields: Protocol Name, Level, Flags, and KeepAlive. These are mostly boolean flags that I extract with bitshifts and ANDing. Level is an int for supported MQTT version. The Name field is a string. The MQTT protocol follows a common pattern when storing strings; a two byte length field followed by the UTF8 encoded string of the specified length. This shows up a lot throughout the parsing process.
1 | /// Extract variable header (Connect) from buffer |
Finally, the payload. For a connect packet, what is in the payload is determined by the flags in the variable header. So here is mostly conditional extraction of values, which means most of the payload values are Option types. As an example, if the UserName
flag was set in the header, then a UserName is expected in the payload.
1 | /// Extract a byte array from a header or payload. |
Once a Connect packet is parsed, it needs to be processed. Like the parsing code, right now the example only includes Connect. This will be expanded over future posts. The control flow is again just matching against type. Per the spec, I disconnect from the client if I get invalid data. This could either be due to failed parsing, or just an unsupported type.
1 | /// Process an mqtt request packet |
Before I can fully process a Connect packet, I need to provide some supporting functions and types. Specifically I need to define a response packet for the client in the way of ConnAck
. Like the request packets, it is compromised of a fixed header and variable header (ConnAck doesn’t have a payload, but many response packets do). Unlike request packets, I need to handle serialization for when sending to the client. This is easy enough to accomplish with a Serialize
function attached to the type. This a direct serialization to bytes. Serialization can get a little hairy, but really at this point the toughest parts to deal with have been the byte parsing and the serialization. Thankfully everything else has been the “easy stuff”.
1 | /// ConnAck response packet - Fixed header |
With that out of the way, I can look at the Connect packet processing. The connection handshake is straight-forward. I can perform client authentication and prepare the ConnAck packet. Once complete, the final steps are sending the response back to the client and updating system state. That is handled with the state.Post
call. I’m going to hand wave over this part for now since the underlying implementation involved. Just trust me for now that the serialized packet will get back to the client.
1 | /// Process a Connection packet |
At this point I’ve covered parsing of a Connect packet as well as crafting a response packet. After working with the byte-level details, I need to tie things all together. There is a small detail I glossed over when discussing the original handler from the last post. There are a couple dynamics regarding packets and the provided buffer data. MQTT is often used for lots of short messages, but it is entirely within the spec that they could also be longer than the Kestrel-provided segment length. So although Kestrel handles a lot of the networking dynamics I’m not completely absolved of dealing with at least some of it. I need a way to deal with multiple packets in a memory segment or a packet being split across segments. There are are couple ways to handle this. My preference is to keep the parsing code as clean as possible, which means I don’t want to add the complexity of multiple segments there. My approach is to write my own segment buffer abstraction, called SegmentBuffer
. This will store multiple segments as the arrive and I’ll write my own Item
, Slice
, and Length
methods to hit the common use-cases. This abstraction incurs a small performance cost, but makes the parsing so much easier to process.
With that out of the way I need to rework the handler by removing the echo placeholder. I modify the handler to aggregate the buffer to my abstraction. Once in a manageable structure, I can do a quick peek at the next packet. Now I can grab packet-length blocks from the buffer and pass them to the parser. Doing it this way makes the parsing easier. Considering parsing is often the messy part, it pays to simplify it as much as possible. Once parsed, I pass the packet to be processed. This is done as long as their are complete packets in the SegmentBuffer to be processed.
1 | /// Mqtt handler - using segmentbuffer |
In the echo server example client communication was simple; read from client, write to client. As expected, MQTT is more involved. The server must maintain the internal state to support multiple clients, their topic subscriptions, as well as the ability to publish messages to clients based on their subscriptions. For this I’ll use a central control function to manage these interactions, which I’ll call a StateManager. It’s interface will be supplied by a MailboxProcessor. This provides me asyncronous processing capability without having to worry about data or lock contention. From the main control interface, I’ll create a function for each supported MQTT message type. This allows me to centralize all state management as well as handling proper message sending to all clients. Taking a deeper look at state management requirements, I need a way to track connected clients and their subscriptions. I also need to track active messages (messages that are published, and possibly retained). This domain modeling is simple enough to do with F# records. Additionally, I have moved the context.Transport.Output.WriteAsync()
functionality into the state manager, specifically a writeMessage
function. I typically like to keep the read and write to clients together in the main handler, but it just isn’t a reasonable option in this case. Interacting with multiple clients based on a single client’s message requires this to be buried a layer deeper in the code. But at least I can keep all client interactions in a single place. This ends up being an overall win, and the code is still reasonably clean.
1 | module MqttState |
Now that the supporting structure is implemented, I can directly address how I handle connects. When a Connect message is processed, it is ultimately ends up in the stateConnect
function of the StateManager (remember the hand-waving from earlier?). The server needs to keep track of clients. so a ConnectedClient
record is created and added to the client tracking map. It also sends a ConnAck
message back to client, letting them know they are connected. This completes the connection handshake. I’m including the client Disconnect
processing as well. This is partially because it is a nice symmetry with Connect, but also it gets woven throughout the code to handle bad messages (through parsing and processing). The typical response to bad things in MQTT is just disconnect the client, so it is just natural to include it immediately. Fwiw, there are specifically client Disconnect message as well. So, here the opposite of Connect happens, remove the client from the ConnectedClients
map.
1 | /// Handle client connect |
Once the state manager is fully defined, it gets added to the MqttConnectionHandler
class. You may have noticed earlier, it gets passed into the packet processing function. This singleton is what allows me to have a shared server state that can be easily managed.
1 | /// mqtt server state |
At this point the server can handle a Connect
message from the client. This includes adding the client to it’s internal state handling as well as responding with the proper ConnAck
message. Now is a good time to see this in action. From the server side, I can see the client network connection as well as reception and consumption of it’s MQTT Connect packet. Upon proper connection it sends a ConnAck back to the client. From the client side, it sends the Connect and receives a ConnAck. Great! This is the short-term goal, and I’d say mission accomplished. This is also a good time to point out that my test client is prepped for a fully functional interaction, so after connection it attempts to subscribe to a topic. Obviously this hasn’t been implemented yet, so the server catches the unsupported packet error and disconnects the client. I guess I know what I need to work on next.
1 | # Server |
1 | # Client |
At this point I have the server supporting very basic MQTT functionality, specifically Connect messages. As is often the case, the beginning of a project often includes a lot of setup and yak shaving before I can get to the ultimate, or even preliminary, goal. But this process is also informative. It is a view into how system state and basic MQTT types can be modeled in F#. I’m probably a little biased, but F#’s ability to model proper state and structure is a definite asset, and makes refactoring through this process a breeze. With my goal met, this is a good stopping point. I hope you found this post informative. Next time I’ll venture further into the MQTT spec and tackle Subcribe and Publish.