jython concurrency - cdn
TRANSCRIPT
Jython ConcurrencyLeveraging the Java Platform with PythonJim Baker
[email protected] Server team, working on JujuJython committer, working on concurrency, compilation, …jimbaker on freenode (#jython, #juju, #ubuntu-server), twitterPythoneering blog - zyasoft.com/pythoneering
Talk Overview• Concurrency Debate• Deadlock detection• Python memory model• Computation concurrency and the GIL• Executors and tasks• Resources
Jython Concurrency
© 2012, Jim Baker - 2 - CC-SA 3.0
Jython 2.7• Alpha 2 available at http://jython.org• Running trunk on my laptop• Almost there for a beta and a final release later this year
Jython Concurrency
© 2012, Jim Baker - 3 - CC-SA 3.0
Concurrency Horror• Argument in a nutshell: concurrency is just too hard• Subtler/not nice variant: concurrency is for experts, not for you• Implication: let’s do something else…
Jython Concurrency
© 2012, Jim Baker - 4 - CC-SA 3.0
Concurrency Reality• Concurrency is everywhere, and we are used to it• Multicore is the reality• Some/most techniques don’t work at scale. Don’t do that!• Must always judge concurrency by its results:
• Depends on what you’re doing…• But taking advantage of cores, being linear in scaling, that’s always
nice
Jython Concurrency
© 2012, Jim Baker - 5 - CC-SA 3.0
• Likely to see better paradigms become available in Python, such astransactional memory in PyPy
• But Jython readily can take advantage of a strong platform forconcurrency now
Jython Concurrency
© 2012, Jim Baker - 6 - CC-SA 3.0
Threads for Concurrency• Jython supports standard Python threads: threading.Thread
• All Python threads directly mapped to Java threads• Standard API: threading.enumerate, join, etc• Usually want to work at a higher level (thread pool), but foundational
Jython Concurrency
© 2012, Jim Baker - 7 - CC-SA 3.0
Threading Code• Don’t do this!
from threading import Threadimport time
def run_threads(): counter = [0] started = time.time()
def be_crazy():
Jython Concurrency
© 2012, Jim Baker - 8 - CC-SA 3.0
for i in xrange(100000): # Increment is not atomic!!! counter[0] += 1
threads = [Thread(target=be_crazy) for i in xrange(5)] for thread in threads: thread.start() for thread in threads: thread.join() print "total time", time.time() - started assert counter[0] == 100000 * 5, "Counter is {}".format(counter[0]) print "success (truly unexpected!)"
if __name__ == '__main__': run_threads()
Jython Concurrency
© 2012, Jim Baker - 9 - CC-SA 3.0
Locks• Standard support for mutual exclusion• Jython locks are always reentrant (maps to
java.util.concurrent.ReentrantLock)• No extra memory model implications for Jython, unlike underlying Java
Jython Concurrency
© 2012, Jim Baker - 10 - CC-SA 3.0
Lock Code
from threading import Lock, Threadimport time
DO_PRETEND_WORK = 1000000
def run_threads(): counter = [0] lock = Lock() started = time.time()
Jython Concurrency
© 2012, Jim Baker - 11 - CC-SA 3.0
def increment_counter(): for i in xrange(DO_PRETEND_WORK): lock.acquire() try: counter[0] += 1 finally: lock.release()
threads = [Thread(target=increment_counter) for i in xrange(5)] for thread in threads: thread.start()
Jython Concurrency
© 2012, Jim Baker - 12 - CC-SA 3.0
for thread in threads: thread.join() assert counter[0] == DO_PRETEND_WORK * 5 print "success", time.time() - started
if __name__ == '__main__': run_threads()
Jython Concurrency
© 2012, Jim Baker - 13 - CC-SA 3.0
with Statement Variant• Better performance on Jython, possibility of lock eliding• Nicer structure:
with lock: # be protected
Jython Concurrency
© 2012, Jim Baker - 14 - CC-SA 3.0
with Code
from threading import Lock, Threadimport time
DO_PRETEND_WORK = 1000000
def run_threads(): counter = [0] lock = Lock() started = time.time()
Jython Concurrency
© 2012, Jim Baker - 15 - CC-SA 3.0
def increment_counter(): for i in xrange(DO_PRETEND_WORK): with lock: counter[0] += 1
threads = [Thread(target=increment_counter) for i in xrange(5)] for thread in threads: thread.start() for thread in threads: thread.join() assert counter[0] == DO_PRETEND_WORK * 5
Jython Concurrency
© 2012, Jim Baker - 16 - CC-SA 3.0
print "success!", time.time() - started
if __name__ == '__main__': run_threads()
Jython Concurrency
© 2012, Jim Baker - 17 - CC-SA 3.0
AtomicInteger• Let’s use functionality from Java• AtomicInteger provides an atomic increment (and add…) methods• Simply import this package from java.util.concurrent.atomic
Jython Concurrency
© 2012, Jim Baker - 18 - CC-SA 3.0
AtomicInteger Code
from threading import Lock, Threadfrom java.util.concurrent.atomic import AtomicIntegerimport time
DO_PRETEND_WORK = 1000000
def run_threads(): counter = AtomicInteger() started = time.time()
Jython Concurrency
© 2012, Jim Baker - 19 - CC-SA 3.0
def increment_counter(): for i in xrange(DO_PRETEND_WORK): counter.getAndIncrement()
threads = [Thread(target=increment_counter) for i in xrange(5)] for thread in threads: thread.start() for thread in threads: thread.join() assert counter.longValue() == DO_PRETEND_WORK * 5 print "success!", time.time() - started
Jython Concurrency
© 2012, Jim Baker - 20 - CC-SA 3.0
if __name__ == '__main__': run_threads()
Jython Concurrency
© 2012, Jim Baker - 21 - CC-SA 3.0
Deadlock Detection• Implementing concurrent code raises the possibilities of deadlocks• In the wild, code will just stop working…• JVM provides support for diagnosing and possibly managing these issues
Jython Concurrency
© 2012, Jim Baker - 22 - CC-SA 3.0
But First, Deadlock Setup• Two threads• Two resources, held exclusively, but in opposite order• No you go first; no you, sir
Jython Concurrency
© 2012, Jim Baker - 23 - CC-SA 3.0
Deadlock Code
from threading import Thread, Lockimport threading
def cause_deadlock(): lock_one = Lock() lock_two = Lock() counter = [0] # Shared resource for both locks Thread(name="thread #1", target=acquire_locks, args=(counter, lock_one, lock_two)).start()
Jython Concurrency
© 2012, Jim Baker - 24 - CC-SA 3.0
Thread(name="thread #2 (reversed)", target=acquire_locks, args=(counter, lock_two, lock_one)).start()
def acquire_locks(counter, lock1, lock2): # Will eventually deadlock if locks are acquired in different order name = threading.currentThread().getName() thread_count = 0 while True: with lock1: with lock2: counter[0] += 1 thread_count += 1
Jython Concurrency
© 2012, Jim Baker - 25 - CC-SA 3.0
print "name={}, total count={}, thread count={}".\ format(name, counter[0], thread_count)
if __name__ == '__main__': cause_deadlock()
Jython Concurrency
© 2012, Jim Baker - 26 - CC-SA 3.0
Deadlock Detection• JMX as of Java 6 supports findDeadlockedThreads
• Not perfect: it’s possible for it to miss complex chains that causedeadlock
• External vs internal monitoring:
• External uses a JMX agent that can be installed on the fly• Interal directly uses the JMX API
Jython Concurrency
© 2012, Jim Baker - 27 - CC-SA 3.0
Deadlock Detection Code
from contextlib import closingfrom com.sun.tools.attach import VirtualMachinefrom javax.management.remote import JMXServiceURL, JMXConnectorFactoryfrom java.lang.management import ManagementFactoryfrom java.lang import Class
import osimport os.pathimport signal
Jython Concurrency
© 2012, Jim Baker - 28 - CC-SA 3.0
my_pid = os.getpid()vm_descs = VirtualMachine.list()for desc in vm_descs: vm = VirtualMachine.attach(desc) pid = int(desc.id()) if pid == my_pid: continue agent_props = vm.getAgentProperties() system_props = vm.getSystemProperties() connector_addr = agent_props[ "com.sun.management.jmxremote.localConnectorAddress"]
Jython Concurrency
© 2012, Jim Baker - 29 - CC-SA 3.0
if connector_addr is None: agent = os.path.join( system_props["java.home"], "lib", "management-agent.jar") print "Loading management agent={} for pid={}...".format(agent, pid) vm.loadAgent(agent) # Get the agent properties again agent_props = vm.getAgentProperties() connector_addr = agent_props[ "com.sun.management.jmxremote.localConnectorAddress"] url = JMXServiceURL(connector_addr) with closing(JMXConnectorFactory.connect(url)) as connector: mbean = connector.getMBeanServerConnection()
Jython Concurrency
© 2012, Jim Baker - 30 - CC-SA 3.0
thread_mxbean = ManagementFactory.getPlatformMXBean( mbean, Class.forName("java.lang.management.ThreadMXBean")) thread_ids = thread_mxbean.findDeadlockedThreads() if thread_ids: print "Deadlock detected, shutting down pid={}, threads={}".\ format(pid, list(thread_ids)) os.kill(pid, signal.SIGINT)
Jython Concurrency
© 2012, Jim Baker - 31 - CC-SA 3.0
Daemon Threads• Simple lifecycle management• Thread is set to be a daemon thread before started• Daemon status inherited by child threads• Upon JVM shutdown, daemon threads are simply terminated
• Never hold any external resources (database connections, filehandles)
• Daemon thread should never make an import attempt
Jython Concurrency
© 2012, Jim Baker - 32 - CC-SA 3.0
• Use case - strictly used to work with in-memory objects (caches, indexs,computation in general)
• Fork-join executors use daemon threads• But convenient - never have to worry about pool shutdown
Jython Concurrency
© 2012, Jim Baker - 33 - CC-SA 3.0
Daemon Threads Code
from threading import Thread, Lockimport sys, time, threadingfrom java.lang.management import ManagementFactory
def cause_deadlock(): lock_one = Lock() lock_two = Lock() counter = [0] # Shared resource for both locks threads = [
Jython Concurrency
© 2012, Jim Baker - 34 - CC-SA 3.0
Thread(name="thread #1", target=acquire_locks, args=(counter, lock_one, lock_two)), Thread(name="thread #2 (reversed)", target=acquire_locks, args=(counter, lock_two, lock_one))] for thread in threads: thread.setDaemon(True) thread.start()
thread_mxbean = ManagementFactory.getThreadMXBean() while True: time.sleep(1) print "monitoring thread", counter[0]
Jython Concurrency
© 2012, Jim Baker - 35 - CC-SA 3.0
thread_ids = thread_mxbean.findDeadlockedThreads() if thread_ids: print "monitoring thread: deadlock detected, shutting down", \ list(thread_ids) sys.exit(1)
def acquire_locks(counter, lock1, lock2): # Will eventually deadlock if locks are acquired in different order name = threading.currentThread().getName() thread_count = 0 while True: with lock1:
Jython Concurrency
© 2012, Jim Baker - 36 - CC-SA 3.0
with lock2: counter[0] += 1 thread_count += 1 print "name={}, total count={}, thread count={}".\ format(name, counter[0], thread_count)
if __name__ == '__main__': cause_deadlock()
Jython Concurrency
© 2012, Jim Baker - 37 - CC-SA 3.0
Computation vs I/O Concurrency• Threads allow for concurrent activity. But of what?• I/O concurrency• Computation concurrency• Always a mix of course in real work
Jython Concurrency
© 2012, Jim Baker - 38 - CC-SA 3.0
I/O Concurrency Code
from threading import Threadimport time
def run_threads(): def sleepily_wait(): for i in xrange(50): time.sleep(0.1)
Jython Concurrency
© 2012, Jim Baker - 39 - CC-SA 3.0
started = time.time() threads = [Thread(target=sleepily_wait) for i in xrange(5)] for thread in threads: thread.start() for thread in threads: thread.join() print "ok", time.time() - started
if __name__ == '__main__': run_threads()
Jython Concurrency
© 2012, Jim Baker - 40 - CC-SA 3.0
CPython and the Global InterpreterLock
• GIL protects mutable collection structures - dict, list, set
• GIL supports reference counting GC system• But also forgoes writing concurrent compute intensive code in CPython,
without processes
Jython Concurrency
© 2012, Jim Baker - 41 - CC-SA 3.0
Concurrent Computation Code
from threading import Threadimport time
def run_threads(): def busily_wait(): j = 0 for i in xrange(10000000): j += i print j
Jython Concurrency
© 2012, Jim Baker - 42 - CC-SA 3.0
started = time.time() threads = [Thread(target=busily_wait) for i in xrange(10)] for thread in threads: thread.start() for thread in threads: thread.join() print "total time", time.time() - started
if __name__ == '__main__': run_threads()
Jython Concurrency
© 2012, Jim Baker - 43 - CC-SA 3.0
CPython’s GIL and its Defenders• False argument: the GIL is good for you• Forces everything to be in a separate process• But being in a separate process simply makes sharing potentially more
expensive
Jython Concurrency
© 2012, Jim Baker - 44 - CC-SA 3.0
Jython and Concurrent ComputationCode
Jython Concurrency
© 2012, Jim Baker - 45 - CC-SA 3.0
Python Memory Model• Reading or replacing a single instance attribute• Reading or replacing a single global variable• Fetching an item from a list
• Modifying a list in place (e.g. adding an item using append)• Fetching an item from a dict
• Modifying a dict in place (e.g. adding an item, or calling the clear method)
Jython Concurrency
© 2012, Jim Baker - 46 - CC-SA 3.0
Immutable Types in Python• str, unicode
• int, float, Decimal
• frozenset
• tuple
• datetime.datetime, etc• and many more, including your own
Jython Concurrency
© 2012, Jim Baker - 47 - CC-SA 3.0
Thread Safety Concerns• Interaction of two or more threads corrupt a mutable object?
• Dangerous for mutable collections (list, dict)• Such as infinite loop condition when traversing collection
(java.util.HashMap)• Can an update get lost?
• Incrementing a counter, appending to a list, etc
Jython Concurrency
© 2012, Jim Baker - 48 - CC-SA 3.0
Thread Confinement• Probably don’t need to share a large percentage of the mutable objects
used in your code• Very simply put, if you don’t share, then thread safety issues go away
Jython Concurrency
© 2012, Jim Baker - 49 - CC-SA 3.0
GIL Easter Egg>>> from __future__ import GIL File "<stdin>", line 1SyntaxError: Never going to happen!
Jython Concurrency
© 2012, Jim Baker - 50 - CC-SA 3.0
No GIL• No need for a GIL:
• Jython uses standard Java garbage collection support• Protects mutable collection structures so they are threadsafe
• Threads for compute-intensive tasks can be written in Python
Jython Concurrency
© 2012, Jim Baker - 51 - CC-SA 3.0
Jython and Thread Safety• Updates still might get lost in a data race. Depends on what is
guaranteed. May need to synchronize. Be careful.• Other Java collection objects likely don’t have no-corruption guarantees
Jython Concurrency
© 2012, Jim Baker - 52 - CC-SA 3.0
JVM Issues• Need memory fences, with some nice bracketing symmetry:
• synchronized:
• acquire - fresh reads into cache• release - writes cache, visible to all processors
• volatile:
• reads value, no caching• writes, flushes that value from cache, all processors see it
Jython Concurrency
© 2012, Jim Baker - 53 - CC-SA 3.0
Translating to JVM: Volatile andMemory Fences
• Setting any attribute in Python is a volatile write• Getting any attribute is a volatile read• Implementation:
• Python attributes are stored in dictionaries• For Jython: follows the semantics of the backing ConcurrentHashMap
Jython Concurrency
© 2012, Jim Baker - 54 - CC-SA 3.0
Interruption• Volatility is key• Works in CPython or Jython - Python memory model is same
Jython Concurrency
© 2012, Jim Baker - 55 - CC-SA 3.0
One Way to Interruptclass DoSomething(Runnable): def __init__(self): cancelled = False
def run(self): while not self.cancelled: do_stuff()
Jython Concurrency
© 2012, Jim Baker - 56 - CC-SA 3.0
Python Memory Model Implications• Again: Python variables are always volatile, unlike Java• No problems with using a cancelled flag like this
Jython Concurrency
© 2012, Jim Baker - 57 - CC-SA 3.0
Memory Model Implications for Jython• Python code has sequential consistency• No surprises: execution follows the statement order• Safe publication is trivial
• Safe publication simply means assigning a name to an object• Just need to ensure that the object itself is built in a thread-safe
fashion; then publish• It is possible to defeat this: start with sys._getframe() …• Or use arrays or anything directly going against Java
Jython Concurrency
© 2012, Jim Baker - 58 - CC-SA 3.0
Java Interruption• Can use standard Java API for threads• Thread.interrupt• Note the alternate calling form here• Works because we can strip off the proxy that Jython uses
Jython Concurrency
© 2012, Jim Baker - 59 - CC-SA 3.0
Thread Interruption• Thread interruption allows for even more responsive cancellation• If a a thread is waiting on most synchronizers (condition variable, file I/O),
this action will cause the waited-on method to exit with anInterruptedException.
• Lock acquistion, except by using lockInterruptibly on underlying Java lock,is not interruptible.
Jython Concurrency
© 2012, Jim Baker - 60 - CC-SA 3.0
Interrupting Code
from threading import Condition, Lock, Threadfrom java.lang import Thread as JThread, InterruptedExceptionimport time, threading
def be_unfair(): unfair_condition = Condition() threads = [ Thread( name="thread #{}".format(i),
Jython Concurrency
© 2012, Jim Baker - 61 - CC-SA 3.0
target=wait_until_interrupted, args=(unfair_condition,)) for i in xrange(5)] for thread in threads: thread.start() time.sleep(5)
# instead of notifying, we will interrupt the threads for thread in threads: JThread.interrupt(thread) for thread in threads: thread.join()
Jython Concurrency
© 2012, Jim Baker - 62 - CC-SA 3.0
def wait_until_interrupted(cv): name = threading.currentThread().getName() with cv: while not JThread.currentThread().isInterrupted(): try: print "{} Waiting pointlessly".format(name) cv.wait() except InterruptedException, e: print "{} Got exception: {}".format(name, str(e)) break print "{} Finished".format(name)
Jython Concurrency
© 2012, Jim Baker - 63 - CC-SA 3.0
if __name__ == '__main__': be_unfair()
Jython Concurrency
© 2012, Jim Baker - 64 - CC-SA 3.0
Module Import Lock• Defined by the Python specification• Always acquired on every import!• Don’t write code like this in a hot loop, especially when threaded!
def slow_things_way_down(): from foo import bar, baz # etc
Jython Concurrency
© 2012, Jim Baker - 65 - CC-SA 3.0
• Ensures safe publication of module-level names * Globals being initialized* Functions being decorated * Classes being built
Jython Concurrency
© 2012, Jim Baker - 66 - CC-SA 3.0
Tasks vs Threads• Avoid haphazard threading
• Heterogeneous threads are problematic/evil• Dependencies managed through a variety of channels
• Use tasks instead to build a simple, scalable system * Encapsulatesbusiness logic * Explicit wait-on dependencies and time scheduling (ifnecessary)
Jython Concurrency
© 2012, Jim Baker - 67 - CC-SA 3.0
Downloaderfrom java.util.concurrent import Callable
class Downloader(Callable): def __init__(self, url): self.url = url def call(self): try: self.result = urllib2.urlopen(self.url).read() except Exception, ex: self.exception = ex return self
Jython Concurrency
© 2012, Jim Baker - 68 - CC-SA 3.0
Execution Models for Executors• Variety of execution models
• Thread pools• 1 thread per task (so no pooling)• 1 thread (so sequential)• Fork-join with work stealing in Java 7 (also available in Java 6)
• Combined with blocking queues using ExecutorCompletionService
Jython Concurrency
© 2012, Jim Baker - 69 - CC-SA 3.0
ExecutorCompletionService Code
from downloader import Downloaderfrom shutdown import shutdown_and_await_terminationfrom java.util.concurrent import Executors, ExecutorCompletionServiceimport osimport hashlib
MAX_CONCURRENT = 3SITES = [ "http://www.cnn.com/",
Jython Concurrency
© 2012, Jim Baker - 70 - CC-SA 3.0
"http://www.nytimes.com/", "http://www.washingtonpost.com/", "http://www.dailycamera.com/", "http://www.timescall.com/", # generate a random web site name that is very, very unlikely to exist "http://" + hashlib.md5( "unlikely-web-site-" + os.urandom(4)).hexdigest() + ".com", ]
pool = Executors.newFixedThreadPool(MAX_CONCURRENT)ecs = ExecutorCompletionService(pool)
Jython Concurrency
© 2012, Jim Baker - 71 - CC-SA 3.0
# this function could spider the links from these roots;# for now just schedule these roots directlydef scheduler(roots): for site in roots: yield site
# submit tasks indefinitelyfor site in scheduler(SITES): ecs.submit(Downloader(site))
# work with results as soon as they become availablesubmitted = len(SITES)
Jython Concurrency
© 2012, Jim Baker - 72 - CC-SA 3.0
while submitted > 0: result = ecs.take().get() # here we just do something unimaginative with the result; # consider parsing it with tools like BeautifulSoup or JSoup print result submitted -= 1
print "shutting pool down..."shutdown_and_await_termination(pool, 5)print "done"
Jython Concurrency
© 2012, Jim Baker - 73 - CC-SA 3.0
Google Guava’s LoadingCache• Part of Guava, kick-butt library for doing stuff concurrently• Weak keys, weak values, LRU support, …• Used by Jython itself• load method blocks upon computation
Jython Concurrency
© 2012, Jim Baker - 74 - CC-SA 3.0
Java or Python APIs - Recommendations• Port Python code that uses the threading module
• Such code can still interoperate with Java, because Jython threadsare always mapped to Java threads
• Use Python types concurrently - Jython implements dict and set withConcurrentHashMap
• Use java.util.concurrent collections or third-party Java libraries like Guava• Use higher-level primitives from Java instead of creating your own
Jython Concurrency
© 2012, Jim Baker - 75 - CC-SA 3.0
Concurrency Resources• Jython Book
http://www.jython.org/jythonbook/en/1.0/Concurrency.html• Java Memory Model resources -
http://www.cs.umd.edu/~pugh/java/memoryModel• Guava docs -
http://code.google.com/p/guava-libraries/wiki/GuavaExplained• Java concurrency mailing list -
http://g.oswego.edu/dl/concurrency-interest
Jython Concurrency
© 2012, Jim Baker - 76 - CC-SA 3.0