lotuc.xnfun.core.transport-mqtt

MQTT implementation for lotuc.xnfun.core.transport/XNFunTransport.

We encode our transferring data into MQTT’s topic and payload.

  • create-send-data: Convert our data to topic and payload to be sent.
  • create-sub-data:: Convert our subscription to topic filter and a message adapter. The message adapter adapts the received payload & topic to the message sent and pass it to the handler function.

Topics:

  • Node heartbeat: <prefix>/<ver>/registry/hb/<node-id>
  • Request: <prefix>/<ver>/rpc/req/<callee-node-id>/<caller-node-id>/<request-id>
  • Response: <prefix>/<ver>/rpc/resp/<caller-node-id>/<request-id>/<callee-node-id>

The <ver> is now constantly v0. You can specify <prefix> when creating the transport with make-mqtt-transport.

create-send-data

multimethod

Convert the message to MQTT topic & payload.

Arguments: [node-id, {:as msg :keys [typ data}]

Returns: [topic, payload]

create-sub-data

multimethod

Create a topic filter and a message adapter for subscription.

The message adapter convert the MQTT topic & payload to the sent message for handler-fn’s usage.

Arguments: [node-id {:as subscription :keys [handle-fn]} typ]

  • node: current node

Returns: [topic-filter, message-adapter]

make-mqtt-transport

(make-mqtt-transport node-id {:as mqtt-transport, :keys [topic-prefix mqtt-config]})

Create the mqtt transport for node node-id using the given transport configuration mqtt-transport.

MQTT transport configuration mqtt-trans-port contains two parts:

  • topic-prefix: all message’s topics would be prefixed with it, consider it as a namespace.
  • mqtt-config: the mqtt connection configuration, it contains two part
    • :broker: Connection uri, ex. tcp://127.0.0.1:1883
    • :client-id (optional)
    • :connect-options: a map, all fields are optional:
      • :username (string)
      • :password (string or char array)
      • :auto-reconnect (bool)
      • :connection-timeout (int)
      • :clean-session (bool)
      • :keep-alive-interval (int)
      • :max-in-flight (int)
      • :will: {:keys [topic payload qos retain]}