groovy and concurrency

114
© ASERT 2006-2010 Groovy and Concurrency Dr Paul King, @paulk_asert paulk at asert.com.au

Upload: paul-king

Post on 27-Jan-2015

141 views

Category:

Technology


0 download

DESCRIPTION

Dr Paul King presentation slides on "Groovy and Concurrency" presented at StrangeLoop 2010 in St Loius

TRANSCRIPT

Page 1: groovy and concurrency

© A

SE

RT

2006-2

010

Groovy and Concurrency

Dr Paul King, @paulk_asert

paulk at asert.com.au

Page 2: groovy and concurrency

Concurrent Programming in Groovy • Java concurrent programming enhancements

– Normal OO methods

– Ability to have immutable types

– Some concurrency building blocks

– Annotations with baked in goodness

• Process/Thread ease of use – AntBuilder and GDK methods

• Closures for greater flexibility – Enabler for concurrency

– Closure is Runnable and Callable

• Third-party libraries – GPars, Functional Java (Actors), Multiverse, JCSP

– Cascading.groovy subproject for Hadoop clusters

– Jetlang, JPPF, GridGain, Google Collections, Gruple

– Groovy actors: http://www.groovyactors.org Concurrency - 2

Page 3: groovy and concurrency

Concurrency - 3

Java Concurrency Features

• The early years – Threads, synchronised and non-synchronised

collections, synchronisation at the language level,

Monitors (wait/notify), Locks, ThreadLocal, final, ...

• More recent enhancements – java.util.concurrent: Executors, Thread Pools,

Optimistic updates, Blocking queues, Synchronizers,

Callables, Futures, Atomic operations, Deques, ...

• Emerging – Fork/Join & others, Kilim, Phasers, PicoThreads ...

• Leverage related APIs/technologies – Networking, real-time, GUIs, simulation, database,

multimedia, operating systems, parallel processing,

distribution, mobile agents, nio, ...

Page 4: groovy and concurrency

Topics

Groovy Intro

• Useful Groovy features for Concurrency

• Related Concurrency Libraries & Tools

• Fibonacci Case Study

• GPars

• More Info

Concurrency - 4

© A

SE

RT

2006-2

010

Page 5: groovy and concurrency

What is Groovy?

Concurrency - 5

© A

SE

RT

2006-2

010

• “Groovy is like a super version

of Java. It can leverage Java's

enterprise capabilities but also

has cool productivity features like closures,

DSL support, builders and dynamic typing.”

Groovy = Java – boiler plate code + optional dynamic typing + closures + domain specific languages + builders + metaprogramming

Page 6: groovy and concurrency

Groovy Goodies Overview • Fully object oriented

• Closures: reusable

and assignable

pieces of code

• Operators can be

overloaded

• Multimethods

• Literal declaration

for lists (arrays),

maps, ranges and

regular expressions

• GPath: efficient

object navigation

• GroovyBeans

• grep and switch

• Templates, builder,

swing, Ant, markup, XML,

SQL, XML-RPC, Scriptom,

Grails, tests, Mocks

Concurrency - 6

© A

SE

RT

2006-2

010

Page 7: groovy and concurrency

Growing Acceptance …

A slow and steady start but now gaining in

momentum, maturity and mindshare

Now free

Page 8: groovy and concurrency

… Growing Acceptance …

Concurrency - 8

© A

SE

RT

2006-2

010

What alternative JVM language are you using or intending to use

http://www.leonardoborges.com/writings

http://it-republik.de/jaxenter/quickvote/results/1/poll/44

(translated using http://babelfish.yahoo.com)

Page 9: groovy and concurrency

… Growing Acceptance …

Concurrency - 9

© A

SE

RT

2006-2

010

Source: http://www.grailspodcast.com/

Source: http://www.micropoll.com/akira/mpresult/501697-116746

Page 10: groovy and concurrency

… Growing Acceptance …

Concurrency - 10

© A

SE

RT

2006-2

010

http://www.java.net

http://www.jroller.com/scolebourne/entry/devoxx_2008_whiteboard_votes

Page 11: groovy and concurrency

… Growing Acceptance …

Concurrency - 11

© A

SE

RT

2006-2

010

Groovy and Grails

downloads: 70-90K per

month and growing Frequent topic at the popular conferences

Page 12: groovy and concurrency

… Growing Acceptance

Concurrency - 12

© A

SE

RT

2006-2

010

Page 13: groovy and concurrency

The Landscape of JVM Languages

Concurrency - 13

© A

SE

RT

2006-2

010

Java bytecode calls

for static types

Dynamic features call

for dynamic types

optional

static

types

The terms “Java Virtual Machine” and “JVM” mean a Virtual Machine for the Java™ platform.

Page 14: groovy and concurrency

Groovy Starter

Concurrency - 14

© A

SE

RT

2006-2

010

System.out.println("Hello, World!"); // supports Java syntax println 'Hello, World!' // but can remove some syntax String name = 'Guillaume' // Explicit typing/awareness println "$name, I'll get the car." // Gstring (interpolation) def longer = """${name}, the car is in the next row.""" // multi-line, implicit type assert 0.5 == 1/2 // BigDecimal equals() assert 0.1 + 0.2 == 0.3 // and arithmetic def printSize(obj) { // implicit/duck typing print obj?.size() // safe dereferencing } def pets = ['ant', 'bee', 'cat'] // native list syntax pets.each { pet -> // closure support assert pet < 'dog' // overloading '<' on String } // or: for (pet in pets)...

Page 15: groovy and concurrency

A Better Java...

Concurrency - 15

© A

SE

RT

2006-2

010

import java.util.List; import java.util.ArrayList; class Erase { private List removeLongerThan(List strings, int length) { List result = new ArrayList(); for (int i = 0; i < strings.size(); i++) { String s = (String) strings.get(i); if (s.length() <= length) { result.add(s); } } return result; } public static void main(String[] args) { List names = new ArrayList(); names.add("Ted"); names.add("Fred"); names.add("Jed"); names.add("Ned"); System.out.println(names); Erase e = new Erase(); List shortNames = e.removeLongerThan(names, 3); System.out.println(shortNames.size()); for (int i = 0; i < shortNames.size(); i++) { String s = (String) shortNames.get(i); System.out.println(s); } } }

This code

is valid

Java and

valid Groovy

Based on an

example by

Jim Weirich

& Ted Leung

Page 16: groovy and concurrency

...A Better Java...

Concurrency - 16

© A

SE

RT

2006-2

010

import java.util.List; import java.util.ArrayList; class Erase { private List removeLongerThan(List strings, int length) { List result = new ArrayList(); for (int i = 0; i < strings.size(); i++) { String s = (String) strings.get(i); if (s.length() <= length) { result.add(s); } } return result; } public static void main(String[] args) { List names = new ArrayList(); names.add("Ted"); names.add("Fred"); names.add("Jed"); names.add("Ned"); System.out.println(names); Erase e = new Erase(); List shortNames = e.removeLongerThan(names, 3); System.out.println(shortNames.size()); for (int i = 0; i < shortNames.size(); i++) { String s = (String) shortNames.get(i); System.out.println(s); } } }

Do the

semicolons

add anything?

And shouldn‟t

we us more

modern list

notation?

Why not

import common

libraries?

Page 17: groovy and concurrency

...A Better Java...

Concurrency - 17

© A

SE

RT

2006-2

010

class Erase { private List removeLongerThan(List strings, int length) { List result = new ArrayList() for (String s in strings) { if (s.length() <= length) { result.add(s) } } return result } public static void main(String[] args) { List names = new ArrayList() names.add("Ted"); names.add("Fred") names.add("Jed"); names.add("Ned") System.out.println(names) Erase e = new Erase() List shortNames = e.removeLongerThan(names, 3) System.out.println(shortNames.size()) for (String s in shortNames) { System.out.println(s) } } }

Page 18: groovy and concurrency

...A Better Java...

Concurrency - 18

© A

SE

RT

2006-2

010

class Erase { private List removeLongerThan(List strings, int length) { List result = new ArrayList() for (String s in strings) { if (s.length() <= length) { result.add(s) } } return result } public static void main(String[] args) { List names = new ArrayList() names.add("Ted"); names.add("Fred") names.add("Jed"); names.add("Ned") System.out.println(names) Erase e = new Erase() List shortNames = e.removeLongerThan(names, 3) System.out.println(shortNames.size()) for (String s in shortNames) { System.out.println(s) } } }

Do we need

the static types?

Must we always

have a main

method and

class definition?

How about

improved

consistency?

Page 19: groovy and concurrency

...A Better Java...

Concurrency - 19

© A

SE

RT

2006-2

010

def removeLongerThan(strings, length) { def result = new ArrayList() for (s in strings) { if (s.size() <= length) { result.add(s) } } return result } names = new ArrayList() names.add("Ted") names.add("Fred") names.add("Jed") names.add("Ned") System.out.println(names) shortNames = removeLongerThan(names, 3) System.out.println(shortNames.size()) for (s in shortNames) { System.out.println(s) }

Page 20: groovy and concurrency

...A Better Java...

Concurrency - 20

© A

SE

RT

2006-2

010

def removeLongerThan(strings, length) { def result = new ArrayList() for (s in strings) { if (s.size() <= length) { result.add(s) } } return result } names = new ArrayList() names.add("Ted") names.add("Fred") names.add("Jed") names.add("Ned") System.out.println(names) shortNames = removeLongerThan(names, 3) System.out.println(shortNames.size()) for (s in shortNames) { System.out.println(s) }

Shouldn‟t we

have special

notation for lists?

And special

facilities for

list processing?

Is „return‟

needed at end?

Page 21: groovy and concurrency

...A Better Java...

Concurrency - 21

© A

SE

RT

2006-2

010

def removeLongerThan(strings, length) { strings.findAll{ it.size() <= length } } names = ["Ted", "Fred", "Jed", "Ned"] System.out.println(names) shortNames = removeLongerThan(names, 3) System.out.println(shortNames.size()) shortNames.each{ System.out.println(s) }

Page 22: groovy and concurrency

...A Better Java...

Concurrency - 22

© A

SE

RT

2006-2

010

def removeLongerThan(strings, length) { strings.findAll{ it.size() <= length } } names = ["Ted", "Fred", "Jed", "Ned"] System.out.println(names) shortNames = removeLongerThan(names, 3) System.out.println(shortNames.size()) shortNames.each{ System.out.println(s) }

Is the method

now needed?

Easier ways to

use common

methods?

Are brackets

required here?

Page 23: groovy and concurrency

...A Better Java

Concurrency - 23

© A

SE

RT

2006-2

010

names = ["Ted", "Fred", "Jed", "Ned"] println names shortNames = names.findAll{ it.size() <= 3 } println shortNames.size() shortNames.each{ println it }

["Ted", "Fred", "Jed", "Ned"] 3 Ted Jed Ned

Output:

Page 24: groovy and concurrency

Grapes / Grab: Google collections

Concurrency - 24

© A

SE

RT

2006-2

010

@Grab('com.google.collections:google-collections:1.0') import com.google.common.collect.HashBiMap HashBiMap fruit = [grape:'purple', lemon:'yellow', lime:'green'] assert fruit.lemon == 'yellow' assert fruit.inverse().yellow == 'lemon'

Page 25: groovy and concurrency

ESDC 2010 - 25

© A

SE

RT

2006-2

010

Better Design Patterns: Delegate…

import java.util.Date; public class Event { private String title; private String url; private Date when; public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } // ...

public Date getWhen() { return when; } public void setWhen(Date when) { this.when = when; } public boolean before(Date other) { return when.before(other); } public void setTime(long time) { when.setTime(time); } public long getTime() { return when.getTime(); } public boolean after(Date other) { return when.after(other); } // ...

Page 26: groovy and concurrency

ESDC 2010 - 26

© A

SE

RT

2006-2

010

…Better Design Patterns: Delegate…

import java.util.Date; public class Event { private String title; private String url; private Date when; public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } // ...

public Date getWhen() { return when; } public void setWhen(Date when) { this.when = when; } public boolean before(Date other) { return when.before(other); } public void setTime(long time) { when.setTime(time); } public long getTime() { return when.getTime(); } public boolean after(Date other) { return when.after(other); } // ...

boilerplate

Page 27: groovy and concurrency

ESDC 2010 - 27

© A

SE

RT

2006-2

010

…Better Design Patterns: Delegate

class Event { String title, url @Delegate Date when }

def gr8conf = new Event(title: "GR8 Conference", url: "http://www.gr8conf.org", when: Date.parse("yyyy/MM/dd", "2009/05/18")) def javaOne = new Event(title: "JavaOne", url: "http://java.sun.com/javaone/", when: Date.parse("yyyy/MM/dd", "2009/06/02")) assert gr8conf.before(javaOne.when)

Page 28: groovy and concurrency

Why Groovy? Technical Answer

• Minimal learning curve

• Compiles to bytecode

• Java object model & integration

• Annotations

• "Optional" static typing

• Both run-time and compile-time

metaprogramming

Concurrency - 28

Page 29: groovy and concurrency

Why Groovy? Adoption Assessment

• Innovators/Thought leaders – Ideas, power, flexibility, novelty, thinking community

• Early adopters – Productivity benefits and collegiate community

– Leverage JVM and potential for mainstream

• Mainstream – Leverage existing Java skills, low learning curve

– Leverage JVM and production infrastructure

– Professional community

– Tools, tools, tools

Page 30: groovy and concurrency

"Andy giveth and Bill taketh away"

Concurrency - 30

So

urc

e: H

erb

Su

tte

r: h

ttp

://w

ww

.gotw

.ca

/pub

lica

tion

s/c

on

cu

rre

ncy-d

dj.h

tm

Page 31: groovy and concurrency

Why is it hard?

• Many issues to deal with: – Doing things in parallel, concurrently,

asynchronously

• Processes, Threads, Co-routines, Events, Scheduling

– Sharing/Synchronization Mechanisms

• shared memory, locks, transactions, wait/notify, STM,

message passing, actors, serializability, persistence,

immutability

– Abstractions

• Shared memory on top of messaging passing

• Message passing on top of shared memory

• Dataflow, Selective Communication, Continuations

– Data Structures and Algorithms

• Queues, Heaps, Trees

• Sorting, Graph Algorithms

Concurrency - 31

Page 32: groovy and concurrency

Topics

• Groovy Intro

Useful Groovy features for Concurrency

• Related Concurrency Libraries & Tools

• Fibonacci Case Study

• GPars

• More Info

Concurrency - 32

© A

SE

RT

2006-2

010

Page 33: groovy and concurrency

Thread Enhancements…

• Thread improvements

Concurrency - 33

Thread.start{ sleep 1001; println 'one' } Thread.start{ sleep 1000; println 'two' }

println 'one' def t = Thread.start{ sleep 100; println 'three' } println 'two' t.join() println 'four'

Page 34: groovy and concurrency

…Thread Enhancements…

• Thread improvements (Cont’d)

Concurrency - 34

class Storage { List stack = [] synchronized void leftShift(value){ stack << value println "push: $value" notifyAll() } synchronized Object pop() { while (!stack) { try{ wait() } catch(InterruptedException e){} } def value = stack.pop() println "pop : $value" return value } } ...

Page 35: groovy and concurrency

…Thread Enhancements…

• Thread improvements (Cont’d)

Concurrency - 35

push: 0

push: 1

push: 2

pop : 2

push: 3

push: 4

pop : 4

push: 5

push: 6

pop : 6

push: 7

push: 8

pop : 8

push: 9

pop : 9

pop : 7

pop : 5

pop : 3

pop : 1

pop : 0

... storage = new Storage() Thread.start { for (i in 0..9) { storage << i sleep 100 } } Thread.start { 10.times { sleep 200 storage.pop() } }

Page 36: groovy and concurrency

…Thread Enhancements…

• Thread improvements (Cont’d)

Concurrency - 36

ROCK!

. 25

. 33

. 34

. 35

. 36

.. 131

.. 134

.. 137

.. 138

.. 139

... 232

... 234

... 237

... 238

... 239

.... 334

.... 336

.... 337

.... 338

.... 339

import java.util.concurrent.locks.ReentrantLock import static System.currentTimeMillis as now def startTime = now() ReentrantLock.metaClass.withLock = { critical -> lock() try { critical() } finally { unlock() } } def lock = new ReentrantLock() def worker = { threadNum -> 4.times { count -> lock.withLock { print " " * threadNum print "." * (count + 1) println " ${now() - startTime}" } Thread.sleep 100 } } 5.times { Thread.start worker.curry(it) } println "ROCK!"

Source: http://chrisbroadfoot.id.au/articles/2008/08/06/groovy-threads

Page 37: groovy and concurrency

… Thread Enhancements

• Thread Management meets AtomicInteger

Concurrency - 37

thread loop 1

main loop 1

thread loop 2

thread loop 3

main loop 2

thread loop 4

thread loop 5

main loop 3

thread loop 6

main loop 4

thread loop 7

thread loop 8

import java.util.concurrent.atomic.AtomicInteger def counter = new AtomicInteger() synchronized out(message) { println(message) } def th = Thread.start { for( i in 1..8 ) { sleep 30 out "thread loop $i" counter.incrementAndGet() } } for( j in 1..4 ) { sleep 50 out "main loop $j" counter.incrementAndGet() } th.join() assert counter.get() == 12

Page 38: groovy and concurrency

Process Enhancements…

• Using AntBuilder

Concurrency - 38

def ant = new AntBuilder() ant.parallel { 10.times { echo "Message $it" } }

[echo] Message 6

[echo] Message 9

[echo] Message 8

[echo] Message 0

[echo] Message 2

[echo] Message 4

[echo] Message 1

[echo] Message 7

[echo] Message 5

[echo] Message 3

Page 39: groovy and concurrency

…Process Enhancements…

• Using AntBuilder (Cont’d)

Concurrency - 39

def ant = new AntBuilder() ant.taskdef(name:'groovy', classname:'org.codehaus.groovy.ant.Groovy') ant.parallel { 10.times { echo "Ant message $it" groovy "println 'Groovy via ant message $it'" println "Groovy message $it" // or fork java, command line, thread ... } }

def ant = new AntBuilder() ant.exec(outputproperty:"cmdOut", errorproperty: "cmdErr", resultproperty:"cmdExit", failonerror: "true", executable: /opt/myExecutable') { arg(line:'*"first with space"* second') }

Page 40: groovy and concurrency

…Process Enhancements

• Process Management

Concurrency - 40

def process = "ls -l".execute() println "Found text ${process.text}"

def process = "ls -l".execute() process.in.eachLine { line -> println line }

def sout = new StringBuffer() def serr = new StringBuffer() proc1 = 'gzip -c'.execute() proc2 = 'gunzip -c'.execute() proc2.consumeProcessOutput(sout, serr) proc1 | proc2 proc1.consumeProcessErrorStream(serr) proc1.withWriter { writer -> writer << 'test text' } proc2.waitForOrKill(1000) println 'sout: ' + sout // => test text println 'serr: ' + serr

proc1 = 'ls'.execute() proc2 = 'tr -d o'.execute() proc3 = 'tr -d e'.execute() proc4 = 'tr -d i'.execute() proc1 | proc2 | proc3 | proc4 proc4.waitFor() if (proc4.exitValue()) print proc4.err.text else print proc4.text

Page 41: groovy and concurrency

Parallelize your arrays with JSR 166y

Concurrency - 41

//Create a pool with size close to the number of processor cores def pool = new ForkJoinPool(2)

def createParallelArray(pool, collection) { return ParallelArray.createFromCopy( collection.toArray(new Object[collection.size()]), pool) }

// Enhance ArrayLists to find matching objects in parallel ArrayList.metaClass.findAll = {Closure cl -> createParallelArray(pool, delegate). withFilter({cl(it)} as Predicate).all().asList() }

def sites=['http://www.jroller.com', 'http://www.infoq.com', 'http://java.dzone.com']

def groovySites = sites.findAll { new URL(it).text.toLowerCase().contains('groovy')} println "These sites talk about Groovy today: ${groovySites}"

Source: http://www.jroller.com/vaclav/date/20080923

Page 42: groovy and concurrency

Java Concurrency Best Practice?

• Java Concurrency in Practice:

– “If mutable threads access the

same mutable state variable

without appropriate

synchronization,

your program is broken”

– “When designing thread-safe classes,

good object-oriented techniques –

encapsulation, immutability, and clear

specification of invariants – are your

best friends”

Concurrency - 42

Page 43: groovy and concurrency

Immutability options

• Built-in

• Google Collections – Numerous improved immutable collection types

• Groovy run-time metaprogramming

• Groovy 1.6+ compile-time

metaprogramming – @Immutable can help us create such classes

Concurrency - 43

import com.google.common.collect.* List<String> animals = ImmutableList.of("cat", "dog", "horse") animals << 'fish' // => java.lang.UnsupportedOperationException

def animals = ['cat', 'dog', 'horse'].asImmutable() animals << 'fish' // => java.lang.UnsupportedOperationException

def animals = ['cat', 'dog', 'horse'] ArrayList.metaClass.leftShift = { throw new UnsupportedOperationException() } animals << 'fish' // => java.lang.UnsupportedOperationException

Page 44: groovy and concurrency

@Immutable...

• Java Immutable Class – As per Joshua Bloch

Effective Java

Concurrency - 44

© A

SE

RT

2006-2

010

public final class Punter { private final String first; private final String last; public String getFirst() { return first; } public String getLast() { return last; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((first == null) ? 0 : first.hashCode()); result = prime * result + ((last == null) ? 0 : last.hashCode()); return result; } public Punter(String first, String last) { this.first = first; this.last = last; } // ...

// ... @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; Punter other = (Punter) obj; if (first == null) { if (other.first != null) return false; } else if (!first.equals(other.first)) return false; if (last == null) { if (other.last != null) return false; } else if (!last.equals(other.last)) return false; return true; } @Override public String toString() { return "Punter(first:" + first + ", last:" + last + ")"; } }

Page 45: groovy and concurrency

...@Immutable...

• Java Immutable Class – As per Joshua Bloch

Effective Java

Concurrency - 45

© A

SE

RT

2006-2

010

public final class Punter { private final String first; private final String last; public String getFirst() { return first; } public String getLast() { return last; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((first == null) ? 0 : first.hashCode()); result = prime * result + ((last == null) ? 0 : last.hashCode()); return result; } public Punter(String first, String last) { this.first = first; this.last = last; } // ...

// ... @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; Punter other = (Punter) obj; if (first == null) { if (other.first != null) return false; } else if (!first.equals(other.first)) return false; if (last == null) { if (other.last != null) return false; } else if (!last.equals(other.last)) return false; return true; } @Override public String toString() { return "Punter(first:" + first + ", last:" + last + ")"; } }

boilerplate

Page 46: groovy and concurrency

...@Immutable

Concurrency - 46

© A

SE

RT

2006-2

010

@Immutable class Punter { String first, last }

Page 47: groovy and concurrency

@Synchronized – Before Transform

Concurrency - 47

© A

SE

RT

2006-2

010

class SynchronizedExample { private final myLock = new Object() @Synchronized static void greet() { println "world" } @Synchronized int answerToEverything() { return 42 } @Synchronized("myLock") void foo() { println "bar" } }

Page 48: groovy and concurrency

@Synchronized – After Transform

Concurrency - 48

© A

SE

RT

2006-2

010

class SynchronizedExample { private static final $LOCK = new Object[0] private final $lock = new Object[0] private final myLock = new Object()

static void greet() { synchronized ($LOCK) { println "world" } }

int answerToEverything() { synchronized ($lock) { return 42 } }

void foo() { synchronized (myLock) { println "bar" } } }

Inspired by

Project

Lombok

Page 49: groovy and concurrency

@Lazy...

• Safe initialization idioms – Eager or fully synchronized

Concurrency - 49

© A

SE

RT

2006-2

010

import net.jcip.annotations.ThreadSafe @ThreadSafe class EagerInitialization { static final resource = new Resource() }

@ThreadSafe class SafeLazyInitialization { private static ExpensiveResource resource synchronized static ExpensiveResource getInstance() { if (!resource) resource = new ExpensiveResource () resource } }

Page 50: groovy and concurrency

...@Lazy...

• Java Concurrency in Practice – Race condition with lazy initialization

– No problems: just apply Double-checked

locking pattern Concurrency - 50

© A

SE

RT

2006-2

010

import net.jcip.annotations.NotThreadSafe @NotThreadSafe class LazyInitRace { private ExpensiveResource instance = null ExpensiveResource getInstance() { if (!instance) instance = new ExpensiveResource() instance } }

Page 51: groovy and concurrency

...@Lazy...

• Java Concurrency in Practice – Double checked locking anti-pattern

Concurrency - 51

© A

SE

RT

2006-2

010

@NotThreadSafe class DoubleCheckedLocking { private static ExpensiveResource instance = null static ExpensiveResource getInstance() { if (!instance) { synchronized (DoubleCheckedLocking) { if (!instance) instance = new ExpensiveResource() } } instance } }

Page 52: groovy and concurrency

...@Lazy...

• Java Concurrency in Practice – Double checked locking less broken-pattern

Concurrency - 52

© A

SE

RT

2006-2

010

class DoubleCheckedLocking { private static volatile ExpensiveResource instance = null static ExpensiveResource getInstance() { if (!instance) { synchronized (DoubleCheckedLocking) { if (!instance) instance = new ExpensiveResource() } } instance } }

Page 53: groovy and concurrency

...@Lazy...

• Java Concurrency in Practice – Lazy initialization holder class idiom

Concurrency - 53

© A

SE

RT

2006-2

010

import net.jcip.annotations.ThreadSafe

@ThreadSafe

class ResourceFactory {

private static class ResourceHolder {

static Resource resource = new Resource()

}

static Resource getResource() {

ResourceHolder.resource

}

}

Page 54: groovy and concurrency

...@Lazy

Concurrency - 54

© A

SE

RT

2006-2

010

@Lazy volatile Resource second

@Lazy volatile Resource third = { new Resource(args) }()

@Lazy(soft = true) volatile Resource fourth

@Lazy Resource first

Page 55: groovy and concurrency

Topics

• Groovy Intro

• Useful Groovy features for Concurrency

Related Concurrency Libraries & Tools

• Fibonacci Case Study

• GPars

• More Info

Concurrency - 55

© A

SE

RT

2006-2

010

Page 56: groovy and concurrency

Lightweight threads: Jetlang

• Jetlang – A high performance java threading library

Concurrency - 56

Fiber receiver = new ThreadFiber(); receiver.start(); // create java.util.concurrent.CountDownLatch to notify when message arrives final CountDownLatch latch = new CountDownLatch(1); // create channel to message between threads Channel<String> channel = new MemoryChannel<String>(); Callback<String> onMsg = new Callback<String>() { public void onMessage(String message) { //open latch latch.countDown(); } }; //add subscription for message on receiver thread channel.subscribe(receiver, onMsg); //publish message to receive thread. the publish method is thread safe. channel.publish("Hello"); //wait for receiving thread to receive message latch.await(10, TimeUnit.SECONDS); //shutdown thread receiver.dispose();

// JAVA

Page 57: groovy and concurrency

Other High-Level Libraries: JPPF

– Open source Grid Computing platform

– http://www.jppf.org/

Concurrency - 57

import org.jppf.client.* import java.util.concurrent.Callable

class Task implements Callable, Serializable { private static final long serialVersionUID = 1162L; public Object call() { println 'Executing Groovy' "Hello JPPF from Groovy" } } def client = new JPPFClient() def job = new JPPFJob() def task = new Task() job.addTask task def results = client.submit(job) for (t in results) { if (t.exception) throw t.exception println "Result: " + t.result }

Page 58: groovy and concurrency

Other High-Level Libraries: Gruple...

– http://code.google.com/p/gruple

– Gruple aims to provide a simple abstraction to allow

programmers to coordinate and synchronize threads

with ease – based on Tuplespaces

• Tuplespaces provide the illusion of a shared memory on top

of a message passing system, along with a small set of

operations to greatly simplify parallel programming

– Example Tuple: [fname:"Vanessa", lname:"Williams", project:"Gruple"]

– Basic operations within a Tuplespace are:

• put - insert a tuple into the space

• get - read a tuple from the space (non-destructively)

• take - take a tuple from the space (a destructive read)

– Further reading: Eric Freeman, Susanne Hupfer, and

Ken Arnold. JavaSpaces Principles, Patterns, and

Practice, Addison Wesley, 1999 Concurrency - 58

Page 59: groovy and concurrency

Other High-Level Libraries: ...Gruple...

– Mandelbrot example (included in Gruple download)

Concurrency - 59

... Space space = SpaceService.getSpace("mandelbrot") Map template = createTaskTemplate() Map task String threadName = Thread.currentThread().name while(true) { ArrayList points task = space.take(template) println "Worker $threadName got task ${task['start']} for job ${task['jobId']}" points = calculateMandelbrot(task) Map result = createResult(task['jobId'], task['start'], points) println "Worker $threadName writing result for task ${result['start']} for job ${result['jobId']}" space.put(result) } ...

Page 60: groovy and concurrency

Other High-Level Libraries: ...Gruple

Concurrency - 60

Page 61: groovy and concurrency

Other High-Level Libraries: Cascading.groovy

– API/DSL for executing tasks on a Hadoop cluster

– http://www.cascading.org/

Concurrency - 61

def assembly = builder.assembly(name: "wordcount") { eachTuple(args: ["line"], results: ["word"]) { regexSplitGenerator(declared: ["word"], pattern: /[.,]*\s+/) } group(["word"]) everyGroup(args: ["word"], results: ["word", "count"]) { count() } group(["count"], reverse: true) } def map = builder.map() { source(name: "wordcount") { hfs(input) { text(["line"]) } } sink(name: "wordcount") { hfs(output) { text() } } } def flow = builder.flow(name: "wordcount", map: map, assembly: assembly)

Page 62: groovy and concurrency

Other High-Level Libraries: GridGain

– Simple & productive to use grid computing platform

– http://www.gridgain.com/

Concurrency - 62

class GridHelloWorldGroovyTask extends GridTaskSplitAdapter<String, Integer> { Collection split(int gridSize, Object phrase) throws GridException { // ... } Object reduce(List results) throws GridException { // ... } }

import static GridFactory.* start() def grid = getGrid() def future = grid.execute(GridHelloWorldGroovyTask, "Hello World") def phraseLen = future.get() stop(true)

Page 63: groovy and concurrency

Multiverse STM…

– http://www.multiverse.org/

Concurrency - 63

import org.multiverse.api.GlobalStmInstance import org.multiverse.api.Transaction import org.multiverse.templates.TransactionTemplate import org.multiverse.transactional.refs.LongRef def from = new Account(10) def to = new Account(10) atomic { from.balance -= 5 to.balance += 5 } println "from $from.balance" println "to $to.balance" ...

Page 64: groovy and concurrency

…Multiverse STM…

Concurrency - 64

... void atomic(Closure block) { atomic([:], block) } void atomic(Map args, Closure block) { boolean readonly = args['readonly'] ?: false boolean trackreads = args['trackreads'] ?: true def txFactory = GlobalStmInstance.globalStmInstance. transactionFactoryBuilder. setReadonly(readonly). setReadTrackingEnabled(trackreads).build() new TransactionTemplate(txFactory) { Object execute(Transaction transaction) { block.call() return null } }.execute() } ...

Page 65: groovy and concurrency

…Multiverse STM

Concurrency - 65

class Account { private final balance = new LongRef() Account(long initial) { balance.set initial } void setBalance(long newBalance) { if (newBalance < 0) throw new RuntimeException("not enough money") balance.set newBalance } long getBalance() { balance.get() } }

Page 66: groovy and concurrency

Testing multi-threaded applications: ConTest...

• Advanced Testing for Multi-Threaded Applications

– Tool for testing, debugging, and coverage-measuring

of concurrent programs (collects runtime statistics)

– Systematically and transparently (using a java agent)

schedules the execution of program threads in ways

likely to reveal race conditions, deadlocks, and other

intermittent bugs (collectively called synchronization

problems) with higher than normal frequency

– The ConTest run-time engine adds heuristically

controlled conditional instructions (adjustable by a

preferences file) that force thread switches, thus

helping to reveal concurrent bugs. You can use

existing tests and run ConTest multiple times – by

default different heuristics used each time it is run

• http://www.alphaworks.ibm.com/tech/contest

Concurrency - 66

Page 67: groovy and concurrency

...Testing multi-threaded applications: ConTest

Concurrency - 67

NUM = 5

count = 0

def incThread = { n -> Thread.start{

sleep n*10

//synchronized(ParalInc) {

count++

//}

} }

def threads = (1..NUM).collect(incThread)

threads.each{ it.join() }

assert count == NUM

targetClasses = ParalInc

timeoutTampering = true

noiseFrequency = 500

strength = 10000

Exception in thread "main" Assertion failed:

assert count == NUM

| | |

4 | 5

false

> groovyc ParalInc.groovy

> java -javaagent:../../Lib/ConTest.jar -cp %GROOVY_JAR%;. ParalInc

ParalInc.groovy

Page 68: groovy and concurrency

Concurrency - 68

GContracts

@Grab('org.gcontracts:gcontracts:1.0.2') import org.gcontracts.annotations.* @Invariant({ first != null && last != null }) class Person { String first, last @Requires({ delimiter in ['.', ',', ' '] }) @Ensures({ result == first + delimiter + last }) String getName(String delimiter) { first + delimiter + last } } new Person(first: 'John', last: 'Smith').getName('.')

1.8+

Page 69: groovy and concurrency

Testing: Spock

Concurrency - 69

class HelloSpock extends spock.lang.Specification { def "length of Spock's and his friends' names"() { expect: name.size() == length where: name | length "Spock" | 5 "Kirk" | 4 "Scotty" | 6 } }

Page 70: groovy and concurrency

Topics

• Groovy Intro

• Useful Groovy features for Concurrency

• Related Concurrency Libraries & Tools

Fibonacci Case Study

• GPars

• More Info

Concurrency - 70

© A

SE

RT

2006-2

010

Page 71: groovy and concurrency

Fibonacci Case Study

Concurrency - 71

Page 72: groovy and concurrency

Concurrency - 72

Fibonacci…

START = 8 END = 16 fib = {n -> n < 2 ? n : fib(n - 1) + fib(n - 2) } (START..END).each {num -> println "n:$num => ${fib(num)}" }

Serial

version

Page 73: groovy and concurrency

Concurrency - 73

…Fibonacci…

import java.util.concurrent.* THREADS = 4 START = 8 END = 16 QUIT = -1 class Fibonacci { def values = new ConcurrentHashMap() int calc(x) { x < 2 ? x : calc(x-1) + calc(x-2) } int calcWithCache(x) { def result = values[x] if (!result) { result = calc(x) values.putIfAbsent(x, result) } result } } println "Calculating Fibonacci sequence in parallel..." def queue = new ArrayBlockingQueue(10)

ConcurrentHashMap

version

Page 74: groovy and concurrency

Concurrency - 74

…Fibonacci…

Thread.start('Producer') { int x = START while (x <= END) { sleep 200 queue << x++ } sleep 1000 THREADS.times { queue << QUIT } } (1..THREADS).each { def name = "Consumer$it" Thread.start(name) { def done = false def fib = new Fibonacci() while (!done) { def n = queue.take() if (n == QUIT) done = true else println "$name n:$n => ${fib.calcWithCache(n)}" } } }

Page 75: groovy and concurrency

Concurrency - 75

…Fibonacci…

import java.util.concurrent.* CUTOFF = 12 // not worth parallelizing for small n THREADS = 100 println "Calculating Fibonacci sequence in parallel..." serialFib = {n -> (n < 2) ? n : serialFib(n - 1) + serialFib(n - 2) } pool = Executors.newFixedThreadPool(THREADS) defer = {c -> pool.submit(c as Callable) } fib = {n -> if (n < CUTOFF) return serialFib(n) def left = defer { fib(n - 1) } def right = defer { fib(n - 2) } left.get() + right.get() } (8..16).each {n -> println "n=$n => ${fib(n)}" } pool.shutdown()

Executor

version

Page 76: groovy and concurrency

Concurrency - 76

…Fibonacci… import EDU.oswego.cs.dl.util.concurrent.FJTask import EDU.oswego.cs.dl.util.concurrent.FJTaskRunnerGroup class Fib extends FJTask { static final CUTOFF = 12 volatile int number int getAnswer() { if (!isDone()) throw new IllegalStateException() number } void run() { int n = number if (n <= CUTOFF) number = seqFib(n) else { def f1 = new Fib(number: n - 1) def f2 = new Fib(number: n - 2) coInvoke(f1, f2) number = f1.number + f2.number } } int seqFib(int n) { n < 2 ? n : seqFib(n - 1) + seqFib(n - 2) } }

Fork/Join

version

Page 77: groovy and concurrency

Concurrency - 77

…Fibonacci…

def THREADS = 2 def group = new FJTaskRunnerGroup(THREADS) def START = 8 def END = 16 (START..END).each {num -> def f = new Fib(number: num) group.invoke(f) println "n:$num => $f.answer" }

Page 78: groovy and concurrency

Concurrency - 78

…Fibonacci…

import fj.* import fj.control.parallel.Strategy import static fj.Function.curry as fcurry import static fj.P1.curry as pcurry import static fj.P1.fmap import static fj.control.parallel.Actor.actor import static fj.control.parallel.Promise.* import static fj.data.List.range import static java.util.concurrent.Executors.* CUTOFF = 12 // not worth parallelizing for small n START = 8 END = 16 THREADS = 4 pool = newFixedThreadPool(THREADS) su = Strategy.executorStrategy(pool) spi = Strategy.executorStrategy(pool) add = fcurry({a, b -> a + b } as F2) nums = range(START, END + 1) println "Calculating Fibonacci sequence in parallel..."

FunctionalJava

version

Page 79: groovy and concurrency

Concurrency - 79

…Fibonacci…

serialFib = {n -> n < 2 ? n : serialFib(n - 1) + serialFib(n - 2) } print = {results -> def n = START results.each { println "n=${n++} => $it" } pool.shutdown() } as Effect calc = {n -> n < CUTOFF ? promise(su, P.p(serialFib(n))) : calc.f(n - 1).bind(join(su, pcurry(calc).f(n - 2)), add) } as F out = actor(su, print) join(su, fmap(sequence(su)).f(spi.parMapList(calc).f(nums))).to(out)

Page 80: groovy and concurrency

Concurrency - 80

…Fibonacci…

import org.jetlang.core.Callback import org.jetlang.channels.MemoryChannel import org.jetlang.fibers.ThreadFiber import java.util.concurrent.* println "Calculating Fibonacci sequence with two cooperating fibers..." class FibonacciCalc implements Callback { private channel, receiver def limit, name, latch void onMessage(inpair) { def next = inpair[0] + inpair[1] def outpair = [inpair[1], next] println "$name next:$next" channel.publish(outpair) if (next > limit) { latch.countDown() sleep 200 receiver.dispose() } } // ...

Jetlang

version

Page 81: groovy and concurrency

Concurrency - 81

…Fibonacci

// ... void subscribe(other) { channel.subscribe(receiver, other) } FibonacciCalc() { channel = new MemoryChannel() receiver = new ThreadFiber() receiver.start() } } def seed = [0, 1] def latch = new CountDownLatch(2) def calcA = new FibonacciCalc(limit: 500, name: 'CalcA', latch: latch) def calcB = new FibonacciCalc(limit: 500, name: 'CalcB', latch: latch) calcA.subscribe calcB calcB.subscribe calcA calcA.onMessage seed latch.await(10, TimeUnit.SECONDS)

Page 82: groovy and concurrency

Topics

• Groovy Intro

• Useful Groovy features for Concurrency

• Related Concurrency Libraries & Tools

• Fibonacci Case Study

GPars

• More Info

Concurrency - 82

© A

SE

RT

2006-2

010

Page 83: groovy and concurrency

GPars • http://gpars.codehaus.org/

• Library classes and DSL sugar providing

intuitive ways for Groovy developers to

handle tasks concurrently. Logical parts:

– Actors provide a Groovy implementation of Scala-like

actors including "remote" actors on other machines

– Dataflow Concurrency supports natural shared-memory

concurrency model, using single-assignment variables

– Asynchronizer extends the Java 1.5 built-in support for

executor services to enable multi-threaded collection and

closure processing

– Parallelizer uses JSR-166y Parallel Arrays to enable

multi-threaded collection processing

– Safe a non-blocking mt-safe reference to mutable state

that is inspired by "agents" in Clojure

Concurrency - 83

© A

SE

RT

2006-2

010

Page 84: groovy and concurrency

GPars: Parallel Collection Functions

Concurrency - 85

© A

SE

RT

2006-2

010

def nums = 1..100000 def squares = nums .collect{ it ** 2 } .grep{ it % 7 == it % 5 } .grep{ it % 3 == 0 } println squares[0..3] + "..." + squares[-3..-1] assert squares[0..3] == [36, 144, 1089, 1296]

@Grab('org.codehaus.gpars:gpars:0.10') import static groovyx.gpars.GParsPool.withPool

def nums = 1..100000 withPool(5) { def squares = nums. collectParallel{ it ** 2 }. grepParallel{ it % 7 == it % 5 }. grepParallel{ it % 3 == 0 } println squares[0..3] + "..." + squares[-3..-1] assert squares[0..3] == [36, 144, 1089, 1296] }

Page 85: groovy and concurrency

GPars: Transparent Parallel Collections

• Applies some Groovy metaprogramming

Concurrency - 86

© A

SE

RT

2006-2

010

import static groovyx.gpars.GParsPool.withPool withPool(5) { def nums = 1..100000 nums.makeTransparent() def squares = nums. collect{ it ** 2 }. grep{ it % 7 == it % 5 }. grep{ it % 3 == 0 } println squares[0..3] + "..." + squares[-3..-1] assert squares[0..3] == [36, 144, 1089, 1296] }

Page 86: groovy and concurrency

Gpars concurrency-aware methods

Transparent Transitive? Parallel

any { ... } anyParallel { ... }

collect { ... } yes collectParallel { ... }

count(filter) countParallel(filter)

each { ... } eachParallel { ... }

eachWithIndex { ... } eachWithIndexParallel { ... }

every { ... } everyParallel { ... }

find { ... } findParallel { ... }

findAll { ... } yes findAllParallel { ... }

findAny { ... } findAnyParallel { ... }

fold { ... } foldParallel { ... }

fold(seed) { ... } foldParallel(seed) { ... }

grep(filter) yes grepParallel(filter)

groupBy { ... } groupByParallel { ... }

max { ... } maxParallel { ... }

max() maxParallel()

min { ... } minParallel { ... }

min() minParallel()

split { ... } yes splitParallel { ... }

Concurrency - 87 Source: ReGina

Page 87: groovy and concurrency

GPars: Map-Reduce...

Concurrency - 88

© A

SE

RT

2006-2

010

import static groovyx.gpars.GParsPool.withPool withPool(5) { def nums = 1..100000 println nums.parallel. map{ it ** 2 }. filter{ it % 7 == it % 5 }. filter{ it % 3 == 0 }. collection }

Page 88: groovy and concurrency

...GPars: Map-Reduce

Concurrency - 89

© A

SE

RT

2006-2

010

import static groovyx.gpars.GParsPool.withPool withPool(5) { def nums = 1..100000 println nums.parallel. map{ it ** 2 }. filter{ it % 7 == it % 5 }. filter{ it % 3 == 0 }. reduce{ a, b -> a + b } }

Page 89: groovy and concurrency

Parallel Collections vs Map-Reduce

Concurrency - 90 Source: ReGina

Page 90: groovy and concurrency

GPars: Dataflows...

Concurrency - 91

© A

SE

RT

2006-2

010

import groovyx.gpars.dataflow.DataFlows import static groovyx.gpars.dataflow.DataFlow.task final flow = new DataFlows() task { flow.result = flow.x + flow.y } task { flow.x = 10 } task { flow.y = 5 } assert 15 == flow.result

new DataFlows().with { task { result = x * y } task { x = 10 } task { y = 5 } assert 50 == result }

Page 91: groovy and concurrency

...GPars: Dataflows...

• Evaluating:

Concurrency - 92

© A

SE

RT

2006-2

010

import groovyx.gpars.dataflow.DataFlows import static groovyx.gpars.dataflow.DataFlow.task final flow = new DataFlows() task { flow.a = 10 } task { flow.b = 5 } task { flow.x = flow.a - flow.b } task { flow.y = flow.a + flow.b } task { flow.result = flow.x * flow.y } assert flow.result == 75

b

10 5

a

+ -

*

result = (a – b) * (a + b)

x y

Page 92: groovy and concurrency

...GPars: Dataflows...

• Naive attempt for loops

Concurrency - 93

© A

SE

RT

2006-2

010

import groovyx.gpars.dataflow.DataFlows import static groovyx.gpars.dataflow.DataFlow.task final flow = new DataFlows() [10, 20].each { thisA -> [4, 5].each { thisB -> task { flow.a = thisA } task { flow.b = thisB } task { flow.x = flow.a - flow.b } task { flow.y = flow.a + flow.b } task { flow.result = flow.x * flow.y } println flow.result } } // => java.lang.IllegalStateException: A DataFlowVariable can only be assigned once.

... task { flow.a = 10 } ... task { flow.a = 20 }

Page 93: groovy and concurrency

...GPars: Dataflows...

Concurrency - 94

© A

SE

RT

2006-2

010

import groovyx.gpars.dataflow.DataFlowStream import static groovyx.gpars.dataflow.DataFlow.* final streamA = new DataFlowStream() final streamB = new DataFlowStream() final streamX = new DataFlowStream() final streamY = new DataFlowStream() final results = new DataFlowStream() operator(inputs: [streamA, streamB], outputs: [streamX, streamY]) { a, b -> streamX << a - b; streamY << a + b } operator(inputs: [streamX, streamY], outputs: [results]) { x, y -> results << x * y } [[10, 20], [4, 5]].combinations().each{ thisA, thisB -> task { streamA << thisA } task { streamB << thisB } } 4.times { println results.val }

b

10

10

20

20

4

5

4

5

a

+ -

*

84

75

384

375

Page 94: groovy and concurrency

...GPars: Dataflows

• Amenable to static analysis

• Race conditions avoided

• Deadlocks “typically” become repeatable

Concurrency - 95

© A

SE

RT

2006-2

010

import groovyx.gpars.dataflow.DataFlows import static groovyx.gpars.dataflow.DataFlow.task final flow = new DataFlows() task { flow.x = flow.y } task { flow.y = flow.x }

Page 95: groovy and concurrency

GPars: Dataflow Sieve

Concurrency - 96

© A

SE

RT

2006-2

010

final int requestedPrimeNumberCount = 1000 final DataFlowStream initialChannel = new DataFlowStream() task { (2..10000).each { initialChannel << it } } def filter(inChannel, int prime) { def outChannel = new DataFlowStream() operator([inputs: [inChannel], outputs: [outChannel]]) { if (it % prime != 0) { bindOutput it } } return outChannel } def currentOutput = initialChannel requestedPrimeNumberCount.times { int prime = currentOutput.val println "Found: $prime" currentOutput = filter(currentOutput, prime) }

Source: http://groovyconsole.appspot.com/script/235002

Page 96: groovy and concurrency

GPars: Actors... • Predefined coordination

with fork/join &

map/filter/reduce

• Implicit coordination

with dataflow

• Actors provide explicit

coordination and

enforce* no sharing of

state, process a single

activity/message at a

time

* mostly

• Class with the

following lifecycle &

methods

– But also DSL sugar &

augmentation

Concurrency - 97

© A

SE

RT

2006-2

010

start() stop() act() send(msg) sendAndWait(msg) loop { } react { msg -> } msg.reply(replyMsg) receive() join()

Page 97: groovy and concurrency

…GPars: Actors...

Concurrency - 98

© A

SE

RT

2006-2

010

import static groovyx.gpars.actor.Actors.* def decrypt = reactor { code -> code.reverse() } def audit = reactor { println it } def main = actor { decrypt 'terces pot' react { plainText -> audit plainText } } main.join() audit.stop() audit.join()

Source: ReGina

Page 98: groovy and concurrency

…GPars: Actors...

Concurrency - 99

© A

SE

RT

2006-2

010

final class FilterActor extends DynamicDispatchActor { private final int myPrime private def follower def FilterActor(final myPrime) { this.myPrime = myPrime; } def onMessage(int value) { if (value % myPrime != 0) { if (follower) follower value else { println "Found $value" follower = new FilterActor(value).start() } } } def onMessage(def poisson) { if (follower) { def sender = poisson.sender follower.sendAndContinue(poisson, {this.stop(); sender?.send('Done')}) //Pass the poisson along and stop after a reply } else { //I am the last in the chain stop() reply 'Done' } } }

Source: http://groovyconsole.appspot.com/script/242001

Page 99: groovy and concurrency

…GPars: Actors

Concurrency - 100

© A

SE

RT

2006-2

010

(2..requestedPrimeNumberBoundary).each { firstFilter it } firstFilter.sendAndWait 'Poisson'

Source: http://groovyconsole.appspot.com/script/242001

Page 100: groovy and concurrency

GPars: Agents...

• Agents safeguard non-thread safe objects

• Only the agent can update the underlying

object

• “Code” to update the protected object is

sent to the agent

• Can be used with other approaches

Concurrency - 101

© A

SE

RT

2006-2

010

Page 101: groovy and concurrency

…GPars: Agents

Concurrency - 102

© A

SE

RT

2006-2

010

@Grab('org.codehaus.gpars:gpars:0.10')

import groovyx.gpars.agent.Agent

def speakers = new Agent<List>(['Alex'], {it?.clone()}) // add Alex

speakers.send {updateValue it << 'Hilary'} // add Hilary

final Thread t1 = Thread.start {

speakers.send {updateValue it << 'Ken'} // add Ken

}

final Thread t2 = Thread.start {

speakers << {updateValue it << 'Guy'} // add Guy

speakers << {updateValue it << 'Ralph'} // add Ralph

}

[t1, t2]*.join()

assert new HashSet(speakers.val) ==

new HashSet(['Alex', 'Hilary', 'Ken', 'Guy', 'Ralph'])

Source: Gpars examples

Page 102: groovy and concurrency

GPars for testing

Concurrency - 103

© A

SE

RT

2006-2

010

@Grab('net.sourceforge.htmlunit:htmlunit:2.6') import com.gargoylesoftware.htmlunit.WebClient @Grab('org.codehaus.gpars:gpars:0.10') import static groovyx.gpars.GParsPool.*

def testCases = [ ['Home', 'Bart', 'Content 1'], ['Work', 'Homer', 'Content 2'], ['Travel', 'Marge', 'Content 3'], ['Food', 'Lisa', 'Content 4'] ]

withPool(3) { testCases.eachParallel{ category, author, content -> postAndCheck category, author, content } }

private postAndCheck(category, author, content) { ...

Page 103: groovy and concurrency

Guy Steele example in Groovy…

Concurrency - 104

© A

SE

RT

2006-2

010

def words = { s -> def result = [] def word = '' s.each{ ch -> if (ch == ' ') { if (word) result += word word = '' } else word += ch } if (word) result += word result } assert words("This is a sample") == ['This', 'is', 'a', 'sample'] assert words(" Here is another sample ") == ['Here', 'is', 'another', 'sample'] assert words("JustOneWord") == ['JustOneWord'] assert words("Here is a sesquipedalian string of words") == ['Here', 'is', 'a', 'sesquipedalian', 'string', 'of', 'words'] assert words(" ") == [] && words("") == []

Sequential version

Guy Steele‟s example from keynote (from slide 52 onwards for several slides):

http://strangeloop2010.com/talk/presentation_file/14299/GuySteele-parallel.pdf

Page 104: groovy and concurrency

…Guy Steele example in Groovy…

Concurrency - 105

© A

SE

RT

2006-2

010

@Immutable class Chunk { String s def plus(Chunk other) { new Chunk(s + other.s) } def plus(Segment other) { new Segment(s + other.l, other.m, other.r) } }

@Immutable class Segment { String l; List m; String r def plus(Chunk other) { new Segment(l, m, r + other.s) } def plus(Segment other) { new Segment(l, m + maybeWord(r + other.l) + other.m, other.r) } }

class Util { static processChar(ch) { ch == ' ' ? new Segment('', [], '') : new Chunk(ch) } static maybeWord(s) { s ? [s] : [] } } import static Util.* ...

Refactored sequential version

Page 105: groovy and concurrency

…Guy Steele example in Groovy…

Concurrency - 106

© A

SE

RT

2006-2

010

def words = { s -> def result s.each{ ch -> if (!result) result = processChar(ch) else result += processChar(ch) } switch(result) { case Chunk: return maybeWord(result.s) case Segment: return result.with{ maybeWord(l) + m + maybeWord(r) } case null: return [] } } assert words("This is a sample") == ['This', 'is', 'a', 'sample'] assert words(" Here is another sample ") == ['Here', 'is', 'another', 'sample'] assert words("JustOneWord") == ['JustOneWord'] assert words("Here is a sesquipedalian string of words") == ['Here', 'is', 'a', 'sesquipedalian', 'string', 'of', 'words'] assert words(" ") == [] && words("") == []

Refactored sequential version

Page 106: groovy and concurrency

…Guy Steele example in Groovy…

Concurrency - 107

© A

SE

RT

2006-2

010

def swords = { s -> def result s.each{ ch -> if (!result) result = processChar(ch) else result += processChar(ch) } result ?: new Chunk('') } THREADS = 4 def words = { s -> int n = (s.size() + THREADS - 1) / THREADS def map = new java.util.concurrent.ConcurrentHashMap() (0..<THREADS).collect { i -> Thread.start { def (min, max) = [[s.size(),i*n].min(), [s.size(),(i+1)*n].min()] map[i] = swords(s[min..<max]) }}*.join() def result = map.entrySet().sort{ it.key }.sum{ it.value } switch(result) { case Chunk: return maybeWord(result.s) case Segment: return result.with{ maybeWord(l) + m + maybeWord(r) } } }

Roll your own threading with ConcurrentHashMap version

Page 107: groovy and concurrency

…Guy Steele example in Groovy…

Concurrency - 108

© A

SE

RT

2006-2

010

def words = { s -> int n = (s.size() + THREADS - 1) / THREADS def min = (0..<THREADS).collectEntries{ [it, [s.size(),it*n].min()] } def max = (0..<THREADS).collectEntries{ [it, [s.size(),(it+1)*n].min()] } def result = new DataFlows().with { task { a = swords(s[min[0]..<max[0]]) } task { b = swords(s[min[1]..<max[1]]) } task { c = swords(s[min[2]..<max[2]]) } task { d = swords(s[min[3]..<max[3]]) } task { sum1 = a + b } task { sum2 = c + d } task { sum = sum1 + sum2 } println 'Tasks ahoy!' sum } switch(result) { case Chunk: return maybeWord(result.s) case Segment: return result.with{ maybeWord(l) + m + maybeWord(r) } } }

DataFlow version: partially hard-coded to 4 partitions for easier reading

Page 108: groovy and concurrency

…Guy Steele example in Groovy…

Concurrency - 109

© A

SE

RT

2006-2

010

GRANULARITY_THRESHHOLD = 10 THREADS = 4 println GParsPool.withPool(THREADS) { def result = runForkJoin(0, input.size(), input){ first, last, s -> def size = last - first if (size <= GRANULARITY_THRESHHOLD) { swords(s[first..<last]) } else { // divide and conquer def mid = first + ((last - first) >> 1) forkOffChild(first, mid, s) forkOffChild(mid, last, s) childrenResults.sum() } } switch(result) { case Chunk: return maybeWord(result.s) case Segment: return result.with{ maybeWord(l) + m + maybeWord(r) } } }

Fork/Join version

Page 109: groovy and concurrency

…Guy Steele example in Groovy…

Concurrency - 110

© A

SE

RT

2006-2

010

THRESHHOLD = 10 def split(raw) { raw.size() <= THRESHHOLD ? raw : [raw[0..<THRESHHOLD]] + split(raw.substring(THRESHHOLD)) } println GParsPool.withPool(THREADS) { def ans = split(input).parallel.map(swords).reduce{ a,b -> a + b } switch(ans) { case Chunk: return maybeWord(ans.s) case Segment: return ans.with{ maybeWord(l) + m + maybeWord(r) } } }

Map/Reduce version

Page 110: groovy and concurrency

…Guy Steele example in Groovy

Concurrency - 111

© A

SE

RT

2006-2

010

println GParsPool.withPool(THREADS) { def ans = input.collectParallel{ processChar(it) }.sum() switch(ans) { case Chunk: return maybeWord(ans.s) case Segment: return ans.with{ maybeWord(l) + m + maybeWord(r) } } }

Just leveraging the algorithm‟s parallel nature

Page 111: groovy and concurrency

Topics

• Groovy Intro

• Useful Groovy features for Concurrency

• Related Concurrency Libraries & Tools

• Fibonacci Case Study

• GPars

More Info

Concurrency - 112

© A

SE

RT

2006-2

010

Page 112: groovy and concurrency

More Information about Concurrency

• Web sites – http://gpars.codehaus.org/

– http://g.oswego.edu/

Doug Lea's home page

– http://gee.cs.oswego.edu/dl/concurrency-interest/

– http://jcip.net/

Companion site for Java Concurrency in Practice

– http://www.eecs.usma.edu/webs/people/okasaki/pubs.html#cup98

Purely Functional Data Structures

– http://delicious.com/kragen/concurrency

Concurrency bookmark list

– http://www.gotw.ca/publications/concurrency-ddj.htm

The Free Lunch is Over, Herb Sutter

– http://manticore.cs.uchicago.edu/papers/damp07.pdf

– http://mitpress.mit.edu/catalog/item/default.asp?ttype=2&tid=10142

Concepts, Techniques, and Models of Computer Programming

Concurrency - 113

Page 113: groovy and concurrency

More Information about Groovy

• Web sites – http://groovy.codehaus.org

– http://grails.codehaus.org

– http://pleac.sourceforge.net/pleac_groovy (many examples)

– http://www.asert.com.au/training/java/GV110.htm (workshop)

• Mailing list for users – [email protected]

• Information portals – http://www.aboutgroovy.org

– http://www.groovyblogs.org

• Documentation (1000+ pages) – Getting Started Guide, User Guide, Developer Guide, Testing

Guide, Cookbook Examples, Advanced Usage Guide

• Books – Several to choose from ...

Concurrency - 114

Page 114: groovy and concurrency

More Information: Groovy in Action

Concurrency - 115