gor nishanov, c++ coroutines – a negative overhead abstraction
TRANSCRIPT
C++ Russia 2016 Coroutines 2
What this talk is about?
• C++ Coroutines• Lightweight, customizable coroutines• C++17 (maybe)• Experimental Implementation in
MSVC 2015, Clang in progress, EDG
2012 - N33282013 - N35642013 - N36502013 - N37222014 - N38582014 - N39772014 - N4134 EWG direction approved2014 - N42862015 - N4403 EWG accepted, sent to Core WG2015 - P0057R0 Core & LEWG review (co_xxx)2016 - P0057R2 more Core & LEWG review
C++ Russia 2016 Coroutines 3
C++ in two lines• Direct mapping to hardware• Zero-overhead abstractions
From Bjarne Stroustrup lecture:The Essence of C++
Assembler BCPL C
Simula
C++
General-Purpose Abstractions
C++11
C++14
Direct Mapping to hardware
C++ Russia 2016 Coroutines 5
000100 IDENTIFICATION DIVISION. 000200 PROGRAM-ID. HELLOWORLD. 000300* 000400 ENVIRONMENT DIVISION. 000500 CONFIGURATION SECTION. 000600 SOURCE-COMPUTER. RM-COBOL. 000700 OBJECT-COMPUTER. RM-COBOL. 000800 001000 DATA DIVISION. 001100 FILE SECTION. 001200 100000 PROCEDURE DIVISION. 100100 100200 MAIN-LOGIC SECTION. 100300 BEGIN. 100400 DISPLAY " " LINE 1 POSITION 1 ERASE EOS. 100500 DISPLAY "Hello world!" LINE 15 POSITION 10. 100600 STOP RUN. 100700 MAIN-LOGIC-EXIT. 100800 EXIT.
C++ Russia 2016 Coroutines 10
Joel ErdwinnMelvin Conway
image credits: wikipedia commons, Communication of the ACM vol.6 No.7 July 1963
C++ Russia 2016 Coroutines 11
C
S
Y
A
AC Y
Write toTape
S A1C Y2
Subroutine Coroutine
Basic SymbolReducer
A
C
Basic NameReducer
A
C
S AS Y output token
S A
Basic SymbolReducer
S Y output token
S
Y
S AC Y
12
S Y3
S A
read token
read token
read token
output token
(EOF)Subroutine
Subroutine
Subroutine
SubroutineCoroutine
C++ Russia 2016 Coroutines 15
Trivial if synchronousint tcp_reader(int total){ char buf[4 * 1024]; auto conn = Tcp::Connect("127.0.0.1", 1337); for (;;) { auto bytesRead = conn.Read(buf, sizeof(buf)); total -= bytesRead; if (total <= 0 || bytesRead == 0) return total; }}
C++ Russia 2016 Coroutines 16
std::future<T> and std::promise<T>
shared_state<T>
atomic<long> refCnt;mutex lock;variant<empty, T, exception_ptr> value;conditional_variable ready;
future<T>
intrusive_ptr<shared_state<T>>
wait()T get()
promise<T>
intrusive_ptr<shared_state<T>>
set_value(T)set_exception(exception_ptr)
C++ Russia 2016 Coroutines 17
future<int> tcp_reader(int64_t total) { struct State { char buf[4 * 1024]; int64_t total; Tcp::Connection conn; explicit State(int64_t total) : total(total) {} }; auto state = make_shared<State>(total); return Tcp::Connect("127.0.0.1", 1337).then( [state](future<Tcp::Connection> conn) { state->conn = std::move(conn.get()); return do_while([state]()->future<bool> { if (state->total <= 0) return make_ready_future(false); return state->conn.read(state->buf, sizeof(state->buf)).then( [state](future<int> nBytesFut) { auto nBytes = nBytesFut.get() if (nBytes == 0) return make_ready_future(false); state->total -= nBytes; return make_ready_future(true); }); }); });}
N4399 Working Draft, Technical Specification for C++ Extensions for Concurrency
.then
future<void> do_while(function<future<bool>()> body) { return body().then([=](future<bool> notDone) { return notDone.get() ? do_while(body) : make_ready_future(); });}
C++ Russia 2016 Coroutines 18
Forgot somethingint tcp_reader(int total){ char buf[4 * 1024]; auto conn = Tcp::Connect("127.0.0.1", 1337); for (;;) { auto bytesRead = conn.Read(buf, sizeof(buf)); total -= bytesRead; if (total <= 0 || bytesRead == 0) return total; }}
C++ Russia 2016 Coroutines 19
future<int> tcp_reader(int64_t total) { struct State { char buf[4 * 1024]; int64_t total; Tcp::Connection conn; explicit State(int64_t total) : total(total) {} }; auto state = make_shared<State>(total); return Tcp::Connect("127.0.0.1", 1337).then( [state](future<Tcp::Connection> conn) { state->conn = std::move(conn.get()); return do_while([state]()->future<bool> { if (state->total <= 0) return make_ready_future(false); return state->conn.read(state->buf, sizeof(state->buf)).then( [state](future<int> nBytesFut) { auto nBytes = nBytesFut.get() if (nBytes == 0) return make_ready_future(false); state->total -= nBytes; return make_ready_future(true); }); // read }); // do_while }); // Tcp::Connect}
.then
C++ Russia 2016 Coroutines 20
future<int> tcp_reader(int64_t total) { struct State { char buf[4 * 1024]; int64_t total; Tcp::Connection conn; explicit State(int64_t total) : total(total) {} }; auto state = make_shared<State>(total); return Tcp::Connect("127.0.0.1", 1337).then( [state](future<Tcp::Connection> conn) { state->conn = std::move(conn.get()); return do_while([state]()->future<bool> { if (state->total <= 0) return make_ready_future(false); return state->conn.read(state->buf, sizeof(state->buf)).then( [state](future<int> nBytesFut) { auto nBytes = nBytesFut.get() if (nBytes == 0) return make_ready_future(false); state->total -= nBytes; return make_ready_future(true); }); // read }); // do_while }).then([state](future<void>){return make_ready_future(state->total)});}
.then
C++ Russia 2016 Coroutines 21
Hand-crafted async state machine (1/3)class tcp_reader{ char buf[64 * 1024]; Tcp::Connection conn; promise<int> done; int total;
explicit tcp_reader(int total): total(total) {}
void OnConnect(error_code ec, Tcp::Connection newCon); void OnRead(error_code ec, int bytesRead); void OnError(error_code ec); void OnComplete();
public: static future<int> start(int total);};int main() { cout << tcp_reader::start(1000 * 1000 * 1000).get(); }
Failed
Connecting
Completed
Reading①
①
②
②③
③
④
④
⑤
⑤
C++ Russia 2016 Coroutines 22
Hand-crafted async state machine (2/3)future<int> tcp_reader::start(int total) { auto p = make_unique<tcp_reader>(total); auto result = p->done.get_future(); Tcp::Connect("127.0.0.1", 1337, [raw = p.get()](auto ec, auto newConn) { raw->OnConnect(ec, std::move(newConn)); }); p.release(); return result;}
void tcp_reader::OnConnect(error_code ec, Tcp::Connection newCon) { if (ec) return OnError(ec); conn = std::move(newCon); conn.Read(buf, sizeof(buf), [this](error_code ec, int bytesRead) { OnRead(ec, bytesRead); });}
C++ Russia 2016 Coroutines 23
Hand-crafted async state machine (3/3)void tcp_reader::OnRead(error_code ec, int bytesRead) { if (ec) return OnError(ec); total -= bytesRead; if (total <= 0 || bytesRead == 0) return OnComplete(); conn.Read(buf, sizeof(buf), [this](error_code ec, int bytesRead) { OnRead(ec, bytesRead); });}
void OnError(error_code ec) { auto cleanMe = unique_ptr<tcp_reader>(this); done.set_exception(make_exception_ptr(system_error(ec)));}
void OnComplete() { auto cleanMe = unique_ptr<tcp_reader>(this); done.set_value(total);}
C++ Russia 2016 Coroutines 25
Trivialauto tcp_reader(int total) -> int{ char buf[4 * 1024]; auto conn = Tcp::Connect("127.0.0.1", 1337); for (;;) { auto bytesRead = conn.Read(buf, sizeof(buf)); total -= bytesRead; if (total <= 0 || bytesRead == 0) return total; }}
C++ Russia 2016 Coroutines 26
Trivialauto tcp_reader(int total) -> future<int> { char buf[4 * 1024]; auto conn = await Tcp::Connect("127.0.0.1", 1337); for (;;) { auto bytesRead = await conn.Read(buf, sizeof(buf)); total -= bytesRead; if (total <= 0 || bytesRead == 0) return total; }}
C++ Russia 2016 Coroutines 27
What about perf?
MB/s
Binary size(Kbytes)
Visual C++ 2015 RTM. Measured on Lenovo W540 laptop. Transmitting & Receiving 1GB over loopback IP addr
495 (1.3x) 380 0
25 (0.85x) 30 9
Hand-CraftedCoroutines
int main() { printf("Hello, world\n"); }
Hello
C++ Russia 2016 Coroutines 28
Coroutines are closer to the metal
Hardware
OS / Low Level Libraries
HandcraftedState
Machines
I/O Abstractions(Callback based) I/O Abstraction
(Awaitable based)
Coroutines
C++ Russia 2016 Coroutines 29
How to map high level call to OS API?
template <class Cb> void Read(void* buf, size_t bytes, Cb && cb);
conn.Read(buf, sizeof(buf), [this](error_code ec, int bytesRead) { OnRead(ec, bytesRead); });
Windows: WSARecv(fd, ..., OVERLAPPED*) Posix aio: aio_read(fd, ..., aiocbp*)
aiocbp
Function Object
OVERLAPPED
FunctionObject
C++ Russia 2016 Coroutines 30
struct OverlappedBase : os_async_context { virtual void Invoke(std::error_code, int bytes) = 0; virtual ~OverlappedBase() {}
static void io_complete_callback(CompletionPacket& p) { auto me = unique_ptr<OverlappedBase>(static_cast<OverlappedBase*>(p.overlapped)); me->Invoke(p.error, p.byteTransferred); }};
template <typename Fn> unique_ptr<OverlappedBase> make_handler_with_count(Fn && fn) { return std::make_unique<CompletionWithCount<std::decay_t<Fn>>(std::forward<Fn>(fn));}
os_async_ctxOVERLAPPED/aiocbp
FunctionObject
After open associate a socket handle with a threadpool and a callback ThreadPool::AssociateHandle(sock.native_handle(), &OverlappedBase::io_complete_callback);template <typename Fn> struct CompletionWithCount : OverlappedBase, private Fn{ CompletionWithCount(Fn fn) : Fn(std::move(fn)) {}
void Invoke(std::error_code ec, int count) override { Fn::operator()(ec, count); }};
C++ Russia 2016 Coroutines 31
template <typename F>void Read(void* buf, int len, F && cb) { return Read(buf, len, make_handler_with_count(std::forward<F>(cb)));}
void Read(void* buf, int len, std::unique_ptr<detail::OverlappedBase> o){
auto error = sock.Receive(buf, len, o.get());if (error) {
if (error.value() != kIoPending) { o->Invoke(error, 0);
return; }}o.release();
}
conn.Read(buf, sizeof(buf), [this](error_code ec, int bytesRead) { OnRead(ec, bytesRead); });
C++ Russia 2016 Coroutines 33
Awaitable – Concept of the Future<T>
.await_ready()F<T> → bool
.await_suspend(cb)F<T> x Fn → void
.await_resume()F<T> → T
PresentT
PresentT
Present
T
await expr-of-awaitable-type
C++ Russia 2016 Coroutines 34
await <expr>Expands into an expression equivalent of
{ auto && tmp = operator await(opt) <expr>; if (!tmp.await_ready()) { tmp.await_suspend(<coroutine-handle>); } return tmp.await_resume(tmp);}
suspendresume
C++ Russia 2016 Coroutines 35
Overlapped Base from beforestruct OverlappedBase : os_async_context { virtual void Invoke(std::error_code, int bytes) = 0; virtual ~OverlappedBase() {}
static void io_complete_callback(CompletionPacket& p) { auto me = static_cast<OverlappedBase*>(p.overlapped); auto cleanMe = unique_ptr<OverlappedBase>(me);
me->Invoke(p.error, p.byteTransferred); }};
REMEMBER THIS?
C++ Russia 2016 Coroutines 36
Overlapped Base for awaitablestruct AwaiterBase : os_async_context{ coroutine_handle<> resume; std::error_code err; int bytes;
static void io_complete_callback(CompletionPacket& p) { auto me = static_cast<AwaiterBase*>(p.overlapped); me->err = p.error; me->bytes = p.byteTransferred; me->resume(); }};
mov rcx, [rcx]jmp [rcx]
sizeof(void*)no dtor
C++ Russia 2016 Coroutines 38
auto Connection::Read(void* buf, int len) { struct awaiter: AwaiterBase { Connection* me; void* buf; awaiter(Connection* me, void* buf, int len) : me(me), buf(buf) { bytes = len; }
bool await_ready() { return false; }
void await_suspend(coroutine_handle<> h) { this->resume = h; auto error = me->sock.Receive(buf, bytes, this);
if (error.value() != kIoPending) throw system_error(err); }
int await_resume() { if (this->err) throw system_error(err); return bytes; } }; return awaiter{ this, buf, len };}
struct AwaiterBase : os_async_context { coroutine_handle<> resume; std::error_code err; int bytes;
static void io_complete_callback(CompletionPacket& p){ auto me = static_cast<AwaiterBase*>(p.overlapped); me->err = p.error; me->bytes = p.byteTransferred; me->resume(); }};
C++ Russia 2016 Coroutines 39
Trivialauto tcp_reader(int total) -> future<int> { char buf[4 * 1024]; auto conn = await Tcp::Connect("127.0.0.1", 1337); for (;;) { auto bytesRead = await conn.Read(buf, sizeof(buf)); total -= bytesRead; if (total <= 0 || bytesRead == 0) return total; }}
C++ Russia 2016 Coroutines 40
Can we make it better?
50% I/O completes synchronously50% I/O with I/O pending error
SetFileCompletionNotificationModes(h, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS);
C++ Russia 2016 Coroutines 41
Take advantage of synchronous completions
void Read(void* buf, int len, std::unique_ptr<detail::OverlappedBase> o){
auto error = sock.Receive(buf, len, o.get());if (error) {
if (error.value() != kIoPending) { o->Invoke(error, 0);
return; }}o.release();
}
SetFileCompletionNotificationModes(h, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS);
C++ Russia 2016 Coroutines 42
Take advantage of synchronous completions
void Read(void* buf, int len, std::unique_ptr<detail::OverlappedBase> o){
auto error = sock.Receive(buf, len, o.get());
if (error.value() != kIoPending) { o->Invoke(error, len);
return; }
o.release();}
SetFileCompletionNotificationModes(h, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS);
C++ Russia 2016 Coroutines 43
Take advantage of synchronous completions
void Read(void* buf, int len, std::unique_ptr<detail::OverlappedBase> o){
auto error = sock.Receive(buf, len, o.get());
if (error.value() != kIoPending) { o->Invoke(error, len);
return; }
o.release();}
SetFileCompletionNotificationModes(h, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS);
SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::detail::io_complete_callback(CompletionPacket & p) Line 22SuperLean.exe!CompletionQueue::ThreadProc(void * lpParameter) Line 112 C++
StackOverflow
C++ Russia 2016 Coroutines 44
Need to implement it on the use side
void tcp_reader::OnRead(std::error_code ec, int bytesRead) { if (ec) return OnError(ec); total -= (int)bytesRead; if (total <= 0 || bytesRead == 0) return OnComplete(); bytesRead = sizeof(buf); conn.Read(buf, bytesRead, [this](std::error_code ec, int bytesRead) { OnRead(ec, bytesRead); }) ;}
C++ Russia 2016 Coroutines 45
Now handling synchronous completion
void tcp_reader::OnRead(std::error_code ec, int bytesRead) { do { if (ec) return OnError(ec); total -= (int)bytesRead; if (total <= 0 || bytesRead == 0) return OnComplete(); bytesRead = sizeof(buf); } while ( conn.Read(buf, bytesRead, [this](std::error_code ec, int bytesRead) { OnRead(ec, bytesRead); }));}
C++ Russia 2016 Coroutines 46
Let’s measure the improvement (handwritten)
Handcrafted Coroutine Handcrafted Coroutine
Original 380 495 30 25Synchr Completion. Opt
MB/s Executable size
485
25
30
C++ Russia 2016 Coroutines 47
auto Connection::Read(void* buf, int len) { struct awaiter: AwaiterBase { Connection* me; void* buf; awaiter(Connection* me, void* buf, int len) : me(me), buf(buf) { bytes = len; }
bool await_ready() { return false; }
void await_suspend(coroutine_handle<> h) { this->resume = h; auto error = me->sock.Receive(buf, bytes, this); if (error.value() == kIoPending) return; if (error) throw system_error(err); return; }
int await_resume() { if (this->err) throw system_error(err); return bytes; } }; return awaiter{ this, buf, len };}
struct AwaiterBase : os_async_context { coroutine_handle<> resume; std::error_code err; int bytes;
static void io_complete_callback(CompletionPacket& p){ auto me = static_cast<AwaiterBase*>(p.overlapped); me->err = p.error; me->bytes = p.byteTransferred; me->resume(); }};
SetFileCompletionNotificationModes(h, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS);
C++ Russia 2016 Coroutines 48
auto Connection::Read(void* buf, int len) { struct awaiter: AwaiterBase { Connection* me; void* buf; awaiter(Connection* me, void* buf, int len) : me(me), buf(buf) { bytes = len; }
bool await_ready() { return false; }
bool await_suspend(coroutine_handle<> h) { this->resume = h; auto error = me->sock.Receive(buf, bytes, this); if (error.value() == kIoPending) return true; if (error) throw system_error(err); return false; }
int await_resume() { if (this->err) throw system_error(err); return bytes; } }; return awaiter{ this, buf, len };}
struct AwaiterBase : os_async_context { coroutine_handle<> resume; std::error_code err; int bytes;
static void io_complete_callback(CompletionPacket& p){ auto me = static_cast<AwaiterBase*>(p.overlapped); me->err = p.error; me->bytes = p.byteTransferred; me->resume(); }};
C++ Russia 2016 Coroutines 49
await <expr>Expands into an expression equivalent of
{ auto && tmp = operator co_await <expr>; if (! tmp.await_ready()) { tmp.await_suspend(<coroutine-handle>); } return tmp.await_resume();}
suspendresume
C++ Russia 2016 Coroutines 50
await <expr>Expands into an expression equivalent of
{ auto && tmp = operator await(opt) <expr>; if (! tmp.await_ready() && tmp.await_suspend(<coroutine-handle>) { } return tmp.await_resume();}
suspendresume
C++ Russia 2016 Coroutines 51
Let’s measure the improvement (coroutine)
Handcrafted Coroutine Handcrafted Coroutine
Original 380 495 30 25Synchr Completion. Opt 485 30
MB/s Executable size
1028
25
25
C++ Russia 2016 Coroutines 54
Getting rid of the allocationsclass tcp_reader { std::unique_ptr<detail::OverlappedBase> wo; …
tcp_reader(int64_t total) : total(total) { wo = detail::make_handler_with_count( [this](auto ec, int nBytes) {OnRead(ec, nBytes); }); … }
void OnRead(std::error_code ec, int bytesRead) { if (ec) return OnError(ec); do { total -= (int)bytesRead; if (total <= 0 || bytesRead == 0) return OnComplete(); bytesRead = sizeof(buf); } while (conn.Read(buf, bytesRead, wo.get())); }
C++ Russia 2016 Coroutines 55
Let’s measure the improvement (handcrafted)
Handcrafted Coroutine Handcrafted Coroutine
Original 380 495 30 25Synchr Completion. Opt 485 1028 30 25Prealloc handler 1028 25
MB/s Executable size
690
25
28
C++ Russia 2016 Coroutines 56
Coroutines are popular!Python: PEP 0492 async def abinary(n): if n <= 0: return 1 l = await abinary(n - 1) r = await abinary(n - 1) return l + 1 + r
HACK (programming language)
async function gen1(): Awaitable<int> { $x = await Batcher::fetch(1); $y = await Batcher::fetch(2); return $x + $y; }
DART 1.9Future<int> getPage(t) async { var c = new http.Client(); try { var r = await c.get('http://url/search?q=$t'); print(r); return r.length(); } finally { await c.close(); }}
C#async Task<string> WaitAsynchronouslyAsync() { await Task.Delay(10000); return "Finished"; }
C++17future<string> WaitAsynchronouslyAsync() { await sleep_for(10ms); return "Finished“s; }
C++ Russia 2016 Coroutines 58
Generalized Function
Compiler
User
CoroutineDesigner
AsyncGeneratorawait + yield
Generatoryield
Taskawait
Monadic*await - suspend
POF
does not careimage credits: Три богатыря и змей горыныч
C++ Russia 2016 Coroutines 59
Design Principles• Scalable (to billions of concurrent coroutines)• Efficient (resume and suspend operations comparable in cost to
a function call overhead)• Seamless interaction with existing facilities with no overhead• Open ended coroutine machinery allowing library designers to
develop coroutine libraries exposing various high-level semantics, such as generators, goroutines, tasks and more.
• Usable in environments where exceptions are forbidden or not available
C++ Russia 2016 Coroutines 64
Return Address
Locals of F
Parameters of F
Thread Stack
F’s ActivationRecord
…
Return Address
Locals of G
Parameters of G
G’s ActivationRecord
Return Address
Locals of H
Parameters of H
H’s ActivationRecord
Stack Pointer
Stack Pointer
Stack Pointer Normal Functions
C++ Russia 2016 Coroutines 65
Return Address
Locals of F
Parameters of F
Thread 1 Stack
F’s ActivationRecord
…
Return Address
Locals of G
Parameters of G
G’s ActivationRecord
Return Address
Locals of H
Parameters of H
H’s ActivationRecord
Stack Pointer
Stack Pointer
Stack Pointer Normal Functions
C++ Russia 2016 Coroutines 66
Return Address
Locals of F
Parameters of F
Thread 1 Stack
F’s ActivationRecord
…
Return Address
Locals of H
Parameters of H
H’s ActivationRecord
Stack Pointer
Coroutines using Fibers (first call)Stack Pointer
Locals of G
Parameters of G
Return Address
Fiber Context
Old Stack Top
Saved Registers
Fiber Stack
Fiber StartRoutine
Thread Context:IP,RSP,RAX,RCX
RDX,…RDI,etcSaved Registers
C++ Russia 2016 Coroutines 67
Return Address
Locals of F
Parameters of F
Thread 1 Stack
F’s ActivationRecord
…
Return Address
Locals of H
Parameters of H
H’s ActivationRecord
Coroutines using Fibers (Suspend)Stack Pointer
Locals of G
Parameters of G
Return Address
Fiber Context
Old Stack Top
Saved Registers
Fiber Stack
Fiber StartRoutine
Thread Context:IP,RSP,RAX,RCX
RDX,…RDI,RSI,
etcSaved RegistersSaved Registers
C++ Russia 2016 Coroutines 68
Return Address
Locals of Z
Parameters of Z
Thread 2 Stack
Z’s ActivationRecord
…
Return Address
Locals of H
Parameters of H
H’s ActivationRecord
Stack Pointer
Coroutines using Fibers (Resume)
Locals of G
Parameters of G
Return Address
Fiber Context
Old Stack Top
Saved Registers
Fiber Stack
Fiber StartRoutine
Saved Registers
Return Address
Saved Registers
C++ Russia 2016 Coroutines 69
https://github.com/mirror/boost/blob/master/libs/context/src/asm/jump_x86_64_ms_pe_masm.asm (1/2)
C++ Russia 2016 Coroutines 70
https://github.com/mirror/boost/blob/master/libs/context/src/asm/jump_x86_64_ms_pe_masm.asm (2/2)
C++ Russia 2016 Coroutines 71
Mitigating Memory Footprint
Fiber State
1 meg of stack
(chained stack)
4k stacklet
4k stacklet
4k stacklet
4k stacklet
…
4k stacklet
(reallocate and copy)
2k stack
4k stack
…
1k stack
8k stack
16k stack
C++ Russia 2016 Coroutines 72
Design Principles• Scalable (to billions of concurrent coroutines)• Efficient (resume and suspend operations comparable in cost to
a function call overhead)• Seamless interaction with existing facilities with no overhead• Open ended coroutine machinery allowing library designers to
develop coroutine libraries exposing various high-level semantics, such as generators, goroutines, tasks and more.
• Usable in environments where exceptions are forbidden or not available
C++ Russia 2016 Coroutines 73
Compiler based coroutinesgenerator<int> f() { for (int i = 0; i < 5; ++i) { yield i;}
generator<int> f() { f$state *mem = __coro_elide() ? alloca(f$state) : new f$state; mem->__resume_fn = &f$resume; mem->__destroy_fn = &f$resume; return {mem};}
struct f$state { void* __resume_fn; void* __destroy_fn; int __resume_index = 0; int i;};
void f$resume(f$state s) { switch (s->__resume_index) { case 0: s->i = 0; s->resume_index = 1; break; case 1: if( ++s->i == 5) s->resume_address = nullptr; break; }}
int main() { for (int v: f()) printf(“%d\n”, v);}
void f$destroy(f$state s) { if(!__coro_elide()) delete f$state;}
int main() { printf(“%d\n”, 0); printf(“%d\n”, 1); printf(“%d\n”, 2); printf(“%d\n”, 3); printf(“%d\n”, 4);}
C++ Russia 2016 Coroutines 74
Return Address
Locals of F
Parameters of F
Thread 1 Stack
F’s ActivationRecord
…
Return Address
Locals of G
Parameters of G
G’s ActivationRecord (Coroutine)
Return Address
Locals of H
Parameters of H
H’s ActivationRecord
Stack Pointer
Stack Pointer
Stack Pointer Compiler Based Coroutines
struct G$state { void* __resume_fn; void* __destroy_fn; int __resume_index;
locals, temporaries that need to preserve values across suspend points};
G’s CoroutineState
C++ Russia 2016 Coroutines 75
Return Address
Locals of F
Parameters of F
Thread 1 Stack
F’s ActivationRecord
…
Return Address
Locals of G
Parameters of G
G’s ActivationRecord
Return Address
Locals of H
Parameters of H
H’s ActivationRecord
Stack Pointer
Stack Pointer
Stack Pointer Compiler Based Coroutines(Suspend)
struct G$state { void* __resume_fn; void* __destroy_fn; int __resume_index;
locals, temporaries that need to preserve values across suspend points};
G’s CoroutineState
C++ Russia 2016 Coroutines 76
Return Address
Locals of X
Parameters of X
Thread 2 Stack
X’s ActivationRecord
…
Return Address
Locals of g$resume
Parameters of g$resume
G$resume’s ActivationRecord
Return Address
Locals of H
Parameters of H
H’s ActivationRecord
Stack Pointer
Stack Pointer
Stack Pointer Compiler Based Coroutines(Resume)
struct G$state { void* __resume_fn; void* __destroy_fn; int __resume_index;
locals, temporaries that need to preserve values across suspend points};
G’s CoroutineState
C++ Russia 2016 Coroutines 77
Design Principles• Scalable (to billions of concurrent coroutines)• Efficient (resume and suspend operations comparable in cost to
a function call overhead)• Seamless interaction with existing facilities with no overhead• Open ended coroutine machinery allowing library designers to
develop coroutine libraries exposing various high-level semantics, such as generators, goroutines, tasks and more.
• Usable in environments where exceptions are forbidden or not available
C++ Russia 2016 Coroutines 79
2 x 2 x 2• Two new keywords
• await• yield
syntactic sugar for: await $p.yield_value(expr)
• Two new concepts• Awaitable• Coroutine Promise
•Two library types• coroutine_handle• coroutine_traits
After Kona 2015co_awaitco_yieldco_return
C++ Russia 2016 Coroutines 80
Trivial Awaitable #1
struct _____blank____ { bool await_ready(){ return false; } template <typename F> void await_suspend(F){} void await_resume(){}};
C++ Russia 2016 Coroutines 81
Trivial Awaitable #1
struct suspend_always { bool await_ready(){ return false; } template <typename F> void await_suspend(F){} void await_resume(){}};
await suspend_always {};
C++ Russia 2016 Coroutines 82
Trivial Awaitable #2
struct suspend_never { bool await_ready(){ return true; } template <typename F> void await_suspend(F){} void await_resume(){}};
C++ Russia 2016 Coroutines 83
Simple Awaitable #1std::future<void> DoSomething(mutex& m) { unique_lock<mutex> lock = await lock_or_suspend{m}; // ...}
struct lock_or_suspend { std::unique_lock<std::mutex> lock; lock_or_suspend(std::mutex & mut) : lock(mut, std::try_to_lock) {}
bool await_ready() { return lock.owns_lock(); }
template <typename F> void await_suspend(F cb) { std::thread t([this, cb]{ lock.lock(); cb(); }); t.detach(); }
auto await_resume() { return std::move(lock);}};
Do not use!
For illustration only!
C++ Russia 2016 Coroutines 85
2 x 2 x 2• Two new keywords
• await• yield
syntactic sugar for: await $p.yield_value(expr)
• Two new concepts• Awaitable• Coroutine Promise
•Two library types• coroutine_handle• coroutine_traits
After Kona 2015co_awaitco_yieldco_return
C++ Russia 2016 Coroutines 86
coroutine_handletemplate <typename Promise = void> struct coroutine_handle; template <> struct coroutine_handle<void> { void resume(); void destroy(); bool done() const; void * address(); static coroutine_handle from_address(void*); void operator()(); // same as resume()…};
== != < > <= >=
C++ Russia 2016 Coroutines 87
Simple Awaitable #2: Raw OS APIs await 10ms;class awaiter {
static void CALLBACK TimerCallback(PTP_CALLBACK_INSTANCE, void *Context, PTP_TIMER) { std::experimental::coroutine_handle<>::from_address(Context).resume(); } PTP_TIMER timer = nullptr; std::chrono::system_clock::duration duration;public: explicit awaiter(std::chrono::system_clock::duration d) : duration(d) {} bool await_ready() const { return duration.count() <= 0; } void await_suspend(std::experimental::coroutine_handle<> resume_cb) { timer = CreateThreadpoolTimer(TimerCallback, resume_cb.address(), nullptr); if (!timer) throw std::bad_alloc(); int64_t relative_count = -duration.count(); SetThreadpoolTimer(timer, (PFILETIME)&relative_count, 0, 0); } void await_resume() {} ~awaiter() { if (timer) CloseThreadpoolTimer(timer); }};
auto operator await(std::chrono::system_clock::duration duration) { return awaiter{duration};}
C++ Russia 2016 Coroutines 88
2 x 2 x 2• Two new keywords
• await• yield
syntactic sugar for: await $p.yield_value(expr)
• Two new concepts• Awaitable• Coroutine Promise
•Two library types• coroutine_handle• coroutine_traits
After Kona 2015co_awaitco_yieldco_return
C++ Russia 2016 Coroutines 89
coroutine_traits
template <typename R, typename... Ts>struct coroutine_traits { using promise_type = typename R::promise_type;};
generator<int> fib(int n)
std::coroutine_traits<generator<int>, int>
C++ Russia 2016 Coroutines 90
Compiler vs Coroutine Promise
yield <expr>
await <Promise>.yield_value(<expr>)
<before-last-curly>
return <expr>
<Promise>.return_value(<expr>); goto <end>
<after-first-curly>
<unhandled-exception> <Promise>.set_exception ( std::current_exception())
<get-return-object> <Promise>.get_return_object()
await <Promise>.initial_suspend()
await <Promise>.final_suspend()
await <expr>
Spent the last hour talking about it
<allocate coro-state> <Promise>.operator new (or global)
<free coro-state> <Promise>.operator delete (or global)
C++ Russia 2016 Coroutines 91
Defining Coroutine Promise for boost::future
namespace std { template <typename T, typename… anything> struct coroutine_traits<boost::unique_future<T>, anything…> { struct promise_type { boost::promise<T> promise; auto get_return_object() { return promise.get_future(); }
template <class U> void return_value(U && value) { promise.set_value(std::forward<U>(value)); }
void set_exception(std::exception_ptr e) { promise.set_exception(std::move(e)); } std::suspend_never initial_suspend() { return {}; } std::suspend_never final_suspend() { return {}; } }; };}
C++ Russia 2016 Coroutines 92
coroutine_handle<promise>template <typename Promise = void> struct coroutine_handle; template <> struct coroutine_handle<void> { void resume(); void destroy(); bool done() const; void * address(); static coroutine_handle from_address(void*); void operator()(); // same as resume()…};
template < typename Promise> struct coroutine_handle: coroutine_handle<void> { Promise & promise(); static coroutine_handle from_promise(Promise&);};
== != < > <= >=
C++ Russia 2016 Coroutines 93
Defining Generator From Scratchstruct int_generator { bool move_next(); int current_value(); …};
int_generator f() { for (int i = 0; i < 5; i++) { yield i; }
int main() { auto g = f (); while (g.move_next()) { printf("%d\n", g.current_value()); }}
C++ Russia 2016 Coroutines 94
struct int_generator { struct promise_type { int current_value; std::suspend_always yield_value(int value) { this->current_value = value; return{}; } std::suspend_always initial_suspend() { return{}; } std::suspend_always final_suspend() { return{}; } int_generator get_return_object() { return int_generator{ this }; }; }; bool move_next() { p.resume(); return !p.done(); } int current_value() { return p.promise().current_value; } ~int_generator() { p.destroy(); }private: explicit int_generator(promise_type *p) : p(std::coroutine_handle<promise_type>::from_promise(*p)) {}
std::coroutine_handle<promise_type> p;};
Defining Generator From Scratchyield <expr>
await <Promise>.yield_value(<expr>)
C++ Russia 2016 Coroutines 95
STL looks like the machine language macro library of an anally retentive assembly language programmer
Pamela Seymour, Leiden University
C++ Russia 2016 Coroutines 96
C++ Coroutines: Layered complexity
• Everybody• Safe by default, novice friendly
Use coroutines and awaitables defined by standard library, boost and other high quality libraries
• Power Users• Define new awaitables to customize await for their
environment using existing coroutine types• Experts
• Define new coroutine types
C++ Russia 2016 Coroutines 97
Thank you!
Kavya Kotacherry, Daveed Vandevoorde, Richard Smith, Jens Maurer, Lewis Baker, Kirk Shoop, Hartmut Kaiser, Kenny Kerr, Artur Laksberg, Jim
Radigan, Chandler Carruth, Gabriel Dos Reis, Deon Brewis, Jonathan Caves, James McNellis, Stephan T. Lavavej, Herb Sutter, Pablo Halpern,
Robert Schumacher, Viktor Tong, Geoffrey Romer, Michael Wong, Niklas Gustafsson, Nick Maliwacki, Vladimir Petter, Shahms King, Slava
Kuznetsov, Tongari J, Lawrence Crowl, Valentin Isac and many more who contributed
C++ Russia 2016 Coroutines 98
Coroutines – a negative overhead abstraction
• Proposal is working through C++ standardization committee (C++17?)
• Experimental implementation in VS 2015 RTM• Clang implementation is in progress• more details:
• http://www.open-std.org/JTC1/SC22/WG21/docs/papers/2016/P0057R2.pdf