intro to clojure's core.async

38
Intro to core.async #cljsyd, July 2013 Leonardo Borges @leonardo_borges www.leonardoborges.com www.thoughtworks.com Tuesday, 13 August 13

Upload: leonardo-borges

Post on 05-Dec-2014

7.496 views

Category:

Technology


1 download

DESCRIPTION

Presentation given at the Sydney Clojure User Group in July 2013

TRANSCRIPT

Page 1: Intro to Clojure's core.async

Intro to core.async#cljsyd, July 2013

Leonardo Borges@leonardo_borgeswww.leonardoborges.comwww.thoughtworks.com

Tuesday, 13 August 13

Page 2: Intro to Clojure's core.async

Background

• Nothing new• Based on Communicating Sequential Processes (CSP)• CSP was first described by Tony Hoare in 1978• You probably heard about it from the Go community• They love their channels and goroutines

Tuesday, 13 August 13

Page 3: Intro to Clojure's core.async

goroutines: lightweight processes

// doing some stuff...go myFunction("argument") //does stuff in the background...//continuing about my business...

kinda look like futures in this case.... but there’s more to it

Tuesday, 13 August 13

Page 4: Intro to Clojure's core.async

lightweight?

• goroutines don’t map 1-1 to threads• They get their own thread pool (number of cores + 2 in Clojure, uses the event loop in Clojurescript)• The runtime takes care of multiplexing them• Easy win due to language support

Tuesday, 13 August 13

Page 5: Intro to Clojure's core.async

Why?

• Looking for more ways to be efficient and achieve concurrency• A thread per client model can get expensive quickly• Threads spend most of their time waiting for things to happen• Put this idle time to good use!

Tuesday, 13 August 13

Page 6: Intro to Clojure's core.async

But goroutines aren’t terribly interesting on their own.They’re just the beginning.

Tuesday, 13 August 13

Page 7: Intro to Clojure's core.async

Channels

• Allow goroutines to talk to each other• First-class citizens• Can be thought of as concurrent blocking queues

Tuesday, 13 August 13

Page 8: Intro to Clojure's core.async

Channels

c := make(chan string)go func() { time.Sleep(time.Duration(5000) * time.Millisecond) c <- "Leo"}()fmt.Printf("Hello: %s\n", <-c) //this will block until the channel has something to give us

Tuesday, 13 August 13

Page 9: Intro to Clojure's core.async

But what about Clojure?Patience, young padawan, we’ll get there...

Tuesday, 13 August 13

Page 10: Intro to Clojure's core.async

Example 1

• We wish to implement a search service which is itself dependent on 3 other search services: web, images and video• Each individual service has unpredictable performance• Also, clients shouldn’t need to wait for slow services• Stolen from Rob Pike’s presentation, “Go Concurrency Patterns”[1]

[1] http://bit.ly/go-concurrency-patternsTuesday, 13 August 13

Page 11: Intro to Clojure's core.async

Example 1Video Service Image Service Web Service

Search service

Client

Tuesday, 13 August 13

Page 12: Intro to Clojure's core.async

Example 1: the servicevar ( Web = fakeSearch("web") Image = fakeSearch("image") Video = fakeSearch("video"))

type Search func(query string) Result

func fakeSearch(kind string) Search { return func(query string) Result { time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) return Result(fmt.Sprintf("%s result for %q\n", kind, query)) }}

Tuesday, 13 August 13

Page 13: Intro to Clojure's core.async

Example 1: the clientc := make(chan Result)go func() { c <- Web(query) } ()go func() { c <- Image(query) } ()go func() { c <- Video(query) } ()

timeout := time.After(80 * time.Millisecond)for i := 0; i < 3; i++ { select { case result := <-c: results = append(results, result) case <-timeout: fmt.Println("timed out") return }}return

Tuesday, 13 August 13

Page 14: Intro to Clojure's core.async

Example 1: the clientc := make(chan Result)go func() { c <- Web(query) } ()go func() { c <- Image(query) } ()go func() { c <- Video(query) } ()

timeout := time.After(80 * time.Millisecond)for i := 0; i < 3; i++ { select { case result := <-c: results = append(results, result) case <-timeout: fmt.Println("timed out") return }}return

Timeout channels: channels which close after msecs

Tuesday, 13 August 13

Page 15: Intro to Clojure's core.async

Example 1: the clientc := make(chan Result)go func() { c <- Web(query) } ()go func() { c <- Image(query) } ()go func() { c <- Video(query) } ()

timeout := time.After(80 * time.Millisecond)for i := 0; i < 3; i++ { select { case result := <-c: results = append(results, result) case <-timeout: fmt.Println("timed out") return }}return

Can be used in select blocks to “give up” on slow alternatives

Tuesday, 13 August 13

Page 16: Intro to Clojure's core.async

Yes. select/case can be thought of as switch/case statements for channels.

Tuesday, 13 August 13

Page 17: Intro to Clojure's core.async

select/case

• Makes a single choice from a set of channels• Immediately returns once any of the channels either responds or closes• In our example, if a service is too slow, the timeout channel closes first

Tuesday, 13 August 13

Page 18: Intro to Clojure's core.async

Enough Go. Let’s rewrite the code in Clojurescript!

Tuesday, 13 August 13

Page 19: Intro to Clojure's core.async

Example 1: the service

(defn fake-search [kind] (fn [query] (let [c (chan)] (go (<! (timeout (rand-int 100))) (>! c (str "<span>" kind " result for " query "</span>"))) c)))

(def web (fake-search "Web"))(def image (fake-search "Image"))(def video (fake-search "Video"))

Tuesday, 13 August 13

Page 20: Intro to Clojure's core.async

Example 1: the client

(defn google [query] (let [c (chan) t (timeout 75)] (go (>! c (<! (web query)))) (go (>! c (<! (image query)))) (go (>! c (<! (video query)))) (go (loop [i 0 acc []] (if (> i 2) acc (recur (inc i) (conj acc (alt! [c t] ([v] v)))))))))

Tuesday, 13 August 13

Page 21: Intro to Clojure's core.async

Example 1: the client

(defn google [query] (let [c (chan) t (timeout 75)] (go (>! c (<! (web query)))) (go (>! c (<! (image query)))) (go (>! c (<! (video query)))) (go (loop [i 0 acc []] (if (> i 2) acc (recur (inc i) (conj acc (alt! [c t] ([v] v)))))))))

Same deal: a timeout channel

Tuesday, 13 August 13

Page 22: Intro to Clojure's core.async

Example 1: the client

(defn google [query] (let [c (chan) t (timeout 75)] (go (>! c (<! (web query)))) (go (>! c (<! (image query)))) (go (>! c (<! (video query)))) (go (loop [i 0 acc []] (if (> i 2) acc (recur (inc i) (conj acc (alt! [c t] ([v] v)))))))))

alt! - Clojure’s answer to Go’s select

Tuesday, 13 August 13

Page 23: Intro to Clojure's core.async

Demo

Tuesday, 13 August 13

Page 24: Intro to Clojure's core.async

Example 2

• From David Nolen’s CSP post [2]• In his words: “We will coordinate three independent processes running at three different speeds via a fourth process which shows the results of the coordination without any obvious use of mutation - only recursion”

[2] http://bit.ly/david-nolen-csp

• He also said this demo “should seem impossible for those familiar with JavaScript” - Challenge accepted!

Tuesday, 13 August 13

Page 25: Intro to Clojure's core.async

This time, demo first.

Tuesday, 13 August 13

Page 26: Intro to Clojure's core.async

Example 2: Clojurescript(def c (chan))

(defn render [q] (apply str (for [p (reverse q)] (str "<div class='proc-" p "'>Process " p "</div>"))))

(go (while true (<! (async/timeout 250)) (>! c 1)))(go (while true (<! (async/timeout 1000)) (>! c 2)))(go (while true (<! (async/timeout 1500)) (>! c 3)))

(defn peekn "Returns vector of (up to) n items from the end of vector v" [v n] (if (> (count v) n) (subvec v (- (count v) n)) v))

(let [out (by-id "messages")] (go (loop [q []] (set-html! out (render q)) (recur (-> (conj q (<! c)) (peekn 10))))))

Tuesday, 13 August 13

Page 27: Intro to Clojure's core.async

Example 2: Clojurescript(def c (chan))

(defn render [q] (apply str (for [p (reverse q)] (str "<div class='proc-" p "'>Process " p "</div>"))))

(go (while true (<! (async/timeout 250)) (>! c 1)))(go (while true (<! (async/timeout 1000)) (>! c 2)))(go (while true (<! (async/timeout 1500)) (>! c 3)))

(defn peekn "Returns vector of (up to) n items from the end of vector v" [v n] (if (> (count v) n) (subvec v (- (count v) n)) v))

(let [out (by-id "messages")] (go (loop [q []] (set-html! out (render q)) (recur (-> (conj q (<! c)) (peekn 10))))))

The three independent, different speed processes

Tuesday, 13 August 13

Page 28: Intro to Clojure's core.async

Example 2: Clojurescript(def c (chan))

(defn render [q] (apply str (for [p (reverse q)] (str "<div class='proc-" p "'>Process " p "</div>"))))

(go (while true (<! (async/timeout 250)) (>! c 1)))(go (while true (<! (async/timeout 1000)) (>! c 2)))(go (while true (<! (async/timeout 1500)) (>! c 3)))

(defn peekn "Returns vector of (up to) n items from the end of vector v" [v n] (if (> (count v) n) (subvec v (- (count v) n)) v))

(let [out (by-id "messages")] (go (loop [q []] (set-html! out (render q)) (recur (-> (conj q (<! c)) (peekn 10))))))

The fourth process, responsible for rendering

Tuesday, 13 August 13

Page 29: Intro to Clojure's core.async

Example 2: Javascript - part Ivar messageChannel = new MessageChannel();var tasks = [];

messageChannel.port1.onmessage = function(msg) { tasks.shift()();};

var c = [];

function publishValue(value, timeout) { setTimeout(function() { c.push(value); publishValue(value, timeout); }, timeout);}

publishValue(1, 250);publishValue(2, 1000);publishValue(3, 1500);

Tuesday, 13 August 13

Page 30: Intro to Clojure's core.async

Example 2: Javascript - part IIfunction renderValues(q) { tasks.push(function() { var v = c.shift(); if (v) { q.unshift(v); q = q.slice(0,10); var result = q.reduce(function(acc,p){ return acc+ "<div class='proc-" + p + "'>Process " + p + "</div>"; },""); document.getElementById("messages1").innerHTML = result; } renderValues(q); }); messageChannel.port2.postMessage(0);}

renderValues([]);

Tuesday, 13 August 13

Page 31: Intro to Clojure's core.async

Cljs vs. js - couldn’t resist it :)(def c (chan))

(defn render [q] (apply str (for [p (reverse q)] (str "<div class='proc-" p "'>Process " p "</div>"))))

(go (while true (<! (async/timeout 250)) (>! c 1)))(go (while true (<! (async/timeout 1000)) (>! c 2)))(go (while true (<! (async/timeout 1500)) (>! c 3)))

(defn peekn "Returns vector of (up to) n items from the end of vector v" [v n] (if (> (count v) n) (subvec v (- (count v) n)) v))

(let [out (by-id "messages")] (go (loop [q []] (set-html! out (render q)) (recur (-> (conj q (<! c)) (peekn 10))))))

var messageChannel = new MessageChannel();var tasks = [];

messageChannel.port1.onmessage = function(msg) { tasks.shift()();};

var c = [];

function publishValue(value, timeout) { setTimeout(function() { c.push(value); publishValue(value, timeout); }, timeout);}

publishValue(1, 250);publishValue(2, 1000);publishValue(3, 1500);

function renderValues(q) { tasks.push(function() { var v = c.shift(); if (v) { q.unshift(v); q = q.slice(0,10); var result = q.reduce(function(acc,p){ return acc+ "<div class='proc-" + p + "'>Process " + p + "</div>"; },""); document.getElementById("messages1").innerHTML = result; } renderValues(q); }); messageChannel.port2.postMessage(0);}

renderValues([]);

Tuesday, 13 August 13

Page 32: Intro to Clojure's core.async

Wait! MessageChannel?

Tuesday, 13 August 13

Page 33: Intro to Clojure's core.async

Under core.async’s hood

• core.async is composed of several fairly involved macros and functions• At the end of the day, dispatching go blocks is platform specific• JVM has threads whereas JS has one main thread and an event loop

Tuesday, 13 August 13

Page 34: Intro to Clojure's core.async

• the Javascript implementation dispatches like this:

(ns cljs.core.async.impl.dispatch)

...

(defn run [f] (cond (exists? js/MessageChannel) (queue-task f) (exists? js/setImmediate) (js/setImmediate f) :else (js/setTimeout f 0)))

Under core.async’s hood

Tuesday, 13 August 13

Page 35: Intro to Clojure's core.async

• The JVM on the other hand uses java.util.concurrent.Executors(ns ^{:skip-wiki true} clojure.core.async.impl.dispatch (:require [clojure.core.async.impl.protocols :as impl] [clojure.core.async.impl.exec.threadpool :as tp]))...

(def executor (delay (tp/thread-pool-executor)))

(defn run "Runs Runnable r in a thread pool thread" [^Runnable r] (impl/exec @executor r))

Under core.async’s hood

Tuesday, 13 August 13

Page 36: Intro to Clojure's core.async

Final thoughts

• core.async isn’t magic• if you’re using blocking API’s you’ll starve its thread pool• though async frameworks such as Netty and http-kit can benefit from it• huge gains in cljs - UI’s are inherently concurrent

Tuesday, 13 August 13

Page 37: Intro to Clojure's core.async

Questions?Leonardo Borges

@leonardo_borgeswww.leonardoborges.comwww.thoughtworks.com

Tuesday, 13 August 13

Page 38: Intro to Clojure's core.async

References

• http://www.leonardoborges.com/writings/2013/07/06/clojure-core-dot-async-lisp-advantage/

• http://clojure.com/blog/2013/06/28/clojure-core-async-channels.html• http://swannodette.github.io/2013/07/12/communicating-sequential-processes/• http://martintrojer.github.io/clojure/2013/07/07/coreasync-and-blocking-io/• http://bryangilbert.com/code/2013/07/19/escaping-callback-hell-with-core-async/• http://thinkrelevance.com/blog/2013/07/10/rich-hickey-and-core-async-podcast-

episode-035

Code: https://github.com/leonardoborges/core-async-intro

Tuesday, 13 August 13