gor nishanov, c++ coroutines – a negative overhead abstraction

95
C++ Coroutines a negative overhead abstraction [email protected]

Upload: sergey-platonov

Post on 06-Jan-2017

2.917 views

Category:

Software


2 download

TRANSCRIPT

C++ Coroutinesa negative overhead abstraction

[email protected]

C++ Russia 2016 Coroutines 2

What this talk is about?

• C++ Coroutines• Lightweight, customizable coroutines• C++17 (maybe)• Experimental Implementation in

MSVC 2015, Clang in progress, EDG

2012 - N33282013 - N35642013 - N36502013 - N37222014 - N38582014 - N39772014 - N4134 EWG direction approved2014 - N42862015 - N4403 EWG accepted, sent to Core WG2015 - P0057R0 Core & LEWG review (co_xxx)2016 - P0057R2 more Core & LEWG review

C++ Russia 2016 Coroutines 3

C++ in two lines• Direct mapping to hardware• Zero-overhead abstractions

From Bjarne Stroustrup lecture:The Essence of C++

Assembler BCPL C

Simula

C++

General-Purpose Abstractions

C++11

C++14

Direct Mapping to hardware

C++ Russia 2016 Coroutines 4

C++ Russia 2016 Coroutines 5

000100 IDENTIFICATION DIVISION. 000200 PROGRAM-ID. HELLOWORLD. 000300* 000400 ENVIRONMENT DIVISION. 000500 CONFIGURATION SECTION. 000600 SOURCE-COMPUTER. RM-COBOL. 000700 OBJECT-COMPUTER. RM-COBOL. 000800 001000 DATA DIVISION. 001100 FILE SECTION. 001200 100000 PROCEDURE DIVISION. 100100 100200 MAIN-LOGIC SECTION. 100300 BEGIN. 100400 DISPLAY " " LINE 1 POSITION 1 ERASE EOS. 100500 DISPLAY "Hello world!" LINE 15 POSITION 10. 100600 STOP RUN. 100700 MAIN-LOGIC-EXIT. 100800 EXIT.

C++ Russia 2016 Coroutines 6

C++ Russia 2016 Coroutines 7

C++ Russia 2016 Coroutines 8

C++ Russia 2016 Coroutines 9

C++ Russia 2016 Coroutines 10

Joel ErdwinnMelvin Conway

image credits: wikipedia commons, Communication of the ACM vol.6 No.7 July 1963

C++ Russia 2016 Coroutines 11

C

S

Y

A

AC Y

Write toTape

S A1C Y2

Subroutine Coroutine

Basic SymbolReducer

A

C

Basic NameReducer

A

C

S AS Y output token

S A

Basic SymbolReducer

S Y output token

S

Y

S AC Y

12

S Y3

S A

read token

read token

read token

output token

(EOF)Subroutine

Subroutine

Subroutine

SubroutineCoroutine

C++ Russia 2016 Coroutines 12

100 cards per minute!

C++ Russia 2016 Coroutines 13

1958196519751985199520052016

C++ Russia 2016 Coroutines 14

Async state machine

Failed

Connecting

Completed

Reading

C++ Russia 2016 Coroutines 15

Trivial if synchronousint tcp_reader(int total){ char buf[4 * 1024]; auto conn = Tcp::Connect("127.0.0.1", 1337); for (;;) { auto bytesRead = conn.Read(buf, sizeof(buf)); total -= bytesRead; if (total <= 0 || bytesRead == 0) return total; }}

C++ Russia 2016 Coroutines 16

std::future<T> and std::promise<T>

shared_state<T>

atomic<long> refCnt;mutex lock;variant<empty, T, exception_ptr> value;conditional_variable ready;

future<T>

intrusive_ptr<shared_state<T>>

wait()T get()

promise<T>

intrusive_ptr<shared_state<T>>

set_value(T)set_exception(exception_ptr)

C++ Russia 2016 Coroutines 17

future<int> tcp_reader(int64_t total) { struct State { char buf[4 * 1024]; int64_t total; Tcp::Connection conn; explicit State(int64_t total) : total(total) {} }; auto state = make_shared<State>(total); return Tcp::Connect("127.0.0.1", 1337).then( [state](future<Tcp::Connection> conn) { state->conn = std::move(conn.get()); return do_while([state]()->future<bool> { if (state->total <= 0) return make_ready_future(false); return state->conn.read(state->buf, sizeof(state->buf)).then( [state](future<int> nBytesFut) { auto nBytes = nBytesFut.get() if (nBytes == 0) return make_ready_future(false); state->total -= nBytes; return make_ready_future(true); }); }); });}

N4399 Working Draft, Technical Specification for C++ Extensions for Concurrency

.then

future<void> do_while(function<future<bool>()> body) { return body().then([=](future<bool> notDone) { return notDone.get() ? do_while(body) : make_ready_future(); });}

C++ Russia 2016 Coroutines 18

Forgot somethingint tcp_reader(int total){ char buf[4 * 1024]; auto conn = Tcp::Connect("127.0.0.1", 1337); for (;;) { auto bytesRead = conn.Read(buf, sizeof(buf)); total -= bytesRead; if (total <= 0 || bytesRead == 0) return total; }}

C++ Russia 2016 Coroutines 19

future<int> tcp_reader(int64_t total) { struct State { char buf[4 * 1024]; int64_t total; Tcp::Connection conn; explicit State(int64_t total) : total(total) {} }; auto state = make_shared<State>(total); return Tcp::Connect("127.0.0.1", 1337).then( [state](future<Tcp::Connection> conn) { state->conn = std::move(conn.get()); return do_while([state]()->future<bool> { if (state->total <= 0) return make_ready_future(false); return state->conn.read(state->buf, sizeof(state->buf)).then( [state](future<int> nBytesFut) { auto nBytes = nBytesFut.get() if (nBytes == 0) return make_ready_future(false); state->total -= nBytes; return make_ready_future(true); }); // read }); // do_while }); // Tcp::Connect}

.then

C++ Russia 2016 Coroutines 20

future<int> tcp_reader(int64_t total) { struct State { char buf[4 * 1024]; int64_t total; Tcp::Connection conn; explicit State(int64_t total) : total(total) {} }; auto state = make_shared<State>(total); return Tcp::Connect("127.0.0.1", 1337).then( [state](future<Tcp::Connection> conn) { state->conn = std::move(conn.get()); return do_while([state]()->future<bool> { if (state->total <= 0) return make_ready_future(false); return state->conn.read(state->buf, sizeof(state->buf)).then( [state](future<int> nBytesFut) { auto nBytes = nBytesFut.get() if (nBytes == 0) return make_ready_future(false); state->total -= nBytes; return make_ready_future(true); }); // read }); // do_while }).then([state](future<void>){return make_ready_future(state->total)});}

.then

C++ Russia 2016 Coroutines 21

Hand-crafted async state machine (1/3)class tcp_reader{ char buf[64 * 1024]; Tcp::Connection conn; promise<int> done; int total;

explicit tcp_reader(int total): total(total) {}

void OnConnect(error_code ec, Tcp::Connection newCon); void OnRead(error_code ec, int bytesRead); void OnError(error_code ec); void OnComplete();

public: static future<int> start(int total);};int main() { cout << tcp_reader::start(1000 * 1000 * 1000).get(); }

Failed

Connecting

Completed

Reading①

②③

C++ Russia 2016 Coroutines 22

Hand-crafted async state machine (2/3)future<int> tcp_reader::start(int total) { auto p = make_unique<tcp_reader>(total); auto result = p->done.get_future(); Tcp::Connect("127.0.0.1", 1337, [raw = p.get()](auto ec, auto newConn) { raw->OnConnect(ec, std::move(newConn)); }); p.release(); return result;}

void tcp_reader::OnConnect(error_code ec, Tcp::Connection newCon) { if (ec) return OnError(ec); conn = std::move(newCon); conn.Read(buf, sizeof(buf), [this](error_code ec, int bytesRead) { OnRead(ec, bytesRead); });}

C++ Russia 2016 Coroutines 23

Hand-crafted async state machine (3/3)void tcp_reader::OnRead(error_code ec, int bytesRead) { if (ec) return OnError(ec); total -= bytesRead; if (total <= 0 || bytesRead == 0) return OnComplete(); conn.Read(buf, sizeof(buf), [this](error_code ec, int bytesRead) { OnRead(ec, bytesRead); });}

void OnError(error_code ec) { auto cleanMe = unique_ptr<tcp_reader>(this); done.set_exception(make_exception_ptr(system_error(ec)));}

void OnComplete() { auto cleanMe = unique_ptr<tcp_reader>(this); done.set_value(total);}

C++ Russia 2016 Coroutines 24

Async state machine

Failed

Connecting

Completed

Reading

C++ Russia 2016 Coroutines 25

Trivialauto tcp_reader(int total) -> int{ char buf[4 * 1024]; auto conn = Tcp::Connect("127.0.0.1", 1337); for (;;) { auto bytesRead = conn.Read(buf, sizeof(buf)); total -= bytesRead; if (total <= 0 || bytesRead == 0) return total; }}

C++ Russia 2016 Coroutines 26

Trivialauto tcp_reader(int total) -> future<int> { char buf[4 * 1024]; auto conn = await Tcp::Connect("127.0.0.1", 1337); for (;;) { auto bytesRead = await conn.Read(buf, sizeof(buf)); total -= bytesRead; if (total <= 0 || bytesRead == 0) return total; }}

C++ Russia 2016 Coroutines 27

What about perf?

MB/s

Binary size(Kbytes)

Visual C++ 2015 RTM. Measured on Lenovo W540 laptop. Transmitting & Receiving 1GB over loopback IP addr

495 (1.3x) 380 0

25 (0.85x) 30 9

Hand-CraftedCoroutines

int main() { printf("Hello, world\n"); }

Hello

C++ Russia 2016 Coroutines 28

Coroutines are closer to the metal

Hardware

OS / Low Level Libraries

HandcraftedState

Machines

I/O Abstractions(Callback based) I/O Abstraction

(Awaitable based)

Coroutines

C++ Russia 2016 Coroutines 29

How to map high level call to OS API?

template <class Cb> void Read(void* buf, size_t bytes, Cb && cb);

conn.Read(buf, sizeof(buf), [this](error_code ec, int bytesRead) { OnRead(ec, bytesRead); });

Windows: WSARecv(fd, ..., OVERLAPPED*) Posix aio: aio_read(fd, ..., aiocbp*)

aiocbp

Function Object

OVERLAPPED

FunctionObject

C++ Russia 2016 Coroutines 30

struct OverlappedBase : os_async_context { virtual void Invoke(std::error_code, int bytes) = 0; virtual ~OverlappedBase() {}

static void io_complete_callback(CompletionPacket& p) { auto me = unique_ptr<OverlappedBase>(static_cast<OverlappedBase*>(p.overlapped)); me->Invoke(p.error, p.byteTransferred); }};

template <typename Fn> unique_ptr<OverlappedBase> make_handler_with_count(Fn && fn) { return std::make_unique<CompletionWithCount<std::decay_t<Fn>>(std::forward<Fn>(fn));}

os_async_ctxOVERLAPPED/aiocbp

FunctionObject

After open associate a socket handle with a threadpool and a callback ThreadPool::AssociateHandle(sock.native_handle(), &OverlappedBase::io_complete_callback);template <typename Fn> struct CompletionWithCount : OverlappedBase, private Fn{ CompletionWithCount(Fn fn) : Fn(std::move(fn)) {}

void Invoke(std::error_code ec, int count) override { Fn::operator()(ec, count); }};

C++ Russia 2016 Coroutines 31

template <typename F>void Read(void* buf, int len, F && cb) { return Read(buf, len, make_handler_with_count(std::forward<F>(cb)));}

void Read(void* buf, int len, std::unique_ptr<detail::OverlappedBase> o){

auto error = sock.Receive(buf, len, o.get());if (error) {

if (error.value() != kIoPending) { o->Invoke(error, 0);

return; }}o.release();

}

conn.Read(buf, sizeof(buf), [this](error_code ec, int bytesRead) { OnRead(ec, bytesRead); });

C++ Russia 2016 Coroutines 32

await conn.Read(buf, sizeof(buf));

?

C++ Russia 2016 Coroutines 33

Awaitable – Concept of the Future<T>

.await_ready()F<T> → bool

.await_suspend(cb)F<T> x Fn → void

.await_resume()F<T> → T

PresentT

PresentT

Present

T

await expr-of-awaitable-type

C++ Russia 2016 Coroutines 34

await <expr>Expands into an expression equivalent of

{ auto && tmp = operator await(opt) <expr>; if (!tmp.await_ready()) { tmp.await_suspend(<coroutine-handle>); } return tmp.await_resume(tmp);}

suspendresume

C++ Russia 2016 Coroutines 35

Overlapped Base from beforestruct OverlappedBase : os_async_context { virtual void Invoke(std::error_code, int bytes) = 0; virtual ~OverlappedBase() {}

static void io_complete_callback(CompletionPacket& p) { auto me = static_cast<OverlappedBase*>(p.overlapped); auto cleanMe = unique_ptr<OverlappedBase>(me);

me->Invoke(p.error, p.byteTransferred); }};

REMEMBER THIS?

C++ Russia 2016 Coroutines 36

Overlapped Base for awaitablestruct AwaiterBase : os_async_context{ coroutine_handle<> resume; std::error_code err; int bytes;

static void io_complete_callback(CompletionPacket& p) { auto me = static_cast<AwaiterBase*>(p.overlapped); me->err = p.error; me->bytes = p.byteTransferred; me->resume(); }};

mov rcx, [rcx]jmp [rcx]

sizeof(void*)no dtor

C++ Russia 2016 Coroutines 37

await conn.Read(buf, sizeof(buf));

?

C++ Russia 2016 Coroutines 38

auto Connection::Read(void* buf, int len) { struct awaiter: AwaiterBase { Connection* me; void* buf; awaiter(Connection* me, void* buf, int len) : me(me), buf(buf) { bytes = len; }

bool await_ready() { return false; }

void await_suspend(coroutine_handle<> h) { this->resume = h; auto error = me->sock.Receive(buf, bytes, this);

if (error.value() != kIoPending) throw system_error(err); }

int await_resume() { if (this->err) throw system_error(err); return bytes; } }; return awaiter{ this, buf, len };}

struct AwaiterBase : os_async_context { coroutine_handle<> resume; std::error_code err; int bytes;

static void io_complete_callback(CompletionPacket& p){ auto me = static_cast<AwaiterBase*>(p.overlapped); me->err = p.error; me->bytes = p.byteTransferred; me->resume(); }};

C++ Russia 2016 Coroutines 39

Trivialauto tcp_reader(int total) -> future<int> { char buf[4 * 1024]; auto conn = await Tcp::Connect("127.0.0.1", 1337); for (;;) { auto bytesRead = await conn.Read(buf, sizeof(buf)); total -= bytesRead; if (total <= 0 || bytesRead == 0) return total; }}

C++ Russia 2016 Coroutines 40

Can we make it better?

50% I/O completes synchronously50% I/O with I/O pending error

SetFileCompletionNotificationModes(h, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS);

C++ Russia 2016 Coroutines 41

Take advantage of synchronous completions

void Read(void* buf, int len, std::unique_ptr<detail::OverlappedBase> o){

auto error = sock.Receive(buf, len, o.get());if (error) {

if (error.value() != kIoPending) { o->Invoke(error, 0);

return; }}o.release();

}

SetFileCompletionNotificationModes(h, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS);

C++ Russia 2016 Coroutines 42

Take advantage of synchronous completions

void Read(void* buf, int len, std::unique_ptr<detail::OverlappedBase> o){

auto error = sock.Receive(buf, len, o.get());

if (error.value() != kIoPending) { o->Invoke(error, len);

return; }

o.release();}

SetFileCompletionNotificationModes(h, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS);

C++ Russia 2016 Coroutines 43

Take advantage of synchronous completions

void Read(void* buf, int len, std::unique_ptr<detail::OverlappedBase> o){

auto error = sock.Receive(buf, len, o.get());

if (error.value() != kIoPending) { o->Invoke(error, len);

return; }

o.release();}

SetFileCompletionNotificationModes(h, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS);

SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::tcp_reader::OnRead(std::error_code ec, int bytesRead) Line 254SuperLean.exe!improved::detail::CompletionWithSizeT<<lambda_ee38b7a750c7f550b4ee1dd60c2450c1> >::Invoke(std::error_code ec, int count) Line 31SuperLean.exe!improved::detail::io_complete_callback(CompletionPacket & p) Line 22SuperLean.exe!CompletionQueue::ThreadProc(void * lpParameter) Line 112 C++

StackOverflow

C++ Russia 2016 Coroutines 44

Need to implement it on the use side

void tcp_reader::OnRead(std::error_code ec, int bytesRead) { if (ec) return OnError(ec); total -= (int)bytesRead; if (total <= 0 || bytesRead == 0) return OnComplete(); bytesRead = sizeof(buf); conn.Read(buf, bytesRead, [this](std::error_code ec, int bytesRead) { OnRead(ec, bytesRead); }) ;}

C++ Russia 2016 Coroutines 45

Now handling synchronous completion

void tcp_reader::OnRead(std::error_code ec, int bytesRead) { do { if (ec) return OnError(ec); total -= (int)bytesRead; if (total <= 0 || bytesRead == 0) return OnComplete(); bytesRead = sizeof(buf); } while ( conn.Read(buf, bytesRead, [this](std::error_code ec, int bytesRead) { OnRead(ec, bytesRead); }));}

C++ Russia 2016 Coroutines 46

Let’s measure the improvement (handwritten)

Handcrafted Coroutine Handcrafted Coroutine

Original 380 495 30 25Synchr Completion. Opt

MB/s Executable size

485

25

30

C++ Russia 2016 Coroutines 47

auto Connection::Read(void* buf, int len) { struct awaiter: AwaiterBase { Connection* me; void* buf; awaiter(Connection* me, void* buf, int len) : me(me), buf(buf) { bytes = len; }

bool await_ready() { return false; }

void await_suspend(coroutine_handle<> h) { this->resume = h; auto error = me->sock.Receive(buf, bytes, this); if (error.value() == kIoPending) return; if (error) throw system_error(err); return; }

int await_resume() { if (this->err) throw system_error(err); return bytes; } }; return awaiter{ this, buf, len };}

struct AwaiterBase : os_async_context { coroutine_handle<> resume; std::error_code err; int bytes;

static void io_complete_callback(CompletionPacket& p){ auto me = static_cast<AwaiterBase*>(p.overlapped); me->err = p.error; me->bytes = p.byteTransferred; me->resume(); }};

SetFileCompletionNotificationModes(h, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS);

C++ Russia 2016 Coroutines 48

auto Connection::Read(void* buf, int len) { struct awaiter: AwaiterBase { Connection* me; void* buf; awaiter(Connection* me, void* buf, int len) : me(me), buf(buf) { bytes = len; }

bool await_ready() { return false; }

bool await_suspend(coroutine_handle<> h) { this->resume = h; auto error = me->sock.Receive(buf, bytes, this); if (error.value() == kIoPending) return true; if (error) throw system_error(err); return false; }

int await_resume() { if (this->err) throw system_error(err); return bytes; } }; return awaiter{ this, buf, len };}

struct AwaiterBase : os_async_context { coroutine_handle<> resume; std::error_code err; int bytes;

static void io_complete_callback(CompletionPacket& p){ auto me = static_cast<AwaiterBase*>(p.overlapped); me->err = p.error; me->bytes = p.byteTransferred; me->resume(); }};

C++ Russia 2016 Coroutines 49

await <expr>Expands into an expression equivalent of

{ auto && tmp = operator co_await <expr>; if (! tmp.await_ready()) { tmp.await_suspend(<coroutine-handle>); } return tmp.await_resume();}

suspendresume

C++ Russia 2016 Coroutines 50

await <expr>Expands into an expression equivalent of

{ auto && tmp = operator await(opt) <expr>; if (! tmp.await_ready() && tmp.await_suspend(<coroutine-handle>) { } return tmp.await_resume();}

suspendresume

C++ Russia 2016 Coroutines 51

Let’s measure the improvement (coroutine)

Handcrafted Coroutine Handcrafted Coroutine

Original 380 495 30 25Synchr Completion. Opt 485 30

MB/s Executable size

1028

25

25

C++ Russia 2016 Coroutines 53

Can we make it better?

C++ Russia 2016 Coroutines 54

Getting rid of the allocationsclass tcp_reader { std::unique_ptr<detail::OverlappedBase> wo; …

tcp_reader(int64_t total) : total(total) { wo = detail::make_handler_with_count( [this](auto ec, int nBytes) {OnRead(ec, nBytes); }); … }

void OnRead(std::error_code ec, int bytesRead) { if (ec) return OnError(ec); do { total -= (int)bytesRead; if (total <= 0 || bytesRead == 0) return OnComplete(); bytesRead = sizeof(buf); } while (conn.Read(buf, bytesRead, wo.get())); }

C++ Russia 2016 Coroutines 55

Let’s measure the improvement (handcrafted)

Handcrafted Coroutine Handcrafted Coroutine

Original 380 495 30 25Synchr Completion. Opt 485 1028 30 25Prealloc handler 1028 25

MB/s Executable size

690

25

28

C++ Russia 2016 Coroutines 56

Coroutines are popular!Python: PEP 0492 async def abinary(n): if n <= 0: return 1 l = await abinary(n - 1) r = await abinary(n - 1) return l + 1 + r

HACK (programming language)

async function gen1(): Awaitable<int> { $x = await Batcher::fetch(1); $y = await Batcher::fetch(2); return $x + $y; }

DART 1.9Future<int> getPage(t) async { var c = new http.Client(); try { var r = await c.get('http://url/search?q=$t'); print(r); return r.length(); } finally { await c.close(); }}

C#async Task<string> WaitAsynchronouslyAsync() { await Task.Delay(10000); return "Finished"; }

C++17future<string> WaitAsynchronouslyAsync() { await sleep_for(10ms); return "Finished“s; }

C++ Russia 2016 Coroutines 57

Cosmetics (Nov 2015, keyword change)

co_awaitco_yield

co_return

C++ Russia 2016 Coroutines 58

Generalized Function

Compiler

User

CoroutineDesigner

AsyncGeneratorawait + yield

Generatoryield

Taskawait

Monadic*await - suspend

POF

does not careimage credits: Три богатыря и змей горыныч

C++ Russia 2016 Coroutines 59

Design Principles• Scalable (to billions of concurrent coroutines)• Efficient (resume and suspend operations comparable in cost to

a function call overhead)• Seamless interaction with existing facilities with no overhead• Open ended coroutine machinery allowing library designers to

develop coroutine libraries exposing various high-level semantics, such as generators, goroutines, tasks and more.

• Usable in environments where exceptions are forbidden or not available

C++ Russia 2016 Coroutines 60

C++ Russia 2016 Coroutines 61

Coroutine implementation strategies

C++ Russia 2016 Coroutines 64

Return Address

Locals of F

Parameters of F

Thread Stack

F’s ActivationRecord

Return Address

Locals of G

Parameters of G

G’s ActivationRecord

Return Address

Locals of H

Parameters of H

H’s ActivationRecord

Stack Pointer

Stack Pointer

Stack Pointer Normal Functions

C++ Russia 2016 Coroutines 65

Return Address

Locals of F

Parameters of F

Thread 1 Stack

F’s ActivationRecord

Return Address

Locals of G

Parameters of G

G’s ActivationRecord

Return Address

Locals of H

Parameters of H

H’s ActivationRecord

Stack Pointer

Stack Pointer

Stack Pointer Normal Functions

C++ Russia 2016 Coroutines 66

Return Address

Locals of F

Parameters of F

Thread 1 Stack

F’s ActivationRecord

Return Address

Locals of H

Parameters of H

H’s ActivationRecord

Stack Pointer

Coroutines using Fibers (first call)Stack Pointer

Locals of G

Parameters of G

Return Address

Fiber Context

Old Stack Top

Saved Registers

Fiber Stack

Fiber StartRoutine

Thread Context:IP,RSP,RAX,RCX

RDX,…RDI,etcSaved Registers

C++ Russia 2016 Coroutines 67

Return Address

Locals of F

Parameters of F

Thread 1 Stack

F’s ActivationRecord

Return Address

Locals of H

Parameters of H

H’s ActivationRecord

Coroutines using Fibers (Suspend)Stack Pointer

Locals of G

Parameters of G

Return Address

Fiber Context

Old Stack Top

Saved Registers

Fiber Stack

Fiber StartRoutine

Thread Context:IP,RSP,RAX,RCX

RDX,…RDI,RSI,

etcSaved RegistersSaved Registers

C++ Russia 2016 Coroutines 68

Return Address

Locals of Z

Parameters of Z

Thread 2 Stack

Z’s ActivationRecord

Return Address

Locals of H

Parameters of H

H’s ActivationRecord

Stack Pointer

Coroutines using Fibers (Resume)

Locals of G

Parameters of G

Return Address

Fiber Context

Old Stack Top

Saved Registers

Fiber Stack

Fiber StartRoutine

Saved Registers

Return Address

Saved Registers

C++ Russia 2016 Coroutines 69

https://github.com/mirror/boost/blob/master/libs/context/src/asm/jump_x86_64_ms_pe_masm.asm (1/2)

C++ Russia 2016 Coroutines 70

https://github.com/mirror/boost/blob/master/libs/context/src/asm/jump_x86_64_ms_pe_masm.asm (2/2)

C++ Russia 2016 Coroutines 71

Mitigating Memory Footprint

Fiber State

1 meg of stack

(chained stack)

4k stacklet

4k stacklet

4k stacklet

4k stacklet

4k stacklet

(reallocate and copy)

2k stack

4k stack

1k stack

8k stack

16k stack

C++ Russia 2016 Coroutines 72

Design Principles• Scalable (to billions of concurrent coroutines)• Efficient (resume and suspend operations comparable in cost to

a function call overhead)• Seamless interaction with existing facilities with no overhead• Open ended coroutine machinery allowing library designers to

develop coroutine libraries exposing various high-level semantics, such as generators, goroutines, tasks and more.

• Usable in environments where exceptions are forbidden or not available

C++ Russia 2016 Coroutines 73

Compiler based coroutinesgenerator<int> f() { for (int i = 0; i < 5; ++i) { yield i;}

generator<int> f() { f$state *mem = __coro_elide() ? alloca(f$state) : new f$state; mem->__resume_fn = &f$resume; mem->__destroy_fn = &f$resume; return {mem};}

struct f$state { void* __resume_fn; void* __destroy_fn; int __resume_index = 0; int i;};

void f$resume(f$state s) { switch (s->__resume_index) { case 0: s->i = 0; s->resume_index = 1; break; case 1: if( ++s->i == 5) s->resume_address = nullptr; break; }}

int main() { for (int v: f()) printf(“%d\n”, v);}

void f$destroy(f$state s) { if(!__coro_elide()) delete f$state;}

int main() { printf(“%d\n”, 0); printf(“%d\n”, 1); printf(“%d\n”, 2); printf(“%d\n”, 3); printf(“%d\n”, 4);}

C++ Russia 2016 Coroutines 74

Return Address

Locals of F

Parameters of F

Thread 1 Stack

F’s ActivationRecord

Return Address

Locals of G

Parameters of G

G’s ActivationRecord (Coroutine)

Return Address

Locals of H

Parameters of H

H’s ActivationRecord

Stack Pointer

Stack Pointer

Stack Pointer Compiler Based Coroutines

struct G$state { void* __resume_fn; void* __destroy_fn; int __resume_index;

locals, temporaries that need to preserve values across suspend points};

G’s CoroutineState

C++ Russia 2016 Coroutines 75

Return Address

Locals of F

Parameters of F

Thread 1 Stack

F’s ActivationRecord

Return Address

Locals of G

Parameters of G

G’s ActivationRecord

Return Address

Locals of H

Parameters of H

H’s ActivationRecord

Stack Pointer

Stack Pointer

Stack Pointer Compiler Based Coroutines(Suspend)

struct G$state { void* __resume_fn; void* __destroy_fn; int __resume_index;

locals, temporaries that need to preserve values across suspend points};

G’s CoroutineState

C++ Russia 2016 Coroutines 76

Return Address

Locals of X

Parameters of X

Thread 2 Stack

X’s ActivationRecord

Return Address

Locals of g$resume

Parameters of g$resume

G$resume’s ActivationRecord

Return Address

Locals of H

Parameters of H

H’s ActivationRecord

Stack Pointer

Stack Pointer

Stack Pointer Compiler Based Coroutines(Resume)

struct G$state { void* __resume_fn; void* __destroy_fn; int __resume_index;

locals, temporaries that need to preserve values across suspend points};

G’s CoroutineState

C++ Russia 2016 Coroutines 77

Design Principles• Scalable (to billions of concurrent coroutines)• Efficient (resume and suspend operations comparable in cost to

a function call overhead)• Seamless interaction with existing facilities with no overhead• Open ended coroutine machinery allowing library designers to

develop coroutine libraries exposing various high-level semantics, such as generators, goroutines, tasks and more.

• Usable in environments where exceptions are forbidden or not available

C++ Russia 2016 Coroutines 79

2 x 2 x 2• Two new keywords

• await• yield

syntactic sugar for: await $p.yield_value(expr)

• Two new concepts• Awaitable• Coroutine Promise

•Two library types• coroutine_handle• coroutine_traits

After Kona 2015co_awaitco_yieldco_return

C++ Russia 2016 Coroutines 80

Trivial Awaitable #1

struct _____blank____ { bool await_ready(){ return false; } template <typename F> void await_suspend(F){} void await_resume(){}};

C++ Russia 2016 Coroutines 81

Trivial Awaitable #1

struct suspend_always { bool await_ready(){ return false; } template <typename F> void await_suspend(F){} void await_resume(){}};

await suspend_always {};

C++ Russia 2016 Coroutines 82

Trivial Awaitable #2

struct suspend_never { bool await_ready(){ return true; } template <typename F> void await_suspend(F){} void await_resume(){}};

C++ Russia 2016 Coroutines 83

Simple Awaitable #1std::future<void> DoSomething(mutex& m) { unique_lock<mutex> lock = await lock_or_suspend{m}; // ...}

struct lock_or_suspend { std::unique_lock<std::mutex> lock; lock_or_suspend(std::mutex & mut) : lock(mut, std::try_to_lock) {}

bool await_ready() { return lock.owns_lock(); }

template <typename F> void await_suspend(F cb) { std::thread t([this, cb]{ lock.lock(); cb(); }); t.detach(); }

auto await_resume() { return std::move(lock);}};

Do not use!

For illustration only!

C++ Russia 2016 Coroutines 84

AwaitableInteracting with C APIs

C++ Russia 2016 Coroutines 85

2 x 2 x 2• Two new keywords

• await• yield

syntactic sugar for: await $p.yield_value(expr)

• Two new concepts• Awaitable• Coroutine Promise

•Two library types• coroutine_handle• coroutine_traits

After Kona 2015co_awaitco_yieldco_return

C++ Russia 2016 Coroutines 86

coroutine_handletemplate <typename Promise = void> struct coroutine_handle; template <> struct coroutine_handle<void> { void resume(); void destroy(); bool done() const; void * address(); static coroutine_handle from_address(void*); void operator()(); // same as resume()…};

== != < > <= >=

C++ Russia 2016 Coroutines 87

Simple Awaitable #2: Raw OS APIs await 10ms;class awaiter {

static void CALLBACK TimerCallback(PTP_CALLBACK_INSTANCE, void *Context, PTP_TIMER) { std::experimental::coroutine_handle<>::from_address(Context).resume(); } PTP_TIMER timer = nullptr; std::chrono::system_clock::duration duration;public: explicit awaiter(std::chrono::system_clock::duration d) : duration(d) {} bool await_ready() const { return duration.count() <= 0; } void await_suspend(std::experimental::coroutine_handle<> resume_cb) { timer = CreateThreadpoolTimer(TimerCallback, resume_cb.address(), nullptr); if (!timer) throw std::bad_alloc(); int64_t relative_count = -duration.count(); SetThreadpoolTimer(timer, (PFILETIME)&relative_count, 0, 0); } void await_resume() {} ~awaiter() { if (timer) CloseThreadpoolTimer(timer); }};

auto operator await(std::chrono::system_clock::duration duration) { return awaiter{duration};}

C++ Russia 2016 Coroutines 88

2 x 2 x 2• Two new keywords

• await• yield

syntactic sugar for: await $p.yield_value(expr)

• Two new concepts• Awaitable• Coroutine Promise

•Two library types• coroutine_handle• coroutine_traits

After Kona 2015co_awaitco_yieldco_return

C++ Russia 2016 Coroutines 89

coroutine_traits

template <typename R, typename... Ts>struct coroutine_traits { using promise_type = typename R::promise_type;};

generator<int> fib(int n)

std::coroutine_traits<generator<int>, int>

C++ Russia 2016 Coroutines 90

Compiler vs Coroutine Promise

yield <expr>

await <Promise>.yield_value(<expr>)

<before-last-curly>

return <expr>

<Promise>.return_value(<expr>); goto <end>

<after-first-curly>

<unhandled-exception> <Promise>.set_exception ( std::current_exception())

<get-return-object> <Promise>.get_return_object()

await <Promise>.initial_suspend()

await <Promise>.final_suspend()

await <expr>

Spent the last hour talking about it

<allocate coro-state> <Promise>.operator new (or global)

<free coro-state> <Promise>.operator delete (or global)

C++ Russia 2016 Coroutines 91

Defining Coroutine Promise for boost::future

namespace std { template <typename T, typename… anything> struct coroutine_traits<boost::unique_future<T>, anything…> { struct promise_type { boost::promise<T> promise; auto get_return_object() { return promise.get_future(); }

template <class U> void return_value(U && value) { promise.set_value(std::forward<U>(value)); }

void set_exception(std::exception_ptr e) { promise.set_exception(std::move(e)); } std::suspend_never initial_suspend() { return {}; } std::suspend_never final_suspend() { return {}; } }; };}

C++ Russia 2016 Coroutines 92

coroutine_handle<promise>template <typename Promise = void> struct coroutine_handle; template <> struct coroutine_handle<void> { void resume(); void destroy(); bool done() const; void * address(); static coroutine_handle from_address(void*); void operator()(); // same as resume()…};

template < typename Promise> struct coroutine_handle: coroutine_handle<void> { Promise & promise(); static coroutine_handle from_promise(Promise&);};

== != < > <= >=

C++ Russia 2016 Coroutines 93

Defining Generator From Scratchstruct int_generator { bool move_next(); int current_value(); …};

int_generator f() { for (int i = 0; i < 5; i++) { yield i; }

int main() { auto g = f (); while (g.move_next()) { printf("%d\n", g.current_value()); }}

C++ Russia 2016 Coroutines 94

struct int_generator { struct promise_type { int current_value; std::suspend_always yield_value(int value) { this->current_value = value; return{}; } std::suspend_always initial_suspend() { return{}; } std::suspend_always final_suspend() { return{}; } int_generator get_return_object() { return int_generator{ this }; }; }; bool move_next() { p.resume(); return !p.done(); } int current_value() { return p.promise().current_value; } ~int_generator() { p.destroy(); }private: explicit int_generator(promise_type *p) : p(std::coroutine_handle<promise_type>::from_promise(*p)) {}

std::coroutine_handle<promise_type> p;};

Defining Generator From Scratchyield <expr>

await <Promise>.yield_value(<expr>)

C++ Russia 2016 Coroutines 95

STL looks like the machine language macro library of an anally retentive assembly language programmer

Pamela Seymour, Leiden University

C++ Russia 2016 Coroutines 96

C++ Coroutines: Layered complexity

• Everybody• Safe by default, novice friendly

Use coroutines and awaitables defined by standard library, boost and other high quality libraries

• Power Users• Define new awaitables to customize await for their

environment using existing coroutine types• Experts

• Define new coroutine types

C++ Russia 2016 Coroutines 97

Thank you!

Kavya Kotacherry, Daveed Vandevoorde, Richard Smith, Jens Maurer, Lewis Baker, Kirk Shoop, Hartmut Kaiser, Kenny Kerr, Artur Laksberg, Jim

Radigan, Chandler Carruth, Gabriel Dos Reis, Deon Brewis, Jonathan Caves, James McNellis, Stephan T. Lavavej, Herb Sutter, Pablo Halpern,

Robert Schumacher, Viktor Tong, Geoffrey Romer, Michael Wong, Niklas Gustafsson, Nick Maliwacki, Vladimir Petter, Shahms King, Slava

Kuznetsov, Tongari J, Lawrence Crowl, Valentin Isac and many more who contributed

C++ Russia 2016 Coroutines 98

Coroutines – a negative overhead abstraction

• Proposal is working through C++ standardization committee (C++17?)

• Experimental implementation in VS 2015 RTM• Clang implementation is in progress• more details:

• http://www.open-std.org/JTC1/SC22/WG21/docs/papers/2016/P0057R2.pdf

C++ Russia 2016 Coroutines 99

Questions?