fastr+apache flink
Post on 23-Jan-2018
990 Views
Preview:
TRANSCRIPT
FastR + Apache FlinkEnabling distributed data
processing in R
<juan.fumero@ed.ac.uk>
Disclaimer
This is a work in progress
Outline
1. Introduction and motivation○ Flink and R○ Oracle Truffle + Graal
2. FastR and Flink○ Execution Model○ Memory Management
3. Preliminary Results4. Conclusions and Future Work5. [DEMO] within a distributed configuration
Introduction and motivation
Why R?
http://spectrum.ieee.org
R is one of the most popular languages for data analytics
FunctionalLazyObject OrientedDynamic
R is:
But, R is slow
Benchmark
Slo
wdo
wn
com
pare
d to
C
Problem
It is neither fast nor distributed
Problem
It is neither fast nor distributed
Apache Flink● Framework for distributed stream and batch data processing ● Custom scheduler● Custom optimizer for operations● Hadoop/Amazon plugins ● YARN connector for cluster execution● It runs on laptops, clusters and supercomputers with the same code● High level APIs: Java, Scala and Python
Flink Example, Word Countpublic class LineSplit … { public void flatMap(...) { for (String token : value.split("\\W+")) if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1); }}… DataSet<String> textInput = env.readTextFile(input); DataSet<Tuple2<String, Integer>> counts= textInput.flatMap(new LineSplit())
.groupBy(0)
.sum(1);
JobManager
TaskManager
TaskManager
Flink Client And Optimizer
We propose a compilation approach built within Truffle/Graal for distributed computing on top of Apache Flink.
Our SolutionHow can we enable distributed data processing in R exploiting Flink?
FastR on Top of Flink
* Flink Material
FastR
Truffle/Graal Overview
How to Implement Your Own Language?
Oracle Labs material
Truffle/Graal Infrastructure
Oracle Labs material
Truffle node specialization
Oracle Labs material
Deoptimization and rewriting
Oracle Labs material
More Details about Truffle/Graalhttps://wiki.openjdk.java.net/display/Graal/Publications+and+Presentations
[Thomas Würthinger, Christian Wimmer, Andreas Wöß, Lukas Stadler, Gilles Duboscq, Christian Humer, Gregor Richards, Doug Simon, Mario Wolczko]
One VM to Rule Them All
In Proceedings of Onward!, 2013.
OpenJDK Graal - New JIT for Java ● Graal Compiler is a JIT compiler for Java which is written in Java● The Graal VM is a modification of the HotSpot VM -> It replaces the client and server compilers with
the Graal compiler.● Open Source: http://openjdk.java.net/projects/graal/
FastR + Flink: implementation
Our Solution: FastR - Flink Compiler● Our goal:
○ Run R data processing applications in a distributed system as easy as possible
● Approach○ Custom FastR-Flink compiler builtins (library) ○ Minimal R/Flink setup and configuration ○ “Offload” hard computation if the cluster is available
FastR-Flink API Example - Word Count # Word Count in R
bigText <- flink.readTextFile("hdfs://192.168.1.10/user/juan/quijote.txt")
createTuples <- function(text) {
words <- strsplit(text, " ")[[1]]
tuples = list()
for (w in words) {
tuples[[length(tuples)+1]] = list(w, 1)
}
return (tuples)
}
splitText <- flink.flatMap(bigText, createTuples)
groupBy <- flink.groupBy(splitText, 0)
count <- flink.sum(groupBy, 1)
hdfsPath <- "hdfs://192.168.1.10/user/juan/newQUIJOTE.txt"
flink.writeAsText(count, hdfsPath)
Supported Operations Operation Description
flink.sapply Local and remove apply (blocking)
flink.execute Execute previous operation (blocking)
flink.collect Execute the operations (blocking)
flink.map Apply local/remove map (non-blocking)
flink.sum Sum arrays (non-blocking)
flink.groupBy Group tuples (non-blocking)
Supported Operations Operation Description
flink.reduce Reduction operation (blocking and non-blocking versions)
flink.readTextFile Read from HDFS (non-blocking)
flink.filter Filter data according to the function (blocking and non-blocking)
flink.arrayMap Map with bigger chunks per Flink thread
flink.connectToJobManager Establish the connection with the main server
flink.setParallelism Set the parallelism degree for future Flink operations
Assumptions ● Distributed code with no side effects allowed → better for parallelism ● R is mostly a functional programming language
alpha <- 0.01652g <- function(x) { …
if (x < 10) {alpha <- 123.46543
} … }
Restrictions● Apply R function which change the data type
f: Integer → (String | Double)
f <- function(x) {if (x < 10) {
return("YES")}else {
return(0.123)}
}
> f(2)[1] “YES” # String> f(100)[1] 0.123 # Double
In FastR + Flink we infered the data type based on the first execution.
We can not change the type with Flink
FastR + Flink : Execution model
Cool, so how does it work?
Where R you?
ipJobManager <- "192.168.1.10"
flink.connectToJobManager(ipJobManager)
flink.setParallelism(2048)
userFunction <- function(x) {
x * x
}
result <- flink.sapply(1:10000, userFunction)
VMs cluster organization
FastR+ GraalVM
TaskManager
TaskManager
FastR + GraalVM
FastR + GraalVM
R Source Code
R Source Code
JobManager
FastR-Flink Execution Workflow
1a) Identify the skeleton
map < − f l i n k . sapply ( input , f )
switch (builtin) { … case "flink.sapply": return new FlinkFastRMap(); … }
R User Program:
FastR-Flink Compiler:
Parallel Map Node
1b ) Create the JAR File
public static ArrayList<String> compileDefaultUserFunctions() {
for (String javaSource : ClassesToCompile) { JavaCompileUtils.compileJavaClass(...) ; String jarFileName = ... String jarPath = FastRFlinkUtils.buildTmpPathForJar(jarFileName); jarFiles.add(jarPath); JavaCompileUtils.createJarFile(...); } jarFiles.add("fastr.jar"); return jarFiles; }
Jar compilation for User Defined Function:
2-3. Data Type Inference and Flink Objects
private static <T> MapOperator<?, ?> remoteApply(Object input, RFunction function, RAbstractVector[] args, String[] scopeVars, String returns) {
Object[] argsPackage = FastRFlinkUtils.getArgsPackage(nArgs, function, (RAbstractVector) input, args, argsName);firstOutput = function.getTarget().call(argsPackage);
returnsType = inferOutput(firstOutput);ArrayList<T> arrayInput = prepareArrayList(returnsType, input, args);
MapOperator<?, ?> mapOperator = computation(arrayInput, codeSource, args, argsName, returnsType, scopeVars, frame);
MetadataPipeline mapPending = new MetadataPipeline(input, firstOutput, returns);mapPending.setIntermediateReturnType(returnsType);FlinkPromises.insertOperation(Operation.REMOTE_MAP, mapPending);return mapOperator;
}
4. Code Distribution
ArrayList<String> compileUDF = FlinkUDFCompilation.compileDefaultUserFunctions();
RemoteEnvironment fastREnv = new RemoteEnvironment();fastREnv.createRemoteEnvironment(FlinkJobManager.jobManagerServer, FLINK_PORT, jarFiles);
Send the JAR files:
The Flink UDF Class receives also the String which represents the R User code:
inputSet.map(new FastRFlinkMap(codeSource)).setParallelism(parallelism)).returns(returns);
5. Data Transformation
RVector Flink DataSets
Flink DataSets preparation
RVector in FastR are not serializable → Boxing and Unboxing are needed
Paper: “Runtime Code Generation and Data Management for Heterogeneous Computing in Java”
Creating a view for the data:
6. FastR-Flink Execution, JIT Compilation
@Override public R map(T t) throws Exception {
if (!setup) { setup(); }
Object[] argsFunction = composeArgsForRFunction(t); Object[] argumentsPackage = RArguments.create(userFunction, null, null, 0, argsFunction, rArguments, null); Object result = target.call(argumentsPackage); return (R) new Tuple2<>(t.getField(0), result);}
6. FastR-Flink Execution, JIT Compilation ● Each Flink execution thread has a copy of
the R code● It builds the AST and start the
interpretation using Truffle● After a while it will reach the compiled
code (JIT) ● The language context (RContext) is
shared among the threads● Scope variables are passed by broadcast
7. Data Transformation - Get the result
RVectorFlink DataSets
Build RVectors:
The R user gets the final results within the right R data representation
Flink Web Interface ● It works with FastR!!!
● Track Jobs● Check history● Check configuration
But, Java trace.
● Maybe not very useful for R users.
Memory Management
And Memory Management on the TaskManager?
70% of the heap for hashing by default for Apache Flink
Solution: taskmanager.memory.fraction:0.5, in the config file.
User Space: ● Graal Compilation Threads● Truffle Compilation Threads● R Interpreter● R memory space
Some Questions ● Is there any heuristic to know the ideal heap distribution? ● Is there any way to format the name of the operations for the Flink Web
Interface? R functions names? Custom names for easy track● Is there any way to attach custom initialization code into TaskManager?
Preliminary Results
Evaluation Setup● Shared memory:
○ Big server machine ■ Xeon(R) 32 real cores■ Heap: 12GB■ FastR compiled with OpenJDK 1.8_60
● 4 Benchmarks:○ PI approximation○ Binary Trees○ Binomial○ Nested-Loops (high computation)
Preliminary Results
Preliminary Results
And Distributed Memory?● It works (I will show you in demo in a bit) ● Some performance issues concerning:
○ Multi-thread context issues. ○ Each Truffle language has a context initialized within the application○ It will need pre-initialization within the Flink TaskManager
● This is work in progress
Conclusions and Future work
Conclusions
● Prototype: R implementation with + some Flink operations included in the compiler
● FastR-Flink in the compiler● Easy installation and easy programmability● Good speedups in shared memory for high computation● Investigating distributed memory to increase speedups
Future Work● A world to explore:
○ Solve distribute performance issues○ More realistic benchmarks (big data)○ Distributed benchmarks○ Support more Flink operations ○ Support Flink for other Truffle languages using same approach○ Define a stable API for high level programming languages○ Optimisations (caching) ○ GPU support based on our previous work [1]
[1] Juan Fumero, Toomas Remmelg, Michel Steuwer and Christophe Dubach. Runtime Code Generation and Data Management for Heterogeneous Computing in Java. 2015 International Conference on Principles and Practices of Programming on the Java Platform
Check it out! It is Open SourceClone and compile:
$ mx sclone https://bitbucket.org/allr/fastr-flink
$ hg update -r rflink-0.1
$ mx build
Run:
$ mx R # start the interpreter with Flink local environment
Thanks for your attention
> flink.ask(questions)
AND DEMO !
Contact: Juan Fumero <juan.fumero@ed.ac.uk>
Thanks to Oracle Labs, TU Berlin and The University of Edinburgh
top related