groovy and concurrency
DESCRIPTION
Dr Paul King presentation slides on "Groovy and Concurrency" presented at StrangeLoop 2010 in St LoiusTRANSCRIPT
© A
SE
RT
2006-2
010
Groovy and Concurrency
Dr Paul King, @paulk_asert
paulk at asert.com.au
Concurrent Programming in Groovy • Java concurrent programming enhancements
– Normal OO methods
– Ability to have immutable types
– Some concurrency building blocks
– Annotations with baked in goodness
• Process/Thread ease of use – AntBuilder and GDK methods
• Closures for greater flexibility – Enabler for concurrency
– Closure is Runnable and Callable
• Third-party libraries – GPars, Functional Java (Actors), Multiverse, JCSP
– Cascading.groovy subproject for Hadoop clusters
– Jetlang, JPPF, GridGain, Google Collections, Gruple
– Groovy actors: http://www.groovyactors.org Concurrency - 2
Concurrency - 3
Java Concurrency Features
• The early years – Threads, synchronised and non-synchronised
collections, synchronisation at the language level,
Monitors (wait/notify), Locks, ThreadLocal, final, ...
• More recent enhancements – java.util.concurrent: Executors, Thread Pools,
Optimistic updates, Blocking queues, Synchronizers,
Callables, Futures, Atomic operations, Deques, ...
• Emerging – Fork/Join & others, Kilim, Phasers, PicoThreads ...
• Leverage related APIs/technologies – Networking, real-time, GUIs, simulation, database,
multimedia, operating systems, parallel processing,
distribution, mobile agents, nio, ...
Topics
Groovy Intro
• Useful Groovy features for Concurrency
• Related Concurrency Libraries & Tools
• Fibonacci Case Study
• GPars
• More Info
Concurrency - 4
© A
SE
RT
2006-2
010
What is Groovy?
Concurrency - 5
© A
SE
RT
2006-2
010
• “Groovy is like a super version
of Java. It can leverage Java's
enterprise capabilities but also
has cool productivity features like closures,
DSL support, builders and dynamic typing.”
Groovy = Java – boiler plate code + optional dynamic typing + closures + domain specific languages + builders + metaprogramming
Groovy Goodies Overview • Fully object oriented
• Closures: reusable
and assignable
pieces of code
• Operators can be
overloaded
• Multimethods
• Literal declaration
for lists (arrays),
maps, ranges and
regular expressions
• GPath: efficient
object navigation
• GroovyBeans
• grep and switch
• Templates, builder,
swing, Ant, markup, XML,
SQL, XML-RPC, Scriptom,
Grails, tests, Mocks
Concurrency - 6
© A
SE
RT
2006-2
010
Growing Acceptance …
A slow and steady start but now gaining in
momentum, maturity and mindshare
Now free
… Growing Acceptance …
Concurrency - 8
© A
SE
RT
2006-2
010
What alternative JVM language are you using or intending to use
http://www.leonardoborges.com/writings
http://it-republik.de/jaxenter/quickvote/results/1/poll/44
(translated using http://babelfish.yahoo.com)
… Growing Acceptance …
Concurrency - 9
© A
SE
RT
2006-2
010
Source: http://www.grailspodcast.com/
Source: http://www.micropoll.com/akira/mpresult/501697-116746
… Growing Acceptance …
Concurrency - 10
© A
SE
RT
2006-2
010
http://www.java.net
http://www.jroller.com/scolebourne/entry/devoxx_2008_whiteboard_votes
… Growing Acceptance …
Concurrency - 11
© A
SE
RT
2006-2
010
Groovy and Grails
downloads: 70-90K per
month and growing Frequent topic at the popular conferences
… Growing Acceptance
Concurrency - 12
© A
SE
RT
2006-2
010
The Landscape of JVM Languages
Concurrency - 13
© A
SE
RT
2006-2
010
Java bytecode calls
for static types
Dynamic features call
for dynamic types
optional
static
types
The terms “Java Virtual Machine” and “JVM” mean a Virtual Machine for the Java™ platform.
Groovy Starter
Concurrency - 14
© A
SE
RT
2006-2
010
System.out.println("Hello, World!"); // supports Java syntax println 'Hello, World!' // but can remove some syntax String name = 'Guillaume' // Explicit typing/awareness println "$name, I'll get the car." // Gstring (interpolation) def longer = """${name}, the car is in the next row.""" // multi-line, implicit type assert 0.5 == 1/2 // BigDecimal equals() assert 0.1 + 0.2 == 0.3 // and arithmetic def printSize(obj) { // implicit/duck typing print obj?.size() // safe dereferencing } def pets = ['ant', 'bee', 'cat'] // native list syntax pets.each { pet -> // closure support assert pet < 'dog' // overloading '<' on String } // or: for (pet in pets)...
A Better Java...
Concurrency - 15
© A
SE
RT
2006-2
010
import java.util.List; import java.util.ArrayList; class Erase { private List removeLongerThan(List strings, int length) { List result = new ArrayList(); for (int i = 0; i < strings.size(); i++) { String s = (String) strings.get(i); if (s.length() <= length) { result.add(s); } } return result; } public static void main(String[] args) { List names = new ArrayList(); names.add("Ted"); names.add("Fred"); names.add("Jed"); names.add("Ned"); System.out.println(names); Erase e = new Erase(); List shortNames = e.removeLongerThan(names, 3); System.out.println(shortNames.size()); for (int i = 0; i < shortNames.size(); i++) { String s = (String) shortNames.get(i); System.out.println(s); } } }
This code
is valid
Java and
valid Groovy
Based on an
example by
Jim Weirich
& Ted Leung
...A Better Java...
Concurrency - 16
© A
SE
RT
2006-2
010
import java.util.List; import java.util.ArrayList; class Erase { private List removeLongerThan(List strings, int length) { List result = new ArrayList(); for (int i = 0; i < strings.size(); i++) { String s = (String) strings.get(i); if (s.length() <= length) { result.add(s); } } return result; } public static void main(String[] args) { List names = new ArrayList(); names.add("Ted"); names.add("Fred"); names.add("Jed"); names.add("Ned"); System.out.println(names); Erase e = new Erase(); List shortNames = e.removeLongerThan(names, 3); System.out.println(shortNames.size()); for (int i = 0; i < shortNames.size(); i++) { String s = (String) shortNames.get(i); System.out.println(s); } } }
Do the
semicolons
add anything?
And shouldn‟t
we us more
modern list
notation?
Why not
import common
libraries?
...A Better Java...
Concurrency - 17
© A
SE
RT
2006-2
010
class Erase { private List removeLongerThan(List strings, int length) { List result = new ArrayList() for (String s in strings) { if (s.length() <= length) { result.add(s) } } return result } public static void main(String[] args) { List names = new ArrayList() names.add("Ted"); names.add("Fred") names.add("Jed"); names.add("Ned") System.out.println(names) Erase e = new Erase() List shortNames = e.removeLongerThan(names, 3) System.out.println(shortNames.size()) for (String s in shortNames) { System.out.println(s) } } }
...A Better Java...
Concurrency - 18
© A
SE
RT
2006-2
010
class Erase { private List removeLongerThan(List strings, int length) { List result = new ArrayList() for (String s in strings) { if (s.length() <= length) { result.add(s) } } return result } public static void main(String[] args) { List names = new ArrayList() names.add("Ted"); names.add("Fred") names.add("Jed"); names.add("Ned") System.out.println(names) Erase e = new Erase() List shortNames = e.removeLongerThan(names, 3) System.out.println(shortNames.size()) for (String s in shortNames) { System.out.println(s) } } }
Do we need
the static types?
Must we always
have a main
method and
class definition?
How about
improved
consistency?
...A Better Java...
Concurrency - 19
© A
SE
RT
2006-2
010
def removeLongerThan(strings, length) { def result = new ArrayList() for (s in strings) { if (s.size() <= length) { result.add(s) } } return result } names = new ArrayList() names.add("Ted") names.add("Fred") names.add("Jed") names.add("Ned") System.out.println(names) shortNames = removeLongerThan(names, 3) System.out.println(shortNames.size()) for (s in shortNames) { System.out.println(s) }
...A Better Java...
Concurrency - 20
© A
SE
RT
2006-2
010
def removeLongerThan(strings, length) { def result = new ArrayList() for (s in strings) { if (s.size() <= length) { result.add(s) } } return result } names = new ArrayList() names.add("Ted") names.add("Fred") names.add("Jed") names.add("Ned") System.out.println(names) shortNames = removeLongerThan(names, 3) System.out.println(shortNames.size()) for (s in shortNames) { System.out.println(s) }
Shouldn‟t we
have special
notation for lists?
And special
facilities for
list processing?
Is „return‟
needed at end?
...A Better Java...
Concurrency - 21
© A
SE
RT
2006-2
010
def removeLongerThan(strings, length) { strings.findAll{ it.size() <= length } } names = ["Ted", "Fred", "Jed", "Ned"] System.out.println(names) shortNames = removeLongerThan(names, 3) System.out.println(shortNames.size()) shortNames.each{ System.out.println(s) }
...A Better Java...
Concurrency - 22
© A
SE
RT
2006-2
010
def removeLongerThan(strings, length) { strings.findAll{ it.size() <= length } } names = ["Ted", "Fred", "Jed", "Ned"] System.out.println(names) shortNames = removeLongerThan(names, 3) System.out.println(shortNames.size()) shortNames.each{ System.out.println(s) }
Is the method
now needed?
Easier ways to
use common
methods?
Are brackets
required here?
...A Better Java
Concurrency - 23
© A
SE
RT
2006-2
010
names = ["Ted", "Fred", "Jed", "Ned"] println names shortNames = names.findAll{ it.size() <= 3 } println shortNames.size() shortNames.each{ println it }
["Ted", "Fred", "Jed", "Ned"] 3 Ted Jed Ned
Output:
Grapes / Grab: Google collections
Concurrency - 24
© A
SE
RT
2006-2
010
@Grab('com.google.collections:google-collections:1.0') import com.google.common.collect.HashBiMap HashBiMap fruit = [grape:'purple', lemon:'yellow', lime:'green'] assert fruit.lemon == 'yellow' assert fruit.inverse().yellow == 'lemon'
ESDC 2010 - 25
© A
SE
RT
2006-2
010
Better Design Patterns: Delegate…
import java.util.Date; public class Event { private String title; private String url; private Date when; public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } // ...
public Date getWhen() { return when; } public void setWhen(Date when) { this.when = when; } public boolean before(Date other) { return when.before(other); } public void setTime(long time) { when.setTime(time); } public long getTime() { return when.getTime(); } public boolean after(Date other) { return when.after(other); } // ...
ESDC 2010 - 26
© A
SE
RT
2006-2
010
…Better Design Patterns: Delegate…
import java.util.Date; public class Event { private String title; private String url; private Date when; public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } // ...
public Date getWhen() { return when; } public void setWhen(Date when) { this.when = when; } public boolean before(Date other) { return when.before(other); } public void setTime(long time) { when.setTime(time); } public long getTime() { return when.getTime(); } public boolean after(Date other) { return when.after(other); } // ...
boilerplate
ESDC 2010 - 27
© A
SE
RT
2006-2
010
…Better Design Patterns: Delegate
class Event { String title, url @Delegate Date when }
def gr8conf = new Event(title: "GR8 Conference", url: "http://www.gr8conf.org", when: Date.parse("yyyy/MM/dd", "2009/05/18")) def javaOne = new Event(title: "JavaOne", url: "http://java.sun.com/javaone/", when: Date.parse("yyyy/MM/dd", "2009/06/02")) assert gr8conf.before(javaOne.when)
Why Groovy? Technical Answer
• Minimal learning curve
• Compiles to bytecode
• Java object model & integration
• Annotations
• "Optional" static typing
• Both run-time and compile-time
metaprogramming
Concurrency - 28
Why Groovy? Adoption Assessment
• Innovators/Thought leaders – Ideas, power, flexibility, novelty, thinking community
• Early adopters – Productivity benefits and collegiate community
– Leverage JVM and potential for mainstream
• Mainstream – Leverage existing Java skills, low learning curve
– Leverage JVM and production infrastructure
– Professional community
– Tools, tools, tools
"Andy giveth and Bill taketh away"
Concurrency - 30
So
urc
e: H
erb
Su
tte
r: h
ttp
://w
ww
.gotw
.ca
/pub
lica
tion
s/c
on
cu
rre
ncy-d
dj.h
tm
Why is it hard?
• Many issues to deal with: – Doing things in parallel, concurrently,
asynchronously
• Processes, Threads, Co-routines, Events, Scheduling
– Sharing/Synchronization Mechanisms
• shared memory, locks, transactions, wait/notify, STM,
message passing, actors, serializability, persistence,
immutability
– Abstractions
• Shared memory on top of messaging passing
• Message passing on top of shared memory
• Dataflow, Selective Communication, Continuations
– Data Structures and Algorithms
• Queues, Heaps, Trees
• Sorting, Graph Algorithms
Concurrency - 31
Topics
• Groovy Intro
Useful Groovy features for Concurrency
• Related Concurrency Libraries & Tools
• Fibonacci Case Study
• GPars
• More Info
Concurrency - 32
© A
SE
RT
2006-2
010
Thread Enhancements…
• Thread improvements
Concurrency - 33
Thread.start{ sleep 1001; println 'one' } Thread.start{ sleep 1000; println 'two' }
println 'one' def t = Thread.start{ sleep 100; println 'three' } println 'two' t.join() println 'four'
…Thread Enhancements…
• Thread improvements (Cont’d)
Concurrency - 34
class Storage { List stack = [] synchronized void leftShift(value){ stack << value println "push: $value" notifyAll() } synchronized Object pop() { while (!stack) { try{ wait() } catch(InterruptedException e){} } def value = stack.pop() println "pop : $value" return value } } ...
…Thread Enhancements…
• Thread improvements (Cont’d)
Concurrency - 35
push: 0
push: 1
push: 2
pop : 2
push: 3
push: 4
pop : 4
push: 5
push: 6
pop : 6
push: 7
push: 8
pop : 8
push: 9
pop : 9
pop : 7
pop : 5
pop : 3
pop : 1
pop : 0
... storage = new Storage() Thread.start { for (i in 0..9) { storage << i sleep 100 } } Thread.start { 10.times { sleep 200 storage.pop() } }
…Thread Enhancements…
• Thread improvements (Cont’d)
Concurrency - 36
ROCK!
. 25
. 33
. 34
. 35
. 36
.. 131
.. 134
.. 137
.. 138
.. 139
... 232
... 234
... 237
... 238
... 239
.... 334
.... 336
.... 337
.... 338
.... 339
import java.util.concurrent.locks.ReentrantLock import static System.currentTimeMillis as now def startTime = now() ReentrantLock.metaClass.withLock = { critical -> lock() try { critical() } finally { unlock() } } def lock = new ReentrantLock() def worker = { threadNum -> 4.times { count -> lock.withLock { print " " * threadNum print "." * (count + 1) println " ${now() - startTime}" } Thread.sleep 100 } } 5.times { Thread.start worker.curry(it) } println "ROCK!"
Source: http://chrisbroadfoot.id.au/articles/2008/08/06/groovy-threads
… Thread Enhancements
• Thread Management meets AtomicInteger
Concurrency - 37
thread loop 1
main loop 1
thread loop 2
thread loop 3
main loop 2
thread loop 4
thread loop 5
main loop 3
thread loop 6
main loop 4
thread loop 7
thread loop 8
import java.util.concurrent.atomic.AtomicInteger def counter = new AtomicInteger() synchronized out(message) { println(message) } def th = Thread.start { for( i in 1..8 ) { sleep 30 out "thread loop $i" counter.incrementAndGet() } } for( j in 1..4 ) { sleep 50 out "main loop $j" counter.incrementAndGet() } th.join() assert counter.get() == 12
Process Enhancements…
• Using AntBuilder
Concurrency - 38
def ant = new AntBuilder() ant.parallel { 10.times { echo "Message $it" } }
[echo] Message 6
[echo] Message 9
[echo] Message 8
[echo] Message 0
[echo] Message 2
[echo] Message 4
[echo] Message 1
[echo] Message 7
[echo] Message 5
[echo] Message 3
…Process Enhancements…
• Using AntBuilder (Cont’d)
Concurrency - 39
def ant = new AntBuilder() ant.taskdef(name:'groovy', classname:'org.codehaus.groovy.ant.Groovy') ant.parallel { 10.times { echo "Ant message $it" groovy "println 'Groovy via ant message $it'" println "Groovy message $it" // or fork java, command line, thread ... } }
def ant = new AntBuilder() ant.exec(outputproperty:"cmdOut", errorproperty: "cmdErr", resultproperty:"cmdExit", failonerror: "true", executable: /opt/myExecutable') { arg(line:'*"first with space"* second') }
…Process Enhancements
• Process Management
Concurrency - 40
def process = "ls -l".execute() println "Found text ${process.text}"
def process = "ls -l".execute() process.in.eachLine { line -> println line }
def sout = new StringBuffer() def serr = new StringBuffer() proc1 = 'gzip -c'.execute() proc2 = 'gunzip -c'.execute() proc2.consumeProcessOutput(sout, serr) proc1 | proc2 proc1.consumeProcessErrorStream(serr) proc1.withWriter { writer -> writer << 'test text' } proc2.waitForOrKill(1000) println 'sout: ' + sout // => test text println 'serr: ' + serr
proc1 = 'ls'.execute() proc2 = 'tr -d o'.execute() proc3 = 'tr -d e'.execute() proc4 = 'tr -d i'.execute() proc1 | proc2 | proc3 | proc4 proc4.waitFor() if (proc4.exitValue()) print proc4.err.text else print proc4.text
Parallelize your arrays with JSR 166y
Concurrency - 41
//Create a pool with size close to the number of processor cores def pool = new ForkJoinPool(2)
def createParallelArray(pool, collection) { return ParallelArray.createFromCopy( collection.toArray(new Object[collection.size()]), pool) }
// Enhance ArrayLists to find matching objects in parallel ArrayList.metaClass.findAll = {Closure cl -> createParallelArray(pool, delegate). withFilter({cl(it)} as Predicate).all().asList() }
def sites=['http://www.jroller.com', 'http://www.infoq.com', 'http://java.dzone.com']
def groovySites = sites.findAll { new URL(it).text.toLowerCase().contains('groovy')} println "These sites talk about Groovy today: ${groovySites}"
Source: http://www.jroller.com/vaclav/date/20080923
Java Concurrency Best Practice?
• Java Concurrency in Practice:
– “If mutable threads access the
same mutable state variable
without appropriate
synchronization,
your program is broken”
– “When designing thread-safe classes,
good object-oriented techniques –
encapsulation, immutability, and clear
specification of invariants – are your
best friends”
Concurrency - 42
Immutability options
• Built-in
• Google Collections – Numerous improved immutable collection types
• Groovy run-time metaprogramming
• Groovy 1.6+ compile-time
metaprogramming – @Immutable can help us create such classes
Concurrency - 43
import com.google.common.collect.* List<String> animals = ImmutableList.of("cat", "dog", "horse") animals << 'fish' // => java.lang.UnsupportedOperationException
def animals = ['cat', 'dog', 'horse'].asImmutable() animals << 'fish' // => java.lang.UnsupportedOperationException
def animals = ['cat', 'dog', 'horse'] ArrayList.metaClass.leftShift = { throw new UnsupportedOperationException() } animals << 'fish' // => java.lang.UnsupportedOperationException
@Immutable...
• Java Immutable Class – As per Joshua Bloch
Effective Java
Concurrency - 44
© A
SE
RT
2006-2
010
public final class Punter { private final String first; private final String last; public String getFirst() { return first; } public String getLast() { return last; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((first == null) ? 0 : first.hashCode()); result = prime * result + ((last == null) ? 0 : last.hashCode()); return result; } public Punter(String first, String last) { this.first = first; this.last = last; } // ...
// ... @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; Punter other = (Punter) obj; if (first == null) { if (other.first != null) return false; } else if (!first.equals(other.first)) return false; if (last == null) { if (other.last != null) return false; } else if (!last.equals(other.last)) return false; return true; } @Override public String toString() { return "Punter(first:" + first + ", last:" + last + ")"; } }
...@Immutable...
• Java Immutable Class – As per Joshua Bloch
Effective Java
Concurrency - 45
© A
SE
RT
2006-2
010
public final class Punter { private final String first; private final String last; public String getFirst() { return first; } public String getLast() { return last; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((first == null) ? 0 : first.hashCode()); result = prime * result + ((last == null) ? 0 : last.hashCode()); return result; } public Punter(String first, String last) { this.first = first; this.last = last; } // ...
// ... @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; Punter other = (Punter) obj; if (first == null) { if (other.first != null) return false; } else if (!first.equals(other.first)) return false; if (last == null) { if (other.last != null) return false; } else if (!last.equals(other.last)) return false; return true; } @Override public String toString() { return "Punter(first:" + first + ", last:" + last + ")"; } }
boilerplate
...@Immutable
Concurrency - 46
© A
SE
RT
2006-2
010
@Immutable class Punter { String first, last }
@Synchronized – Before Transform
Concurrency - 47
© A
SE
RT
2006-2
010
class SynchronizedExample { private final myLock = new Object() @Synchronized static void greet() { println "world" } @Synchronized int answerToEverything() { return 42 } @Synchronized("myLock") void foo() { println "bar" } }
@Synchronized – After Transform
Concurrency - 48
© A
SE
RT
2006-2
010
class SynchronizedExample { private static final $LOCK = new Object[0] private final $lock = new Object[0] private final myLock = new Object()
static void greet() { synchronized ($LOCK) { println "world" } }
int answerToEverything() { synchronized ($lock) { return 42 } }
void foo() { synchronized (myLock) { println "bar" } } }
Inspired by
Project
Lombok
@Lazy...
• Safe initialization idioms – Eager or fully synchronized
Concurrency - 49
© A
SE
RT
2006-2
010
import net.jcip.annotations.ThreadSafe @ThreadSafe class EagerInitialization { static final resource = new Resource() }
@ThreadSafe class SafeLazyInitialization { private static ExpensiveResource resource synchronized static ExpensiveResource getInstance() { if (!resource) resource = new ExpensiveResource () resource } }
...@Lazy...
• Java Concurrency in Practice – Race condition with lazy initialization
– No problems: just apply Double-checked
locking pattern Concurrency - 50
© A
SE
RT
2006-2
010
import net.jcip.annotations.NotThreadSafe @NotThreadSafe class LazyInitRace { private ExpensiveResource instance = null ExpensiveResource getInstance() { if (!instance) instance = new ExpensiveResource() instance } }
...@Lazy...
• Java Concurrency in Practice – Double checked locking anti-pattern
Concurrency - 51
© A
SE
RT
2006-2
010
@NotThreadSafe class DoubleCheckedLocking { private static ExpensiveResource instance = null static ExpensiveResource getInstance() { if (!instance) { synchronized (DoubleCheckedLocking) { if (!instance) instance = new ExpensiveResource() } } instance } }
...@Lazy...
• Java Concurrency in Practice – Double checked locking less broken-pattern
Concurrency - 52
© A
SE
RT
2006-2
010
class DoubleCheckedLocking { private static volatile ExpensiveResource instance = null static ExpensiveResource getInstance() { if (!instance) { synchronized (DoubleCheckedLocking) { if (!instance) instance = new ExpensiveResource() } } instance } }
...@Lazy...
• Java Concurrency in Practice – Lazy initialization holder class idiom
Concurrency - 53
© A
SE
RT
2006-2
010
import net.jcip.annotations.ThreadSafe
@ThreadSafe
class ResourceFactory {
private static class ResourceHolder {
static Resource resource = new Resource()
}
static Resource getResource() {
ResourceHolder.resource
}
}
...@Lazy
Concurrency - 54
© A
SE
RT
2006-2
010
@Lazy volatile Resource second
@Lazy volatile Resource third = { new Resource(args) }()
@Lazy(soft = true) volatile Resource fourth
@Lazy Resource first
Topics
• Groovy Intro
• Useful Groovy features for Concurrency
Related Concurrency Libraries & Tools
• Fibonacci Case Study
• GPars
• More Info
Concurrency - 55
© A
SE
RT
2006-2
010
Lightweight threads: Jetlang
• Jetlang – A high performance java threading library
Concurrency - 56
Fiber receiver = new ThreadFiber(); receiver.start(); // create java.util.concurrent.CountDownLatch to notify when message arrives final CountDownLatch latch = new CountDownLatch(1); // create channel to message between threads Channel<String> channel = new MemoryChannel<String>(); Callback<String> onMsg = new Callback<String>() { public void onMessage(String message) { //open latch latch.countDown(); } }; //add subscription for message on receiver thread channel.subscribe(receiver, onMsg); //publish message to receive thread. the publish method is thread safe. channel.publish("Hello"); //wait for receiving thread to receive message latch.await(10, TimeUnit.SECONDS); //shutdown thread receiver.dispose();
// JAVA
Other High-Level Libraries: JPPF
– Open source Grid Computing platform
– http://www.jppf.org/
Concurrency - 57
import org.jppf.client.* import java.util.concurrent.Callable
class Task implements Callable, Serializable { private static final long serialVersionUID = 1162L; public Object call() { println 'Executing Groovy' "Hello JPPF from Groovy" } } def client = new JPPFClient() def job = new JPPFJob() def task = new Task() job.addTask task def results = client.submit(job) for (t in results) { if (t.exception) throw t.exception println "Result: " + t.result }
Other High-Level Libraries: Gruple...
– http://code.google.com/p/gruple
– Gruple aims to provide a simple abstraction to allow
programmers to coordinate and synchronize threads
with ease – based on Tuplespaces
• Tuplespaces provide the illusion of a shared memory on top
of a message passing system, along with a small set of
operations to greatly simplify parallel programming
– Example Tuple: [fname:"Vanessa", lname:"Williams", project:"Gruple"]
– Basic operations within a Tuplespace are:
• put - insert a tuple into the space
• get - read a tuple from the space (non-destructively)
• take - take a tuple from the space (a destructive read)
– Further reading: Eric Freeman, Susanne Hupfer, and
Ken Arnold. JavaSpaces Principles, Patterns, and
Practice, Addison Wesley, 1999 Concurrency - 58
Other High-Level Libraries: ...Gruple...
– Mandelbrot example (included in Gruple download)
Concurrency - 59
... Space space = SpaceService.getSpace("mandelbrot") Map template = createTaskTemplate() Map task String threadName = Thread.currentThread().name while(true) { ArrayList points task = space.take(template) println "Worker $threadName got task ${task['start']} for job ${task['jobId']}" points = calculateMandelbrot(task) Map result = createResult(task['jobId'], task['start'], points) println "Worker $threadName writing result for task ${result['start']} for job ${result['jobId']}" space.put(result) } ...
Other High-Level Libraries: ...Gruple
Concurrency - 60
Other High-Level Libraries: Cascading.groovy
– API/DSL for executing tasks on a Hadoop cluster
– http://www.cascading.org/
Concurrency - 61
def assembly = builder.assembly(name: "wordcount") { eachTuple(args: ["line"], results: ["word"]) { regexSplitGenerator(declared: ["word"], pattern: /[.,]*\s+/) } group(["word"]) everyGroup(args: ["word"], results: ["word", "count"]) { count() } group(["count"], reverse: true) } def map = builder.map() { source(name: "wordcount") { hfs(input) { text(["line"]) } } sink(name: "wordcount") { hfs(output) { text() } } } def flow = builder.flow(name: "wordcount", map: map, assembly: assembly)
Other High-Level Libraries: GridGain
– Simple & productive to use grid computing platform
– http://www.gridgain.com/
Concurrency - 62
class GridHelloWorldGroovyTask extends GridTaskSplitAdapter<String, Integer> { Collection split(int gridSize, Object phrase) throws GridException { // ... } Object reduce(List results) throws GridException { // ... } }
import static GridFactory.* start() def grid = getGrid() def future = grid.execute(GridHelloWorldGroovyTask, "Hello World") def phraseLen = future.get() stop(true)
Multiverse STM…
– http://www.multiverse.org/
Concurrency - 63
import org.multiverse.api.GlobalStmInstance import org.multiverse.api.Transaction import org.multiverse.templates.TransactionTemplate import org.multiverse.transactional.refs.LongRef def from = new Account(10) def to = new Account(10) atomic { from.balance -= 5 to.balance += 5 } println "from $from.balance" println "to $to.balance" ...
…Multiverse STM…
Concurrency - 64
... void atomic(Closure block) { atomic([:], block) } void atomic(Map args, Closure block) { boolean readonly = args['readonly'] ?: false boolean trackreads = args['trackreads'] ?: true def txFactory = GlobalStmInstance.globalStmInstance. transactionFactoryBuilder. setReadonly(readonly). setReadTrackingEnabled(trackreads).build() new TransactionTemplate(txFactory) { Object execute(Transaction transaction) { block.call() return null } }.execute() } ...
…Multiverse STM
Concurrency - 65
class Account { private final balance = new LongRef() Account(long initial) { balance.set initial } void setBalance(long newBalance) { if (newBalance < 0) throw new RuntimeException("not enough money") balance.set newBalance } long getBalance() { balance.get() } }
Testing multi-threaded applications: ConTest...
• Advanced Testing for Multi-Threaded Applications
– Tool for testing, debugging, and coverage-measuring
of concurrent programs (collects runtime statistics)
– Systematically and transparently (using a java agent)
schedules the execution of program threads in ways
likely to reveal race conditions, deadlocks, and other
intermittent bugs (collectively called synchronization
problems) with higher than normal frequency
– The ConTest run-time engine adds heuristically
controlled conditional instructions (adjustable by a
preferences file) that force thread switches, thus
helping to reveal concurrent bugs. You can use
existing tests and run ConTest multiple times – by
default different heuristics used each time it is run
• http://www.alphaworks.ibm.com/tech/contest
Concurrency - 66
...Testing multi-threaded applications: ConTest
Concurrency - 67
NUM = 5
count = 0
def incThread = { n -> Thread.start{
sleep n*10
//synchronized(ParalInc) {
count++
//}
} }
def threads = (1..NUM).collect(incThread)
threads.each{ it.join() }
assert count == NUM
targetClasses = ParalInc
timeoutTampering = true
noiseFrequency = 500
strength = 10000
Exception in thread "main" Assertion failed:
assert count == NUM
| | |
4 | 5
false
> groovyc ParalInc.groovy
> java -javaagent:../../Lib/ConTest.jar -cp %GROOVY_JAR%;. ParalInc
ParalInc.groovy
Concurrency - 68
GContracts
@Grab('org.gcontracts:gcontracts:1.0.2') import org.gcontracts.annotations.* @Invariant({ first != null && last != null }) class Person { String first, last @Requires({ delimiter in ['.', ',', ' '] }) @Ensures({ result == first + delimiter + last }) String getName(String delimiter) { first + delimiter + last } } new Person(first: 'John', last: 'Smith').getName('.')
1.8+
Testing: Spock
Concurrency - 69
class HelloSpock extends spock.lang.Specification { def "length of Spock's and his friends' names"() { expect: name.size() == length where: name | length "Spock" | 5 "Kirk" | 4 "Scotty" | 6 } }
Topics
• Groovy Intro
• Useful Groovy features for Concurrency
• Related Concurrency Libraries & Tools
Fibonacci Case Study
• GPars
• More Info
Concurrency - 70
© A
SE
RT
2006-2
010
Fibonacci Case Study
Concurrency - 71
Concurrency - 72
Fibonacci…
START = 8 END = 16 fib = {n -> n < 2 ? n : fib(n - 1) + fib(n - 2) } (START..END).each {num -> println "n:$num => ${fib(num)}" }
Serial
version
Concurrency - 73
…Fibonacci…
import java.util.concurrent.* THREADS = 4 START = 8 END = 16 QUIT = -1 class Fibonacci { def values = new ConcurrentHashMap() int calc(x) { x < 2 ? x : calc(x-1) + calc(x-2) } int calcWithCache(x) { def result = values[x] if (!result) { result = calc(x) values.putIfAbsent(x, result) } result } } println "Calculating Fibonacci sequence in parallel..." def queue = new ArrayBlockingQueue(10)
ConcurrentHashMap
version
Concurrency - 74
…Fibonacci…
Thread.start('Producer') { int x = START while (x <= END) { sleep 200 queue << x++ } sleep 1000 THREADS.times { queue << QUIT } } (1..THREADS).each { def name = "Consumer$it" Thread.start(name) { def done = false def fib = new Fibonacci() while (!done) { def n = queue.take() if (n == QUIT) done = true else println "$name n:$n => ${fib.calcWithCache(n)}" } } }
Concurrency - 75
…Fibonacci…
import java.util.concurrent.* CUTOFF = 12 // not worth parallelizing for small n THREADS = 100 println "Calculating Fibonacci sequence in parallel..." serialFib = {n -> (n < 2) ? n : serialFib(n - 1) + serialFib(n - 2) } pool = Executors.newFixedThreadPool(THREADS) defer = {c -> pool.submit(c as Callable) } fib = {n -> if (n < CUTOFF) return serialFib(n) def left = defer { fib(n - 1) } def right = defer { fib(n - 2) } left.get() + right.get() } (8..16).each {n -> println "n=$n => ${fib(n)}" } pool.shutdown()
Executor
version
Concurrency - 76
…Fibonacci… import EDU.oswego.cs.dl.util.concurrent.FJTask import EDU.oswego.cs.dl.util.concurrent.FJTaskRunnerGroup class Fib extends FJTask { static final CUTOFF = 12 volatile int number int getAnswer() { if (!isDone()) throw new IllegalStateException() number } void run() { int n = number if (n <= CUTOFF) number = seqFib(n) else { def f1 = new Fib(number: n - 1) def f2 = new Fib(number: n - 2) coInvoke(f1, f2) number = f1.number + f2.number } } int seqFib(int n) { n < 2 ? n : seqFib(n - 1) + seqFib(n - 2) } }
Fork/Join
version
Concurrency - 77
…Fibonacci…
def THREADS = 2 def group = new FJTaskRunnerGroup(THREADS) def START = 8 def END = 16 (START..END).each {num -> def f = new Fib(number: num) group.invoke(f) println "n:$num => $f.answer" }
Concurrency - 78
…Fibonacci…
import fj.* import fj.control.parallel.Strategy import static fj.Function.curry as fcurry import static fj.P1.curry as pcurry import static fj.P1.fmap import static fj.control.parallel.Actor.actor import static fj.control.parallel.Promise.* import static fj.data.List.range import static java.util.concurrent.Executors.* CUTOFF = 12 // not worth parallelizing for small n START = 8 END = 16 THREADS = 4 pool = newFixedThreadPool(THREADS) su = Strategy.executorStrategy(pool) spi = Strategy.executorStrategy(pool) add = fcurry({a, b -> a + b } as F2) nums = range(START, END + 1) println "Calculating Fibonacci sequence in parallel..."
FunctionalJava
version
Concurrency - 79
…Fibonacci…
serialFib = {n -> n < 2 ? n : serialFib(n - 1) + serialFib(n - 2) } print = {results -> def n = START results.each { println "n=${n++} => $it" } pool.shutdown() } as Effect calc = {n -> n < CUTOFF ? promise(su, P.p(serialFib(n))) : calc.f(n - 1).bind(join(su, pcurry(calc).f(n - 2)), add) } as F out = actor(su, print) join(su, fmap(sequence(su)).f(spi.parMapList(calc).f(nums))).to(out)
Concurrency - 80
…Fibonacci…
import org.jetlang.core.Callback import org.jetlang.channels.MemoryChannel import org.jetlang.fibers.ThreadFiber import java.util.concurrent.* println "Calculating Fibonacci sequence with two cooperating fibers..." class FibonacciCalc implements Callback { private channel, receiver def limit, name, latch void onMessage(inpair) { def next = inpair[0] + inpair[1] def outpair = [inpair[1], next] println "$name next:$next" channel.publish(outpair) if (next > limit) { latch.countDown() sleep 200 receiver.dispose() } } // ...
Jetlang
version
Concurrency - 81
…Fibonacci
// ... void subscribe(other) { channel.subscribe(receiver, other) } FibonacciCalc() { channel = new MemoryChannel() receiver = new ThreadFiber() receiver.start() } } def seed = [0, 1] def latch = new CountDownLatch(2) def calcA = new FibonacciCalc(limit: 500, name: 'CalcA', latch: latch) def calcB = new FibonacciCalc(limit: 500, name: 'CalcB', latch: latch) calcA.subscribe calcB calcB.subscribe calcA calcA.onMessage seed latch.await(10, TimeUnit.SECONDS)
Topics
• Groovy Intro
• Useful Groovy features for Concurrency
• Related Concurrency Libraries & Tools
• Fibonacci Case Study
GPars
• More Info
Concurrency - 82
© A
SE
RT
2006-2
010
GPars • http://gpars.codehaus.org/
• Library classes and DSL sugar providing
intuitive ways for Groovy developers to
handle tasks concurrently. Logical parts:
– Actors provide a Groovy implementation of Scala-like
actors including "remote" actors on other machines
– Dataflow Concurrency supports natural shared-memory
concurrency model, using single-assignment variables
– Asynchronizer extends the Java 1.5 built-in support for
executor services to enable multi-threaded collection and
closure processing
– Parallelizer uses JSR-166y Parallel Arrays to enable
multi-threaded collection processing
– Safe a non-blocking mt-safe reference to mutable state
that is inspired by "agents" in Clojure
Concurrency - 83
© A
SE
RT
2006-2
010
GPars: Parallel Collection Functions
Concurrency - 85
© A
SE
RT
2006-2
010
def nums = 1..100000 def squares = nums .collect{ it ** 2 } .grep{ it % 7 == it % 5 } .grep{ it % 3 == 0 } println squares[0..3] + "..." + squares[-3..-1] assert squares[0..3] == [36, 144, 1089, 1296]
@Grab('org.codehaus.gpars:gpars:0.10') import static groovyx.gpars.GParsPool.withPool
def nums = 1..100000 withPool(5) { def squares = nums. collectParallel{ it ** 2 }. grepParallel{ it % 7 == it % 5 }. grepParallel{ it % 3 == 0 } println squares[0..3] + "..." + squares[-3..-1] assert squares[0..3] == [36, 144, 1089, 1296] }
GPars: Transparent Parallel Collections
• Applies some Groovy metaprogramming
Concurrency - 86
© A
SE
RT
2006-2
010
import static groovyx.gpars.GParsPool.withPool withPool(5) { def nums = 1..100000 nums.makeTransparent() def squares = nums. collect{ it ** 2 }. grep{ it % 7 == it % 5 }. grep{ it % 3 == 0 } println squares[0..3] + "..." + squares[-3..-1] assert squares[0..3] == [36, 144, 1089, 1296] }
Gpars concurrency-aware methods
Transparent Transitive? Parallel
any { ... } anyParallel { ... }
collect { ... } yes collectParallel { ... }
count(filter) countParallel(filter)
each { ... } eachParallel { ... }
eachWithIndex { ... } eachWithIndexParallel { ... }
every { ... } everyParallel { ... }
find { ... } findParallel { ... }
findAll { ... } yes findAllParallel { ... }
findAny { ... } findAnyParallel { ... }
fold { ... } foldParallel { ... }
fold(seed) { ... } foldParallel(seed) { ... }
grep(filter) yes grepParallel(filter)
groupBy { ... } groupByParallel { ... }
max { ... } maxParallel { ... }
max() maxParallel()
min { ... } minParallel { ... }
min() minParallel()
split { ... } yes splitParallel { ... }
Concurrency - 87 Source: ReGina
GPars: Map-Reduce...
Concurrency - 88
© A
SE
RT
2006-2
010
import static groovyx.gpars.GParsPool.withPool withPool(5) { def nums = 1..100000 println nums.parallel. map{ it ** 2 }. filter{ it % 7 == it % 5 }. filter{ it % 3 == 0 }. collection }
...GPars: Map-Reduce
Concurrency - 89
© A
SE
RT
2006-2
010
import static groovyx.gpars.GParsPool.withPool withPool(5) { def nums = 1..100000 println nums.parallel. map{ it ** 2 }. filter{ it % 7 == it % 5 }. filter{ it % 3 == 0 }. reduce{ a, b -> a + b } }
Parallel Collections vs Map-Reduce
Concurrency - 90 Source: ReGina
GPars: Dataflows...
Concurrency - 91
© A
SE
RT
2006-2
010
import groovyx.gpars.dataflow.DataFlows import static groovyx.gpars.dataflow.DataFlow.task final flow = new DataFlows() task { flow.result = flow.x + flow.y } task { flow.x = 10 } task { flow.y = 5 } assert 15 == flow.result
new DataFlows().with { task { result = x * y } task { x = 10 } task { y = 5 } assert 50 == result }
...GPars: Dataflows...
• Evaluating:
Concurrency - 92
© A
SE
RT
2006-2
010
import groovyx.gpars.dataflow.DataFlows import static groovyx.gpars.dataflow.DataFlow.task final flow = new DataFlows() task { flow.a = 10 } task { flow.b = 5 } task { flow.x = flow.a - flow.b } task { flow.y = flow.a + flow.b } task { flow.result = flow.x * flow.y } assert flow.result == 75
b
10 5
a
+ -
*
result = (a – b) * (a + b)
x y
...GPars: Dataflows...
• Naive attempt for loops
Concurrency - 93
© A
SE
RT
2006-2
010
import groovyx.gpars.dataflow.DataFlows import static groovyx.gpars.dataflow.DataFlow.task final flow = new DataFlows() [10, 20].each { thisA -> [4, 5].each { thisB -> task { flow.a = thisA } task { flow.b = thisB } task { flow.x = flow.a - flow.b } task { flow.y = flow.a + flow.b } task { flow.result = flow.x * flow.y } println flow.result } } // => java.lang.IllegalStateException: A DataFlowVariable can only be assigned once.
... task { flow.a = 10 } ... task { flow.a = 20 }
...GPars: Dataflows...
Concurrency - 94
© A
SE
RT
2006-2
010
import groovyx.gpars.dataflow.DataFlowStream import static groovyx.gpars.dataflow.DataFlow.* final streamA = new DataFlowStream() final streamB = new DataFlowStream() final streamX = new DataFlowStream() final streamY = new DataFlowStream() final results = new DataFlowStream() operator(inputs: [streamA, streamB], outputs: [streamX, streamY]) { a, b -> streamX << a - b; streamY << a + b } operator(inputs: [streamX, streamY], outputs: [results]) { x, y -> results << x * y } [[10, 20], [4, 5]].combinations().each{ thisA, thisB -> task { streamA << thisA } task { streamB << thisB } } 4.times { println results.val }
b
10
10
20
20
4
5
4
5
a
+ -
*
84
75
384
375
...GPars: Dataflows
• Amenable to static analysis
• Race conditions avoided
• Deadlocks “typically” become repeatable
Concurrency - 95
© A
SE
RT
2006-2
010
import groovyx.gpars.dataflow.DataFlows import static groovyx.gpars.dataflow.DataFlow.task final flow = new DataFlows() task { flow.x = flow.y } task { flow.y = flow.x }
GPars: Dataflow Sieve
Concurrency - 96
© A
SE
RT
2006-2
010
final int requestedPrimeNumberCount = 1000 final DataFlowStream initialChannel = new DataFlowStream() task { (2..10000).each { initialChannel << it } } def filter(inChannel, int prime) { def outChannel = new DataFlowStream() operator([inputs: [inChannel], outputs: [outChannel]]) { if (it % prime != 0) { bindOutput it } } return outChannel } def currentOutput = initialChannel requestedPrimeNumberCount.times { int prime = currentOutput.val println "Found: $prime" currentOutput = filter(currentOutput, prime) }
Source: http://groovyconsole.appspot.com/script/235002
GPars: Actors... • Predefined coordination
with fork/join &
map/filter/reduce
• Implicit coordination
with dataflow
• Actors provide explicit
coordination and
enforce* no sharing of
state, process a single
activity/message at a
time
* mostly
• Class with the
following lifecycle &
methods
– But also DSL sugar &
augmentation
Concurrency - 97
© A
SE
RT
2006-2
010
start() stop() act() send(msg) sendAndWait(msg) loop { } react { msg -> } msg.reply(replyMsg) receive() join()
…GPars: Actors...
Concurrency - 98
© A
SE
RT
2006-2
010
import static groovyx.gpars.actor.Actors.* def decrypt = reactor { code -> code.reverse() } def audit = reactor { println it } def main = actor { decrypt 'terces pot' react { plainText -> audit plainText } } main.join() audit.stop() audit.join()
Source: ReGina
…GPars: Actors...
Concurrency - 99
© A
SE
RT
2006-2
010
final class FilterActor extends DynamicDispatchActor { private final int myPrime private def follower def FilterActor(final myPrime) { this.myPrime = myPrime; } def onMessage(int value) { if (value % myPrime != 0) { if (follower) follower value else { println "Found $value" follower = new FilterActor(value).start() } } } def onMessage(def poisson) { if (follower) { def sender = poisson.sender follower.sendAndContinue(poisson, {this.stop(); sender?.send('Done')}) //Pass the poisson along and stop after a reply } else { //I am the last in the chain stop() reply 'Done' } } }
Source: http://groovyconsole.appspot.com/script/242001
…GPars: Actors
Concurrency - 100
© A
SE
RT
2006-2
010
(2..requestedPrimeNumberBoundary).each { firstFilter it } firstFilter.sendAndWait 'Poisson'
Source: http://groovyconsole.appspot.com/script/242001
GPars: Agents...
• Agents safeguard non-thread safe objects
• Only the agent can update the underlying
object
• “Code” to update the protected object is
sent to the agent
• Can be used with other approaches
Concurrency - 101
© A
SE
RT
2006-2
010
…GPars: Agents
Concurrency - 102
© A
SE
RT
2006-2
010
@Grab('org.codehaus.gpars:gpars:0.10')
import groovyx.gpars.agent.Agent
def speakers = new Agent<List>(['Alex'], {it?.clone()}) // add Alex
speakers.send {updateValue it << 'Hilary'} // add Hilary
final Thread t1 = Thread.start {
speakers.send {updateValue it << 'Ken'} // add Ken
}
final Thread t2 = Thread.start {
speakers << {updateValue it << 'Guy'} // add Guy
speakers << {updateValue it << 'Ralph'} // add Ralph
}
[t1, t2]*.join()
assert new HashSet(speakers.val) ==
new HashSet(['Alex', 'Hilary', 'Ken', 'Guy', 'Ralph'])
Source: Gpars examples
GPars for testing
Concurrency - 103
© A
SE
RT
2006-2
010
@Grab('net.sourceforge.htmlunit:htmlunit:2.6') import com.gargoylesoftware.htmlunit.WebClient @Grab('org.codehaus.gpars:gpars:0.10') import static groovyx.gpars.GParsPool.*
def testCases = [ ['Home', 'Bart', 'Content 1'], ['Work', 'Homer', 'Content 2'], ['Travel', 'Marge', 'Content 3'], ['Food', 'Lisa', 'Content 4'] ]
withPool(3) { testCases.eachParallel{ category, author, content -> postAndCheck category, author, content } }
private postAndCheck(category, author, content) { ...
Guy Steele example in Groovy…
Concurrency - 104
© A
SE
RT
2006-2
010
def words = { s -> def result = [] def word = '' s.each{ ch -> if (ch == ' ') { if (word) result += word word = '' } else word += ch } if (word) result += word result } assert words("This is a sample") == ['This', 'is', 'a', 'sample'] assert words(" Here is another sample ") == ['Here', 'is', 'another', 'sample'] assert words("JustOneWord") == ['JustOneWord'] assert words("Here is a sesquipedalian string of words") == ['Here', 'is', 'a', 'sesquipedalian', 'string', 'of', 'words'] assert words(" ") == [] && words("") == []
Sequential version
Guy Steele‟s example from keynote (from slide 52 onwards for several slides):
http://strangeloop2010.com/talk/presentation_file/14299/GuySteele-parallel.pdf
…Guy Steele example in Groovy…
Concurrency - 105
© A
SE
RT
2006-2
010
@Immutable class Chunk { String s def plus(Chunk other) { new Chunk(s + other.s) } def plus(Segment other) { new Segment(s + other.l, other.m, other.r) } }
@Immutable class Segment { String l; List m; String r def plus(Chunk other) { new Segment(l, m, r + other.s) } def plus(Segment other) { new Segment(l, m + maybeWord(r + other.l) + other.m, other.r) } }
class Util { static processChar(ch) { ch == ' ' ? new Segment('', [], '') : new Chunk(ch) } static maybeWord(s) { s ? [s] : [] } } import static Util.* ...
Refactored sequential version
…Guy Steele example in Groovy…
Concurrency - 106
© A
SE
RT
2006-2
010
def words = { s -> def result s.each{ ch -> if (!result) result = processChar(ch) else result += processChar(ch) } switch(result) { case Chunk: return maybeWord(result.s) case Segment: return result.with{ maybeWord(l) + m + maybeWord(r) } case null: return [] } } assert words("This is a sample") == ['This', 'is', 'a', 'sample'] assert words(" Here is another sample ") == ['Here', 'is', 'another', 'sample'] assert words("JustOneWord") == ['JustOneWord'] assert words("Here is a sesquipedalian string of words") == ['Here', 'is', 'a', 'sesquipedalian', 'string', 'of', 'words'] assert words(" ") == [] && words("") == []
Refactored sequential version
…Guy Steele example in Groovy…
Concurrency - 107
© A
SE
RT
2006-2
010
def swords = { s -> def result s.each{ ch -> if (!result) result = processChar(ch) else result += processChar(ch) } result ?: new Chunk('') } THREADS = 4 def words = { s -> int n = (s.size() + THREADS - 1) / THREADS def map = new java.util.concurrent.ConcurrentHashMap() (0..<THREADS).collect { i -> Thread.start { def (min, max) = [[s.size(),i*n].min(), [s.size(),(i+1)*n].min()] map[i] = swords(s[min..<max]) }}*.join() def result = map.entrySet().sort{ it.key }.sum{ it.value } switch(result) { case Chunk: return maybeWord(result.s) case Segment: return result.with{ maybeWord(l) + m + maybeWord(r) } } }
Roll your own threading with ConcurrentHashMap version
…Guy Steele example in Groovy…
Concurrency - 108
© A
SE
RT
2006-2
010
def words = { s -> int n = (s.size() + THREADS - 1) / THREADS def min = (0..<THREADS).collectEntries{ [it, [s.size(),it*n].min()] } def max = (0..<THREADS).collectEntries{ [it, [s.size(),(it+1)*n].min()] } def result = new DataFlows().with { task { a = swords(s[min[0]..<max[0]]) } task { b = swords(s[min[1]..<max[1]]) } task { c = swords(s[min[2]..<max[2]]) } task { d = swords(s[min[3]..<max[3]]) } task { sum1 = a + b } task { sum2 = c + d } task { sum = sum1 + sum2 } println 'Tasks ahoy!' sum } switch(result) { case Chunk: return maybeWord(result.s) case Segment: return result.with{ maybeWord(l) + m + maybeWord(r) } } }
DataFlow version: partially hard-coded to 4 partitions for easier reading
…Guy Steele example in Groovy…
Concurrency - 109
© A
SE
RT
2006-2
010
GRANULARITY_THRESHHOLD = 10 THREADS = 4 println GParsPool.withPool(THREADS) { def result = runForkJoin(0, input.size(), input){ first, last, s -> def size = last - first if (size <= GRANULARITY_THRESHHOLD) { swords(s[first..<last]) } else { // divide and conquer def mid = first + ((last - first) >> 1) forkOffChild(first, mid, s) forkOffChild(mid, last, s) childrenResults.sum() } } switch(result) { case Chunk: return maybeWord(result.s) case Segment: return result.with{ maybeWord(l) + m + maybeWord(r) } } }
Fork/Join version
…Guy Steele example in Groovy…
Concurrency - 110
© A
SE
RT
2006-2
010
THRESHHOLD = 10 def split(raw) { raw.size() <= THRESHHOLD ? raw : [raw[0..<THRESHHOLD]] + split(raw.substring(THRESHHOLD)) } println GParsPool.withPool(THREADS) { def ans = split(input).parallel.map(swords).reduce{ a,b -> a + b } switch(ans) { case Chunk: return maybeWord(ans.s) case Segment: return ans.with{ maybeWord(l) + m + maybeWord(r) } } }
Map/Reduce version
…Guy Steele example in Groovy
Concurrency - 111
© A
SE
RT
2006-2
010
println GParsPool.withPool(THREADS) { def ans = input.collectParallel{ processChar(it) }.sum() switch(ans) { case Chunk: return maybeWord(ans.s) case Segment: return ans.with{ maybeWord(l) + m + maybeWord(r) } } }
Just leveraging the algorithm‟s parallel nature
Topics
• Groovy Intro
• Useful Groovy features for Concurrency
• Related Concurrency Libraries & Tools
• Fibonacci Case Study
• GPars
More Info
Concurrency - 112
© A
SE
RT
2006-2
010
More Information about Concurrency
• Web sites – http://gpars.codehaus.org/
– http://g.oswego.edu/
Doug Lea's home page
– http://gee.cs.oswego.edu/dl/concurrency-interest/
– http://jcip.net/
Companion site for Java Concurrency in Practice
– http://www.eecs.usma.edu/webs/people/okasaki/pubs.html#cup98
Purely Functional Data Structures
– http://delicious.com/kragen/concurrency
Concurrency bookmark list
– http://www.gotw.ca/publications/concurrency-ddj.htm
The Free Lunch is Over, Herb Sutter
– http://manticore.cs.uchicago.edu/papers/damp07.pdf
– http://mitpress.mit.edu/catalog/item/default.asp?ttype=2&tid=10142
Concepts, Techniques, and Models of Computer Programming
Concurrency - 113
More Information about Groovy
• Web sites – http://groovy.codehaus.org
– http://grails.codehaus.org
– http://pleac.sourceforge.net/pleac_groovy (many examples)
– http://www.asert.com.au/training/java/GV110.htm (workshop)
• Mailing list for users – [email protected]
• Information portals – http://www.aboutgroovy.org
– http://www.groovyblogs.org
• Documentation (1000+ pages) – Getting Started Guide, User Guide, Developer Guide, Testing
Guide, Cookbook Examples, Advanced Usage Guide
• Books – Several to choose from ...
Concurrency - 114
More Information: Groovy in Action
Concurrency - 115