async and non-blocking io w/ jruby
TRANSCRIPT
BLOCKING IS BAD
▸ Each thread consumes resources
▸ Memory, Stack frames
▸ OS context switching
▸ GC has to walk the stack frames
▸ More $$$
PROMISES
promise = Blocking.get do # execution segmentend
promise.then do |result| # execution segmentend
PROMISES
promise = Blocking.get do # execution segment
end
promise.then do |result| # execution segment
end
EVENT LOOP
SYNCHRONOUS EXAMPLE
results = terms.map do |item| url = rest_url(item) response = Net ::HTTP.get(url) JSON.parse(response)["Item"] end.flatten
render(results)
ASYNCHRONOUS EXAMPLE
results = [] terms.each do |item| url = rest_url(item) http_client = ctx.get(HttpClient.java_class) http_client.get(url).then do |response| body = response.get_body.get_text results << JSON.parse(body)["Item"] end end
Promise.value(results).then { |r| render(r) }
ASYNCHRONOUS EXAMPLE
results = [] terms.each do |item| url = rest_url(item) http_client = ctx.get(HttpClient.java_class) http_client.get(url).then do |response| body = response.get_body.get_text results << JSON.parse(body)["Item"] end end
Promise.value(results).then { |r| render(r) }
Promise
ASYNCHRONOUS EXAMPLE
results = [] terms.each do |item| url = rest_url(item) http_client = ctx.get(HttpClient.java_class) http_client.get(url).then do |response| body = response.get_body.get_text results << JSON.parse(body)["Item"] end end
Promise.value(results).then { |r| render(r) }
ASYNCHRONOUS EXAMPLE
results = [] terms.each do |item| url = rest_url(item) http_client = ctx.get(HttpClient.java_class) http_client.get(url).then do |response| body = response.get_body.get_text results << JSON.parse(body)["Item"] end end
Promise.value(results).then { |r| render(r) }
promises = terms.map do |item| url = rest_url(item) http_client = ctx.get(HttpClient.java_class) http_client.get(url) end
batch = ParallelBatch.of(promises)
results = Collections.synchronized_list([]) operation = batch.for_each do |i, response| body = response.get_body.get_text results << JSON.parse(body)["Item"] end
operation.then { render(results) }
promises = terms.map do |item| url = rest_url(item) http_client = ctx.get(HttpClient.java_class) http_client.get(url) end
batch = ParallelBatch.of(promises)
results = Collections.synchronized_list([]) operation = batch.for_each do |i, response| body = response.get_body.get_text results << JSON.parse(body)["Item"] end
operation.then { render(results) }
Promise
promises = terms.map do |item| url = rest_url(item) http_client = ctx.get(HttpClient.java_class) http_client.get(url) end
batch = ParallelBatch.of(promises) results = Collections.synchronized_list([])
operation = batch.for_each do |i, response| body = response.get_body.get_text results << JSON.parse(body)["Item"] end
operation.then { render(results) }
promises = terms.map do |item| url = rest_url(item) http_client = ctx.get(HttpClient.java_class) http_client.get(url) end
batch = ParallelBatch.of(promises)
results = Collections.synchronized_list([]) operation = batch.for_each do |i, response| body = response.get_body.get_text results << JSON.parse(body)["Item"] end
operation.then { render(results) }
promises = terms.map do |item| url = rest_url(item) http_client = ctx.get(HttpClient.java_class) http_client.get(url) end
batch = ParallelBatch.of(promises)
results = Collections.synchronized_list([]) operation = batch.for_each do |i, response| body = response.get_body.get_text results << JSON.parse(body)["Item"] end
operation.then { render(results) }
Gemfile
source 'https: //rubygems.org'
ruby '2.3.3', engine: 'jruby', engine_version: '9.1.12.0'
gem 'jbundler'
INSTALL DEPENDENCIES
$ bundle install
$ jbundle install...
jbundler runtime classpath:------------------------.../ratpack-core-1.4.6.jar.../netty-common-4.1.6.Final.jar...
jbundle complete !
lib/server.rb
require 'java' require 'bundler/setup' Bundler.require
java_import 'ratpack.server.RatpackServer'
RatpackServer.start do |b| b.handlers do |chain| chain.get("async") do |ctx| # async request handling end
chain.get("sync") do |ctx| # sync request handling end end end
RUN THE APP
$ bundle exec ruby lib/server.rb [main] INFO ratpack.server.RatpackServer - Starting server ...[main] INFO ratpack.server.RatpackServer - Building registry ...[main] INFO ratpack.server.RatpackServer - Ratpack started ...
RATPACK INTEGRATES WITH JAVA LIBRARIES
▸ RxJava
▸ compose & transform asynchronous operations
▸ Hystrix
▸ fault tolerance
▸ caching
▸ de-duping
▸ batching
Jarfile
jar 'io.ratpack:ratpack-core', '1.4.6'
jar 'io.ratpack:ratpack-rx', '1.4.6'
jar 'io.ratpack:ratpack-hystrix', '1.4.6'
jar 'org.slf4j:slf4j-simple', '1.7.25'
lib/server.rb
RatpackServer.start do |b| book_service = BookService.new
b.handlers do |chain| chain.get do |ctx| book_service.all(ctx).subscribe do |books| render_erb(ctx, "index.html.erb", binding) end end
lib/server.rb
RatpackServer.start do |b| book_service = BookService.new
b.handlers do |chain| chain.get do |ctx| book_service.all(ctx).subscribe do |books| render_erb(ctx, "index.html.erb", binding) end end
Observable Subscription
lib/server.rb
RatpackServer.start do |b| book_service = BookService.new
b.handlers do |chain| chain.get do |ctx| book_service.all(ctx).subscribe do |books| render_erb(ctx, "index.html.erb", binding) end end
lib/book_service.rb
class BookService def initialize @db = BookDbCommands.new @isbn_db = IsbnDbCommands.new end
def all(ctx) @db.all.flat_map do |row| @isbn_db.get_book(ctx, row[:isbn]).map do |json| row.merge(JSON.parse(json)) end end.to_list end
lib/book_service.rb
class BookService def initialize @db = BookDbCommands.new @isbn_db = IsbnDbCommands.new end
def all(ctx) @db.all.flat_map do |row| @isbn_db.get_book(ctx, row[:isbn]).map do |json| row.merge(JSON.parse(json)) end end.to_list end
Observable
lib/book_service.rb
class BookService def initialize @db = BookDbCommands.new @isbn_db = IsbnDbCommands.new end
def all(ctx) @db.all.flat_map do |row| @isbn_db.get_book(ctx, row[:isbn]).map do |json| row.merge(JSON.parse(json)) end end.to_list end Observable
lib/book_service.rb
class BookService def initialize @db = BookDbCommands.new @isbn_db = IsbnDbCommands.new end
def all(ctx) @db.all.flat_map do |row| @isbn_db.get_book(ctx, row[:isbn]).map do |json| row.merge(JSON.parse(json)) end end.to_list end
Observable
lib/book_service.rb
class BookService def initialize @db = BookDbCommands.new @isbn_db = IsbnDbCommands.new end
def all(ctx) @db.all.flat_map do |row| @isbn_db.get_book(ctx, row[:isbn]).map do |json| row.merge(JSON.parse(json)) end end.to_list end
Observable
lib/book_service.rb
class BookService def initialize @db = BookDbCommands.new @isbn_db = IsbnDbCommands.new end
def all(ctx) @db.all.flat_map do |row| @isbn_db.get_book(ctx, row[:isbn]).map do |json| row.merge(JSON.parse(json)) end end.to_list end
Observable
OBSERVABLES
▸ all: Observable that will emit items
▸ flat_map: Applies a function to each item, and returns an Observable that emits items
▸ to_list: Returns an Observable that emits an Array of items
▸ subscribe: Returns a Subscription to the Observable
lib/book_db_commands.rb
class BookDbCommands
def all(ctx) s = HystrixObservableCommand ::Setter. with_group_key(GROUP_KEY). and_command_key(HystrixCommandKey ::Factory.as_key("getAll"))
Class.new(HystrixObservableCommand) do def construct RxRatpack.observe_each(Blocking.get { DB["select isbn, quantity, price from books order by isbn"].all }) end
def get_cache_key "db-bookdb-all" end end.new(s).to_observable end
lib/book_db_commands.rb
class BookDbCommands
def all(ctx) s = HystrixObservableCommand ::Setter. with_group_key(GROUP_KEY). and_command_key(HystrixCommandKey ::Factory.as_key("getAll"))
Class.new(HystrixObservableCommand) do def construct RxRatpack.observe_each(Blocking.get { DB["select isbn, quantity, price from books order by isbn"].all }) end
def get_cache_key "db-bookdb-all" end end.new(s).to_observable end
lib/book_db_commands.rb
class BookDbCommands
def all(ctx) s = HystrixObservableCommand ::Setter. with_group_key(GROUP_KEY). and_command_key(HystrixCommandKey ::Factory.as_key("getAll"))
Class.new(HystrixObservableCommand) do def construct RxRatpack.observe_each(Blocking.get { DB["select isbn, quantity, price from books order by isbn"].all }) end
def get_cache_key "db-bookdb-all" end end.new(s).to_observable end
lib/book_db_commands.rb
class BookDbCommands
def all(ctx) s = HystrixObservableCommand ::Setter. with_group_key(GROUP_KEY). and_command_key(HystrixCommandKey ::Factory.as_key("getAll"))
Class.new(HystrixObservableCommand) do def construct RxRatpack.observe_each(Blocking.get { DB["select isbn, quantity, price from books order by isbn"].all }) end
def get_cache_key "db-bookdb-all" end end.new(s).to_observable end
IN RUBY…
WAYS TO DO ASYNC
▸ Netty (via Ratpack)
▸ Servlet 3.1 Async (via Warbler)
▸ Vert.x
▸ EventMachine
TEXT
WHY NOT ASYC?
▸ Bottleneck is often the DB
▸ Async is hard
▸ Non-deterministic
▸ Callback hell
▸ Error handling/propagation
LINKS
▸ http://jruby.org
▸ https://ratpack.io
▸ https://netty.io
▸ http://reactivex.io
▸ https://github.com/jkutner/jruby-ratpack-async-demo
▸ https://github.com/jkutner/jruby-ratpack-books-example