jython concurrency - cdn

76
Jython Concurrency Leveraging the Java Platform with Python Jim Baker [email protected] Ubuntu Server team, working on Juju Jython committer, working on concurrency, compilation, … jimbaker on freenode (#jython, #juju, #ubuntu-server), twitter Pythoneering blog - zyasoft.com/pythoneering

Upload: others

Post on 11-Feb-2022

17 views

Category:

Documents


0 download

TRANSCRIPT

Jython ConcurrencyLeveraging the Java Platform with PythonJim Baker

[email protected] Server team, working on JujuJython committer, working on concurrency, compilation, …jimbaker on freenode (#jython, #juju, #ubuntu-server), twitterPythoneering blog - zyasoft.com/pythoneering

Talk Overview• Concurrency Debate• Deadlock detection• Python memory model• Computation concurrency and the GIL• Executors and tasks• Resources

Jython Concurrency

© 2012, Jim Baker - 2 - CC-SA 3.0

Jython 2.7• Alpha 2 available at http://jython.org• Running trunk on my laptop• Almost there for a beta and a final release later this year

Jython Concurrency

© 2012, Jim Baker - 3 - CC-SA 3.0

Concurrency Horror• Argument in a nutshell: concurrency is just too hard• Subtler/not nice variant: concurrency is for experts, not for you• Implication: let’s do something else…

Jython Concurrency

© 2012, Jim Baker - 4 - CC-SA 3.0

Concurrency Reality• Concurrency is everywhere, and we are used to it• Multicore is the reality• Some/most techniques don’t work at scale. Don’t do that!• Must always judge concurrency by its results:

• Depends on what you’re doing…• But taking advantage of cores, being linear in scaling, that’s always

nice

Jython Concurrency

© 2012, Jim Baker - 5 - CC-SA 3.0

• Likely to see better paradigms become available in Python, such astransactional memory in PyPy

• But Jython readily can take advantage of a strong platform forconcurrency now

Jython Concurrency

© 2012, Jim Baker - 6 - CC-SA 3.0

Threads for Concurrency• Jython supports standard Python threads: threading.Thread

• All Python threads directly mapped to Java threads• Standard API: threading.enumerate, join, etc• Usually want to work at a higher level (thread pool), but foundational

Jython Concurrency

© 2012, Jim Baker - 7 - CC-SA 3.0

Threading Code• Don’t do this!

from threading import Threadimport time

def run_threads(): counter = [0] started = time.time()

def be_crazy():

Jython Concurrency

© 2012, Jim Baker - 8 - CC-SA 3.0

for i in xrange(100000): # Increment is not atomic!!! counter[0] += 1

threads = [Thread(target=be_crazy) for i in xrange(5)] for thread in threads: thread.start() for thread in threads: thread.join() print "total time", time.time() - started assert counter[0] == 100000 * 5, "Counter is {}".format(counter[0]) print "success (truly unexpected!)"

if __name__ == '__main__': run_threads()

Jython Concurrency

© 2012, Jim Baker - 9 - CC-SA 3.0

Locks• Standard support for mutual exclusion• Jython locks are always reentrant (maps to

java.util.concurrent.ReentrantLock)• No extra memory model implications for Jython, unlike underlying Java

Jython Concurrency

© 2012, Jim Baker - 10 - CC-SA 3.0

Lock Code

from threading import Lock, Threadimport time

DO_PRETEND_WORK = 1000000

def run_threads(): counter = [0] lock = Lock() started = time.time()

Jython Concurrency

© 2012, Jim Baker - 11 - CC-SA 3.0

def increment_counter(): for i in xrange(DO_PRETEND_WORK): lock.acquire() try: counter[0] += 1 finally: lock.release()

threads = [Thread(target=increment_counter) for i in xrange(5)] for thread in threads: thread.start()

Jython Concurrency

© 2012, Jim Baker - 12 - CC-SA 3.0

for thread in threads: thread.join() assert counter[0] == DO_PRETEND_WORK * 5 print "success", time.time() - started

if __name__ == '__main__': run_threads()

Jython Concurrency

© 2012, Jim Baker - 13 - CC-SA 3.0

with Statement Variant• Better performance on Jython, possibility of lock eliding• Nicer structure:

with lock: # be protected

Jython Concurrency

© 2012, Jim Baker - 14 - CC-SA 3.0

with Code

from threading import Lock, Threadimport time

DO_PRETEND_WORK = 1000000

def run_threads(): counter = [0] lock = Lock() started = time.time()

Jython Concurrency

© 2012, Jim Baker - 15 - CC-SA 3.0

def increment_counter(): for i in xrange(DO_PRETEND_WORK): with lock: counter[0] += 1

threads = [Thread(target=increment_counter) for i in xrange(5)] for thread in threads: thread.start() for thread in threads: thread.join() assert counter[0] == DO_PRETEND_WORK * 5

Jython Concurrency

© 2012, Jim Baker - 16 - CC-SA 3.0

print "success!", time.time() - started

if __name__ == '__main__': run_threads()

Jython Concurrency

© 2012, Jim Baker - 17 - CC-SA 3.0

AtomicInteger• Let’s use functionality from Java• AtomicInteger provides an atomic increment (and add…) methods• Simply import this package from java.util.concurrent.atomic

Jython Concurrency

© 2012, Jim Baker - 18 - CC-SA 3.0

AtomicInteger Code

from threading import Lock, Threadfrom java.util.concurrent.atomic import AtomicIntegerimport time

DO_PRETEND_WORK = 1000000

def run_threads(): counter = AtomicInteger() started = time.time()

Jython Concurrency

© 2012, Jim Baker - 19 - CC-SA 3.0

def increment_counter(): for i in xrange(DO_PRETEND_WORK): counter.getAndIncrement()

threads = [Thread(target=increment_counter) for i in xrange(5)] for thread in threads: thread.start() for thread in threads: thread.join() assert counter.longValue() == DO_PRETEND_WORK * 5 print "success!", time.time() - started

Jython Concurrency

© 2012, Jim Baker - 20 - CC-SA 3.0

if __name__ == '__main__': run_threads()

Jython Concurrency

© 2012, Jim Baker - 21 - CC-SA 3.0

Deadlock Detection• Implementing concurrent code raises the possibilities of deadlocks• In the wild, code will just stop working…• JVM provides support for diagnosing and possibly managing these issues

Jython Concurrency

© 2012, Jim Baker - 22 - CC-SA 3.0

But First, Deadlock Setup• Two threads• Two resources, held exclusively, but in opposite order• No you go first; no you, sir

Jython Concurrency

© 2012, Jim Baker - 23 - CC-SA 3.0

Deadlock Code

from threading import Thread, Lockimport threading

def cause_deadlock(): lock_one = Lock() lock_two = Lock() counter = [0] # Shared resource for both locks Thread(name="thread #1", target=acquire_locks, args=(counter, lock_one, lock_two)).start()

Jython Concurrency

© 2012, Jim Baker - 24 - CC-SA 3.0

Thread(name="thread #2 (reversed)", target=acquire_locks, args=(counter, lock_two, lock_one)).start()

def acquire_locks(counter, lock1, lock2): # Will eventually deadlock if locks are acquired in different order name = threading.currentThread().getName() thread_count = 0 while True: with lock1: with lock2: counter[0] += 1 thread_count += 1

Jython Concurrency

© 2012, Jim Baker - 25 - CC-SA 3.0

print "name={}, total count={}, thread count={}".\ format(name, counter[0], thread_count)

if __name__ == '__main__': cause_deadlock()

Jython Concurrency

© 2012, Jim Baker - 26 - CC-SA 3.0

Deadlock Detection• JMX as of Java 6 supports findDeadlockedThreads

• Not perfect: it’s possible for it to miss complex chains that causedeadlock

• External vs internal monitoring:

• External uses a JMX agent that can be installed on the fly• Interal directly uses the JMX API

Jython Concurrency

© 2012, Jim Baker - 27 - CC-SA 3.0

Deadlock Detection Code

from contextlib import closingfrom com.sun.tools.attach import VirtualMachinefrom javax.management.remote import JMXServiceURL, JMXConnectorFactoryfrom java.lang.management import ManagementFactoryfrom java.lang import Class

import osimport os.pathimport signal

Jython Concurrency

© 2012, Jim Baker - 28 - CC-SA 3.0

my_pid = os.getpid()vm_descs = VirtualMachine.list()for desc in vm_descs: vm = VirtualMachine.attach(desc) pid = int(desc.id()) if pid == my_pid: continue agent_props = vm.getAgentProperties() system_props = vm.getSystemProperties() connector_addr = agent_props[ "com.sun.management.jmxremote.localConnectorAddress"]

Jython Concurrency

© 2012, Jim Baker - 29 - CC-SA 3.0

if connector_addr is None: agent = os.path.join( system_props["java.home"], "lib", "management-agent.jar") print "Loading management agent={} for pid={}...".format(agent, pid) vm.loadAgent(agent) # Get the agent properties again agent_props = vm.getAgentProperties() connector_addr = agent_props[ "com.sun.management.jmxremote.localConnectorAddress"] url = JMXServiceURL(connector_addr) with closing(JMXConnectorFactory.connect(url)) as connector: mbean = connector.getMBeanServerConnection()

Jython Concurrency

© 2012, Jim Baker - 30 - CC-SA 3.0

thread_mxbean = ManagementFactory.getPlatformMXBean( mbean, Class.forName("java.lang.management.ThreadMXBean")) thread_ids = thread_mxbean.findDeadlockedThreads() if thread_ids: print "Deadlock detected, shutting down pid={}, threads={}".\ format(pid, list(thread_ids)) os.kill(pid, signal.SIGINT)

Jython Concurrency

© 2012, Jim Baker - 31 - CC-SA 3.0

Daemon Threads• Simple lifecycle management• Thread is set to be a daemon thread before started• Daemon status inherited by child threads• Upon JVM shutdown, daemon threads are simply terminated

• Never hold any external resources (database connections, filehandles)

• Daemon thread should never make an import attempt

Jython Concurrency

© 2012, Jim Baker - 32 - CC-SA 3.0

• Use case - strictly used to work with in-memory objects (caches, indexs,computation in general)

• Fork-join executors use daemon threads• But convenient - never have to worry about pool shutdown

Jython Concurrency

© 2012, Jim Baker - 33 - CC-SA 3.0

Daemon Threads Code

from threading import Thread, Lockimport sys, time, threadingfrom java.lang.management import ManagementFactory

def cause_deadlock(): lock_one = Lock() lock_two = Lock() counter = [0] # Shared resource for both locks threads = [

Jython Concurrency

© 2012, Jim Baker - 34 - CC-SA 3.0

Thread(name="thread #1", target=acquire_locks, args=(counter, lock_one, lock_two)), Thread(name="thread #2 (reversed)", target=acquire_locks, args=(counter, lock_two, lock_one))] for thread in threads: thread.setDaemon(True) thread.start()

thread_mxbean = ManagementFactory.getThreadMXBean() while True: time.sleep(1) print "monitoring thread", counter[0]

Jython Concurrency

© 2012, Jim Baker - 35 - CC-SA 3.0

thread_ids = thread_mxbean.findDeadlockedThreads() if thread_ids: print "monitoring thread: deadlock detected, shutting down", \ list(thread_ids) sys.exit(1)

def acquire_locks(counter, lock1, lock2): # Will eventually deadlock if locks are acquired in different order name = threading.currentThread().getName() thread_count = 0 while True: with lock1:

Jython Concurrency

© 2012, Jim Baker - 36 - CC-SA 3.0

with lock2: counter[0] += 1 thread_count += 1 print "name={}, total count={}, thread count={}".\ format(name, counter[0], thread_count)

if __name__ == '__main__': cause_deadlock()

Jython Concurrency

© 2012, Jim Baker - 37 - CC-SA 3.0

Computation vs I/O Concurrency• Threads allow for concurrent activity. But of what?• I/O concurrency• Computation concurrency• Always a mix of course in real work

Jython Concurrency

© 2012, Jim Baker - 38 - CC-SA 3.0

I/O Concurrency Code

from threading import Threadimport time

def run_threads(): def sleepily_wait(): for i in xrange(50): time.sleep(0.1)

Jython Concurrency

© 2012, Jim Baker - 39 - CC-SA 3.0

started = time.time() threads = [Thread(target=sleepily_wait) for i in xrange(5)] for thread in threads: thread.start() for thread in threads: thread.join() print "ok", time.time() - started

if __name__ == '__main__': run_threads()

Jython Concurrency

© 2012, Jim Baker - 40 - CC-SA 3.0

CPython and the Global InterpreterLock

• GIL protects mutable collection structures - dict, list, set

• GIL supports reference counting GC system• But also forgoes writing concurrent compute intensive code in CPython,

without processes

Jython Concurrency

© 2012, Jim Baker - 41 - CC-SA 3.0

Concurrent Computation Code

from threading import Threadimport time

def run_threads(): def busily_wait(): j = 0 for i in xrange(10000000): j += i print j

Jython Concurrency

© 2012, Jim Baker - 42 - CC-SA 3.0

started = time.time() threads = [Thread(target=busily_wait) for i in xrange(10)] for thread in threads: thread.start() for thread in threads: thread.join() print "total time", time.time() - started

if __name__ == '__main__': run_threads()

Jython Concurrency

© 2012, Jim Baker - 43 - CC-SA 3.0

CPython’s GIL and its Defenders• False argument: the GIL is good for you• Forces everything to be in a separate process• But being in a separate process simply makes sharing potentially more

expensive

Jython Concurrency

© 2012, Jim Baker - 44 - CC-SA 3.0

Jython and Concurrent ComputationCode

Jython Concurrency

© 2012, Jim Baker - 45 - CC-SA 3.0

Python Memory Model• Reading or replacing a single instance attribute• Reading or replacing a single global variable• Fetching an item from a list

• Modifying a list in place (e.g. adding an item using append)• Fetching an item from a dict

• Modifying a dict in place (e.g. adding an item, or calling the clear method)

Jython Concurrency

© 2012, Jim Baker - 46 - CC-SA 3.0

Immutable Types in Python• str, unicode

• int, float, Decimal

• frozenset

• tuple

• datetime.datetime, etc• and many more, including your own

Jython Concurrency

© 2012, Jim Baker - 47 - CC-SA 3.0

Thread Safety Concerns• Interaction of two or more threads corrupt a mutable object?

• Dangerous for mutable collections (list, dict)• Such as infinite loop condition when traversing collection

(java.util.HashMap)• Can an update get lost?

• Incrementing a counter, appending to a list, etc

Jython Concurrency

© 2012, Jim Baker - 48 - CC-SA 3.0

Thread Confinement• Probably don’t need to share a large percentage of the mutable objects

used in your code• Very simply put, if you don’t share, then thread safety issues go away

Jython Concurrency

© 2012, Jim Baker - 49 - CC-SA 3.0

GIL Easter Egg>>> from __future__ import GIL File "<stdin>", line 1SyntaxError: Never going to happen!

Jython Concurrency

© 2012, Jim Baker - 50 - CC-SA 3.0

No GIL• No need for a GIL:

• Jython uses standard Java garbage collection support• Protects mutable collection structures so they are threadsafe

• Threads for compute-intensive tasks can be written in Python

Jython Concurrency

© 2012, Jim Baker - 51 - CC-SA 3.0

Jython and Thread Safety• Updates still might get lost in a data race. Depends on what is

guaranteed. May need to synchronize. Be careful.• Other Java collection objects likely don’t have no-corruption guarantees

Jython Concurrency

© 2012, Jim Baker - 52 - CC-SA 3.0

JVM Issues• Need memory fences, with some nice bracketing symmetry:

• synchronized:

• acquire - fresh reads into cache• release - writes cache, visible to all processors

• volatile:

• reads value, no caching• writes, flushes that value from cache, all processors see it

Jython Concurrency

© 2012, Jim Baker - 53 - CC-SA 3.0

Translating to JVM: Volatile andMemory Fences

• Setting any attribute in Python is a volatile write• Getting any attribute is a volatile read• Implementation:

• Python attributes are stored in dictionaries• For Jython: follows the semantics of the backing ConcurrentHashMap

Jython Concurrency

© 2012, Jim Baker - 54 - CC-SA 3.0

Interruption• Volatility is key• Works in CPython or Jython - Python memory model is same

Jython Concurrency

© 2012, Jim Baker - 55 - CC-SA 3.0

One Way to Interruptclass DoSomething(Runnable): def __init__(self): cancelled = False

def run(self): while not self.cancelled: do_stuff()

Jython Concurrency

© 2012, Jim Baker - 56 - CC-SA 3.0

Python Memory Model Implications• Again: Python variables are always volatile, unlike Java• No problems with using a cancelled flag like this

Jython Concurrency

© 2012, Jim Baker - 57 - CC-SA 3.0

Memory Model Implications for Jython• Python code has sequential consistency• No surprises: execution follows the statement order• Safe publication is trivial

• Safe publication simply means assigning a name to an object• Just need to ensure that the object itself is built in a thread-safe

fashion; then publish• It is possible to defeat this: start with sys._getframe() …• Or use arrays or anything directly going against Java

Jython Concurrency

© 2012, Jim Baker - 58 - CC-SA 3.0

Java Interruption• Can use standard Java API for threads• Thread.interrupt• Note the alternate calling form here• Works because we can strip off the proxy that Jython uses

Jython Concurrency

© 2012, Jim Baker - 59 - CC-SA 3.0

Thread Interruption• Thread interruption allows for even more responsive cancellation• If a a thread is waiting on most synchronizers (condition variable, file I/O),

this action will cause the waited-on method to exit with anInterruptedException.

• Lock acquistion, except by using lockInterruptibly on underlying Java lock,is not interruptible.

Jython Concurrency

© 2012, Jim Baker - 60 - CC-SA 3.0

Interrupting Code

from threading import Condition, Lock, Threadfrom java.lang import Thread as JThread, InterruptedExceptionimport time, threading

def be_unfair(): unfair_condition = Condition() threads = [ Thread( name="thread #{}".format(i),

Jython Concurrency

© 2012, Jim Baker - 61 - CC-SA 3.0

target=wait_until_interrupted, args=(unfair_condition,)) for i in xrange(5)] for thread in threads: thread.start() time.sleep(5)

# instead of notifying, we will interrupt the threads for thread in threads: JThread.interrupt(thread) for thread in threads: thread.join()

Jython Concurrency

© 2012, Jim Baker - 62 - CC-SA 3.0

def wait_until_interrupted(cv): name = threading.currentThread().getName() with cv: while not JThread.currentThread().isInterrupted(): try: print "{} Waiting pointlessly".format(name) cv.wait() except InterruptedException, e: print "{} Got exception: {}".format(name, str(e)) break print "{} Finished".format(name)

Jython Concurrency

© 2012, Jim Baker - 63 - CC-SA 3.0

if __name__ == '__main__': be_unfair()

Jython Concurrency

© 2012, Jim Baker - 64 - CC-SA 3.0

Module Import Lock• Defined by the Python specification• Always acquired on every import!• Don’t write code like this in a hot loop, especially when threaded!

def slow_things_way_down(): from foo import bar, baz # etc

Jython Concurrency

© 2012, Jim Baker - 65 - CC-SA 3.0

• Ensures safe publication of module-level names * Globals being initialized* Functions being decorated * Classes being built

Jython Concurrency

© 2012, Jim Baker - 66 - CC-SA 3.0

Tasks vs Threads• Avoid haphazard threading

• Heterogeneous threads are problematic/evil• Dependencies managed through a variety of channels

• Use tasks instead to build a simple, scalable system * Encapsulatesbusiness logic * Explicit wait-on dependencies and time scheduling (ifnecessary)

Jython Concurrency

© 2012, Jim Baker - 67 - CC-SA 3.0

Downloaderfrom java.util.concurrent import Callable

class Downloader(Callable): def __init__(self, url): self.url = url def call(self): try: self.result = urllib2.urlopen(self.url).read() except Exception, ex: self.exception = ex return self

Jython Concurrency

© 2012, Jim Baker - 68 - CC-SA 3.0

Execution Models for Executors• Variety of execution models

• Thread pools• 1 thread per task (so no pooling)• 1 thread (so sequential)• Fork-join with work stealing in Java 7 (also available in Java 6)

• Combined with blocking queues using ExecutorCompletionService

Jython Concurrency

© 2012, Jim Baker - 69 - CC-SA 3.0

ExecutorCompletionService Code

from downloader import Downloaderfrom shutdown import shutdown_and_await_terminationfrom java.util.concurrent import Executors, ExecutorCompletionServiceimport osimport hashlib

MAX_CONCURRENT = 3SITES = [ "http://www.cnn.com/",

Jython Concurrency

© 2012, Jim Baker - 70 - CC-SA 3.0

"http://www.nytimes.com/", "http://www.washingtonpost.com/", "http://www.dailycamera.com/", "http://www.timescall.com/", # generate a random web site name that is very, very unlikely to exist "http://" + hashlib.md5( "unlikely-web-site-" + os.urandom(4)).hexdigest() + ".com", ]

pool = Executors.newFixedThreadPool(MAX_CONCURRENT)ecs = ExecutorCompletionService(pool)

Jython Concurrency

© 2012, Jim Baker - 71 - CC-SA 3.0

# this function could spider the links from these roots;# for now just schedule these roots directlydef scheduler(roots): for site in roots: yield site

# submit tasks indefinitelyfor site in scheduler(SITES): ecs.submit(Downloader(site))

# work with results as soon as they become availablesubmitted = len(SITES)

Jython Concurrency

© 2012, Jim Baker - 72 - CC-SA 3.0

while submitted > 0: result = ecs.take().get() # here we just do something unimaginative with the result; # consider parsing it with tools like BeautifulSoup or JSoup print result submitted -= 1

print "shutting pool down..."shutdown_and_await_termination(pool, 5)print "done"

Jython Concurrency

© 2012, Jim Baker - 73 - CC-SA 3.0

Google Guava’s LoadingCache• Part of Guava, kick-butt library for doing stuff concurrently• Weak keys, weak values, LRU support, …• Used by Jython itself• load method blocks upon computation

Jython Concurrency

© 2012, Jim Baker - 74 - CC-SA 3.0

Java or Python APIs - Recommendations• Port Python code that uses the threading module

• Such code can still interoperate with Java, because Jython threadsare always mapped to Java threads

• Use Python types concurrently - Jython implements dict and set withConcurrentHashMap

• Use java.util.concurrent collections or third-party Java libraries like Guava• Use higher-level primitives from Java instead of creating your own

Jython Concurrency

© 2012, Jim Baker - 75 - CC-SA 3.0

Concurrency Resources• Jython Book

http://www.jython.org/jythonbook/en/1.0/Concurrency.html• Java Memory Model resources -

http://www.cs.umd.edu/~pugh/java/memoryModel• Guava docs -

http://code.google.com/p/guava-libraries/wiki/GuavaExplained• Java concurrency mailing list -

http://g.oswego.edu/dl/concurrency-interest

Jython Concurrency

© 2012, Jim Baker - 76 - CC-SA 3.0