stratosphere intro (java and scala interface)

Download Stratosphere Intro (Java and Scala Interface)

If you can't read please download the document

Upload: robert-metzger

Post on 16-Apr-2017

4.792 views

Category:

Technology


0 download

TRANSCRIPT

Introduction to Stratosphere

Aljoscha KrettekDIMA / TU Berlin

What is this?

Distributed data processing system

DAG (Directed acyclic graph) of sources, sinks, and operators: data flow

Handles distribution, fault-tolerance, network transfer

sourcemap: split wordsreduce: count wordssink

Google: Search results, Spam FilterAmazon: RecommendationsSoundcloud: RecommendationsSpotify: RecommendationsYoutube: Recommendations, AdvertsNetflix: Recommendations, compare to Maxdome :DTwitter: Just everything :DFacebook: Adverts, GraphSearch, Friend suggestion,Filtering (for annoying friends)Instagram: They have lots of data, theres gotta be something Bioinformatik: DNA, 1TB per genom, 1000 genome

Why would I use this?

Automatic parallelization / Because you are told to

sourcemap: split wordsreduce: count wordssink

sourcemap: split wordsreduce: count wordssink

sourcemap: split wordsreduce: count wordssink

Google: Search results, Spam FilterAmazon: RecommendationsSoundcloud: RecommendationsSpotify: RecommendationsYoutube: Recommendations, AdvertsNetflix: Recommendations, compare to Maxdome :DTwitter: Just everything :DFacebook: Adverts, GraphSearch, Friend suggestion,Filtering (for annoying friends)Instagram: They have lots of data, theres gotta be something Bioinformatik: DNA, 1TB per genom, 1000 genome

So how do I use this?
(from Java)

How is data represented in the system?

How to I create data flows?

Which types of operators are there?

How do I write operators?

How do run the whole shebang?

How do I move my data?

Data is stored in fields in PactRecord

Basic data types: PactString, PactInteger, PactDouble, PactFloat, PactBoolean,

New data types must implement Value interface

PactRecord

PactRecord rec = ...PactInteger foo = rec.getField(0, PactInteger.class)int i = foo.getValue()

PactInteger foo2 = new PactInteger(3)rec.setField(1, foo2)

Creating Data Flows

Create one or several sources

Create operators:Input is/are preceding operator(s)

Specify a class/object with the operator implementation

Create one or several sinks:Input is some operator

WordCount Example Data Flow

FileDataSource source = new FileDataSource(TextInputFormat.class, dataInput, "Input Lines");

MapContract mapper = MapContract.builder(TokenizeLine.class) .input(source) .name("Tokenize Lines") .build();

ReduceContract reducer = ReduceContract.builder(CountWords.class, PactString.class, 0) .input(mapper) .name("Count Words") .build();

FileDataSink out = new FileDataSink(RecordOutputFormat.class, output, reducer, "Word Counts");RecordOutputFormat.configureRecordFormat(out) .recordDelimiter('\n') .fieldDelimiter(' ') .field(PactString.class, 0) .field(PactInteger.class, 1);

Plan plan = new Plan(out, "WordCount Example");

Operator Types

We call them second order functions (SOF)

Code inside the operator is the first order functionor user defined function (UDF)

Currently five SOFs: map, reduce, match, cogroup, cross

SOF describes how PactRecords are handed to the UDF

Map Operator

User code receives one record at a time (per call to user code function)

Not really a functional map since all operators can output an arbitrary number of records

Map Operator Example

public static class TokenizeLine extends MapStub { private final AsciiUtils.WhitespaceTokenizer tokenizer = new AsciiUtils.WhitespaceTokenizer(); private final PactRecord outputRecord = new PactRecord(); private final PactString word = new PactString(); private final PactInteger one = new PactInteger(1);

@Override public void map(PactRecord record, Collector collector) { PactString line = record.getField(0, PactString.class); this.tokenizer.setStringToTokenize(line); while (tokenizer.next(word)) { outputRecord.setField(0, word); outputRecord.setField(1, one); collector.collect(outputRecord); } }}

Reduce Operator

User code receives a group of records with same key

Must specify which fields of a record are the key

Reduce Operator Example

public static class CountWords extends ReduceStub { private final PactInteger cnt = new PactInteger(); @Override public void reduce(Iterator records, Collector out) throws Exception { PactRecord element = null; int sum = 0; while (records.hasNext()) { element = records.next(); PactInteger i = element.getField(1, PactInteger.class); sum += i.getValue(); }

cnt.setValue(sum); element.setField(1, cnt); out.collect(element); }}

Specifying the Key Fields

ReduceContract reducer = ReduceContract.builder( Foo.class, PactString.class, 0) .input(mapper) .keyField(PactInteger.class, 1) .name("Count Words") .build();

Cross Operator

Two input operator

Cartesian product: every record from left combined with every record from right

One record from left, one record from right per user code call

Implement CrossStub

Match Operator

Two input operator with keys

Join: record from left combined with every record from right with same key

Implement MatchStub

CoGroup Operator

Two input operator with keys

Records from left combined with all record from right with same key

User code gets an iterator for left and right records

Implement CoGroupStub

How to execute a data flow plan

Either use LocalExecutor:LocalExecutor.execute(plan)

Implement PlanAssembler.getPlan(String...args)And run on a local cluster or proper cluster

See: http://stratosphere.eu/quickstart/and http://stratosphere.eu/docs/gettingstarted.html

Getting Started

https://github.com/stratosphere/stratospherehttps://github.com/stratosphere/stratosphere-quickstart

And Now for Something Completely Different

val input = TextFile(textInput)

val words = input .flatMap { _.split(" ") map { (_, 1) } }

val counts = words .groupBy { case (word, _) => word } .reduce { (w1, w2) => (w1._1, w1._2 + w2._2) }

val output = counts .write(wordsOutput, CsvOutputFormat()) val plan = new ScalaPlan(Seq(output))

(Very) Short Introduction to Scala

Anatomy of a Scala Class

package foo.barimport something.elseclass Job(arg1: Int) { def map(in: Int): String = { val i: Int = in + 2 var a = Hello i.toString }}

Singletons

Similar to Java singletons and/or static methods

object Job { def main(args: String*) { println(Hello World) }}

Collections

val a = Seq(1, 2, 4) List(Hallo, 2) Array(2,3) Map(1->1, 2->2)

val b = a map { x => x + 2}val c = a map { _ + 2 }val c = a.map({ _ + 2 })

Generics and Tuples

val a: Seq[Int] = Seq(1, 2, 4)

val tup = (3, a) val tup2: (Int, String) = (3, a)

Stratosphere Scala Front End

Skeleton of a Stratosphere Program

Input: a text file/JDBC source/CSV, etc. loaded in internal representation: the DataSet

Transformations on the Dataset map, reduce, join, etc.

Output: program results in a DataSinkText file, JDBC, CSV, etc.

The Almighty DataSet

Operations are methods on DataSet[A]

Working with DataSet[A] feels like working with Scala collections

DataSet[A] is not an actual collection but represents computation on a collection

Stringing together operations creates a data flow graph that can be execute

An Important Difference

val input: List[String] = ...

val mapped = input.map { s => (s, 1) }

val input: DataSet[String] = ...

val mapped = input.map { s => (s, 1) }

val result = mapped.write(file, ...)

val plan = new Plan(result)

execute(plan)

Immediately Executed

Executed when data flow is executed

Usable Data Types

Primitive types

Tuples

Case classes

Custom data types that implement the Value interface

Creating Data Sources

val input = TextFile(file://)

val input: DataSet[(Int, String)] = DataSource(hdfs://, CsvInputFormat[(Int, String)]())

def parseInput(line: String): (Int, Int) = {}val input = DataSource(hdfs://, DelimitedInputFormat](parseInput))

Interlude: Anonymous Functions

var fun: ((Int, String)) => String = ...

fun = { t => t._2 }

fun = { _._2 }

fun = { case (i, w) => w }

Map

val input: DataSet[(Int, String)] = ...

val mapper = input .map { case (a, b) => (a + 2, b) }

val mapper2 = input .flatMap { _._2.split( ) }

val filtered = input .filter { case (a, b) => a > 3 }

Reduce

val input: DataSet[(String, Int)] = ...

val reducer = input .groupBy { case (w, _) => w } .groupReduce { _.minBy {...} }

val reducer2 = input .groupBy { case (w, _) => w } .reduce { (w1, w2) => (w1._1, w1._2 + w2._2) }

Cross

val left: DataSet[(String, Int)] = ...val right: DataSet[(String, Int)] = ...

val cross = left cross right .map { (l, r) => ... }

val cross = left cross right .flatMap { (l, r) => ... }

Join (Match)

val counts: DataSet[(String, Int)] = ...val names: DataSet[(Int, String)] = ...

val join = counts .join(right) .where {case (_,c) => c}.isEqualsTo {case (n,_) => n} .map { (l, r) => (l._1, r._2) }

val join = counts .join(right) .where {case (_,c) => c}.isEqualsTo {case (n,_) => n} .flatMap { (l, r) => ... }

CoGroup

val counts: DataSet[(String, Int)] = ...val names: DataSet[(Int, String)] = ...

val cogroup = counts .cogroup(right) .where {case (_,c) => c}.isEqualsTo {case (n,_) => n} .map { (l, r) => (l.minBy {...} , r.minBy {...}) }

val cogroup = counts .cogroup(right) .where {case (_,c) => c}.isEqualsTo {case (n,_) => n} .flatMap { (l, r) => ... }

Creating Data Sinks

val counts: DataSet[(String, Int)]

val sink = counts.write(, CsvOutputFormat())

def formatOutput(a: (String, Int)): String = { Word + a._1 + count + a._2}

val sink = counts.write(, DelimitedOutputFormat(formatOutput))

Word Count example

val input = TextFile(textInput)

val words = input .flatMap { _.split(" ") map { (_, 1) } }

val counts = words .groupBy { case (word, _) => word } .reduce { (w1, w2) => (w1._1, w1._2 + w2._2) }

val output = counts .write(wordsOutput, CsvOutputFormat()) val plan = new ScalaPlan(Seq(output))

Things not mentioned

The is support for iterations (both in Java and Scala)

Many more data source/sink formats

Look at the examples in the stratosphere source

Don't be afraid to write on mailing list and on github:http://stratosphere.eu/quickstart/scala.html

Or come directly to us

End.