Introduction to xnfun (Cross Node Function)

xnfun is designed around the abstraction of node. Consider a node as the container of a group of functions, it manages local & remote function execution states; and nodes are inter-connected through transports.

Checkout the source code at lotuc/xnfun and view api documentation at xnfun.

We will go through two kinds of functions supported the rest of this article.

  • Unary RPC
  • Bidirectional RPC

Unary RPC

At present, we only implemented the MQTT transport. So take it as an example.

First, start a MQTT broker.

cat > mosquitto.conf <<EOF
allow_anonymous true
listener 1883

docker run --rm -it -v `pwd`/mosquitto.conf:/mosquitto/config/mosquitto.conf \
    -p 1883:1883 eclipse-mosquitto

# Now you've started a broker at 1883 which can be connected anonymously.

Next start a Clojure repl which loads the dependency.

cat > deps.edn <<EOF
{:deps {lotuc.xnfun/xnfun {:git/sha "1d0b91d316260bce3bd5cc04504421b9c48eac3a"
                           :git/url ""}}}


From the repl, we start two xnfun nodes and register some functions to each.

(require '[lotuc.xnfun.api :refer [start-node add-function call]])

(def n0 (start-node {:transport
                     {:xnfun/module 'xnfun.mqtt
                      :mqtt-topic-prefix ""
                      {:broker "tcp://"
                       :connect-options {:max-in-flight 1000
                                         :auto-reconnect true}}}}))
;; Actually, the mqtt at is the default transport.
;; We start another node directly.
(def n1 (start-node {}))

;; Register function "add" to n0
(add-function n0 "add" (fn [[a b]] (+ a b)))

;; Register function "sub" to n1
(add-function n1 "sub" (fn [[a b]] (- a b)))

Since node n0 and n1 are connected to the same MQTT transport, the two nodes are actually inter-connected. You can call to each other.

@(call n0 "sub" [4 2])
@(call n1 "add" [4 2])

;; and sure you can call to the function registered to the node itself
@(call n0 "add" [4 2])
@(call n1 "sub" [4 2])

Bidirectional RPC

This mode is like the gRPC Bidirectional streaming RPC.

Create a function that supports bidirectional call, now the function got a second argument, you can retieve two channels in-c and out-c. From in-c you can listen for caller’s message, and via out-c you can send message to caller. Check lotuc.xnfun.api/add-function for details.

(require '[clojure.core.async :refer [<! go-loop put!]])

(defn echo-server
  [arg {:keys [in-c out-c]
        {:as req-meta :keys [req-id hb-interval-ms]} :req-meta}]
  (let [p (promise)]
    ;; Keep the heartbeat.
    (future (loop []
              (put! out-c {:typ :xnfun/hb})
              (when (= :timeout (deref p hb-interval-ms :timeout))
    (go-loop []
      (when-let [{:keys [typ data] :as d} (<! in-c)]
        (case typ
          (deliver p :cancel) ; quit

          (do (put! out-c {:typ :xnfun/to-caller :data data})

          (deliver p {:unkown d}))))

Register the function to node, say n1

(add-function n1 "echo-server" echo-server)

Now, try call it from node n0

(require '[lotuc.xnfun.api :refer [call-function]])

(def r (call-function n0 "echo-server" "hello world"
                      {:req-meta {:timeout-ms 3600000
                                  :hb-interval-ms 3000
                                  :hb-lost-ratio 2}}))

;; retrieve channels from the response
(def in-c  (get-in r [:request :in-c]))
(def out-c (get-in r [:request :out-c]))
;; Get the promise waiting for function call result
(def resp (:res-promise r))

;; Setup a go-loop waiting for echo response
(go-loop [{:as v :keys [typ data]} (<! out-c)]
  (if v (do (when (= typ :xnfun/to-caller)
              (println "!!!" data))
            (recur (<! out-c)))
      (println "quit")))

;; Send message to callee
(put! in-c {:typ :xnfun/to-callee :data "hi!"})

;; Stop the function
(put! in-c {:typ :xnfun/cancel :data nil})

;; And check the function call result