big data analytics with hadoop and spark at osc...1 shameema oottikkal data application engineer...
TRANSCRIPT
1
Shameema OottikkalData Application Engineer
Ohio SuperComputer Centeremail:[email protected]
Big Data Analytics with Hadoop and Spark at OSC
09/28/2017SUG
2
Data Analytics at OSCIntroduction: Data Analytical nodes OSC Ondemand
Applications: R Spark Hadoop
Howto: Rstudio on Ondemand Launch spark and hadoop clusters
3
Data Analytical nodesOwens’ data analytics environment is comprised of 16 nodes, each with 48 CPU cores, 1.5TB of RAM and 24TB of local disk.
$HOME: 500GBBacked up dailyPermanent storage
Local disk:$TMPDIR1.5TB or 24TBNot backed upTemporary storage
/fs/project:Upon request1-5TBBacked up daily1-3 years
/fs/scratch:1200TBNot backed upTemporary storage
4
OSC OnDemand ondemand.osc.edu
• 1: User Interface– Web based
• Usable from computers, tablets, smartphones
• Zero installation
– Single point of entry • User needs three things
– ondemand.osc.edu – OSC Username – OSC Password
• Connected to all resources at OSC
• 2: Interactive Services– File Access– Job Management– Visualization Apps
• Desktop access• Single-click apps
(Abaqus, Ansys, Comsol, Paraview)
– Terminal AccessTutorial available at osc.edu/ondemand
5
6
7
Python: A popular general-purpose, high-level programming language with numerous mathematical and scientific packages available for data analytics
R: A programming language for statistical and machine learning applications with very strong graphical capabilities
MATLAB: A full featured data analysis toolkit with many advanced algorithms readily available
Spark and Hadoop: Big data Frameworks based on distributed storage
Intel Compilers: Compilers for generating optimized code for Intel CPUs.
Intel MKL: The Math Kernel Library provides optimized subroutines for common computation tasks such as matrix-matrix calculations.
Statistical software: Octave, Stata, FFTW, ScaLAPACK, MINPACK, sprng2
Data Analytical Applications
8
R and RstudioR is a language and environment for statistical computing and graphics. R provides a wide variety of statistical and graphical techniques and is highly extensible.
Availability:
9
Batch Usage
Running R interactively
10
Rstudio on Ondemand
11
12
13
14
Apache Spark is an open source cluster computing framework originally developed in the AMPLab at University of California, Berkeley but was later donated to the Apache Software Foundation where it remains today. In contrast to Hadoop's disk-based analytics paradigm, Spark has multi-stage in-memory analytics.
Apache Spark
15
Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext object in your main program (called the driver program).
Requires cluster managers which allocate resources across applications.
Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application.
Next, it sends your application code (defined by JAR or Python files passed toSparkContext) to the executors. Finally, SparkContext sends tasks to the executors to run.
Spark workflow
16
RDD (Resilient Distributed Dataset) is the main logical data unit in Spark. They are
Distributed and partitioned Stored in memory Immutable Partitions recomputed on failure
Transformations are executed on demand. That means they are computed lazily.Eg: filter, join, sort
Actions return final results of RDD computations. Actions triggers execution usinglineage graph to load the data into original RDD, carry out all intermediatetransformations and return final results to Driver program or write it out to file system.Eg: collect(), count(), take()
RDD- Resilient Distributed Datasets
RDD- Transformations and Actions
17
RDD Operations
18
$SPARK_HOME/bin/pyspark # Opens SparkContext
>>> data = sc.textFile("README.md")
>>>linesWithSpark = data.filter(lambda line: "Spark" in line)
>>> linesWithSpark.count() # Number of items in this RDD12
>>> data.filter(lambda line: "Spark" in line).count() # How many lines contain "Spark"?12
Interactive Analysis with the Spark Shell
1. Create a RDD
2. Transformation of RDD
4. Combining Transformation and Actions
3. Action on RDD
19
20
Map: One element in input gets mapped to only one element in output.Flatmap: One element in input maps to zero or more elements in the output.
FlatmapMap
Word count Example
21
>>>wordCounts = data.flatMap(lambda line: line.split()).map(lambda word: (word,1)).reduceByKey(lambda a, b: a+b)
>>> wordCounts.collect()
Word count Example
22
https://www.osc.edu/resources/available_software/software_list/spark
Spark documentation at OSC
23
Running Spark interactively in batch
24
Running Spark non-interactively
25
from pyspark import SparkContextimport urllibf = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz","kddcup.data.gz")
data_file = "./kddcup.data.gz"sc = SparkContext(appName="Stati")raw_data = sc.textFile(data_file)
import numpy as np
def parse_interaction(line):line_split = line.split(",")symbolic_indexes = [1,2,3,41]clean_line_split=[item for i, item in enumerate(line_split) if i not in symbolic_indexes]return np.array([float(x) for x in clean_line_split])
vector_data=raw_data.map(parse_interaction)
from pyspark.mllib.stat import Statisticsfrom math import sqrt
summary = Statistics.colStats(vector_data)
print ("Duration Statistics:")print (" Mean %f" % (round(summary.mean()[0],3)))print ("St. deviation : %f"%(round(sqrt(summary.variance()[0]),3)))print (" Max value: %f"%(round(summary.max()[0],3)))print (" Min value: %f"%(round(summary.min()[0],3)))
1. Create an App in python: stati.pyRunning Spark using PBS script
26
2. Create a PBS script: stati.pbs#PBS -N spark-statistics#PBS -l nodes=18:ppn=28#PBS -l walltime=00:10:00module load spark/2.0.0cp stati.py $TMPDIRcd $TMPDIRpbs-spark-submit stati.py > stati.logcp * $PBS_O_WORKDIR
3. Run Spark jobqsub stati.pbs
4. Output: stati.logsync from spark://n0381.ten.osc.edu:7077starting org.apache.spark.deploy.master.Master, logging to /nfs/15/soottikkal/spark/kdd/spark-soottikkal-org.apache.spark.deploy.master.Master-1-n0381.ten.osc.edu.outfailed to launch org.apache.spark.deploy.master.Master:full log in /nfs/15/soottikkal/spark/kdd/spark-soottikkal-org.apache.spark.deploy.master.Master-1-n0381.ten.osc.edu.out
Duration Statistics:Mean 48.342000St. deviation : 723.330000Max value: 58329.000000Min value: 0.000000Total value count: 4898431.000000Number of non-zero values: 118939.000000
SPARK_MASTER=spark://n0381.ten.osc.edu:7077
27
CASE STUDYData mining of historical jobs records of OSC’s clustersAim: To understand client utilizations of OSC recourses. Data: Historical records of every Job that ran on any OSC clusters that includes information's such as number of nodes, software, CPU time and timestamp.
Import to Spark
Data till 2016
Save as parquet file
AnalysisReload to
SparkNewerData
Append to parquet file
Import to Spark
DATA on MYSQL DB
28
#importing datadf=sqlContext.read.parquet(“pbsacct/Jobs.parquet")df.show(5)
#Which types of queue is mostly useddf.select("jobid",”queue").groupBy("queue").count().show()
#Which software is used most?df.select("jobid","sw_app").groupBy("sw_app").count().sort(col("count").desc()) .show()
#who uses gaussian software most?df.registerTempTable(”Jobs”) sqlContext.sql(" SELECT username FROM Jobs WHERE sw_app='gaussian’ " ).show()
Pyspark code for data analysis
29
Statistics MYSQL SPARKJob vs CPU 1 hour 5 sec
CPU vs Account 1.25 hour 5 secWalltime vsuser
1.40 hour 5 sec
Results
30
Running Hadoop at OSCA Hadoop cluster can be launched within the HPC environment, but managed by the PBSjob scheduler using Myhadoop framework developed by San Diego SupercomputerCenter. (Please see http://www.sdsc.edu/~allans/MyHadoop.pdf)
31
Using Hadoop: Sample PBS Script
32
Using Hadoop: Sample PBS Script
33
Upcoming Events
go.osu.edu/dataanalyticsmonthBig Data at OSCOct. 26; 1-4 p.m.; Ohio Supercomputer Center, 1224 Kinnear Road.
XSEDE Big Data workshopDec. 5-6; 10-5 p.m.; Ohio Supercomputer Center, 1224 Kinnear Road.
34
https://spark.apache.org/docs/2.0.0/programming-guide.html-Programming with Scala, Java and Python
http://www.cs.berkeley.edu/~rxin/ampcamp-ecnu/data-exploration-using-spark.html
1. Spark Programming Guide
2. Data Exploration with Spark
3. Hadoop
http://hadoop.apache.org/
References
https://www.osc.edu/documentation/software_list/sparkhttps://www.osc.edu/resources/available_software/software_list/hadoop
4. OSC Documentation
35
Thank you!
• Questions or comments: [email protected]
• General questions about OSC service: [email protected]
36
#Copy necessary filescp -r ~soottikkal/workshop/July17-Bigdata ./
#check filescd July17-Bigdata lscat instructions
#open another terminal# request 1 interactive node
qsub -I -l nodes=1:ppn=28 -l walltime=04:00:00 -A PZS0557
#check filescd July-Bigdata lscd spark
#launch sparkmodule load spark/2.0.0pyspark --executor-memory 10G --driver-memory 10G
Spark ExerciseConnect to Owens cluster through putty terminal:
ssh [email protected] password
37
#Example 1: Unstructured Data
#create a RDD>>> data = sc.textFile("README.md")
#count number of lines>>> data.count()99
#see the content of the RDD>>> data.take(3)[u'# Apache Spark', u'', u'Spark is a fast and general cluster computing system for Big Data. It provides']>>> data.collect()
#check data type>>> type(data)<class 'pyspark.rdd.RDD'>
#transformation of RDD>>> linesWithSpark = data.filter(lambda line: "Spark" in line)#action on RDD>>> linesWithSpark.count()19
##combining transformation and actions>>> data.filter(lambda line: "Spark" in line).count() 19
38
#Example 2: Structured Data
#About the data: http://kdd.ics.uci.edu/databases/kddcup99/kddcup99
#load data and run basic operations >>> raw_data=sc.textFile("data.gz")>>> raw_data.count()494021
>>> raw_data.take(1)[u'0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.']
>>> raw_data.take(3)[u'0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.', u'0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.', u'0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.’]
39
#SparkSQL>>> from pyspark.sql import SQLContext>>> sqlContext = SQLContext(sc)>>> from pyspark.sql import Row
#transform to csv>>> csv_data=raw_data.map(lambda l: l.split(","))>>> selected_data=csv_data.map(lambda p: Row(
duration=int(p[0]),protocal_type=p[1],service=p[2],flag=p[3],src_bytes=int(p[4]),dst_bytes=int(p[5]))
)
>>> interactions_df = sqlContext.createDataFrame(selected_data)>>> interactions_df.registerTempTable("interactions")
>>> interactions_df.printSchema()root|-- dst_bytes: long (nullable = true)|-- duration: long (nullable = true)|-- flag: string (nullable = true)|-- protocal_type: string (nullable = true)|-- service: string (nullable = true)|-- src_bytes: long (nullable = true)
40
>>> interactions_df.show(5)
>>> interactions_df.select("dst_bytes","flag").show(5)
>>> interactions_df.filter(interactions_df.flag!="SF").show(5)
41
# Select tcp network interactions with more than 1 second duration and no transfer from destination>>> tcp_interactions = sqlContext.sql(“””
SELECT duration, dst_bytes FROM interactions WHERE protocal_type = 'tcp' AND duration > 1000 AND dst_bytes = 0“”")tcp_interactions.show(5)
>>> interactions_df.select("protocal_type", "duration", "dst_bytes").groupBy("protocal_type").count().show()
>>> interactions_df.select("protocal_type", "duration", "dst_bytes").filter(interactions_df.duration>1000).filter(interactions_df.dst_bytes==0).groupBy("protocal_type").count().show()
#exit from the interactive pyspark shell>>> exit()
#exit from the compute nodeexit
42
Submitting Spark and Hadoop job non-interactively
cd sparklsqsub stati.pbsqstatqstat | grep `whoami`lsqsub sql.pbs
cd hadoopqsub sub-wordcount.pbsqsub sub-grep.pbs
43
https://spark.apache.org/docs/2.0.0/programming-guide.html-Programming with Scala, Java and Python
http://www.cs.berkeley.edu/~rxin/ampcamp-ecnu/data-exploration-using-spark.html
1. Spark Programming Guide
2. Data Exploration with Spark
3. Hadoop
http://hadoop.apache.org/
References
https://www.osc.edu/documentation/software_list/spark_documentationhttps://www.osc.edu/resources/available_software/software_list/hadoop
4. OSC Documentation
44
Thank you!
• Questions or comments: [email protected]
• General questions about OSC service: [email protected]