Home Resources

Clojure 实现 go 语言并发的一些例子

Table of Contents

辅助函数

chan-seq 将 channel 变成一个 lazy sequence,每试图取一个值,会阻塞 channel 以等待 1

(defn chan-seq [ch]
  (lazy-seq
   (if-some [v (<!! ch)]
     (cons v (chan-seq ch))
     [])))

另外,后续代码假定引入了以下依赖:

(require '[clojure.core.async
           :as async
           :refer [<! <!! >! alt! chan close! go go-loop timeout]])

ping-pong

该函数创建一个在给定 table 上打乒乓的 player2

(defn player
  [name table]
  (go-loop []
    (when-let [ball (<! table)]
      (let [{:as ball :keys [hits]} (update ball :hits inc)]
        (println name hits)
        (<! (timeout 500))
        (>! table ball))
      (recur))))

下面代码新建一个 table,将两个 player 放到该球桌上。

(let [table (chan)]
  (go (player "ping" table))
  (go (player "pong" table))
  (<!! (go
         (>! table {:hits 0})
         (<! (timeout 2000))
         (close! table))))

;; pong 1
;; ping 2
;; pong 3
;; ping 4

Pipeline

下面复刻了引文3中的两个 pipeline 函数。

(defn gen
  [start end]
  (let [out (chan 1)]
    (go []
        (doseq [i (range start end)]
          (>! out i))
        (close! out))
    out))

(defn sq
  [in]
  (let [out (chan)]
    (go-loop []
      (if-let [n (<! in)]
        (do (>! out (* n n))
            (recur))
        (close! out)))
    out))

执行一个简单的 pipeline:

(doall
 (chan-seq
  (sq
   (gen 0 4))))                         ; => (0 1 4 9)

使用 Clojure 的 Threading macro 更为直观

(->> (gen 0 4) sq chan-seq doall)       ; => (0 1 4 9)
(->> (gen 0 4) sq sq chan-seq doall)    ; => (0 1 16 81)

Fan-out, fan-in 也很直接。 由于 c1c2 两个 pipeline 并发执行, 最终执行结果的顺序每次可能不同。

(let [in (gen 0 4)
      c1 (sq in)
      c2 (sq in)]
    (->> (async/merge [c1 c2]) chan-seq doall)) ; => (1 0 4 9)

Pipeline 的退出

和 go 语言中类似,Clojure 中对于 channel 的关闭 close! 也可认为被当作一个发送给所有监听该 channel 的信号,所有监听者都将收到一个 nil 值(注意,Clojure core.async 中不允许向 channel 发送 nil)。

(defn gen'
  [done start end]
  (let [out (chan 1)]
    (go []
        (doseq [i (range start end)
                :while (alt!
                         done      nil
                         [[out i]] true)])
        (close! out))
    out))

(defn sq'
  [done in]
  (let [out (chan)]
    (go-loop []
      (if (when-let [n (alt! [done in]
                             ([v _ch] v))]
            (alt! done            nil
                  [[out (* n n)]] true))
        (recur)
        (close! out)))
    out))

为这些函数添加了一个 done channel 用于取消 pipeline 函数的执行,一个简单的 pipeline 现在如下:

(let [done (chan)]
  (->> (gen' done 0 4) (sq' done) (sq' done) chan-seq)) ; => (0 1 16 81)

现在可以中途取消

(let [done (chan)
      ;; 注意这边 gen 10 个值
      s    (->> (gen' done 0 10) (sq' done) (sq' done) chan-seq)
      ;; 取 4 个值
      s1   (doall (take 4 s))
      ;; 取消执行
      _    (close! done)
      ;; 取所有值,此时也总共只有 4 个值
      s2   (doall s)]
  [s1 s2])                              ; => [(0 1 16 81) (0 1 16 81)]

Fan-in, fan-out 也一样

(let [done (chan)
        in (gen' done 0 10)
        c1 (sq' done in)
        c2 (sq' done in)
        s  (chan-seq (async/merge [c1 c2]))

        s1 (doall (take 4 s))
        _  (close! done)
        s2 (doall s)]
    [s1 s2])                            ; => [(1 0 9 4) (1 0 9 4)]

Footnotes:

Author: lotuc, Published at: 2023-04-02 Sun 00:00, Modified At: 2023-04-02 Sun 16:35 (orgmode - Publishing)