java concurrency and asynchronous
DESCRIPTION
Introduce Java concurrency and asynchronous programmingTRANSCRIPT
Agenda
● Fundamentals: JMM, volatile, sychronized,
wait and notify
● JUC: Thread Pool, Concurrent Collections,
Sychronizers
● Cancellation & Shutdown
● Java 6, 7 and 8
● Asynchronous Programming
Preliminary
● Junior programmers think Java concurrency
programming is difficult.
● Intermediate programmers think Java concurrency
programming is easy.
● Senior programmers think Java concurrecy is difficult.
Why we need concurrency programming?
1. Multi-core processors
2. IO
3. Quicker response
Agenda
● Fundamentals: JMM, volatile, sychronized,
wait and notify
● JUC: Thread Pool, Concurrent Collections,
Sychronizers
● Cancellation & Shutdown
● Java 6, 7 and 8
● Asynchronous Programming
Java Memory Model
An example
class DmaStatusChecker implements Runnable {
public void run() {
if (requestDmaStatusApiSuccess()) {
dmaManager.dmaRunning = true;
} else {
dmaManager.dmaRunning = false;
}
}
}
class DmaManager {
private boolean dmaRunning;
public boolean isDmaRunnin() {
return this.dmaRunning;
}
}
Any problems?
volatile version
class DmaManager {
private volatile boolean dmaRunning;
public boolean isDmaRunnin() {
return this.dmaRunning;
}
}
Why “volatile” is necessary?
1. Memory visibility
2. Instrument reordering
Reorder
class ReorderExample {
int a = 0;
boolean flag = false;
public void writer() {
a = 1; //1
flag = true; //2
}
Public void reader() {
if (flag) { //3
int i = a * a; //4
// ......
}
}
}
The Java Memory Model
Atomic
For example, “i++”. It is a composite operation. “volatile”
keywords cannot make “i++” to be atomic.
i++ => 1. read
2. plus
3. write
Synchronized
Full synchronizedSynchronizedFactorizer.java
Partial synchronizedCachedFactorizer.java
Thread cooperate: wait, notify and notifyAll
● Question: What are differences between notify and
notifyAll?
● Quesiton: In the producer/consumer model, could both
producers and consumers are waiting?
● Question: When should use notify? When should use
notifyAll?
Agenda
● Fundamentals: JMM, volatile, sychronized,
wait and notify
● JUC: Thread Pool, Concurrent Collections,
Sychronizers
● Cancellation & Shutdown
● Java 6, 7 and 8
● Asynchronous Programming
Shortcomings before Java 5
● Only ONE condition queue, no matter how many
condition predicates.
● No thread pool, create a thread is resource consumed.
● Synchronized collections are bad performanced.
● Lack concurrent collections and tools with more
features.
Thread Pool
The constructor of ThreadPoolExecutor
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue)
So is the following sample to create a Thread Pool correct?
new ThreadPoolExecutor(1, 10, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
Concurrent Collection
Sychronizers
CountDownLatch: countDown(), await()
CyclicBarrier: await()
Phaser: register(), arriveAndAwaitAdvance(),
arriveAndDeregister()
Semaphore: acquire()
Exchanger: exchange(V x)
CAS
CPU instrument: Compare and Swap
Compare and Set:int old;
int new;
do {
old = value.get();
new = doSomeCalcBasedOn(old)
while (value.compareAndSwap(old, new));
AbstractQueuedSynchronizer
Agenda
● Fundamentals: JMM, volatile, sychronized,
wait and notify
● JUC: Thread Pool, Concurrent Collections,
Sychronizers
● Cancellation & Shutdown
● Java 6, 7 and 8
● Asynchronous Programming
Interruption
The most common thing we know about the
interruption is
InterruptedException
About InterruptedException
try {
new Object().wait();
Thread.sleep(1000);
Object element = blockingQueue.take();
// Other blocked methods.
} catch(InterruptedException e) {
// This means the current thread has been interrupted.
// Sometimes we do not need to handle it.
}
So when we do not invoke a blocked method, how to know the current
thread is interrupted?
Two methods
Class “Thread” has two similar methods to know whether the current
thread is interrupted:
Modifier and Type Class and Description
static boolean interrupted()
Tests whether the current thread has been interrupted.
boolean isInterrupted()
Tests whether this thread has been interrupted.
Propogate the interruption
Why?
class AService {
public void aBizMethod() {
try {
Thread.sleep(1000);
} catch(InterruptedException e) {
// You think do nothing is ok.
}
}
}
class MyRunnable implements Runnable {
public void run() {
new AService().aBizMethod();
try {
Object item = blockingQueue.take()
// Process item
} catch(InterruptedException e) {
// do nothing
}
}
}
How?
1. Re-throw InterruptionException
2. Recover the interruption state with
Thread.currentThread().interrupt()
Cancel a Task in the Thread Pool
Cancel with Future:
Modifier and Type Method and Description
boolean cancel(boolean mayInterruptIfRunning)
Attempts to cancel execution of this task.
Shutdown a Thread Pool
Modifier and Type Method and Description
void shutdown()
Initiates an orderly shutdown in which previously submitted
tasks are executed, but no new tasks will be accepted.
List<Runnable> shutdownNow()
Attempts to stop all actively executing tasks, halts the
processing of waiting tasks, and returns a list of the tasks
that were awaiting execution.
boolean awaitTermination(long timeout, TimeUnit unit)
Blocks until all tasks have completed execution after a
shutdown request, or the timeout occurs, or the current
thread is interrupted, whichever happens first.
Agenda
● Fundamentals: JMM, volatile, sychronized,
wait and notify
● JUC: Thread Pool, Concurrent Collections,
Sychronizers
● Cancellation & Shutdown
● Java 6, 7 and 8
● Asynchronous Programming
Java 6, 7
Java 6ConcurrentSkipListMap
Java 7ForkJoinPool, RecursiveAction, RecursiveTask
LinkedTransferQueue
Java 8
Class CompletableFuture<T>: A Future that may be explicitly completed (setting its value and status), and may be
used as a CompletionStage, supporting dependent functions and actions that trigger upon its completion.
Class CountedCompleter<T>: A ForkJoinTask with a completion action performed when triggered and there are no
remaining pending actions.
DoubleAccumulator: One or more variables that together maintain a running double value updated using a supplied
function.
DoubleAdder: One or more variables that together maintain an initially zero double sum.
LongAccumulator: One or more variables that together maintain a running long value updated using a supplied
function.
LongAdder: One or more variables that together maintain an initially zero long sum.
A new StampedLock class adds a capability-based lock with three modes for controlling read/write access (writing,
reading, and optimistic reading). This class also supports methods that conditionally provide conversions across the
three modes.
Agenda
● Fundamentals: JMM, volatile, sychronized,
wait and notify
● JUC: Thread Pool, Concurrent Collections,
Sychronizers
● Cancellation & Shutdown
● Java 6, 7 and 8
● Asynchronous Programming
IO: Block, Reactor and Proactor
Block: Take a thread -> Connect -> Read -> Process
Reactor: Connect -> When connected, notify to take a thread -> Read -> Process
Proactor: Connect -> Read -> When finish read, notify to take a thread -> Process
Asynchronous Programming
Synchronous Style
function getUserEvents(request, response) {
var facebook_id = request.param('facebook_id');
try {
var user = db.users.findOne({fb_id:facebook_id});
var events = db.events.find({user_id:user.id});
response.write(events);
} catch(err) {
response.status(500).send(err);
}
}
Asynchronous Style (Callback)
function getUserEvents(request,response) {
var returnEvents = function(err, events) {
if (err)
respone.status(500).send(err);
response.write(events);
});
var givenUserFindAndReturnEvents = function(err,user) {
if (err) respone.status(500).send(err);
db.events.find({user_id:user.id}, returnEvents);
};
var findUserAndReturnEvents = function() {
var facebook_id = request.param('facebook_id');
db.users.findOne({fb_id:facebook_id}, givenUserFindAndReturnEvents);
}
findUserAndReturnEvents();
}
Synchronous Call
Reactive Programming
Iterator<T> (Pull) Observer<T> (Push)
Next T next() void onNext(T t)
Success boolean hasNext() void onCompleted()
Error Throw an exception on next()
void onError(Throwable e)
Stacks: .Net Reactive Extension, Netflix RxJava
Asynchronous Call
Callback Troubles
Akka
Based “Actor model”, written with Scala
public class Greeter extends UntypedActor {
public void onReceive(Object message) {
String greeting = "";
if (message instanceof WhoToGreet)
greeting = "hello, " + ((WhoToGreet) message).who;
else if (message instanceof Greet)
// Send the current greeting back to the sender
getSender().tell(new Greeting(greeting), getSelf());
else unhandled(message);
}
}
Spring Reactor
final Environment env = new Environment();
// Reactors are created using a ReactorSpec obtained via factory method
Reactor reactor = Reactors.reactor().env(env).get();
// Register a processor on an event.
reactor.on($("topic"), new Consumer<Event<Message>>() { ... });
// if you don't like the $, use the `object()` method
reactor.on(Selectors.object("topic"), new Consumer<Event<Message>>() { ... });
// Fire the event.
Message msg = msgService.nextMessage();
reactor.notify("topic", Event.wrap(msg));
Node.js
Servlet Async
@WebServlet(urlPatterns={"/asyncservlet"}, asyncSupported=true)
public class AsyncServlet extends HttpServlet {
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response) {
response.setContentType("text/html;charset=UTF-8");
final AsyncContext acontext = request.startAsync();
acontext.start(() -> {
String param = acontext.getRequest().getParameter("param");
String result = resource.process(param);
HttpServletResponse response = acontext.getResponse();
/* ... print to the response ... */
acontext.complete();
});
}
}
Spring MVC Async
@Controller
@RequestMapping("/async/callable")
public class CallableController {
@RequestMapping("/response-body")
public @ResponseBody Callable<String> callable() {
return new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return "Callable result";
}
};
}
}
Q & A
Thanks