hpx: c++11 runtime система для параллельных и распределённых...

55
HPX 1 C++11 runtime system for parallel and distributed computing

Upload: platonov-sergey

Post on 15-Apr-2017

601 views

Category:

Software


2 download

TRANSCRIPT

Page 1: HPX: C++11 runtime система для параллельных и распределённых вычислений

HPX

1

C++11 runtime system for parallel and distributed computing

Page 2: HPX: C++11 runtime система для параллельных и распределённых вычислений

HPX

2HPX — Runtime System for Parallel and Distributed Computing

• Theoretical foundation — ParalleX • C++ conformant API • asynchronous • unified syntax for remote and local operations

• https://github.com/stellar-group/hpx

Page 3: HPX: C++11 runtime система для параллельных и распределённых вычислений

What is a future?

3HPX — Runtime System for Parallel and Distributed Computing

• Enables transparent synchronization with producer • Hides thread notion • Makes asynchrony manageable • Allows composition of several asynchronous operations (C++17) • Turns concurrency into parallelism

future<T>

empty value exception

Page 4: HPX: C++11 runtime система для параллельных и распределённых вычислений

What is a future?

4HPX — Runtime System for Parallel and Distributed Computing

= async(…);

executing another thread fut.get();

suspending consumer

resuming consumerreturning result

producing result

Consumer Producer

fut

Page 5: HPX: C++11 runtime система для параллельных и распределённых вычислений

hpx::future & hpx::async

5HPX — Runtime System for Parallel and Distributed Computing

• lightweight tasks - user level context switching - each task has its own stack

• task scheduling - work stealing between cores - user-defined task queue (fifo, lifo, etc.) - enabling use of executors

Page 6: HPX: C++11 runtime система для параллельных и распределённых вычислений

Extending the future (N4538)

6HPX — Runtime System for Parallel and Distributed Computing

• future initialization template <class T>future<T> make_ready_future(T&& value);

• result availability bool future<T>::is_ready() const;

Page 7: HPX: C++11 runtime система для параллельных и распределённых вычислений

Extending the future (N4538)

7HPX — Runtime System for Parallel and Distributed Computing

• sequential composition template <class Cont>future<result_of_t<Cont(T)>>future<T>::then(Cont&&);

“Effects:— The function creates a shared state that is associated with the returned future object. Additionally, when the object's shared state is ready, the continuation is called on an unspecified thread of execution… — Any value returned from the continuation is stored as the result in the shared state of the resulting future.”

Page 8: HPX: C++11 runtime система для параллельных и распределённых вычислений

Extending the future (N4538)

8HPX — Runtime System for Parallel and Distributed Computing

• sequential composition: HPX extension template <class Cont>future<result_of_t<Cont(T)>> future<T>::then(hpx::launch::policy, Cont&&);

template <class Exec, class Cont>future<result_of_t<Cont(T)>> future<T>::then(Exec&, Cont&&);

Page 9: HPX: C++11 runtime система для параллельных и распределённых вычислений

Extending the future (N4538)

9HPX — Runtime System for Parallel and Distributed Computing

• parallel composition template <class InputIt>future<vector<future<T>>> when_all(InputIt first, InputIt last);

template <class... Futures>future<tuple<future<Futures>...>> when_all(Futures&&… futures);

Page 10: HPX: C++11 runtime система для параллельных и распределённых вычислений

Extending the future (N4538)

10HPX — Runtime System for Parallel and Distributed Computing

• parallel composition template <class InputIt>future<when_any_result<vector<future<T>>>> when_any(InputIt first, InputIt last);

template <class... Futures>future<when_any_result<tuple<future<Futures>...>>> when_any(Futures&&... futures);

Page 11: HPX: C++11 runtime система для параллельных и распределённых вычислений

Extending the future (N4538)

11HPX — Runtime System for Parallel and Distributed Computing

• parallel composition: HPX extension template <class InputIt>future<when_some_result<vector<future<T>>>> when_some(size_t n, InputIt f, InputIt l);

template <class... Futures>future<when_some_result<tuple<future<Futures>...>>> when_some(size_t n, Futures&&... futures);

Page 12: HPX: C++11 runtime система для параллельных и распределённых вычислений

Futurization?

12HPX — Runtime System for Parallel and Distributed Computing

• delay direct execution in order to avoid synchronization

• code no longer executes result but generates an execution tree representing the original algorithm

T foo(…){}rvalue T res = foo(…)

future<T> foo(…){}make_ready_future(rvalue)future<T> res = async(foo, …)

Page 13: HPX: C++11 runtime система для параллельных и распределённых вычислений

Example: recursive digital filter

13HPX — Runtime System for Parallel and Distributed Computing

y[n] = a0x[n] + a1x[n� 1] + a2x[n� 2] + a3x[n� 3] + ...

+b1y[n� 1] + b2y[n� 2] + b3y[n� 3] + ...

• generic recursive filter

Page 14: HPX: C++11 runtime система для параллельных и распределённых вычислений

Example: recursive digital filter

14HPX — Runtime System for Parallel and Distributed Computing

y[n] = a0x[n] + a1x[n� 1] + a2x[n� 2] + a3x[n� 3] + ...

+b1y[n� 1] + b2y[n� 2] + b3y[n� 3] + ...

• generic recursive filter

• single-pole high-pass filtery[n] = a0x[n] + a1x[n� 1] + b1y[n� 1]

a0 = 0.93

a1 = �0.93

b1 = 0.86

Page 15: HPX: C++11 runtime система для параллельных и распределённых вычислений

Example: single-pole recursive filter

15HPX — Runtime System for Parallel and Distributed Computing

// y(n) = b(2)*y(n-1) + a(0)*x(n) + a(1)*x(n-1); double filter(const std::vector<double>& x, size_t n){ double yn_1 = n ? filter(x, n - 1) : 0. ;

return (b1 * yn_1 ) + (a0 * x[n]) + (a1 * x[n-1]); ;}

Page 16: HPX: C++11 runtime система для параллельных и распределённых вычислений

Example: futurized single-pole recursive filter

16HPX — Runtime System for Parallel and Distributed Computing

// y(n) = b(2)*y(n-1) + a(0)*x(n) + a(1)*x(n-1);future<double> filter(const std::vector<double>& x, size_t n){ future<double> yn_1 = n ? async(filter, std::ref(x), n - 1) : make_ready_future(0.);

return yn_1.then( [&x, n](future<double>&& yn_1) { return (b1 * yn_1.get()) + (a0 * x[n]) + (a1 * x[n-1]); });}

Page 17: HPX: C++11 runtime система для параллельных и распределённых вычислений

Example: narrow band-pass filter

17HPX — Runtime System for Parallel and Distributed Computing

y(n) = a0 ⇤ x(n) + a1 ⇤ x(n� 1) + a2 ⇤ x(n� 2)+

b1 ⇤ y(n� 1) + b2 ⇤ y(n� 2);

BW = 0.033

f = 0.2

a0 = 0.092

a1 = 0.004

a2 = �0.096

b1 = 0.556

b2 = �0.811

Page 18: HPX: C++11 runtime система для параллельных и распределённых вычислений

Example: narrow band-pass filter

18HPX — Runtime System for Parallel and Distributed Computing

// y(n) = b(1)*y(n-1) + b(2)*y(n-2) +// a(0)*x(n) + a(1)*x(n-1) + a(2)*x(n-2);doublefilter(const std::vector<double>& x, size_t n){ double yn_1 = n > 1 ? filter(x, n - 1) : 0.; double yn_2 = n > 1 ? filter(x, n - 2) : 0.;

return (b1 * yn_1) + (b2 * yn_2) + (a0 * x[n]) + (a1 * x[n-1]) + (a2 * x[n-2]);}

Page 19: HPX: C++11 runtime система для параллельных и распределённых вычислений

Example: futurized narrow band-pass filter

19HPX — Runtime System for Parallel and Distributed Computing

// y(n) = b(1)*y(n-1) + b(2)*y(n-2) +// a(0)*x(n) + a(1)*x(n-1) + a(2)*x(n-2);future<double>filter(const std::vector<double>& x, size_t n){ future<double> yn_1 = n > 1 ? async(filter, std::ref(x), n - 1) : make_ready_future(0.); future<double> yn_2 = n > 1 ? filter(x, n - 2) : make_ready_future(0.);

return when_all(yn_1, yn_2).then(...);

}

Page 20: HPX: C++11 runtime система для параллельных и распределённых вычислений

Example: futurized narrow band-pass filter

20HPX — Runtime System for Parallel and Distributed Computing

future<double> yn_1 = ... future<double> yn_2 = ... return when_all(yn_1, yn_2).then( [&x, n](future<tuple<future<double>, future<double>>> val) { auto unwrapped = val.get(); auto yn_1 = get<0>(unwrapped).get(); auto yn_2 = get<1>(unwrapped).get();

return (b1 * yn_1) + (b2 * yn_2) + (a0 * x[n]) + (a1 * x[n-1]) + (a2 * x[n-2]); });

Page 21: HPX: C++11 runtime система для параллельных и распределённых вычислений

Example: futurized narrow band-pass filter

21HPX — Runtime System for Parallel and Distributed Computing

future<double> yn_1 = ... future<double> yn_2 = ... return async( [&x, n](future<double> yn_1, future<double> yn_2) { return (b1 * yn_1.get()) + (b2 * yn_2.get()) + (a0 * x[n]) + (a1 * x[n-1]) + (a2 * x[n-2]); }, std::move(yn_1), std::move(yn_2));

Page 22: HPX: C++11 runtime система для параллельных и распределённых вычислений

Example: futurized narrow band-pass filter

22HPX — Runtime System for Parallel and Distributed Computing

future<double> yn_1 = ... future<double> yn_2 = ... return dataflow( [&x, n](future<double> yn_1, future<double> yn_2) { return (b1 * yn_1.get()) + (b2 * yn_2.get()) + (a0 * x[n]) + (a1 * x[n-1]) + (a2 * x[n-2]); }, std::move(yn_1), std::move(yn_2));

Page 23: HPX: C++11 runtime система для параллельных и распределённых вычислений

Example: futurized narrow band-pass filter

23HPX — Runtime System for Parallel and Distributed Computing

future<double> yn_1 = ... future<double> yn_2 = ... return (b1 * await yn_1) + (b2 * await yn_2) + (a0 * x[n]) + (a1 * x[n-1]) + (a2 * x[n-2]);

Page 24: HPX: C++11 runtime система для параллельных и распределённых вычислений

Example: filter execution time for

24HPX — Runtime System for Parallel and Distributed Computing

filter_serial: 1.42561

filter_futurized: 54.9641

y(35)

Page 25: HPX: C++11 runtime система для параллельных и распределённых вычислений

Example: narrow band-pass filter

25HPX — Runtime System for Parallel and Distributed Computing

future<double>filter(const std::vector<double>& x, size_t n){ if (n < threshold) return make_ready_future(filter_serial(x, n));

future<double> yn_1 = n > 1 ? async(filter, std::ref(x), n - 1) : make_ready_future(0.); future<double> yn_2 = n > 1 ? filter(x, n - 2) : make_ready_future(0.);

return dataflow(...);}

Page 26: HPX: C++11 runtime система для параллельных и распределённых вычислений

Example: futurized narrow band-pass filter

26HPX — Runtime System for Parallel and Distributed Computing

rela

tive

time

0.01

0.1

1

10

100

Threshold

futurized serial

y(35)

Page 27: HPX: C++11 runtime система для параллельных и распределённых вычислений

Futures on distributed systems

Page 28: HPX: C++11 runtime система для параллельных и распределённых вычислений

Futures on distributed systems

28HPX — Runtime System for Parallel and Distributed Computing

int calculate();

void foo(){

std::future<int> result = std::async(calculate); ... std::cout << result.get() << std::endl; ...}

Page 29: HPX: C++11 runtime система для параллельных и распределённых вычислений

Futures on distributed systems

29HPX — Runtime System for Parallel and Distributed Computing

int calculate();

void foo(){

hpx::future<int> result = hpx::async(calculate); ... std::cout << result.get() << std::endl; ...}

Page 30: HPX: C++11 runtime система для параллельных и распределённых вычислений

Futures on distributed systems

30HPX — Runtime System for Parallel and Distributed Computing

int calculate();HPX_PLAIN_ACTION(calculate, calculate_action);

void foo(){

hpx::future<int> result = hpx::async(calculate); ... std::cout << result.get() << std::endl; ...}

Page 31: HPX: C++11 runtime система для параллельных и распределённых вычислений

Futures on distributed systems

31HPX — Runtime System for Parallel and Distributed Computing

int calculate();HPX_PLAIN_ACTION(calculate, calculate_action);

void foo(){ hpx::id_type where = hpx::find_remote_localities()[0]; hpx::future<int> result = hpx::async(calculate); ... std::cout << result.get() << std::endl; ...}

Page 32: HPX: C++11 runtime система для параллельных и распределённых вычислений

Futures on distributed systems

32HPX — Runtime System for Parallel and Distributed Computing

int calculate();HPX_PLAIN_ACTION(calculate, calculate_action);

void foo(){ hpx::id_type where = hpx::find_remote_localities()[0]; hpx::future<int> result = hpx::async(calculate_action{}, where); ... std::cout << result.get() << std::endl; ...}

Page 33: HPX: C++11 runtime система для параллельных и распределённых вычислений

Futures on distributed systems

33HPX — Runtime System for Parallel and Distributed Computing

Locality 1 Locality 2

future.get();

future

call to hpx::async(…);

Page 34: HPX: C++11 runtime система для параллельных и распределённых вычислений

Futures on distributed systems

34HPX — Runtime System for Parallel and Distributed Computing

namespace boost { namespace math { template <class T1, class T2> some_result_type cyl_bessel_j(T1 v, T2 x);}}

Jv(x) = (1

2z)v

1X

k=0

(� 14z

2)k

k!�(v + k + 1))

Page 35: HPX: C++11 runtime система для параллельных и распределённых вычислений

Futures on distributed systems

35HPX — Runtime System for Parallel and Distributed Computing

namespace boost { namespace math { template <class T1, class T2> some_result_type cyl_bessel_j(T1 v, T2 x);}}

namespace boost { namespace math { template <class T1, class T2> struct cyl_bessel_j_action: hpx::actions::make_action< some_result_type (*)(T1, T2), &cyl_bessel_j<T1, T2>, cyl_bessel_j_action<T1, T2> > {};}}

Page 36: HPX: C++11 runtime система для параллельных и распределённых вычислений

Futures on distributed systems

36HPX — Runtime System for Parallel and Distributed Computing

int main(){ boost::math::cyl_bessel_j_action<double, double> bessel_action;

std::vector<hpx::future<double>> res;

for (const auto& loc : hpx::find_all_localities()) res.push_back( hpx::async(bessel_action, loc, 2., 3.);}

Page 37: HPX: C++11 runtime система для параллельных и распределённых вычислений

HPX task invocation overview

37HPX — Runtime System for Parallel and Distributed Computing

R f(p…) Synchronous(returns R)

Asynchronous(returns future<R>)

Fire & forget(return void)

Functions f(p…); async(f, p…); apply(f, p…);

Actions HPX_ACTION(f, a);a{}(id, p…);

HPX_ACTION(f, a);async(a{}, id, p…);

HPX_ACTION(f, a);apply(a{}, id, p…);

C++C++ stdlib

HPX

Page 38: HPX: C++11 runtime система для параллельных и распределённых вычислений

Writing an HPX component

38HPX — Runtime System for Parallel and Distributed Computing

struct remote_object{ void apply_call();};

int main(){ remote_object obj{some_locality}; obj.apply_call();}

Page 39: HPX: C++11 runtime система для параллельных и распределённых вычислений

Writing an HPX component

39HPX — Runtime System for Parallel and Distributed Computing

struct remote_object_component: hpx::components::simple_component_base< remote_object_component>{ void call() const { std::cout << "hey" << std::endl; } HPX_DEFINE_COMPONENT_ACTION( remote_object_component, call, call_action);};

Page 40: HPX: C++11 runtime система для параллельных и распределённых вычислений

Writing an HPX component

40HPX — Runtime System for Parallel and Distributed Computing

struct remote_object_component: hpx::components::simple_component_base< remote_object_component>{ void call() const { std::cout << "hey" << std::endl; } HPX_DEFINE_COMPONENT_ACTION( remote_object_component, call, call_action);};

HPX_REGISTER_COMPONENT(remote_object_component);HPX_REGISTER_ACTION(remote_object_component::call_action);

Page 41: HPX: C++11 runtime система для параллельных и распределённых вычислений

Writing an HPX component

41HPX — Runtime System for Parallel and Distributed Computing

struct remote_object_component;

int main(){ hpx::id_type where = hpx::find_remote_localities()[0];

hpx::future<hpx::id_type> remote = hpx::new_<remote_object_component>(where);

//prints hey on second locality hpx::apply(call_action{}, remote.get());}

Page 42: HPX: C++11 runtime система для параллельных и распределённых вычислений

Writing an HPX client for component

42HPX — Runtime System for Parallel and Distributed Computing

struct remote_object: hpx::components::client_base< remote_object, remote_object_component>{ using base_type = ...;

remote_object(hpx::id_type where): base_type{ hpx::new_<remote_object_component>(where)} {}

void apply_call() const { hpx::apply(call_action{}, get_id()); }};

Page 43: HPX: C++11 runtime система для параллельных и распределённых вычислений

Writing an HPX client for component

43HPX — Runtime System for Parallel and Distributed Computing

int main(){ hpx::id_type where = hpx::find_remote_localities()[0];

remote_object obj{where}; obj.apply_call();

return 0;}

Page 44: HPX: C++11 runtime система для параллельных и распределённых вычислений

Writing an HPX client for component

44HPX — Runtime System for Parallel and Distributed Computing

Locality 1 Locality 2

Global Address Space

struct remote_object_component: simple_component_base<…>

struct remote_object: client_base<…>

Page 45: HPX: C++11 runtime система для параллельных и распределённых вычислений

Writing multiple HPX clients

45HPX — Runtime System for Parallel and Distributed Computing

int main(){ std::vector<hpx::id_type> locs = hpx::find_all_localities();

std::vector<remote_object> objs { locs.cbegin(), locs.cend()};

for (const auto& obj : objs) obj.apply_call();}

Page 46: HPX: C++11 runtime система для параллельных и распределённых вычислений

Writing multiple HPX clients

46HPX — Runtime System for Parallel and Distributed Computing

Locality 1 Locality 2 Locality N

Global Address Space

Page 47: HPX: C++11 runtime система для параллельных и распределённых вычислений

HPX: distributed point of view

47HPX — Runtime System for Parallel and Distributed Computing

Page 48: HPX: C++11 runtime система для параллельных и распределённых вычислений

HPX parallel algorithms

48HPX — Runtime System for Parallel and Distributed Computing

Page 49: HPX: C++11 runtime система для параллельных и распределённых вычислений

HPX parallel algorithms

49HPX — Runtime System for Parallel and Distributed Computing

template<class ExecutionPolicy, class InputIterator, class Function>void for_each(ExecutionPolicy&& exec, InputIterator first, InputIterator last, Function f);

• Execution policy sequential_execution_policy parallel_execution_policy parallel_vector_execution_policy

hpx(std)::parallel::seqhpx(std)::parallel::parhpx(std)::parallel::par_vec

Page 50: HPX: C++11 runtime система для параллельных и распределённых вычислений

HPX parallel algorithms

50HPX — Runtime System for Parallel and Distributed Computing

template<class ExecutionPolicy, class InputIterator, class Function>void for_each(ExecutionPolicy&& exec, InputIterator first, InputIterator last, Function f);

• Execution policy sequential_execution_policy parallel_execution_policy parallel_vector_execution_policy sequential_task_execution_policy parallel_task_execution_policy

hpx::parallel::seq(task)hpx::parallel::par(task)

HPX

hpx(std)::parallel::seqhpx(std)::parallel::parhpx(std)::parallel::par_vec

Page 51: HPX: C++11 runtime система для параллельных и распределённых вычислений

HPX map reduce algorithm example

51HPX — Runtime System for Parallel and Distributed Computing

template <class T, class Mapper, class Reducer>T map_reduce(const std::vector<T>& input, Mapper mapper, Reducer reducer){

// ???

}

Page 52: HPX: C++11 runtime система для параллельных и распределённых вычислений

HPX map reduce algorithm example

52HPX — Runtime System for Parallel and Distributed Computing

template <class T, class Mapper, class Reducer>T map_reduce(const std::vector<T>& input, Mapper mapper, Reducer reducer){ std::vector<T> temp(input.size()); std::transform(std::begin(input), std::end(input), std::begin(temp), mapper);

return std::accumulate(std::begin(temp), std::end(temp), T{}, reducer);}

Page 53: HPX: C++11 runtime система для параллельных и распределённых вычислений

HPX map reduce algorithm example

53HPX — Runtime System for Parallel and Distributed Computing

template <class T, class Mapper, class Reducer>future<T> map_reduce(const std::vector<T>& input, Mapper mapper, Reducer reducer){ auto temp = std::make_shared<std::vector>( input.size()); auto mapped = transform(par(task), std::begin(input), std::end(input), std::begin(*temp), mapper); return mapped.then( [temp, reducer](auto) { return reduce(par(task), std::begin(*temp), std::end(*temp), T{}, reducer); });}

Page 54: HPX: C++11 runtime система для параллельных и распределённых вычислений

HPX map reduce algorithm example

54HPX — Runtime System for Parallel and Distributed Computing

template <class T, class Mapper, class Reducer>future<T> map_reduce(const std::vector<T>& input, Mapper mapper, Reducer reducer){ using namespace hpx::parallel;

return transform_reduce(par(task), std::begin(input), std::end(input), mapper, T{}, reducer);}

Page 55: HPX: C++11 runtime система для параллельных и распределённых вычислений

Thank you for your attention!

HPX — Runtime System for Parallel and Distributed Computing

• https://github.com/stellar-group/hpx