using spark 1.2 with java 8 and cassandra

Post on 14-Jul-2015

324 Views

Category:

Software

9 Downloads

Preview:

Click to see full reader

TRANSCRIPT

Spark overviewUsing Spark 1.2 with Java 8 and Cassandra

by Denis Dus

Spark

Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala and Python, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

Components

1. Driver programOur main program, which connects to Spark cluster through SparkContext object, submits transformations and actions on RDD

2. Cluster managerAllocates resources across applications (e.g. standalone manager, Mesos, YARN)

3. Worker nodeExecutor - A process launched for an application on a worker node, that runs tasks and keeps data in memory or disk storage across them.

Task - A unit of work that will be sent to one executor

Spark RDD

Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a HadoopInputFormat.

RDD Operations

Spark Stages

Shared variables in Spark

Spark provides two limited types of shared variables for two common usage patterns: broadcast variables and accumulators.

• Broadcast Variables

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

• Accumulators

Accumulators are variables that are only “added” to through an associative operation and can therefore be efficiently supported in parallel.

Spark natively supports accumulators of numeric types, and programmers can add support for new types. If accumulators are created with a name, they will be displayed in Spark’s UI. This can be useful for understanding the progress of running stages.

Spark application workflow

Building a simple Spark application

SparkConf sparkConf = new SparkConf().setAppName("SparkApplication").setMaster("local[*]");JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

JavaRDD<String> file = sparkContext.textFile("hdfs://...");

JavaRDD<String> words = file.flatMap(new FlatMapFunction<String, String>() {public Iterable<String> call(String s) {

return Arrays.asList(s.split(" ")); }

});

JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {public Tuple2<String, Integer> call(String s) {

return new Tuple2<String, Integer>(s, 1); }

});

JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer>() {public Integer call(Integer a, Integer b) {

return a + b;}

});

counts.saveAsTextFile("hdfs://...");

sparkContext.close();

Java 8 + Spark 1.2 + Cassandra for BI:Driver program skeleton

SparkConf sparkConf = new SparkConf()

.setAppName("SparkCassandraTest")

.setMaster("local[*]")

.set("spark.cassandra.connection.host", "127.0.0.1");

JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

CassandraLoader<UserEvent> cassandraLoader = new CassandraLoader<>(sparkContext, "dataanalytics", "user_events", UserEvent.class);

JavaRDD<UserEvent> rdd = cassandraLoader.fetchAndUnion(venues, startDate, endDate);

… Events processing here …

sparkContext.close();

Java 8 + Spark 1.2 + Cassandra for BI:Load events from Cassandra

public class CassandraLoader<T> {private JavaSparkContext sparkContext;private String keySpace;private String tableName;private Class<T> clazz;

…private CassandraJavaRDD<T> fetchForVenueAndDateShard (String venueId, String dateShard) {

RowReaderFactory<T> mapper = CassandraJavaUtil.mapRowTo(clazz);

return CassandraJavaUtil.javaFunctions(sparkContext). // SparkContextJavaFunctions appears herecassandraTable(keySpace, tableName, mapper). // CassandraJavaRDD appears herewhere("venue_id=? AND date_shard=?", venueId, dateShard);

}…}

CassandraJavaUtilThe main entry point to Spark Cassandra Connector Java API. Builds useful wrappers around Spark Context, Streaming Context, RDD.

SparkContextJavaFunctions -> CassandraJavaRDD<T> cassandraTable (String keyspace, String table, RowReaderFactory<T> rrf)Returns a view of a Cassandra table. With this method, each row is converted to a object of type T by a specified row reader factory.

CassandraJavaUtil -> RowReaderFactory<T> mapRowTo(Class<T> targetClass, Pair<String, String>... columnMappings)Constructs a row reader factory which maps an entire row to an object of a specified type (JavaBean style convention).The default mapping of attributes to column names can be changed by providing a custom map of attribute-column mappings for the pairs which do not follow the general convention.

CassandraJavaRDDCassandraJavaRDD<R> select(String... columnNames) CassandraJavaRDD<R> where(String cqlWhereClause, Object... args)

Java 8 + Spark 1.2 + Cassandra for BI:Load events from Cassandra

public Map<String, JavaRDD<T>> fetchByVenue(List<String> venueIds, Date startDate, Date endDate) {Map<String, JavaRDD<T>> result = new HashMap<>();List<String> dateShards = ShardingUtils.generateDailyShards(startDate, endDate);

List<CassandraJavaRDD<T>> dailyRddList = new LinkedList<>();venueIds.stream().forEach(venueId -> {

dailyRddList.clear();

dateShards.stream().forEach(dateShard -> {CassandraJavaRDD<T> rdd = fetchForVenueAndDateShard(venueId, dateShard);dailyRddList.add(rdd);

});

result.put(venueId, unionRddCollection(dailyRddList));});

return result;}

private JavaRDD<T> unionRddCollection(Collection<? extends JavaRDD<T>> rddCollection) {JavaRDD<T> result = null;for (JavaRDD<T> rdd : rddCollection) {

result = (result == null) ? rdd : result.union(rdd);}return result;

}

public JavaRDD<T> fetchAndUnion(List<String> venueIds, Date startDate, Date endDate) {Map<String, JavaRDD<T>> data = fetchByVenue(venueIds, startDate, endDate);return unionRddCollection(data.values());

}

Java 8 + Spark 1.2 + Cassandra for BI:Some processing

JavaPairRDD<String, Iterable<UserEvent>> groupedRdd = rdd.filter(event -> {

boolean result = false;

boolean isSessionEvent = TYPE_SESSION.equals(event.getEvent_type());

if (isSessionEvent) {

Map<String, String> payload = event.getPayload();

String action = payload.get(PAYLOAD_ACTION_KEY);

if (StringUtils.isNotEmpty(action)) {

result = ACTION_SESSION_START.equals(action) || ACTION_SESSION_STOP.equals(action);

}

}

return result;

}).groupBy(event -> event.getUser_id());

Java 8 + Spark 1.2 + Cassandra for BI:Some processing

JavaRDD<SessionReport> reportsRdd = groupedRdd.map(pair -> {String sessionId = pair._1();Iterable<UserEvent> events = pair._2();

Date sessionStart = null;Date sessionEnd = null;

for (UserEvent event : events) {Date eventDate = event.getDate();

if (eventDate != null) {String action = event.getPayload().get(PAYLOAD_ACTION_KEY);

if (ACTION_SESSION_START.equals(action)) {if (sessionStart == null || eventDate.before(sessionStart))

sessionStart = eventDate;}

if (ACTION_SESSION_STOP.equals(action)) {if (sessionEnd == null || endDate.after(sessionEnd))

sessionEnd = eventDate;}

}}

String sessionType = ((sessionStart != null) && (sessionEnd != null)) ? SessionReport.TYPE_CLOSED : SessionReport.TYPE_ACTIVE;

return new SessionReport(sessionId, sessionType, sessionStart, sessionEnd);});

Java 8 + Spark 1.2 + Cassandra for BI:Get result to Driver Program

List<SessionReport> reportsList = reportsRdd.collect(); // Returns RDD as a List to driver program, be aware of OOM

reportsList.forEach(Main::printReport);

….SessionReport{sessionId='36a39b8e-27b9-4560-a1c5-9bfa77679930', sessionType='closed', sessionStart=2014-08-13 21:37:38, sessionEnd=2014-08-13 21:39:12}

SessionReport{sessionId='aee19a86-e060-42fb-b34f-76cd698e483e', sessionType='closed', sessionStart=2014-07-28 17:17:21, sessionEnd=2014-07-28 19:58:12}

SessionReport{sessionId='cecc03eb-f2fb-4ed4-9354-76ec8a965d8d', sessionType='closed', sessionStart=2014-09-04 19:46:51, sessionEnd=2014-09-04 21:12:43}

SessionReport{sessionId='1bd85e46-3fe2-4d46-acc5-2fe69735c453', sessionType='closed', sessionStart=2014-08-24 15:56:54, sessionEnd=2014-08-24 15:57:55}

SessionReport{sessionId='0d4e4b9f-fbd0-4eaf-a815-4f46693dbb2b', sessionType='closed', sessionStart=2014-09-09 13:39:39, sessionEnd=2014-09-09 13:46:08}

SessionReport{sessionId='32e822a6-5835-4001-bd95-ede38746e3bd', sessionType='closed', sessionStart=2014-08-27 21:24:03, sessionEnd=2014-08-28 01:21:11}

SessionReport{sessionId='cd35f911-29f4-496a-92f0-a9f5b51b0298', sessionType='closed', sessionStart=2014-09-09 20:14:49, sessionEnd=2014-09-10 01:07:17}

SessionReport{sessionId='8941e14f-9278-4a42-b000-1a228244cbc9', sessionType='active', sessionStart=2014-09-15 16:58:39, sessionEnd=UNKNOWN}

SessionReport{sessionId='c5bf123a-2e34-4c85-a25f-a705a2d408fa', sessionType='closed', sessionStart=2014-09-10 21:20:15, sessionEnd=2014-09-10 23:58:42}

SessionReport{sessionId='4252c7fd-90c0-4a34-8ddb-8db47d68c5a6', sessionType='closed', sessionStart=2014-07-09 08:32:35, sessionEnd=2014-07-09 08:34:23}

SessionReport{sessionId='f6441966-8d6d-4f1c-801c-29201fa75fe6', sessionType='active', sessionStart=2014-08-05 20:47:14, sessionEnd=UNKNOWN}

….

The End! =)http://spark.apache.org/docs/1.2.0/index.html

top related