Release Highlights
Session
Every client will start_link a session process, whether or not the client is persistent.
Client could resume a persistent session on other clustered node.
Session State in the broker consists of:
- The Client’s subscriptions.
- inflight qos1/2 messages sent to the client but unacked, QoS 1 and QoS 2 messages which have been sent to the Client, but have not been completely acknowledged.
- inflight qos2 messages received from client and waiting for pubrel. QoS 2 messages which have been received from the Client, but have not been completely acknowledged.
- all qos1, qos2 messages published to when client is disconnected. QoS 1 and QoS 2 messages pending transmission to the Client.
- Optionally, QoS 0 messages pending transmission to the Client.
MQueue and Inflight Window
Each session has a simple in-memory message queue.
Notice that MQTT is not an enterprise messaging queue. MQTT assume that client
should be online in most of the time.
This module implements a simple in-memory queue for MQTT persistent session.
If the broker restarted or crashed, all the messages queued will be gone.
Desgin of The Queue:
|<----------------- Max Len ----------------->|
-----------------------------------------------
IN -> | Pending Messages | Inflight Window | -> Out
-----------------------------------------------
|<--- Win Size --->|
- Inflight Window to store the messages awaiting for ack.
- IN messages when the session is offline, or inflight window is full.
- If the queue is full, dropped qos0 messages if store_qos0 is true, otherwise dropped the oldest pending one.
Hooks
Name | Type | Description |
---|---|---|
client.connected | foreach | Run when client connected successfully |
client.subscribe | foldl | Run when client subscribe topics |
client.unsubscribe | foldl | Run when client unsubscribe topics |
message.publish | foldl | Run when message is published |
message.acked | foldl | Run when message is acked |
client.disconnected | foreach | Run when client is disconnnected |
Global Unique Message ID
End-to-End Message Route:
PktId <-- --> MsgId <-- --> MsgId <-- --> PktId
|<--- Qos --->|<---PubSub--->|<-- Qos -->|
Global unique id for mqtt message:
--------------------------------------------------------
| Timestamp | NodeID + PID | Sequence |
|<------- 64bits ------->|<--- 48bits --->|<- 16bits ->|
--------------------------------------------------------
- Timestamp: erlang:system_time if Erlang >= R18, otherwise os:timestamp
- NodeId: encode node() to 2 bytes integer
- Pid: encode pid to 4 bytes integer
- Sequence: 2 bytes sequence in one process
Protocol Compliant
MQTT v3.1.1 protocol specification:
4.4 Message delivery retry(#166)
4.6 Message ordering(#167)
Alarm Management
Add emqttd_alarm module and publish json format alarms to '$SYS/brokers/+/alarms/#' topics.
Project Structure
Merge emqtt and emqttd apps, and change the project structure. You could embed the emqttd broker into your project now.