Clojure 实现 go 语言并发的一些例子
Table of Contents
辅助函数
chan-seq 将 channel 变成一个 lazy
sequence,每试图取一个值,会阻塞 channel 以等待
1。
(defn chan-seq [ch](lazy-seq(if-some [v (<!! ch)](cons v (chan-seq ch))[])))
另外,后续代码假定引入了以下依赖:
(require '[clojure.core.async:as async:refer [<! <!! >! alt! chan close! go go-loop timeout]])
ping-pong
该函数创建一个在给定 table 上打乒乓的 player2。
(defn player[name table](go-loop [](when-let [ball (<! table)](let [{:as ball :keys [hits]} (update ball :hits inc)](println name hits)(<! (timeout 500))(>! table ball))(recur))))
下面代码新建一个 table,将两个 player 放到该球桌上。
(let [table (chan)](go (player "ping" table))(go (player "pong" table))(<!! (go(>! table {:hits 0})(<! (timeout 2000))(close! table))));; pong 1;; ping 2;; pong 3;; ping 4
Pipeline
下面复刻了引文3中的两个 pipeline 函数。
(defn gen[start end](let [out (chan 1)](go [](doseq [i (range start end)](>! out i))(close! out))out))(defn sq[in](let [out (chan)](go-loop [](if-let [n (<! in)](do (>! out (* n n))(recur))(close! out)))out))
执行一个简单的 pipeline:
(doall(chan-seq(sq(gen 0 4)))) ; => (0 1 4 9)
使用 Clojure 的 Threading macro 更为直观
(->> (gen 0 4) sq chan-seq doall) ; => (0 1 4 9)(->> (gen 0 4) sq sq chan-seq doall) ; => (0 1 16 81)
Fan-out, fan-in 也很直接。 由于 c1 和
c2 两个 pipeline 并发执行,
最终执行结果的顺序每次可能不同。
(let [in (gen 0 4)c1 (sq in)c2 (sq in)](->> (async/merge [c1 c2]) chan-seq doall)) ; => (1 0 4 9)
Pipeline 的退出
和 go 语言中类似,Clojure 中对于 channel 的关闭
close! 也可认为被当作一个发送给所有监听该 channel
的信号,所有监听者都将收到一个 nil 值(注意,Clojure
core.async 中不允许向 channel 发送 nil)。
(defn gen'[done start end](let [out (chan 1)](go [](doseq [i (range start end):while (alt!done nil[[out i]] true)])(close! out))out))(defn sq'[done in](let [out (chan)](go-loop [](if (when-let [n (alt! [done in]([v _ch] v))](alt! done nil[[out (* n n)]] true))(recur)(close! out)))out))
为这些函数添加了一个 done channel 用于取消 pipeline 函数的执行,一个简单的 pipeline 现在如下:
(let [done (chan)](->> (gen' done 0 4) (sq' done) (sq' done) chan-seq)) ; => (0 1 16 81)
现在可以中途取消
(let [done (chan);; 注意这边 gen 10 个值s (->> (gen' done 0 10) (sq' done) (sq' done) chan-seq);; 取 4 个值s1 (doall (take 4 s));; 取消执行_ (close! done);; 取所有值,此时也总共只有 4 个值s2 (doall s)][s1 s2]) ; => [(0 1 16 81) (0 1 16 81)]
Fan-in, fan-out 也一样
(let [done (chan)in (gen' done 0 10)c1 (sq' done in)c2 (sq' done in)s (chan-seq (async/merge [c1 c2]))s1 (doall (take 4 s))_ (close! done)s2 (doall s)][s1 s2]) ; => [(1 0 9 4) (1 0 9 4)]