Clojure 实现 go 语言并发的一些例子
Table of Contents
辅助函数
chan-seq
将 channel 变成一个 lazy
sequence,每试图取一个值,会阻塞 channel 以等待
1。
(defn chan-seq [ch]
(lazy-seq
(if-some [v (<!! ch)]
(cons v (chan-seq ch))
[])))
另外,后续代码假定引入了以下依赖:
(require '[clojure.core.async
:as async
:refer [<! <!! >! alt! chan close! go go-loop timeout]])
ping-pong
该函数创建一个在给定 table 上打乒乓的 player2。
(defn player
[name table]
(go-loop []
(when-let [ball (<! table)]
(let [{:as ball :keys [hits]} (update ball :hits inc)]
(println name hits)
(<! (timeout 500))
(>! table ball))
(recur))))
下面代码新建一个 table,将两个 player 放到该球桌上。
(let [table (chan)]
(go (player "ping" table))
(go (player "pong" table))
(<!! (go
(>! table {:hits 0})
(<! (timeout 2000))
(close! table))))
;; pong 1
;; ping 2
;; pong 3
;; ping 4
Pipeline
下面复刻了引文3中的两个 pipeline 函数。
(defn gen
[start end]
(let [out (chan 1)]
(go []
(doseq [i (range start end)]
(>! out i))
(close! out))
out))
(defn sq
[in]
(let [out (chan)]
(go-loop []
(if-let [n (<! in)]
(do (>! out (* n n))
(recur))
(close! out)))
out))
执行一个简单的 pipeline:
(doall
(chan-seq
(sq
(gen 0 4)))) ; => (0 1 4 9)
使用 Clojure 的 Threading macro 更为直观
(->> (gen 0 4) sq chan-seq doall) ; => (0 1 4 9)
(->> (gen 0 4) sq sq chan-seq doall) ; => (0 1 16 81)
Fan-out, fan-in 也很直接。 由于 c1
和
c2
两个 pipeline 并发执行,
最终执行结果的顺序每次可能不同。
(let [in (gen 0 4)
c1 (sq in)
c2 (sq in)]
(->> (async/merge [c1 c2]) chan-seq doall)) ; => (1 0 4 9)
Pipeline 的退出
和 go 语言中类似,Clojure 中对于 channel 的关闭
close!
也可认为被当作一个发送给所有监听该 channel
的信号,所有监听者都将收到一个 nil
值(注意,Clojure
core.async
中不允许向 channel 发送 nil)。
(defn gen'
[done start end]
(let [out (chan 1)]
(go []
(doseq [i (range start end)
:while (alt!
done nil
[[out i]] true)])
(close! out))
out))
(defn sq'
[done in]
(let [out (chan)]
(go-loop []
(if (when-let [n (alt! [done in]
([v _ch] v))]
(alt! done nil
[[out (* n n)]] true))
(recur)
(close! out)))
out))
为这些函数添加了一个 done channel 用于取消 pipeline 函数的执行,一个简单的 pipeline 现在如下:
(let [done (chan)]
(->> (gen' done 0 4) (sq' done) (sq' done) chan-seq)) ; => (0 1 16 81)
现在可以中途取消
(let [done (chan)
;; 注意这边 gen 10 个值
s (->> (gen' done 0 10) (sq' done) (sq' done) chan-seq)
;; 取 4 个值
s1 (doall (take 4 s))
;; 取消执行
_ (close! done)
;; 取所有值,此时也总共只有 4 个值
s2 (doall s)]
[s1 s2]) ; => [(0 1 16 81) (0 1 16 81)]
Fan-in, fan-out 也一样
(let [done (chan)
in (gen' done 0 10)
c1 (sq' done in)
c2 (sq' done in)
s (chan-seq (async/merge [c1 c2]))
s1 (doall (take 4 s))
_ (close! done)
s2 (doall s)]
[s1 s2]) ; => [(1 0 9 4) (1 0 9 4)]