developer training for apache spark and hadoop: hands-on … · distributed mode is a method of...

101
Developer Training for Apache Spark and Hadoop: Hands-On Exercises General Notes ............................................................................................................................ 3 Hands-On Exercise: Query Hadoop Data with Apache Impala ................................. 6 Hands-On Exercise: Access HDFS with the Command Line and Hue ...................... 8 Hands-On Exercise: Run a YARN Job .............................................................................. 14 Hands-On Exercise: Import Data from MySQL Using Apache Sqoop ................... 20 Hands-On Exercise: Explore RDDs Using the Spark Shell ....................................... 24 Hands-On Exercise: Process Data Files with Apache Spark ................................... 34 Hands-On Exercise: Use Pair RDDs to Join Two Datasets ....................................... 38 Hands-On Exercise: Write and Run an Apache Spark Application ...................... 43 201611

Upload: dophuc

Post on 27-May-2019

220 views

Category:

Documents


0 download

TRANSCRIPT

Page 1: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Developer Training for Apache Spark and Hadoop:

Hands-On Exercises GeneralNotes............................................................................................................................3Hands-OnExercise:QueryHadoopDatawithApacheImpala.................................6Hands-OnExercise:AccessHDFSwiththeCommandLineandHue......................8Hands-OnExercise:RunaYARNJob..............................................................................14Hands-OnExercise:ImportDatafromMySQLUsingApacheSqoop...................20Hands-OnExercise:ExploreRDDsUsingtheSparkShell.......................................24Hands-OnExercise:ProcessDataFileswithApacheSpark...................................34Hands-OnExercise:UsePairRDDstoJoinTwoDatasets.......................................38Hands-OnExercise:WriteandRunanApacheSparkApplication......................43

201611

Page 2: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

2

2

Hands-OnExercise:ConfigureanApacheSparkApplication................................48Hands-OnExercise:ViewJobsandStagesintheSparkApplicationUI..............53Hands-OnExercise:PersistanRDD................................................................................59Hands-OnExercise:ImplementanIterativeAlgorithmwithApacheSpark....62Hands-OnExercise:UseApacheSparkSQLforETL..................................................66Hands-OnExercise:ProduceandConsumeApacheKafkaMessages.................72Hands-OnExercise:CollectWebServerLogswithApacheFlume.......................75Hands-OnExercise:SendWebServerLogMessagesfromApacheFlumeto

ApacheKafka..................................................................................78Hands-OnExercise:WriteanApacheSparkStreamingApplication..................81Hands-OnExercise:ProcessMultipleBatcheswithApacheSparkStreaming 86Hands-OnExercise:ProcessApacheKafkaMessageswithApacheSpark

Streaming.........................................................................................91AppendixA:EnablingJupyterNotebookforPySpark..............................................96AppendixB:ManagingServicesontheCourseVirtualMachine..........................99

Page 3: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

3

3

General Notes Cloudera’strainingcoursesuseaVirtualMachinerunningtheCentOSLinuxdistribution.ThisVMhasCDHinstalledinpseudo-distributedmode.Pseudo-distributedmodeisamethodofrunningHadoopwherebyallHadoopdaemonsrunonthesamemachine.Itis,essentially,aclusterconsistingofasinglemachine.ItworksjustlikealargerHadoopcluster;theonlykeydifferenceisthattheHDFSblockreplicationfactorissetto1,sincethereisonlyasingleDataNodeavailable.

Points to Note while Working in the Virtual Machine • TheVirtualMachine(VM)issettologinastheusertrainingautomatically.If

youlogout,youcanlogbackinastheusertrainingwiththepasswordtraining.

• Ifyouneedit,therootpasswordistraining.Youmaybepromptedforthisif,forexample,youwanttochangethekeyboardlayout.Ingeneral,youshouldnotneedthispasswordsincethetraininguserhasunlimitedsudoprivileges.

• Insomecommand-linestepsintheexercises,youwillseelineslikethis:

$ hdfs dfs -put united_states_census_data_2010 \

/user/training/example

Thedollarsign($)atthebeginningofeachlineindicatestheLinuxshellprompt.Theactualpromptwillincludeadditionalinformation(forexample,[training@localhost training_materials]$)butthisisomittedfromtheseinstructionsforbrevity.

Thebackslash(\)attheendofthefirstlinesignifiesthatthecommandisnotcompleted,andcontinuesonthenextline.Youcanenterthecodeexactlyasshown(ontwolines),oryoucanenteritonasingleline.Ifyoudothelatter,youshouldnottypeinthebackslash.

• AlthoughmoststudentsarecomfortableusingUNIXtexteditorslikevioremacs,somemightpreferagraphicaltexteditor.Toinvokethegraphicaleditorfromthecommandline,typegeditfollowedbythepathofthefileyouwishtoedit.

Page 4: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

4

4

Appending&tothecommandallowsyoutotypeadditionalcommandswhiletheeditorisstillopen.Hereisanexampleofhowtoeditafilenamedmyfile.txt:

$ gedit myfile.txt &

Points to Note during the Exercises

Directories • Themaindirectoryfortheexercisesis

~/training_materials/devsh/exercises.Eachdirectoryunderthatonecorrespondstoanexerciseorsetofexercises—thisisreferredtointheinstructionsas“theexercisedirectory.”Anyscriptsorfilesrequiredfortheexercise(otherthandata)areintheexercisedirectory.

• Withineachexercisedirectoryyoumayfindthefollowingsubdirectories:

• solution—Thiscontainssolutioncodeforeachexercise.

• stubs—Afewoftheexercisesdependonprovidedstarterfilescontainingskeletoncode.

• Mavenprojectdirectories—ForexercisesforwhichyoumustwriteScalaclasses,youhavebeenprovidedwithpreconfiguredMavenprojectdirectories.Withintheseprojectsaretwopackages:stubs,whereyouwilldoyourworkusingstarterskeletonclasses;andsolution,containingthesolutionclass.

• Datafilesusedintheexercisesarein~/training_materials/data.UsuallyyouwilluploadthefilestoHDFSbeforeworkingwiththem.

• TheVMdefinesafewenvironmentvariablesthatareusedinplaceoflongerpathsintheinstructions.Sinceeachvariableisautomaticallyreplacedwithitscorrespondingvalueswhenyouruncommandsintheterminal,thismakesiteasierandfasterforyoutoenteracommand.• Thetwoenvironmentvariablesforthiscourseare$DEVSHand$DEVDATA.

Under$DEVSHyoucanfindexercises,examples,andscripts.• Youcanalwaysusetheechocommandifyouwouldliketoseethevalueof

anenvironmentvariable:

Page 5: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

5

5

$ echo $DEVSH

Step-by-Step Instructions

Astheexercisesprogress,andyougainmorefamiliaritywiththetoolsandenvironment,weprovidefewerstep-by-stepinstructions;asintherealworld,wemerelygiveyouarequirementandit’suptoyoutosolvetheproblem!Youshouldfeelfreetorefertothehintsorsolutionsprovided,askyourinstructorforassistance,orconsultwithyourfellowstudents.

Bonus Exercises

Thereareadditionalchallengesforsomeofthehands-onexercises.Ifyoufinishthemainexercise,pleaseattempttheadditionalsteps.

Catch-Up Script

Ifyouareunabletocompleteanexercise,wehaveprovidedascripttocatchyouupautomatically.Eachexercisehasinstructionsforrunningthecatch-upscript,whereapplicable.

$ $DEVSH/scripts/catchup.sh

Thescriptwillpromptforwhichexerciseyouarestarting;itwillsetupalltherequireddataasifyouhadcompletedallthepreviousexercises.

Warning:Ifyourunthecatchupscript,youmayloseyourwork.(Forexample,alldatawillbedeletedfromHDFS.)

Page 6: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

6

6

Hands-On Exercise: Query Hadoop Data with Apache Impala

Files and Data Used in This Exercise:

Impala/Hive table: device

Inthisexercise,youwillusetheHueImpalaQueryEditortoexploredataina

Hadoopcluster.

Thisexerciseisintendedtoletyoubegintofamiliarizeyourselfwiththecourse

VirtualMachineaswellasHue.YouwillalsobrieflyexploretheImpalaQueryEditor.

Using the Hue Impala Query Editor

1. StartFirefoxontheVMusingtheshortcutprovidedonthemainmenupanelatthetopofthescreen.

2. ClicktheHuebookmark,orvisithttp://localhost:8888/.

3. BecausethisisthefirsttimeanyonehasloggedintoHueonthisserver,youwillbepromptedtocreateanewuseraccount.Enterusernametrainingandpasswordtraining,andthenclickCreateaccount.(IfpromptedyoumayclickRemember.)

• Note:WhenyoufirstlogintoHueyoumayseeamisconfigurationwarning.ThisisbecausenotalltheservicesHuedependsonareinstalledonthecourseVMinordertosavespace.Youcandisregardthemessage.

4. OpentheImpalaqueryeditorfromtheQueryEditorsmenu.

Page 7: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

7

7

5. IntheleftpanelAssisttab,selectthedevicetable.Thiswilldisplaythetable’scolumndefinitions.

• Note:Therearefourcolumnsdefined.Ifyoudonotseeallfour,tryresizingtheFirefoxwindow.

6. Hoveryourpointeroverthedevicetabletorevealtheassociatedcontrolicons,thenclickthePreviewSampledataiconasshownbelow:

Apopupwiththefirstseveralrowsofdatainthetablewillappear.Whenyouaredoneviewingthedata,clicktheXintheupperrightcornerofthepopuptoclosethepreview.

7. Inthemainpanelinthequerytextbox,enteraSQLqueryliketheonebelow:

SELECT * FROM device WHERE name LIKE 'Me%';

8. ClicktheExecutebuttontoexecutethecommand.

9. Toseeresults,viewtheResultstabbelowthequeryarea.

10. Optional:Ifyouhaveextratime,continueexploringtheImpalaQueryEditoronyourown.Forinstance,tryselectingothertabsafterviewingtheresults.

This is the end of the exercise

Page 8: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

8

8

Hands-On Exercise: Access HDFS with the Command Line and Hue

Files and Data Used in This Exercise:

Data files (local): $DEVDATA/kb/*

$DEVDATA/base_stations.tsv

Inthisexercise,youwillpracticeworkingwithHDFS,theHadoopDistributed

FileSystem.

YouwillusetheHDFScommandlinetoolandtheHueFileBrowserweb-based

interfacetomanipulatefilesinHDFS.

Exploring the HDFS Command Line Interface

ThesimplestwaytointeractwithHDFSisbyusingthehdfscommand.ToexecutefilesystemcommandswithinHDFS,usethehdfs dfscommand.

1. Openaterminalwindowbydouble-clickingtheTerminalicononthedesktop.

2. Enter:

$ hdfs dfs -ls /

ThisshowsyouthecontentsoftherootdirectoryinHDFS.Therewillbemultipleentries,oneofwhichis/user.Eachuserhasa“home”directoryunderthisdirectory,namedaftertheirusername;yourusernameinthiscourseistraining,thereforeyourhomedirectoryis/user/training.

3. Tryviewingthecontentsofthe/userdirectorybyrunning:

$ hdfs dfs -ls /user

Youwillseeyourhomedirectoryinthedirectorylisting.

Page 9: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

9

9

Relative Paths

In HDFS, relative (non-absolute) paths are considered relative to your home

directory. There is no concept of a “current” or “working” directory as there is in

Linux and similar filesystems.

4. Listthecontentsofyourhomedirectorybyrunning:

$ hdfs dfs -ls /user/training

Therearenofilesyet,sothecommandsilentlyexits.Thisisdifferentthanifyouranhdfs dfs -ls /foo,whichreferstoadirectorythatdoesn’texistandwhichwoulddisplayanerrormessage.

NotethatthedirectorystructureinHDFShasnothingtodowiththedirectorystructureofthelocalfilesystem;theyarecompletelyseparatenamespaces.

Uploading Files to HDFS

Besidesbrowsingtheexistingfilesystem,anotherimportantthingyoucandowiththeHDFScommandlineinterfaceistouploadnewdataintoHDFS.

5. Startbycreatinganewtop-leveldirectoryforexercises.Youwillusethisdirectorythroughouttherestofthecourse.

$ hdfs dfs -mkdir /loudacre

6. ChangedirectoriestotheLinuxlocalfilesystemdirectorycontainingthesampledatawewillbeusinginthecourse.

$ cd $DEVDATA

IfyouperformaregularLinuxlscommandinthisdirectory,youwillseeseveralfilesanddirectoriesthatwillbeusedinthiscourse.Oneofthedatadirectoriesiskb.ThisdirectoryholdsKnowledgeBasearticlesthatarepartofLoudacre’scustomerservicewebsite.

Page 10: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

10

10

7. InsertthisdirectoryintoHDFS:

$ hdfs dfs -put kb /loudacre/

ThiscopiesthelocalkbdirectoryanditscontentsintoaremoteHDFSdirectorynamed/loudacre/kb.

8. ListthecontentsofthenewHDFSdirectorynow:

$ hdfs dfs -ls /loudacre/kb

YoushouldseetheKBarticlesthatwereinthelocaldirectory.

9. Practiceuploadingadirectory,confirmtheupload,andthenremoveit,asitisnotactuallyneededfortheexercises.

$ hdfs dfs -put calllogs /loudacre/

$ hdfs dfs -ls /loudacre/calllogs

$ hdfs dfs -rm -r /loudacre/calllogs

Viewing HDFS Files

NowviewsomeofthedatayoujustcopiedintoHDFS.

10. Enter:

$ hdfs dfs -cat /loudacre/kb/KBDOC-00289.html | head \

-n 20

Thisprintsthefirst20linesofthearticletoyourterminal.ThiscommandishandyforviewingtextdatainHDFS.Anindividualfileisoftenverylarge,makingitinconvenienttoviewtheentirefileintheterminal.Forthisreason,itisoftenagoodideatopipetheoutputofthedfs -catcommandintohead,more,orless.Youcanalsousethedfs-tailoptiontomoreefficientlyviewtheendofthefile,ratherthanpipingthewholecontent.

Page 11: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

11

11

Downloading HDFS Files

Inanearlierexercise,youusedImpalatoexploredatainHDFSinthedevicetable.YoucanviewandworkwiththatdatadirectlybydownloadingitfromHDFStotheLinuxlocalfilesystem.

11. Todownloadafiletoworkwithonthelocalfilesystemusethehdfs dfs -getcommand.Thiscommandtakestwoarguments:anHDFSpathandalocalLinuxpath.ItcopiestheHDFScontentsintothelocalfilesystem:

$ hdfs dfs -get \

/user/hive/warehouse/device /tmp/device

$ less /tmp/device/part-m-00000

Entertheletterqtoquitthelesscommandafterreviewingthedownloadedfile.

Viewing HDFS Command Line Help

12. Thereareseveralotheroperationsavailablewiththehdfs dfscommandtoperformmostcommonfilesystemmanipulationssuchasmv,cp,andmkdir.Intheterminalwindow,enter:

$ hdfs dfs

YouseeahelpmessagedescribingallthefilesystemcommandsprovidedbyHDFS.

Tryplayingaroundwithafewofthesecommandsifyoulike.

Using the Hue File Browser to Browse, View, and Manage Files

13. InFirefox,visitHuebyclickingtheHuebookmark,orgoingtoURLhttp://localhost:8888/.

Page 12: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

12

12

14. Ifyourpriorsessionhasexpired,re-loginusingthelogincredentialsyoucreatedearlier:usernametrainingandpasswordtraining.

15. ToaccessHDFS,clickFileBrowserintheHuemenubar.(Themouse-overtextis“ManageHDFS”).

• Note:IfyourFirefoxwindowistoosmalltodisplaythefullmenunames,youwillseejusttheiconsinstead.

16. Bydefault,thecontentsofyourHDFShomedirectory(/user/training)aredisplayed.Inthedirectorypathname,clicktheleadingslash(/)toviewtheHDFSrootdirectory.

17. Thecontentsoftherootdirectoryaredisplayed,includingtheloudacredirectoryyoucreatedearlier.Clickthatdirectorytoseethecontents.

18. ClickthenameofthekbdirectorytoseetheKnowledgeBasearticlesyouuploaded.

19. Viewoneofthefilesbyclickingonthenameofanyoneofthearticles.

• Note:Inthefileviewer,thecontentsofthefilearedisplayedontheright.Inthiscase,thefileisfairlysmall,buttypicalfilesinHDFSareverylarge,soratherthandisplayingtheentirecontentsononescreen,Hueprovidesbuttonstomovebetweenpages.

Page 13: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

13

13

20. ReturntothedirectoryreviewbyclickingViewfilelocationintheActionspanelontheleft.

21. Clicktheuparrow( )toreturntothe/loudacrebasedirectory.

22. Touploadafile,clicktheUploadbuttonontheright.Youcanchoosetouploadaplainfile,ortouploadazippedfile(whichwillbeautomaticallyunzippedafterupload).Inthiscase,selectFiles,thenclickSelectFiles.

23. ALinuxfilebrowserappears.Browseto/home/training/training_materials/data.

24. Choosebase_stations.tsvandclicktheOpenbutton.

25. Whenthefilehasuploaded,itwillbedisplayedinthedirectory.Clickthecheckboxnexttothefile’sicon,andthenclicktheActionsbuttontoseealistofactionsthatcanbeperformedontheselectedfile(s).

26. Optional:Explorethevariousfileactionsavailable.Whenyouhavefinished,selectanyadditionalfilesyouhaveuploadedandclicktheMovetotrashbuttontodelete.(Donotdeletebase_stations.tsv;thatfilewillbeusedinlaterexercises.)

This is the end of the exercise

Page 14: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

14

14

Hands-On Exercise: Run a YARN Job Files and Data Used in This Exercise

Exercise directory: $DEVSH/exercises/yarn

Data files (HDFS): /loudacre/kb

Inthisexercise,youwillsubmitanapplicationtotheYARNcluster,andmonitortheapplicationusingboththeHueJobBrowserandtheYARNWebUI.

Theapplicationyouwillrunisprovidedforyou.ItisasimpleSparkapplicationwritteninPythonthatcountstheoccurrenceofwordsinLoudacre’scustomerserviceKnowledgeBase(whichyouuploadedinapreviousexercise).Thefocusofthisexerciseisnotonwhattheapplicationdoes,butonhowYARNdistributestasksinajobacrossacluster,andhowtomonitoranapplicationandviewitslogfiles.

Important:Thisexercisedependsonapreviousexercise:“AccessHDFSwiththeCommandLineandHue.”Ifyoudidnotcompletethatexercise,runthecoursecatch-upscriptandadvancetothecurrentexercise:

$ $DEVSH/scripts/catchup.sh

Exploring the YARN Cluster

1. VisittheYARNResourceManager(RM)UIinFirefoxusingtheprovidedbookmark,orbygoingtoURLhttp://localhost:8088/.

Nojobsarecurrentlyrunningsothecurrentviewshowsthecluster“atrest.”

Page 15: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

15

15

Who Is Dr. Who?

You may notice that YARN says you are logged in as dr.who. This is what is

displayed when user authentication is disabled for the cluster, as it is on the

training VM. If user authentication were enabled, you would have to log in as a

valid user to view the YARN UI, and your actual username would be displayed,

together with user metrics such as how many applications you had run, how

much system resources your applications used and so on.

2. TakenoteofthevaluesintheClusterMetricssection,whichdisplaysinformationsuchasthenumberofapplicationsrunningcurrently,previouslyrun,orwaitingtorun;theamountofmemoryusedandavailable;andhowmanyworkernodesareinthecluster.

3. ClicktheNodeslinkintheClustermenuontheleft.Thebottomsectionwilldisplayalistofworkernodesinthecluster.Thepseudo-distributedclusterusedfortraininghasonlyasinglenode,whichisrunningonthelocalmachine.Intherealworld,thislistwouldshowmultipleworkernodes.

4. ClicktheNodeHTTPAddresstoopentheNodeManagerUIforthatnode.Thisdisplaysstatisticsabouttheselectednode,includingamountofavailablememory,currentlyrunningapplications(therearenone),andsoon.

Page 16: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

16

16

5. ToreturntotheResourceManager,clickResourceManageràRMHomeontheleft.

Submitting an Application to the YARN Cluster

6. Inaterminalwindow,changetotheexercisedirectory:

$ cd $DEVSH/exercises/yarn

7. Runtheexamplewordcount.pyprogramontheYARNclustertocountthefrequencyofwordsintheKnowledgeBasedataset:

$ spark-submit --master yarn-client \

wordcount.py /loudacre/kb/*

Thespark-submitcommandisusedtosubmitaSparkprogramforexecutiononthecluster.SinceSparkismanagedbyYARNonthecourseVM,thisgivesustheopportunitytoseehowtheYARNUIdisplaysinformationaboutarunningjob.Fornow,focusonlearningabouttheYARNUI.

Whiletheapplicationisrunning,continuewiththenextsteps.Ifitcompletesbeforeyoufinishtheexercise,gototheterminal,presstheuparrowuntilyougettothespark-submitcommandagain,andreruntheapplication.

Viewing the Application in the Hue Job Browser

8. GotoHueinFirefox,andselecttheJobBrowser.(Dependingonthewidthofyourbrowser,youmayseethewholelabel,orjusttheicon.)

Page 17: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

17

17

9. TheJobBrowserdisplaysalistofcurrentlyrunningandrecentlycompletedapplications.(Ifyoudon’tseetheapplicationyoujuststarted,waitafewseconds,thepagewillautomaticallyreload;itcantakesometimefortheapplicationtobeacceptedandstartrunning.)Reviewtheentryforthecurrentjob.

ThispageallowsyoutoclicktheapplicationIDtoseedetailsoftherunningapplication,ortokillarunningjob.(Donotdothatnowthough!)

Viewing the Application in the YARN UI

Togetamoredetailedviewofthecluster,usetheYARNUI.

10. ReloadtheYARNRMpageinFirefox.YouwillnowseetheapplicationyoujuststartedinthebottomsectionoftheRMhomepage.

11. Asyoudidinthefirstexercisesection,selectNodes.

12. SelecttheNodeHTTPAddresstoopentheNodeManagerUI.

Page 18: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

18

18

13. Nowthatanapplicationisrunning,youcanclickListofApplicationstoseetheapplicationyousubmitted.

14. Ifyourapplicationisstillrunning,tryclickingonListofContainers.

ThiswilldisplaythecontainerstheResourceManagerhasallocatedontheselectednodeforthecurrentapplication.(Nocontainerswillshowifnoapplicationsarerunning;ifyoumisseditbecausetheapplicationcompleted,youcanruntheapplicationagain.Intheterminalwindow,usetheuparrowkeytorecallpreviouscommands.)

Viewing the Application Using the yarn Command

15. Openasecondterminalwindow.

Tip:Resizetheterminalwindowtobeaswideaspossibletomakeiteasiertoreadthecommandoutput.

Page 19: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

19

19

16. Viewthelistofcurrentlyrunningapplications.

$ yarn application -list

Ifyourapplicationisstillrunning,youshouldseeitlisted,includingtheapplicationID(suchasapplication_1469799128160_0001),theapplicationname(PythonWordCount),thetype(SPARK),andsoon.

Iftherearenoapplicationsonthelist,yourapplicationhasprobablyfinishedrunning.Bydefault,onlycurrentapplicationsareincluded.Usethe-appStates ALLoptiontoincludeallapplicationsinthelist:

$ yarn application -list -appStates ALL

17. Takenoteofyourapplication’sID(suchasapplication_1469799128160_0001),anduseitinplaceofapp-idinthecommandbelowtogetadetailedstatusreportontheapplication.

$ yarn application -status app-id

This is the end of the exercise

Page 20: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

20

20

Hands-On Exercise: Import Data from MySQL Using Apache Sqoop

Files and Data Used in This Exercise

Exercise directory: $DEVSH/exercises/sqoop

MySQL database: loudacre

MySQL table: accounts

HDFS paths: /loudacre/accounts

/loudacre/accounts_parquet

Inthisexercise,youwillimporttablesfromMySQLintoHDFSusingSqoop.

Important:Thisexercisedependsonapreviousexercise:“AccessHDFSwithCommandLineandHue.”Ifyoudidnotcompletethatexercise,runthecoursecatch-upscriptandadvancetothecurrentexercise:

$ $DEVSH/scripts/catchup.sh

Importing a Table from MySQL to HDFS

YoucanuseSqooptolookatthetablelayoutinMySQL.WithSqoop,youcanalsoimportthetablefromMySQLtoHDFS.

1. Openaterminalwindowifnecessary.

2. Runthesqoop helpcommandtofamiliarizeyourselfwiththeoptionsinSqoop:

$ sqoop help

Page 21: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

21

21

3. Listthetablesintheloudacredatabase:

$ sqoop list-tables \

--connect jdbc:mysql://localhost/loudacre \

--username training --password training

4. Runthesqoop help importcommandtoseeitsoptions:

$ sqoop help import

5. UseSqooptoimporttheaccountstableintheloudacredatabaseandsaveitinHDFSunder/loudacre:

$ sqoop import \

--connect jdbc:mysql://localhost/loudacre \

--username training --password training \

--table accounts \

--target-dir /loudacre/accounts \

--null-non-string '\\N'

The--null-non-stringoptiontellsSqooptorepresentnullvaluesas\N,whichmakestheimporteddatacompatiblewithHiveandImpala.

6. Optional:WhiletheSqoopjobisrunning,tryviewingitintheHueJobBrowserorYARNWebUI,asyoudidinthepreviousexercise.

Viewing the Imported Data

SqoopimportsthecontentsofthespecifiedtablestoHDFS.YoucanusethecommandlineortheHueFileBrowsertoviewthefilesandtheircontents.

7. Listthecontentsoftheaccountsdirectory:

Page 22: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

22

22

$ hdfs dfs -ls /loudacre/accounts

• Note:OutputofHadoopprocessingjobsissavedasoneormorenumbered“partition”files.Partitionsarecoveredlaterinthecourse.

8. UseeithertheHueFileBrowserorthe-tailoptiontothehdfscommandtoviewthelastpartofthefileforeachoftheMapReducepartitionfiles,forexample:

$ hdfs dfs -tail /loudacre/accounts/part-m-00000

$ hdfs dfs -tail /loudacre/accounts/part-m-00001

$ hdfs dfs -tail /loudacre/accounts/part-m-00002

$ hdfs dfs -tail /loudacre/accounts/part-m-00003

Importing a Table Using an Alternate File Format

9. ImporttheaccountstabletoaParquetdataformatratherthanthedefaultfileform(textfile).

$ sqoop import \

--connect jdbc:mysql://localhost/loudacre \

--username training --password training \

--table accounts \

--target-dir /loudacre/accounts_parquet \

--as-parquetfile

10. Viewtheresultsoftheimportcommandbylistingthecontentsoftheaccounts_parquetdirectoryinHDFS,usingeitherHueorthehdfscommand.NotethattheParquetfilesareeachgivenuniquenames,suchase8f3424e-230d-4101-abba-66b521bae8ef.parquet.

• Note:Youcan’tdirectlyviewthecontentsoftheParquetfilesbecausetheyarebinaryfilesratherthantext.

Page 23: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

23

23

11. ChooseoneofthegeneratedParquetfilesandviewthecontentsofthebinaryrecordsstoredintheParquetfile,usingtheparquet-tools headcommand.Substitutetheactualnameofthefileformyfileinthecommandbelow.

$ parquet-tools head \

hdfs://localhost/loudacre/accounts_parquet/myfile.parquet

This is the end of the exercise

Page 24: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

24

24

Hands-On Exercise: Explore RDDs Using the Spark Shell

Files and Data Used in This Exercise

Exercise directory: $DEVSH/exercises/spark-shell

Data files (local): $DEVDATA/frostroad.txt

$DEVDATA/weblogs/*

Inthisexercise,youwillusetheSparkshelltoworkwithRDDs.

YouwillstartbyviewingandbookmarkingtheSparkdocumentationinyourbrowser.ThenyouwillstarttheSparkshellandreadasimpletextfileintoaResilientDistributedDataSet(RDD).Finally,youwillcopytheweblogsdatasettoHDFSanduseRDDstotransformthedata.

Important:Thisexercisedependsonapreviousexercise:“AccessHDFSwithCommandLineandHue.”Ifyoudidnotcompletethatexercise,runthecoursecatch-upscriptandadvancetothecurrentexercise:

$ $DEVSH/scripts/catchup.sh

Viewing the Spark Documentation

1. StartFirefoxinyourVirtualMachineandvisittheSparkdocumentationonyourlocalmachineusingtheprovidedbookmark.

2. FromtheProgrammingGuidesmenu,selecttheSparkProgrammingGuide.Brieflyreviewtheguide.Youmaywishtobookmarkthepageforlaterreview.

3. FromtheAPIDocsmenu,selecteitherScalaorPython,dependingonyourlanguagepreference.BookmarktheAPIpageforuseduringclass.Laterexerciseswillreferyoutothisdocumentation.

Page 25: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

25

25

Starting the Spark Shell

YoumaychoosetodotheremainingstepsinthisexerciseusingeitherScalaorPython.FollowtheinstructionsbelowforPython,orskiptothenextsectionforScala.

Note:InstructionsforPythonareprovidedinblue,whileinstructionsforScalaareinred.

Starting the Python Spark Shell

FollowtheseinstructionsifyouareusingPythontocompletethisexercise.Otherwise,skipthissectionandcontinuewithStartingtheScalaSparkShell.

4. Inaterminalwindow,startthepysparkshell:

$ pyspark

YoumaygetseveralINFOandWARNINGmessages,whichyoucandisregard.Ifyoudon’tseetheIn[n]>promptafterafewseconds,pressEnterafewtimestoclearthescreenoutput.

5. SparkcreatesaSparkContextobjectforyoucalledsc.Makesuretheobjectexists:

pyspark> sc

Note on Shell Prompt

To help you keep track of which shell is being referenced in the instructions, the

prompt will be shown here as either pyspark> or scala>. The actual prompt

will vary depending on which version of Python or Scala you are using and what

command number you are on.

Pysparkwilldisplayinformationaboutthescobjectsuchas

<pyspark.context.SparkContext at 0x2724490>

Page 26: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

26

26

6. Usingcommandcompletion,youcanseealltheavailableSparkContextmethods:typesc.(scfollowedbyadot)andthenthe[TAB]key.

7. YoucanexittheshellbypressingCtrl+Dorbytypingexit.However,stayintheshellfornowtocompletetheremainderofthisexercise.

Starting the Scala Spark Shell

FollowtheseinstructionsifyouareusingScalatocompletethisexercise.Otherwise,skipthissectionandcontinuewithReadingandDisplayingaTextFile.(Don’ttrytorunbothaScalaandaPythonshellatthesametime;doingsowillcauseerrorsandslowdownyourmachine.)

8. Inaterminalwindow,starttheScalaSparkshell:

$ spark-shell

YoumaygetseveralINFOandWARNINGmessages,whichyoucandisregard.Ifyoudon’tseethescala>promptafterafewseconds,pressEnterafewtimestoclearthescreenoutput.

9. SparkcreatesaSparkContextobjectforyoucalledsc.Makesuretheobjectexists:

scala> sc

Note on Shell Prompt

To help you keep track of which shell is being referenced in the instructions, the

prompt will be shown here as either pyspark> or scala>. The actual prompt

will vary depending on which version of Python or Scala you are using and

which command number you are on.

Scalawilldisplayinformationaboutthescobjectsuchas:

res0: org.apache.spark.SparkContext =

org.apache.spark.SparkContext@2f0301fa

Page 27: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

27

27

10. Usingcommandcompletion,youcanseealltheavailableSparkContextmethods:typesc.(scfollowedbyadot)andthenthe[TAB]key.

11. Youcanexittheshellatanytimebytypingsys.exitorpressingCtrl+D.However,stayintheshellfornowtocompletetheremainderofthisexercise.

Reading and Displaying a Text File (Python or Spark)

12. Reviewthesimpletextfileyouwillbeusingbyviewing(withoutediting)thefileinatexteditorinaseparatewindow(nottheSparkshell).Thefileislocatedat:$DEVDATA/frostroad.txt.

13. DefineanRDDtobecreatedbyreadinginthetestfileonthelocalfilesystem.UsethefirstcommandifyouareusingPython,andthesecondoneifyouareusingScala.(YouonlyneedtocompletetheexercisesinPythonorScala.Donotattempttorunbothshellsatthesametime;itwillresultinerrormessagesandslowdownyourmachine.)

pyspark> myrdd = sc.textFile(\

"file:/home/training/training_materials/\

data/frostroad.txt")

scala> val myrdd = sc.textFile(

"file:/home/training/training_materials/data/frostroad.txt")

• Note:Insubsequentinstructions,bothPythonandScalacommandswillbeshownbutnotnotedexplicitly;Pythonshellcommandsareinblueandprecededwithpyspark>,andScalashellcommandsareinredandprecededwithscala>.

14. Sparkhasnotyetreadthefile.ItwillnotdosountilyouperformanoperationontheRDD.Trycountingthenumberoflinesinthedataset:

pyspark> myrdd.count()

Page 28: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

28

28

scala> myrdd.count()

ThecountoperationcausestheRDDtobematerialized(createdandpopulated).Thenumberoflines(23)shouldbedisplayed,forexample:

Out[2]: 23(Python)or res1: Long = 23(Scala)

15. TryexecutingthecollectoperationtodisplaythedataintheRDD.Notethatthisreturnsanddisplaystheentiredataset.ThisisconvenientforverysmallRDDslikethisone,butbecarefulusingcollectformoretypicallargedatasets.

pyspark> myrdd.collect()

scala> myrdd.collect()

16. Usingcommandcompletion,youcanseealltheavailabletransformationsandoperationsyoucanperformonanRDD.Typemyrdd.andthenthe[TAB]key.

A Tip for PySpark Users: Controlling Log Messages

You may have noticed that by default, PySpark displays many log messages

tagged INFO. If you find this output distracting, you may temporarily override

the default logging level by using the command: sc.setLogLevel("WARN").

You can return to the prior level of logging with sc.setLogLevel("INFO") or

by restarting the PySpark shell. Configuring logging will be covered later in the

course.

Page 29: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

29

29

Exploring the Loudacre Web Log Files

17. Inthissectionyouwillbeusingdatain~/training_materials/data/weblogs.Reviewoneofthe.logfilesinthedirectory.Notetheformatofthelines:

18. InthepreviousstepsyouusedadatafileresidingonthelocalLinuxfilesystem.Intherealworld,youwillalmostalwaysbeworkingwithdistributeddata,suchasfilesstoredontheHDFScluster,instead.CopythedatasetfromthelocalfilesystemtotheloudacreHDFSdirectory.Inaseparateterminalwindow(notyourSparkshell)execute:

$ hdfs dfs -put \

~/training_materials/data/weblogs/ /loudacre/

19. IntheSparkshell,setavariableforthedatafilessoyoudonothavetoretypethepatheachtime.

pyspark> logfiles="/loudacre/weblogs/*"

scala> val logfiles="/loudacre/weblogs/*"

20. CreateanRDDfromthedatafile.

pyspark> logsRDD = sc.textFile(logfiles)

116.180.70.237 - 128 [15/Sep/2013:23:59:53 +0100]

"GET /KBDOC-00031.html HTTP/1.0" 200 1388

"http://www.loudacre.com" "Loudacre CSR Browser"

IP Address User ID Request

Page 30: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

30

30

scala> val logsRDD = sc.textFile(logfiles)

21. CreateanRDDcontainingonlythoselinesthatarerequestsforJPGfiles.

pyspark> jpglogsRDD=\

logsRDD.filter(lambda line: ".jpg" in line)

scala> val jpglogsRDD=

logsRDD.filter(line => line.contains(".jpg"))

22. Viewthefirst10linesofthedatausingtake:

pyspark> jpglogsRDD.take(10)

scala> jpglogsRDD.take(10)

23. Sometimesyoudonotneedtostoreintermediateobjectsinavariable,inwhichcaseyoucancombinethestepsintoasinglelineofcode.Forinstance,executethissinglecommandtocountthenumberofJPGrequests.(Thecorrectnumberis64978.)

pyspark> sc.textFile(logfiles).filter(lambda line: \

".jpg" in line).count()

scala> sc.textFile(logfiles).

filter(line => line.contains(".jpg")).count()

24. NowtryusingthemapfunctiontodefineanewRDD.Startwithasimplemapthatreturnsthelengthofeachlineinthelogfile.

pyspark> logsRDD.map(lambda line: len(line)).take(5)

Page 31: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

31

31

scala> logsRDD.map(line => line.length).take(5)

Thisprintsoutanarrayoffiveintegerscorrespondingtothefirstfivelinesinthefile.(Thecorrectresultis:151,143,154,147,160.)

25. Thatisnotveryuseful.Instead,trymappingtoanarrayofwordsforeachline:

pyspark> logsRDD \

.map(lambda line: line.split(' ')).take(5)

scala> logsRDD.map(line => line.split(' ')).take(5)

ThistimeSparkprintsoutfivearrays,eachcontainingthewordsinthecorrespondinglogfileline.

26. Nowthatyouknowhowmapworks,defineanewRDDcontainingjusttheIPaddressesfromeachlineinthelogfile.(TheIPaddressisthefirst“word”ineachline.)

pyspark> ipsRDD = \

logsRDD.map(lambda line: line.split(' ')[0])

pyspark> ipsRDD.take(5)

scala> val ipsRDD =

logsRDD.map(line =>line.split(' ')(0))

scala> ipsRDD.take(5)

27. AlthoughtakeandcollectareusefulwaystolookatdatainanRDD,theiroutputisnotveryreadable.Fortunately,though,theyreturnarrays,whichyoucaniteratethrough:

pyspark> for ip in ipsRDD.take(10): print ip

Page 32: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

32

32

scala> ipsRDD.take(10).foreach(println)

28. Finally,savethelistofIPaddressesasatextfile:

pyspark> ipsRDD.saveAsTextFile("/loudacre/iplist")

scala> ipsRDD.saveAsTextFile("/loudacre/iplist")

• Note:Ifyoure-runthiscommand,youwillnotbeabletosavetothesamedirectorybecauseitalreadyexists.Besuretodeletethedirectoryusingeitherthehdfscommand(inaseparateterminalwindow)ortheHuefilebrowserfirst.

29. InaterminalwindowortheHuefilebrowser,listthecontentsofthe/loudacre/iplistfolder.Youshouldseemultiplefiles,includingseveralpart-xxxxxfiles,whicharethefilescontainingtheoutputdata.“Part”(partition)filesarenumberedbecausetheremayberesultsfrommultipletasksrunningonthecluster.Reviewthecontentsofoneofthefilestoconfirmthattheywerecreatedcorrectly.

Bonus Exercise

UseRDDtransformationstocreateadatasetconsistingoftheIPaddressandcorrespondinguserIDforeachrequestforanHTMLfile.(Disregardrequestsforotherfiletypes.)TheuserIDisthethirdfieldineachlogfileline.

Page 33: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

33

33

Displaythedataintheformipaddress/userid,suchas:

165.32.101.206/8

100.219.90.44/102

182.4.148.56/173

246.241.6.175/45395

175.223.172.207/4115

This is the end of the exercise

Page 34: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

34

34

Hands-On Exercise: Process Data Files with Apache Spark

Files and Data Used in This Exercise:

Exercise directory: $DEVSH/exercises/spark-etl

Data files (local): $DEVDATA/activations/*

$DEVDATA/devicestatus.txt (Bonus)

Stubs: ActivationModels.pyspark

ActivationModels.scalaspark

Inthisexercise,youwillparseasetofactivationrecordsinXMLformattoextracttheaccountnumbersandmodelnames.

OneofthecommonusesforSparkisdoingdataExtract/Transform/Loadoperations.Sometimesdataisstoredinline-orientedrecords,liketheweblogsinthepreviousexercise,butsometimesthedataisinamulti-lineformatthatmustbeprocessedasawholefile.Inthisexercise,youwillpracticeworkingwithfile-basedinsteadofline-basedformats.

Important:Thisexercisedependsonapreviousexercise:“AccessHDFSwiththeCommandLineandHue.”Ifyoudidnotcompletethatexercise,runthecoursecatch-upscriptandadvancetothecurrentexercise:

$ $DEVSH/scripts/catchup.sh

Reviewing the API Documentation for RDD Operations

1. VisittheSparkAPIpage,whichyoumighthavebookmarked;ifnot,youcanfinditfromtheSparkDocbookmark.FollowthelinkfortheRDDclassandreviewthelistofavailableoperations.(IntheScalaAPI,thelinkwillbenearthetopofthemainwindow;inPythonscrolldowntotheCoreClassesarea.)

Page 35: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

35

35

Reviewing the Data

2. ReviewthedataonthelocalLinuxfilesysteminthedirectory$DEVDATA/activations.EachXMLfilecontainsdataforallthedevicesactivatedbycustomersduringaspecificmonth.

Sampleinputdata:

<activations>

<activation timestamp="1225499258" type="phone">

<account-number>316</account-number>

<device-id>

d61b6971-33e1-42f0-bb15-aa2ae3cd8680

</device-id>

<phone-number>5108307062</phone-number>

<model>iFruit 1</model>

</activation>

</activations>

3. Copytheentireactivationsdirectoryto/loudacreinHDFS.

$ hdfs dfs -put $DEVDATA/activations /loudacre/

Processing the Files

FollowthestepsbelowtowritecodetogothroughasetofactivationXMLfilesandextracttheaccountnumberanddevicemodelforeachactivation,andsavethelisttoafileasaccount_number:model.

Theoutputwilllooksomethinglike:

Page 36: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

36

36

1234:iFruit 1

987:Sorrento F00L

4566:iFruit 1

4. StartwiththeActivationModelsstubscriptintheexercisedirectory:$DEVSH/exercises/spark-etl.(AstubisprovidedforScalaandPython;usewhicheverlanguageyouprefer.)NotethatforconvenienceyouhavebeenprovidedwithfunctionstoparsetheXML,asthatisnotthefocusofthisexercise.CopythestubcodeintotheSparkshellofyourchoice.

5. UsewholeTextFilestocreateanRDDfromtheactivationsdataset.TheresultingRDDwillconsistoftuples,inwhichthefirstvalueisthenameofthefile,andthesecondvalueisthecontentsofthefile(XML)asastring.

6. EachXMLfilecancontainmanyactivationrecords;useflatMaptomapthecontentsofeachfiletoacollectionofXMLrecordsbycallingtheprovidedgetActivationsfunction.getActivationstakesanXMLstring,parsesit,andreturnsacollectionofXMLrecords;flatMapmapseachrecordtoaseparateRDDelement.

7. Mapeachactivationrecordtoastringintheformataccount-number:model.UsetheprovidedgetAccountandgetModelfunctionstofindthevaluesfromtheactivationrecord.

8. Savetheformattedstringstoatextfileinthedirectory/loudacre/account-models.

Bonus Exercise

Ifyouhavemoretime,attemptthefollowingextrabonusexercise:

AnothercommonpartoftheETLprocessisdatascrubbing.Inthisbonusexercise,youwillprocessdatainordertogetitintoastandardizedformatforlaterprocessing.

Page 37: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

37

37

Reviewthecontentsofthefile$DEVDATA/devicestatus.txt.ThisfilecontainsdatacollectedfrommobiledevicesonLoudacre’snetwork,includingdeviceID,currentstatus,location,andsoon.BecauseLoudacrepreviouslyacquiredothermobileproviders’networks,thedatafromdifferentsubnetworkshasadifferentformat.Notethattherecordsinthisfilehavedifferentfielddelimiters:someusecommas,someusepipes(|),andsoon.Yourtaskisthefollowing:

• Uploadthedevicestatus.txtfiletoHDFS.

• Determinewhichdelimitertouse(hint:thecharacteratposition19isthefirstuseofthedelimiter).

• Filteroutanyrecordswhichdonotparsecorrectly(hint:eachrecordshouldhaveexactly14values).

• Extractthedate(firstfield),model(secondfield),deviceID(thirdfield),andlatitudeandlongitude(13thand14thfieldsrespectively).

• Thesecondfieldcontainsthedevicemanufacturerandmodelname(suchasRonin S2).Splitthisfieldbyspacestoseparatethemanufacturerfromthemodel(forexample,manufacturerRonin,modelS2).Keepjustthemanufacturername.

• Savetheextracteddatatocomma-delimitedtextfilesinthe/loudacre/devicestatus_etldirectoryonHDFS.

• Confirmthatthedatainthefile(s)wassavedcorrectly.

Thesolutionstothebonusexercisearein$DEVSH/exercises/spark-etl/solution/bonus.

This is the end of the exercise

Page 38: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

38

38

Hands-On Exercise: Use Pair RDDs to Join Two Datasets

Files and Data Used in This Exercise:

Exercise directory: $DEVSH/exercises/spark-pairs

Data files (HDFS): /loudacre/weblogs/*

/loudacre/accounts/*

Inthisexercise,youwillcontinueexploringtheLoudacrewebserverlogfiles,

aswellastheLoudacreuseraccountdata,usingkey-valuepairRDDs.

Important:Thisexercisedependsontwopreviousexercises:“ImportDatafromMySQLUsingApacheSqoop”and“ExploreRDDsUsingtheSparkShell.”Ifyoudidnotcompletethoseexercises,runthecoursecatch-upscriptandadvancetothecurrentexercise:

$ $DEVSH/scripts/catchup.sh

Exploring Web Log Files

Continueworkingwiththeweblogfiles,asinearlierexercises.

Tip:Inthisexercise,youwillbereducingandjoininglargedatasets,whichcantakealotoftime.Youmaywishtoperformtheexercisesbelowusingasmallerdataset,consistingofonlyafewoftheweblogfiles,ratherthanallofthem.Rememberthatyoucanspecifyawildcard;textFile("/loudacre/weblogs/*2.log")wouldincludeonlyfilenamesendingwith2.log.

Usingmap-reducelogic,countthenumberofrequestsfromeachuser.

a. UsemaptocreateapairRDDwiththeuserIDasthekeyandtheinteger1asthevalue.(TheuserIDisthethirdfieldineachline.)Yourdatawilllooksomethinglikethis:

Page 39: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

39

39

b. UsereduceByKeytosumthevaluesforeachuserID.YourRDDdatawillbesimilartothis:

UsecountByKeytodeterminehowmanyusersvisitedthesiteforeachfrequency.Thatis,howmanyusersvisitedonce,twice,threetimes,andsoon.

a. Usemaptoreversethekeyandvalue,likethis:

b. UsethecountByKeyactiontoreturnamapoffrequency:user-countpairs.

CreateanRDDwheretheuserIDisthekey,andthevalueisthelistofalltheIPaddressesthatuserhasconnectedfrom.(IPaddressisthefirstfieldineachrequestline.)

• Hint:Mapto(userid,ipaddress)andthenusegroupByKey.

(userid,1) (userid,1) (userid,1) …

(userid,5) (userid,7) (userid,2) …

(5,userid) (7,userid) (2,userid) …

(userid,20.1.34.55) (userid,245.33.1.1) (userid,65.50.196.141) …

Page 40: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

40

40

Joining Web Log Data with Account Data

Reviewthedatalocatedin/loudacre/accountscontainingLoudacre’scustomeraccountdata(previouslyimportedfromMySQLtoHDFSusingSqoop).ThefirstfieldineachlineistheuserID,whichcorrespondstotheuserIDinthewebserverlogs.Theotherfieldsincludeaccountdetailssuchascreationdate,firstandlastname,andsoon.

JointheaccountsdatawiththeweblogdatatoproduceadatasetkeyedbyuserIDwhichcontainstheuseraccountinformationandthenumberofwebsitehitsforthatuser.

a. CreateanRDD,basedontheaccountsdata,consistingofkey/value-arraypairs:(userid,[values…])

b. JointhepairRDDwiththesetofuser-id/hit-countpairscalculatedinthefirststep.

(userid,[20.1.34.55, 74.125.239.98]) (userid,[75.175.32.10, 245.33.1.1, 66.79.233.99]) (userid,[65.50.196.141]) …

(userid1,[userid1,2008-11-24 10:04:08,\N,Cheryl,West,4905 Olive Street,San Francisco,CA,…]) (userid2,[userid2,2008-11-23 14:05:07,\N,Elizabeth,Kerns,4703 Eva Pearl Street,Richmond,CA,…]) (userid3,[userid3,2008-11-02 17:12:12,2013-07-18 16:42:36,Melissa,Roman,3539 James Martin Circle,Oakland,CA,…]) …

Page 41: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

41

41

c. DisplaytheuserID,hitcount,andfirstname(4thvalue)andlastname(5thvalue)forthefirst5elements.Theoutputshouldlooksimilartothis:

userid1 6 Rick Hopper

userid2 8 Lucio Arnold

userid3 2 Brittany Parrott

Managed Memory Leak Error Message

When executing a join operation in Scala, you may see an error message such

as this:

ERROR Executor: Managed memory leak detected

This message is a Spark bug and can be disregarded.

Bonus Exercises

Ifyouhavemoretime,attemptthefollowingextrabonusexercises:

UsekeyBytocreateanRDDofaccountdatawiththepostalcode(9thfieldintheCSVfile)asthekey.

Tip:AssignthisnewRDDtoavariableforuseinthenextbonusexercise.

CreateapairRDDwithpostalcodeasthekeyandalistofnames(LastName,FirstName)inthatpostalcodeasthevalue.

(userid1,([userid1,2008-11-24 10:04:08,\N,Cheryl,West,4905 Olive Street,San Francisco,CA,…],4)) (userid2,([userid2,2008-11-23 14:05:07,\N,Elizabeth,Kerns,4703 Eva Pearl Street,Richmond,CA,…],8)) (userid3,([userid3,2008-11-02 17:12:12,2013-07-18 16:42:36,Melissa,Roman,3539 James Martin Circle,Oakland,CA,…],1)) …

Page 42: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

42

42

• Hint:Firstnameandlastnamearethe4thand5thfieldsrespectively.

• Optional:TryusingthemapValuesoperation.

Sortthedatabypostalcode,thenforthefirstfivepostalcodes,displaythecodeandlistthenamesinthatpostalzone.Forexample:

--- 85003

Jenkins,Thad

Rick,Edward

Lindsay,Ivy

--- 85004

Morris,Eric

Reiser,Hazel

Gregg,Alicia

Preston,Elizabeth

This is the end of the exercise

Page 43: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

43

43

Hands-On Exercise: Write and Run an Apache Spark Application

Files and Data Used in This Exercise:

Exercise directory: $DEVSH/exercises/spark-application

Data files (HDFS): /loudacre/weblogs

Scala project:

$DEVSH/exercises/spark-application/countjpgs_project

Scala classes: stubs.CountJPGs

solution.CountJPGs

Python stub: CountJPGs.py

Python solution:

$DEVSH/exercises/spark-application/python-

solution/CountJPGs.py

Inthisexercise,youwillwriteyourownSparkapplicationinsteadofusingthe

interactiveSparkshellapplication.

WriteasimpleprogramthatcountsthenumberofJPGrequestsinaweblogfile.Thenameofthefileshouldbepassedtotheprogramasanargument.

Thisisthesametaskasinthe“ExploreRDDsUsingtheSparkShell”exercise.Thelogicisthesame,butthistimeyouwillneedtosetuptheSparkContextobjectyourself.

Dependingonwhichprogramminglanguageyouareusing,followtheappropriatesetofinstructionsbelowtowriteaSparkprogram.

Beforerunningyourprogram,besuretoexitfromtheSparkshell.

Page 44: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

44

44

Important:Thisexercisedependsonapreviousexercise:“ExploreRDDsUsingtheSparkShell.”Ifyoudidnotcompletethatexercise,runthecoursecatch-upscriptandadvancetothecurrentexercise:

$ $DEVSH/scripts/catchup.sh

Writing a Spark Application in Python

Editing Python Files

You may use any text editor you wish. If you don’t have an editor preference,

you may wish to use gedit, which includes language-specific support for Python.

1. IfyouareusingPython,followtheseinstructions;otherwise,skipthissectionandcontinuetoWritingaSparkApplicationinScalabelow.

2. Asimplestubfiletogetstartedhasbeenprovidedintheexerciseproject:$DEVSH/exercises/spark-application/CountJPGs.py.ThisstubimportstherequiredSparkclassandsetsupyourmaincodeblock.Openthestubfileinaneditor.

3. CreateaSparkContextobjectusingthefollowingcode:

sc = SparkContext()

4. Inthebodyoftheprogram,loadthefilepassedintotheprogram,countthenumberofJPGrequests,anddisplaythecount.Youmaywishtoreferbacktothe“ExploreRDDsUsingtheSparkShell”exerciseforthecodetodothis.

5. Attheendoftheapplication,besuretostoptheSparkcontext:

sc.stop()

6. Changetotheexerciseworkingdirectory,thenruntheprogram,passingthenameofthelogfiletoprocess,forexample:

Page 45: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

45

45

$ cd $DEVSH/exercises/spark-application/

$ spark-submit CountJPGs.py /loudacre/weblogs/*

7. Oncetheprogramcompletes,youmightneedtoscrolluptoseeyourprogramoutput.(ThecorrectnumberofJPGrequestsis64978.)

8. SkipthesectionbelowonwritingaSparkapplicationinScalaandcontinuewithSubmittingaSparkApplicationtotheCluster.

Writing a Spark Application in Scala

Editing Scala Files

You may use any text editor you wish. If you don’t have an editor preference,

you may wish to use gedit, which includes language-specific support for Scala.

If you prefer to work in an IDE, Eclipse is included and configured for the Scala

projects in the course. However, teaching use of Eclipse is beyond the scope of

this course.

AMavenprojecttogetstartedhasbeenprovided:$DEVSH/exercises/spark-application/countjpgs_project.

9. EdittheScalaclassdefinedinCountJPGs.scalainsrc/main/scala/stubs/.

10. CreateaSparkContextobjectusingthefollowingcode:

val sc = new SparkContext()

11. Inthebodyoftheprogram,loadthefilepassedtotheprogram,countthenumberofJPGrequests,anddisplaythecount.Youmaywishtoreferbacktothe“ExploreRDDsUsingtheSparkShell”exerciseforthecodetodothis.

12. Attheendoftheapplication,besuretostoptheSparkcontext:

Page 46: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

46

46

sc.stop

13. Changetotheprojectdirectory,thenbuildyourprojectusingthefollowingcommand:

$ cd \

$DEVSH/exercises/spark-application/countjpgs_project

$ mvn package

14. Ifthebuildissuccessful,MavenwillgenerateaJARfilecalledcountjpgs-1.0.jarincountjpgs-project/target.Runtheprogramusingthefollowingcommand:

$ spark-submit \

--class stubs.CountJPGs \

target/countjpgs-1.0.jar /loudacre/weblogs/*

15. Oncetheprogramcompletes,youmightneedtoscrolluptoseeyourprogramoutput.(ThecorrectnumberofJPGrequestsis64978.)

Submitting a Spark Application to the Cluster

Intheprevioussection,youranaPythonorScalaSparkapplicationusingspark-submit.Bydefault,spark-submitrunstheapplicationlocally.Inthissection,runtheapplicationontheYARNclusterinstead.

16. Re-runtheprogram,specifyingtheclustermasterinordertorunitonthecluster.UseoneofthecommandsbelowdependingonwhetheryourapplicationisinPythonorScala.

TorunPython:

$ spark-submit \

--master yarn-client \

CountJPGs.py /loudacre/weblogs/*

Page 47: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

47

47

TorunScala:

$ spark-submit \

--class stubs.CountJPGs \

--master yarn-client \

target/countjpgs-1.0.jar /loudacre/weblogs/*

17. Afterstartingtheapplication,openFirefoxandvisittheYARNResourceManagerUIusingtheprovidedbookmark(orgoingtoURLhttp://localhost:8088/).Whiletheapplicationisrunning,itappearsinthelistofapplicationssomethinglikethis:

Aftertheapplicationhascompleted,itwillappearinthelistlikethis:

This is the end of the exercise

Page 48: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

48

48

Hands-On Exercise: Configure an Apache Spark Application

Files and Data Used in This Exercise:

Exercise directory: $DEVSH/exercises/spark-application

Data files (HDFS): /loudacre/weblogs

Scala project:

$DEVSH/exercises/spark-application/countjpgs_project

Scala classes: stubs.CountJPGs

solution.CountJPGs

Python stub: CountJPGs.py

Python solution:

$DEVSH/exercises/spark-application/python-

solution/CountJPGs.py

Inthisexercise,youwillpracticesettingvariousSparkconfigurationoptions.

YouwillworkwiththeCountJPGsprogramyouwroteinthepriorexercise.

Important:Thisexercisedependsonapreviousexercise:“ExploreRDDsUsingtheSparkShell.”Ifyoudidnotcompletethatexercise,runthecoursecatch-upscriptandadvancetothecurrentexercise:

$ $DEVSH/scripts/catchup.sh

Setting Configuration Options at the Command Line

1. Changetothecorrectdirectory(ifnecessary)andre-runtheCountJPGsPythonorScalaprogramyouwroteinthepreviousexercise,thistimespecifyinganapplicationname.Forexample:

Page 49: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

49

49

$ cd $DEVSH/exercises/spark-application/

$ spark-submit --master yarn-client \

--name 'Count JPGs' \

CountJPGs.py /loudacre/weblogs/*

$ cd \

$DEVSH/exercises/spark-application/countjpgs_project

$ spark-submit --class stubs.CountJPGs \

--master yarn-client \

--name 'Count JPGs' \

target/countjpgs-1.0.jar /loudacre/weblogs/*

2. VisittheResourceManagerUIagainandnotetheapplicationnamelistedistheonespecifiedinthecommandline.

3. Optional:FromtheRMapplicationlist,followtheApplicationMasterlink(iftheapplicationisstillrunning)ortheHistorylinktovisittheSparkApplicationUI.ViewtheEnvironmenttab.Takenoteofthespark.*propertiessuchasmaster,appName,anddriverproperties.

Setting Configuration Options in a Properties File

4. Usingatexteditor,createafileinthecurrentworkingdirectorycalledmyspark.conf,containingsettingsforthepropertiesshownbelow:

spark.app.name My Spark App

spark.master yarn-client

spark.executor.memory 400M

5. Re-runyourapplication,thistimeusingthepropertiesfileinsteadofusingthescriptoptionstoconfigureSparkproperties:

Page 50: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

50

50

$ spark-submit --properties-file myspark.conf \

CountJPGs.py /loudacre/weblogs/*

$ spark-submit --properties-file myspark.conf \

--class stubs.CountJPGs \

target/countjpgs-1.0.jar /loudacre/weblogs/*

6. Whiletheapplicationisrunning,viewtheYARNUIandconfirmthattheSparkapplicationnameiscorrectlydisplayedas“MySparkApp.”

Setting Logging Levels

7. Copythetemplatefile/usr/lib/spark/conf/log4j.properties.templateto/usr/lib/spark/conf/log4j.properties.Youwillneedtousesuperuserprivilegestodothis,sousethesudocommand:

$ sudo cp \

/usr/lib/spark/conf/log4j.properties.template \

/usr/lib/spark/conf/log4j.properties

8. Loadthenewlog4j.propertiesfileintoaneditor.Again,youwillneedtousesudo.Toeditthefilewithgedit,forinstance,dothis:

Page 51: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

51

51

$ sudo gedit /usr/lib/spark/conf/log4j.properties

gedit Warnings

While using gedit with sudo, you may see Gtk-WARNING messages indicating

permission issues or non-existent files. These can be disregarded.

9. Thefirstlinecurrentlyreads:

log4j.rootCategory=INFO, console

ReplaceINFOwithDEBUG:

log4j.rootCategory=DEBUG, console

10. Savethefile,andthenclosetheeditor.

11. RerunyourSparkapplication.NoticethattheoutputnowcontainsbothINFOandDEBUGmessages,likethis:

16/03/19 11:40:45 INFO MemoryStore: ensureFreeSpace(154293) called

with curMem=0, maxMem=311387750

16/03/19 11:40:45 INFO MemoryStore: Block broadcast_0 stored as

values to memory (estimated size 150.7 KB, free 296.8 MB)

16/03/19 11:40:45 DEBUG BlockManager: Put block broadcast_0 locally

took 79 ms

16/03/19 11:40:45 DEBUG BlockManager: Put for block broadcast_0

without replication took 79 ms

Debugloggingcanbeusefulwhendebugging,testing,oroptimizingyourcode,butinmostcasesitgeneratesunnecessarilydistractingoutput.

12. Editthelog4j.propertiesfileagaintoreplaceDEBUGwithWARNandtryagain.ThistimenoticethatnoINFOorDEBUGmessagesaredisplayed,onlyWARNmessages.

Page 52: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

52

52

Note:Throughouttherestoftheexercises,youmaychangethesesettingsdependingonwhetheryoufindtheextraloggingmessageshelpfulordistracting.Youcanalsooverridethecurrentsettingtemporarilybycallingsc.setLogLevelwithyourpreferredsetting.Forexample,ineitherScalaorPython,call:

> sc.setLogLevel("INFO")

This is the end of the exercise

Page 53: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

53

53

Hands-On Exercise: View Jobs and Stages in the Spark Application UI

Files and Data Used in This Exercise:

Exercise directory: $DEVSH/exercises/spark-stages

Data files (HDFS): /loudacre/weblogs/*

/loudacre/accounts/*

Inthisexercise,youwillusetheSparkApplicationUItoviewtheexecution

stagesforajob.

Inapreviousexercise,youwroteascriptintheSparkshelltojoindatafromtheaccountsdatasetwiththeweblogsdataset,inordertodeterminethetotalnumberofwebhitsforeveryaccount.Nowyouwillexplorethestagesandtasksinvolvedinthatjob.

Important:Thisexercisedependsonpreviousexercises:“ExploreRDDsUsingtheSparkShell”and“ImportDatafromMySQLUsingApacheSqoop.”Ifyoudidnotcompletethoseexercises,runthecoursecatch-upscriptandadvancetothecurrentexercise:

$ $DEVSH/scripts/catchup.sh

Exploring Partitioning of File-Based RDDs

1. Start(orrestart,ifnecessary)theSparkshell.AlthoughyouwouldtypicallyrunaSparkapplicationonacluster,yourcourseVMclusterhasonlyasingleworkernodethatcansupportonlyasingleexecutor.Tosimulateamorerealisticmulti-nodecluster,runinlocalmodewiththreethreads:

$ pyspark --master 'local[3]'

Page 54: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

54

54

$ spark-shell --master 'local[3]'

2. Reviewtheaccountsdataset(/loudacre/accounts/)usingHueorthecommandline.Takenoteofthenumberoffiles.

3. CreateanRDDbasedonasinglefileinthedataset,suchas/loudacre/accounts/part-m-00000,andthencalltoDebugStringontheRDD,whichdisplaysthenumberofpartitionsinparentheses()beforetheRDDfileandID.HowmanypartitionsareintheresultingRDD?

pyspark> accounts=sc. \

textFile("/loudacre/accounts/part-m-00000")

pyspark> print accounts.toDebugString()

scala> var accounts=sc.

textFile("/loudacre/accounts/part-m-00000")

scala> accounts.toDebugString

4. Repeatthisprocess,butspecifyaminimumofthreeofpartitions:sc.textFile(filename,3).DoestheRDDcorrectlyhavethreepartitions?

5. Finally,settheaccountsvariabletoanewRDDbasedonallthefilesintheaccountsdataset.HowdoesthenumberoffilesinthedatasetcomparetothenumberofpartitionsintheRDD?

6. Optional:UseforeachPartitiontoprintthefirstrecordofeachpartition.

Setting up the Job

7. CreateanRDDofaccounts,keyedbyIDandwiththestringfirst_name,last_nameforthevalue:

Page 55: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

55

55

pyspark> accountsByID = accounts \

.map(lambda s: s.split(',')) \

.map(lambda values: \

(values[0],values[4] + ',' + values[3]))

scala> val accountsByID = accounts.

map(line => line.split(',')).

map(values => (values(0),values(4)+','+values(3)))

8. ConstructauserReqsRDDwiththetotalnumberofwebhitsforeachuserID:

Tip:Inthisexercise,youwillbereducingandjoininglargedatasets,whichcantakealotoftimerunningonasinglemachine,asyouareusinginthecourse.Therefore,ratherthanusealltheweblogfilesinthedataset,specifyasubsetofweblogfilesusingawildcard;forexample,selectonlyfilenamesendingin2byspecifyingtextFile("/loudacre/weblogs/*2.log").

pyspark> userReqs = sc \

.textFile("/loudacre/weblogs/*2.log") \

.map(lambda line: line.split()) \

.map(lambda words: (words[2],1)) \

.reduceByKey(lambda v1,v2: v1+v2)

scala> val userReqs = sc.

textFile("/loudacre/weblogs/*2.log").

map(line => line.split(' ')).

map(words => (words(2),1)).

reduceByKey((v1,v2) => v1 + v2)

9. ThenjointhetwoRDDsbyuserID,andconstructanewRDDwithfirstname,lastname,andtotalhits:

Page 56: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

56

56

pyspark> accountHits = accountsByID.join(userReqs)\

.values()

scala> val accountHits =

accountsByID.join(userReqs).map(pair => pair._2)

10. PrinttheresultsofaccountHits.toDebugStringandreviewtheoutput.Basedonthis,seeifyoucandetermine

a. Howmanystagesareinthisjob?

b. Whichstagesaredependentonwhich?

c. Howmanytaskswilleachstageconsistof?

Running the Job and Reviewing the Job in the Spark Application UI

11. Inyourbrowser,visittheSparkApplicationUIbyusingtheprovidedtoolbarbookmark,orvisitingURLhttp://localhost:4040/.

12. IntheSparkUI,makesuretheJobstabisselected.Nojobsareyetrunningsothelistwillbeempty.

13. Returntotheshellandstartthejobbyexecutinganaction(saveAsTextFile):

pyspark> accountHits.\

saveAsTextFile("/loudacre/userreqs")

Page 57: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

57

57

scala> accountHits.

saveAsTextFile("/loudacre/userreqs")

14. ReloadtheSparkUIJobspageinyourbrowser.YourjobwillappearintheActiveJobslistuntilitcompletes,andthenitwilldisplayintheCompletedJobsList.

15. Clickthejobdescription(whichisthelastactioninthejob)toseethestages.Asthejobprogressesyoumaywanttorefreshthepageafewtimes.

Thingstonote:

a. Howmanystagesareinthejob?DoesitmatchthenumberyouexpectedfromtheRDD’stoDebugStringoutput?

b. Thestagesarenumbered,butthenumbersdonotrelatetotheorderofexecution.Notethetimesthestagesweresubmittedtodeterminetheorder.DoestheordermatchwhatyouexpectedbasedonRDDdependency?

c. Howmanytasksareineachstage?

d. TheShuffleReadandShuffleWritecolumnsindicatehowmuchdatawascopiedbetweentasks.Thisisusefultoknowbecausecopyingtoomuchdataacrossthenetworkcancauseperformanceissues.

16. Clickthestagestoviewdetailsaboutthatstage.Thingstonote:

a. TheSummaryMetricsareashowsyouhowmuchtimewasspendonvarioussteps.Thiscanhelpyounarrowdownperformanceproblems.

b. TheTasksarealistseachtask.TheLocalityLevelcolumnindicateswhethertheprocessranonthesamenodewherethepartitionwasphysicallystoredornot.RememberthatSparkwillattempttoalwaysruntaskswherethedatais,butmaynotalwaysbeableto,ifthenodeisbusy.

Page 58: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

58

58

c. Inareal-worldcluster,theexecutorcolumnintheTaskareawoulddisplaythedifferentworkernodesthatranthetasks.(Inthissingle-nodecluster,alltasksrunonthesamehost:localhost.)

17. Whenthejobiscomplete,returntotheJobstabtoseethefinalstatisticsforthenumberoftasksexecutedandthetimethejobtook.

18. Optional:Tryre-runningthelastaction.(YouwillneedtoeitherdeletethesaveAsTextFileoutputdirectoryinHDFS,orspecifyadifferentdirectoryname.)Youwillprobablyfindthatthejobcompletesmuchfaster,andthatseveralstages(andthetasksinthem)showas“skipped.”

Bonusquestion:Whichtaskswereskippedandwhy?

This is the end of the exercise

Page 59: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

59

59

Hands-On Exercise: Persist an RDD Files and Data Used in This Exercise:

Exercise directory: $DEVSH/exercises/spark-persist

Data files (HDFS): /loudacre/weblogs/*

/loudacre/accounts/*

Stubs: SparkPersist.pyspark

SparkPersist.scalaspark

Inthisexercise,youwillpracticehowtopersistRDDs.

Important:Thisexercisedependsonpreviousexercises:“ExploreRDDsUsingtheSparkShell”and“ImportDatafromMySQLUsingApacheSqoop.”Ifyoudidnotcompletethoseexercises,runthecoursecatch-upscriptandadvancetothecurrentexercise:

$ $DEVSH/scripts/catchup.sh

1. CopythecodeintheSparkPersiststubscriptintheexercisedirectory($DEVSH/exercises/spark-persist)intotheSparkshell.(AstubisprovidedforScalaandPython;usewhicheverlanguageyouprefer.)

2. Thestubcodeisverysimilartothejobsetupcodeinthe“ViewJobsandStagesintheSparkApplicationUI”exercise.ItsetsupanRDDcalledaccountHitsthatjoinsaccountdatawithweblogdata.However,thistimeyouwillstartthejobbyperformingaslightlydifferentactionthaninthatexercise:countthenumberofuseraccountswithatotalhitcountgreaterthanfive.Enterthecodebelowintotheshell:

pyspark> accountHits\

.filter(lambda (firstlast,hitcount): hitcount > 5)\

.count()

Page 60: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

60

60

scala> accountHits.filter(pair => pair._2 > 5).count()

3. PersisttheRDDtomemorybycallingaccountHits.persist().

4. Inyourbrowser,viewtheSparkApplicationUIandselecttheStoragetab.Atthispoint,youhavemarkedyourRDDtobepersisted,butyouhavenotyetperformedanactionthatwouldcauseittobematerializedandpersisted,soyouwillnotyetseeanypersistedRDDs.

5. IntheSparkshell,executethecountagain.

6. ViewtheRDD’stoDebugString.Noticethattheoutputindicatesthepersistencelevelselected.

7. ReloadtheStoragetabinyourbrowser,andthistimenotethattheRDDyoupersistedisshown.ClicktheRDDNametoseedetailsaboutpartitionsandpersistence.

8. ClicktheExecutorstabandtakenoteoftheamountofmemoryusedandavailableforyouroneworkernode.

Notethattheclassroomenvironmenthasasingleworkernodewithasmallamountofmemoryallocated,soyoumayseethatnotallofthedatasetisactuallycachedinmemory.Intherealworld,forgoodperformanceaclusterwillhavemorenodes,eachwithmorememory,sothatmoreofyouractivedatacanbecached.

9. Optional:SettheRDD’spersistenceleveltoDISK_ONLYandcomparethestoragereportintheSparkApplicationWebUI.

• Hint:SettheRDD'spersistenceleveltoStorageLevel.DISK_ONLY.YouwillneedtoimporttheclassfirstorusethefullyqualifiednameoftheclassStorageLevelwheninvokingpersist().

• Hint:BecauseyouhavealreadypersistedtheRDDatadifferentlevel,youwillneedtounpersist()firstbeforeyoucansetanewlevel.

Page 61: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

61

61

This is the end of the exercise

Page 62: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

62

62

Hands-On Exercise: Implement an Iterative Algorithm with Apache Spark

Files and Data Used in This Exercise:

Exercise directory: $DEVSH/exercises/spark-iterative

Data files (HDFS): /loudacre/devicestatus_etl/*

Stubs: KMeansCoords.pyspark

KMeansCoords.scalaspark

Inthisexercise,youwillpracticeimplementingiterativealgorithmsinSpark

bycalculatingk-meansforasetofpoints.

Reviewing the Data

1. Ifyoucompletedthebonussectionofthe“ProcessDataFileswithApacheSpark”exercise,youusedSparktoextractthedate,maker,deviceID,latitudeandlongitudefromthedevicestatus.txtdatafile,andstoretheresultsintheHDFSdirectory/loudacre/devicestatus_etl.

Ifyoudidnotcompletethatbonusexercise,uploadthesolutionfilefromthelocalfilesystemtoHDFSnow.(Ifyouhaverunthecoursecatch-upscript,thisisalreadydoneforyou.)

$ hdfs dfs -put $DEVDATA/static_data/devicestatus_etl \

/loudacre/

2. Examinethedatainthedataset.Notethatthelatitudeandlongitudearethe4thand5thfields,respectively,asshowninthesampledatabelow:

Page 63: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

63

63

2014-03-15:10:10:20,Sorrento,8cc3b47e-bd01-4482-b500-

28f2342679af,33.6894754264,-117.543308253

2014-03-15:10:10:20,MeeToo,ef8c7564-0a1a-4650-a655-

c8bbd5f8f943,37.4321088904,-121.485029632

Calculating k-means for Device Location

3. StartwiththeprovidedKMeansCoordsstubfile,whichcontainsthefollowingconveniencefunctionsusedincalculatingk-means:

• closestPoint:givena(latitude/longitude)pointandanarrayofcurrentcenterpoints,returnstheindexinthearrayofthecenterclosesttothegivenpoint

• addPoints:giventwopoints,returnapointwhichisthesumofthetwopoints—thatis,(x1+x2, y1+y2)

• distanceSquared:giventwopoints,returnsthesquareddistanceofthetwo—thisisacommoncalculationrequiredingraphanalysis

NotethatthestubcodesetsthevariableKequalto5—thisisthenumberofmeanstocalculate.

4. ThestubcodealsosetsthevariableconvergeDist.Thiswillbeusedtodecidewhenthek-meanscalculationisdone—whentheamountthelocationsofthemeanschangesbetweeniterationsislessthanconvergeDist.A“perfect”solutionwouldbe0;thisnumberrepresentsa“goodenough”solution.Forthisexercise,useavalueof0.1.

5. Parsetheinputfile—whichisdelimitedbycommas—into(latitude,longitude)pairs(the4thand5thfieldsineachline).Onlyincludeknownlocations—thatis,filterout(0,0)locations.BesuretopersisttheresultingRDDbecauseyouwillaccessiteachtimethroughtheiteration.

6. CreateaK-lengtharraycalledkPointsbytakingarandomsampleofKlocationpointsfromtheRDDasstartingmeans(centerpoints).Forexample,inPython:

Page 64: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

64

64

data.takeSample(False, K, 42)

OrinScala:

data.takeSample(false, K, 42)

7. IterativelycalculateanewsetofKmeansuntilthetotaldistancebetweenthemeanscalculatedforthisiterationandthelastissmallerthanconvergeDist.Foreachiteration:

a. Foreachcoordinatepoint,usetheprovidedclosestPointfunctiontomapthatpointtotheindexinthekPointsarrayofthelocationclosesttothatpoint.TheresultingRDDshouldbekeyedbytheindex,andthevalueshouldbethepair:(point, 1).(Thevalue1willlaterbeusedtocountthenumberofpointsclosesttoagivenmean.)Forexample:

(1, ((37.43210, -121.48502), 1))

(4, ((33.11310, -111.33201), 1))

(0, ((39.36351, -119.40003), 1))

(1, ((40.00019, -116.44829), 1))

b. Reducetheresult:foreachcenterinthekPointsarray,sumthelatitudesandlongitudes,respectively,ofallthepointsclosesttothatcenter,andalsofindthenumberofclosestpoints.Forexample:

(0, ((2638919.87653,-8895032.182481), 74693)))

(1, ((3654635.24961,-12197518.55688), 101268))

(2, ((1863384.99784,-5839621.052003), 48620))

(3, ((4887181.82600,-14674125.94873), 126114))

(4, ((2866039.85637,-9608816.13682), 81162))

Page 65: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

65

65

c. ThereducedRDDshouldhave(atmost)Kmembers.Mapeachtoanewcenterpointbycalculatingtheaveragelatitudeandlongitudeforeachsetofclosestpoints:thatis,map(index,(totalX,totalY),n)to(index,(totalX/n, totalY/n)).

d. Collectthesenewpointsintoalocalmaporarraykeyedbyindex.

e. UsetheprovideddistanceSquaredmethodtocalculatehowmuchthecenters“moved”betweenthecurrentiterationandthelast.Thatis,foreachcenterinkPoints,calculatethedistancebetweenthatpointandthecorrespondingnewpoint,andsumthosedistances.Thatisthedeltabetweeniterations;whenthedeltaislessthanconvergeDist,stopiterating.

f. CopythenewcenterpointstothekPointsarrayinpreparationforthenextiteration.

8. Whenalliterationsarecomplete,displaythefinalKcenterpoints.

This is the end of the exercise

Page 66: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

66

66

Hands-On Exercise: Use Apache Spark SQL for ETL

Files and Data Used in This Exercise

Exercise directory: $DEVSH/exercises/spark-sql

MySQL database: loudacre

MySQL table: webpage

Output path (HDFS): /loudacre/webpage_files

Inthisexercise,youwilluseSparkSQLtoloadstructureddatafromaParquetfile,processit,andstoreittoanewfile.

ThedatayouwillworkwithinthisexerciseisfromthewebpagetableintheloudacredatabaseinMySQL.AlthoughSparkSQLdoesallowyoutodirectlyaccesstablesinadatabaseusingJDBC,doingsoisnotgenerallyabestpractice,becauseinadistributedenvironmentitmayleadtoanunintentionalDenialofServiceattackonthedatabase.Sointhisexercise,youwilluseSqooptoimportthedatatoHDFSfirst.YouwilluseParquetfileformatratherthanatextfilebecausethispreservesthedata’sschemaforusebySparkSQL.

Importing Data from MySQL Using Sqoop

1. Inaterminalwindow,useSqooptoimportthewebpagetablefromMySQL.UseParquetfileformat.

$ sqoop import \

--connect jdbc:mysql://localhost/loudacre \

--username training --password training \

--table webpage \

--target-dir /loudacre/webpage \

--as-parquetfile

Page 67: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

67

67

2. UsingHueorthehdfscommandlineutility,listthedatafilesimportedtothe/loudacre/webpagedirectory.

3. Optional:DownloadoneofthegeneratedParquetfilesfromHDFStoalocaldirectory.Useparquet-tools headandparquet-tools schematoreviewtheschemaandsomesampledata.Takenoteofthestructureofthedata;youwillusethisdatainthenextexercisesections.

Creating a DataFrame from a Table

4. Ifnecessary,starttheSparkshell.

5. TheSparkshellpredefinesaSQLcontextobjectassqlContext.WhattypeistheSQLcontext?IneitherPythonorScala,viewthesqlContextobject:

> sqlContext

6. CreateaDataFramebasedonthewebpagetable:

pyspark> webpageDF = sqlContext \

.read.load("/loudacre/webpage")

scala> val webpageDF = sqlContext.

read.load("/loudacre/webpage")

7. ExaminetheschemaofthenewDataFramebycallingwebpageDF.printSchema().

8. ViewthefirstfewrecordsinthetablebycallingwebpageDF.show(5).

Notethatthedataintheassociated_filescolumnisacomma-delimitedstring.LoudacrewouldliketomakethisdataavailableinanImpalatable,butinordertoperformrequiredanalysis,theassociated_filesdatamustbeextractedandnormalized.YourgoalinthenextsectionistousetheDataFramesAPItoextractthedatainthecolumn,splitthestring,andcreatea

Page 68: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

68

68

newdatafileinHDFScontainingeachpageID,anditsassociatedfilesinseparaterows.

Querying a DataFrame

9. CreateanewDataFramebyselectingtheweb_page_numandassociated_filescolumnsfromtheexistingDataFrame:

python> assocFilesDF = \

webpageDF.select(webpageDF.web_page_num,\

webpageDF.associated_files)

scala> val assocFilesDF =

webpageDF.select($"web_page_num",$"associated_files")

10. ViewtheschemaandthefirstfewrowsofthereturnedDataFrametoconfirmthatitwascreatedcorrectly.

11. InordertomanipulatethedatausingcoreSpark,converttheDataFrameintoapairRDDusingthemapmethod.TheinputintothemapmethodisaRowobject.Thekeyistheweb_page_numvalue,andthevalueistheassociated_filesstring.

InPython,youcandynamicallyreferencethecolumnvalueoftheRowbyname:

pyspark> aFilesRDD = assocFilesDF.map(lambda row: \

(row.web_page_num,row.associated_files))

InScala,usethecorrectgetmethodforthetypeofvaluewiththecolumnindex:

scala> val aFilesRDD = assocFilesDF.

map(row => (row.getAs[Short]("web_page_num"),

row.getAs[String]("associated_files")))

Page 69: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

69

69

12. NowthatyouhaveanRDD,youcanusethefamiliarflatMapValuestransformationtosplitandextractthefilenamesintheassociated_filescolumn:

pyspark> aFilesRDD2 = aFilesRDD \

.flatMapValues( \

lambda filestring:filestring.split(','))

scala> val aFilesRDD2 =

aFilesRDD.flatMapValues(filestring =>

filestring.split(','))

13. ImporttheRowclassandconvertthepairRDDtoaRowRDD.(Note:thisstepisonlynecessaryinScala.)

scala> import org.apache.spark.sql.Row

scala> val aFilesRowRDD = aFilesRDD2.map(pair =>

Row(pair._1,pair._2))

14. ConverttheRDDbacktoaDataFrame,usingtheoriginalDataFrame’sschema:

pyspark> aFileDF = sqlContext. \

createDataFrame(aFilesRDD2,assocFilesDF.schema)

scala> val aFileDF = sqlContext.

createDataFrame(aFilesRowRDD,assocFilesDF.schema)

15. CallprintSchemaonthenewDataFrame.NotethatSparkSQLgavethecolumnsthesamenamestheyhadoriginally:web_page_numandassociated_files.Thesecondcolumnnameisnolongeraccurate,becausethedatainthecolumnreflectsonlyasingleassociatedfile.

Page 70: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

70

70

16. CreateanewDataFramewiththeassociated_filescolumnrenamedtoassociated_file:

pyspark> finalDF = aFileDF. \

withColumnRenamed('associated_files', \

'associated_file')

scala> val finalDF = aFileDF.

withColumnRenamed("associated_files",

"associated_file")

17. CallfinalDF.printSchema()toconfirmthatthenewDataFramehasthecorrectcolumnnames.

18. Callshow(5)onthenewDataFrametoconfirmthatthefinaldataiscorrect.

19. YourfinalDataFramecontainstheprocesseddata,sosaveitinParquetformat(thedefault)indirectory/loudacre/webpage_files.

pyspark> finalDF.write. \

mode("overwrite"). \

save("/loudacre/webpage_files")

scala> finalDF.write.

mode("overwrite").

save("/loudacre/webpage_files")

20. UsingHueortheHDFScommandlinetool,listtheParquetfilesthatweresavedbySparkSQL.

21. Optional:Useparquet-tools schemaandparquet-tools headtoreviewtheschemaandsomesampledataofthegeneratedfiles.

Page 71: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

71

71

22. Optional:IntheSparkwebUI,tryviewingtheSQLtab.Howmanyquerieswerecompletedaspartofthisexercise?Howmanyjobs?

This is the end of the exercise

Page 72: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

72

72

Hands-On Exercise: Produce and Consume Apache Kafka Messages

Files and Data Used in This Exercise

Exercise directory: $DEVSH/exercises/kafka

Inthisexercise,youwilluseKafka’scommandlinetooltocreateaKafkatopic.Youwillalsousethecommandlineproducerandconsumerclientstopublishandreadmessages.

Creating a Kafka Topic

1. OpenanewterminalwindowandcreateaKafkatopicnamedweblogsthatwillcontainmessagesrepresentinglinesinLoudacre’swebserverlogs.Sinceyourexerciseenvironmentisasingle-nodeclusterrunningonavirtualmachine,useareplicationfactorof1andasinglepartition.

$ kafka-topics --create \

--zookeeper localhost:2181 \

--replication-factor 1 \

--partitions 1 \

--topic weblogs

Youwillseethemessage:Created topic "weblogs".

2. DisplayallKafkatopicstoconfirmthatthenewtopicyoujustcreatedislisted:

$ kafka-topics --list \

--zookeeper localhost:2181

Page 73: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

73

73

Producing and Consuming Messages

YouwillnowuseKafkacommandlineutilitiestostartproducersandconsumersforthetopiccreatedearlier.

3. StartaKafkaproducerfortheweblogstopic:

$ kafka-console-producer \

--broker-list localhost:9092 \

--topic weblogs

Tip:Thisexerciseinvolvesusingmultipleterminalwindows.Toavoidconfusion,setadifferenttitleforeachonebyselectingSetTitle…ontheTerminalmenu:

Setthetitleforthiswindowto“KafkaProducer.”

4. PublishatestmessagetotheweblogstopicbytypingthemessagetextandthenpressingEnter.Forexample:

test weblog entry 1

5. Openanewterminalwindowandadjustittofitonthewindowbeneaththeproducerwindow.Setthetitleforthiswindowto“KafkaConsumer.”

6. Inthenewterminalwindow,startaKafkaconsumerthatwillreadfromthebeginningoftheweblogstopic:

Page 74: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

74

74

$ kafka-console-consumer \

--zookeeper localhost:2181 \

--topic weblogs \

--from-beginning

Youshouldseethestatusmessageyousentusingtheproducerdisplayedontheconsumer’sconsole,suchas:

test weblog entry 1

7. PressCtrl+Ctostoptheweblogsconsumer,andrestartit,butthistimeomitthe--from-beginningoptiontothiscommand.Youshouldseethatnomessagesaredisplayed.

8. Switchbacktotheproducerwindowandtypeanothertestmessageintotheterminal,followedbytheEnterkey:

test weblog entry 2

9. Returntotheconsumerwindowandverifythatitnowdisplaysthealertmessageyoupublishedfromtheproducerinthepreviousstep.

Cleaning Up

10. PressCtrl+Cintheconsumerterminalwindowtoenditsprocess.

11. PressCtrl+Cintheproducerterminalwindowtoenditsprocess.

This is the end of the exercise

Page 75: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

75

75

Hands-On Exercise: Collect Web Server Logs with Apache Flume

Files and Data Used in This Exercise

Exercise directory: $DEVSH/exercises/flume

Data files (local): $DEVDATA/weblogs/*

Inthisexercise,youwillrunaFlumeagenttoingestweblogdatafromalocaldirectorytoHDFS.

Apachewebserverlogsaregenerallystoredinfilesonthelocalmachinesrunningtheserver.Inthisexercise,youwillsimulateanApacheserverbyplacingprovidedweblogfilesintoalocalspooldirectory,andthenuseFlumetocollectthedata.

BoththelocalandHDFSdirectoriesmustexistbeforeusingthespoolingdirectorysource.

Creating an HDFS Directory for Flume-Ingested Data

1. CreateadirectoryinHDFScalled/loudacre/weblogs_flumetoholdthedatafilesFlumeingests:

$ hdfs dfs -mkdir -p /loudacre/weblogs_flume

Creating a Local Directory for Web Server Log Output

2. CreatethespooldirectoryintowhichtheweblogsimulatorwillstoredatafilesforFlumetoingest.OnthelocalLinuxfilesystemcreate/flume/weblogs_spooldir:

$ sudo mkdir -p /flume/weblogs_spooldir

Page 76: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

76

76

3. Giveallusersthepermissionstowritetothe/flume/weblogs_spooldirdirectory:

$ sudo chmod a+w -R /flume

Configuring Flume

AFlumeagentconfigurationfilehasbeenprovidedforyou:$DEVSH/exercises/flume/spooldir.conf.

Reviewtheconfigurationfile.Youdonotneedtoeditthisfile.Takenoteinparticularofthefollowing:

• Thesourceisaspoolingdirectorysourcethatpullsfromthelocal/flume/weblogs_spooldirdirectory.

• ThesinkisanHDFSsinkthatwritesfilestotheHDFS/loudacre/weblogs_flumedirectory.

• Thechannelisamemorychannel.

Running the Flume Agent

Next,starttheFlumeagentandcopythefilestothespoolingdirectory.

4. Changedirectoriestotheexercisedirectory.

$ cd $DEVSH/exercises/flume

5. StarttheFlumeagentusingtheconfigurationyoujustreviewed:

$ flume-ng agent --conf /etc/flume-ng/conf \

--conf-file spooldir.conf \

--name agent1 -Dflume.root.logger=INFO,console

Page 77: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

77

77

6. WaitafewmomentsfortheFlumeagenttostartup.Youwillseeamessagelike:Component type: SOURCE, name: webserver-log-source

started

Simulating Apache Web Server Output

7. Openaseparateterminalwindow.Runthescripttoplacetheweblogfilesinthe/flume/weblogs_spooldirdirectory:

$ $DEVSH/exercises/flume/copy-move-weblogs.sh \

/flume/weblogs_spooldir

Thisscriptwillcreateatemporarycopyoftheweblogfilesandmovethemtothespooldirdirectory.

8. ReturntotheterminalthatisrunningtheFlumeagentandwatchtheloggingoutput.TheoutputwillgiveinformationaboutthefilesFlumeisputtingintoHDFS.

9. OncetheFlumeagenthasfinished,enterCtrl+Ctoterminatetheprocess.

10. UsingthecommandlineorHueFileBrowser,listthefilesthatwereaddedbytheFlumeagentintheHDFSdirectory/loudacre/weblogs_flume.

NotethatthefilesthatwereimportedaretaggedwithaUnixtimestampcorrespondingtothetimethefilewasimported,suchasFlumeData.1427214989392.

This is the end of the exercise

Page 78: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

78

78

Hands-On Exercise: Send Web Server Log Messages from Apache Flume to Apache Kafka

Files and Data Used in This Exercise

Exercise directory: $DEVSH/exercises/flafka

Data files (local): $DEVDATA/weblogs/*

Inthisexercise,youwillrunaFlumeagentthatingestsweblogsfromalocalspooldirectoryandsendseachlineasamessagetoaKafkatopic.

TheFlumeagentisconfiguredtosendmessagestotheweblogstopicyoucreatedearlier.

Important:Thisexercisedependsontwopriorexercises:“CollectWebServerLogswithFlume”and“ProduceandConsumeKafkaMessages.”Ifyoudidnotcompletebothoftheseexercises,runthecatch-upscriptandadvancetothecurrentexercise:

$ $DEVSH/scripts/catchup.sh

Configuring a Flume Agent with a Kafka Sink

AFlumeagentconfigurationfilehasbeenprovidedforyou:$DEVSH/exercises/flafka/spooldir_kafka.conf

1. Reviewtheconfigurationfile.Youdonotneedtoeditthisfile.Takenoteinparticularofthefollowingpoints:

• Thesourceandchannelconfigurationsareidenticaltotheonesinthe“CollectWebServerLogswithFlume”exercise:aspoolingdirectorysourcethatpullsfromthelocal/flume/weblogs_spooldirdirectory,andamemorychannel.

Page 79: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

79

79

• InsteadofanHDFSsink,thisconfigurationusesaKafkasinkthatpublishesmessagestotheweblogstopic.

Running the Flume Agent

2. Changetotheexercisedirectory:

$ cd $DEVSH/exercises/flafka

3. StarttheFlumeagentusingtheconfigurationyoujustreviewed:

$ flume-ng agent --conf /etc/flume-ng/conf \

--conf-file spooldir_kafka.conf \

--name agent1 -Dflume.root.logger=INFO,console

4. WaitafewmomentsfortheFlumeagenttostartup.Youwillseeamessagelike:Component type: SINK, name: kafka-sink started

Tip:Thisexerciseinvolvesusingmultipleterminalwindows.Toavoidconfusion,setadifferenttitleforeachwindow.Setthetitleofthecurrentwindowto“FlumeAgent.”

Testing the Flume Agent Kafka Sink

5. Inanewterminalwindow,startaKafkaconsumerthatwillreadfromtheweblogstopic:

$ kafka-console-consumer \

--zookeeper localhost:2181 \

--topic weblogs

Tip:Setthetitleofthiswindowto“KafkaConsumer.”

6. Inaseparatenewterminalwindow,changetotheexercisedirectory.Runthescripttoplacetheweblogfilesinthe/flume/weblogs_spooldirdirectory:

Page 80: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

80

80

$ cd $DEVSH/exercises/flafka

$ ./copy-move-weblogs.sh /flume/weblogs_spooldir

NotethatifyoucompletedanearlierFlumeexerciseorrancatchup.sh,thescriptwillpromptyouwhetheryouwanttoclearoutthespooldirdirectory.Besuretoenterywhenprompted.

7. IntheterminalthatisrunningtheFlumeagent,watchtheloggingoutput.TheoutputwillgiveinformationaboutthefilesFlumeisingestingfromthesourcedirectory.

8. IntheterminalthatisrunningtheKafkaconsumer,confirmthattheconsumertoolisdisplayingeachmessage(thatis,eachlineoftheweblogfileFlumeisingesting).

9. OncetheFlumeagenthasfinished,enterCtrl+CinboththeFlumeagentterminalandtheKafkaconsumerterminaltoendtheirrespectiveprocesses.

This is the end of the exercise

Page 81: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

81

81

Hands-On Exercise: Write an Apache Spark Streaming Application

Files and Directories Used in This Exercise:

Exercise directory: $DEVSH/exercises/spark-streaming

Python stub: stubs-python/StreamingLogs.py

Python solution: solution-python/StreamingLogs.py

Scala project:

Project directory: streaminglogs_project

Stub class: stubs.StreamingLogs

Solution class: solution.StreamingLogs

Test data (local): $DEVDATA/weblogs/*

Test script: streamtest.py

Inthisexercise,youwillwriteaSparkStreamingapplicationtocount

KnowledgeBasearticlerequests.

Thisexercisehastwoparts.First,youwillreviewtheSparkStreamingdocumentation.ThenyouwillwriteandtestaSparkStreamingapplicationtoreadstreamingwebserverlogdataandcountthenumberofrequestsforKnowledgeBasearticles.

Reviewing the Spark Streaming Documentation

1. ViewtheSparkStreamingAPIbyopeningtheSparkAPIdocumentationforeitherScalaorPythonandthen:

ForScala:

• Scrolldownandselecttheorg.apache.spark.streamingpackageinthepackagepaneontheleft.

Page 82: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

82

82

• FollowthelinksatthetopofthepackagepagetoviewtheDStreamandPairDStreamFunctionsclasses—thesewillshowyouthemethodsavailableonaDStreamofregularRDDsandpairRDDsrespectively.

ForPython:

• Gotothepyspark.streamingmodule.

• Scrolldowntothepyspark.streaming.DStreamclassandreviewtheavailablemethods.

2. YoumayalsowishtoviewtheSparkStreamingProgrammingGuide(selectProgrammingGuides>SparkStreamingontheSparkdocumentationmainpage).

Simulating Streaming Web Logs

Tosimulateastreamingdatasource,youwillusetheprovidedstreamtest.pyPythonscript,whichwaitsforaconnectiononthehostandportspecifiedand,onceitreceivesaconnection,sendsthecontentsofthefile(s)specifiedtotheclient(whichwillbeyourSparkStreamingapplication).Youcanspecifythespeed(inlinespersecond)atwhichthedatashouldbesent.

3. Changetotheexercisedirectory.

$ cd $DEVSH/exercises/spark-streaming

4. StreamtheLoudacreweblogfilesatarateof20linespersecondusingtheprovidedtestscript.

$ python streamtest.py localhost 1234 20 \

$DEVDATA/weblogs/*

Thisscriptwillexitaftertheclientdisconnects,soyouwillneedtorestartthescriptwheneveryourestartyourSparkapplication.

Page 83: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

83

83

Writing a Spark Streaming Application

5. TohelpyougetstartedwritingaSparkStreamingapplication,stubfileshavebeenprovidedforyou.

ForPython,startwiththestubfileStreamingLogs.pyinthe$DEVSH/exercises/spark-streaming/stubs-pythondirectory,whichimportsthenecessaryclassesfortheapplication.

ForScala,aMavenprojectdirectorycalledstreaminglogs_projecthasbeenprovidedintheexercisedirectory($DEVSH/exercises/spark-streaming).Tocompletetheexercise,startwiththestubcodeinsrc/main/scala/stubs/StreamingLogs.scala,whichimportsthenecessaryclassesfortheapplication.

6. DefineaStreamingcontextwithaone-secondbatchduration.

7. CreateaDStreambyreadingthedatafromthehostandportprovidedasinputparameters.

8. FiltertheDStreamtoonlyincludelinescontainingthestringKBDOC.

9. Toconfirmthatyourapplicationiscorrectlyreceivingthestreamingweblogdata,displaythefirstfiverecordsinthefilteredDStreamforeachone-secondbatch.(InScala,usetheDStreamprintfunction;inPython,usepprint.)

10. ForeachRDDinthefilteredDStream,displaythenumberofitems—thatis,thenumberofrequestsforKBarticles.

Tip:Pythondoesnotallowcallingprintwithinalambdafunction,socreateanameddefinedfunctiontoprint.

11. SavethefilteredlogstotextfilesinHDFS.Usethebasedirectoryname/loudacre/streamlog/kblogs.

12. Finally,starttheStreamingcontext,andthencallawaitTermination().

Page 84: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

84

84

Testing the Application

13. Inanewterminalwindow,changetothecorrectdirectoryforthelanguageyouareusingforyourapplication.

ForPython,changetotheexercisedirectory:

$ cd $DEVSH/exercises/spark-streaming

ForScala,changetotheprojectdirectoryfortheexercise:

$ cd \

$DEVSH/exercises/spark-streaming/streaminglogs_project

14. IfyouareusingScala,buildyourapplicationJARfileusingthemvn packagecommand.

15. Usespark-submittorunyourapplicationlocallyandbesuretospecifytwothreads;atleasttwothreadsornodesarerequiredtorunastreamingapplication,whiletheVMclusterhasonlyone.TheStreamingLogsapplicationtakestwoparameters:thehostnameandtheportnumbertoconnecttheDStreamto.Specifythesamehostandportatwhichthetestscriptyoustartedearlierislistening.

$ spark-submit --master 'local[2]' \

stubs-python/StreamingLogs.py localhost 1234

Note:Usesolution-python/StreamingLogs.pytorunthesolutionapplicationinstead.

$ spark-submit --master 'local[2]' \

--class stubs.StreamingLogs \

target/streamlog-1.0.jar localhost 1234

Note:Use--classsolution.StreamingLogstorunthesolutionclassinstead.

Page 85: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

85

85

16. Afterafewmoments,theapplicationshouldconnecttothetestscript’ssimulatedstreamofwebserverlogoutput.Confirmthatforeverybatchofdatareceived(everysecond),theapplicationdisplaysthefirstfewKnowledgeBaserequestsandthecountofrequestsinthebatch.ReviewtheHDFSfilestheapplicationsavedin/loudacre/streamlog.

17. Returntotheterminalwindowinwhichyoustartedthestreamtest.pytestscriptearlier.StopthetestscriptbytypingCtrl+C.Youdonotneedtowaituntilalltheweblogdatahasbeensent.

Warning: Stopping Your Application

You must stop the test script before stopping your Spark Streaming application.

If you attempt to stop the application while the test script is still running, you

may find that the application appears to hang while it takes several minutes to

complete. (It will make repeated attempts to reconnect with the data source,

which the test script does not support.)

18. Afterthetestscripthasstopped,stopyourapplicationbytypingCtrl+Cintheterminalwindowtheapplicationisrunningin.

This is the end of the exercise

Page 86: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

86

86

Hands-On Exercise: Process Multiple Batches with Apache Spark Streaming

Files and Data Used in This Exercise

Exercise directory: $DEVSH/exercises/spark-streaming-multi

Python stub: stubs-python/StreamingLogsMB.py

Python solution: solution-python/StreamingLogsMB.py

Scala project:

Project directory: streaminglogsMB_project

Stub class: stubs.StreamingLogsMB

Solution class: solution.StreamingLogsMB

Data (local): $DEVDATA/weblogs/*

Inthisexercise,youwillwriteaSparkStreamingapplicationtocountweb

pagerequestsovertime.

Simulating Streaming Web Logs

Tosimulateastreamingdatasource,youwillusetheprovidedstreamtest.pyPythonscript,whichwaitsforaconnectiononthehostandportspecifiedand,onceitreceivesaconnection,sendsthecontentsofthefile(s)specifiedtotheclient(whichwillbeyourSparkStreamingapplication).Youcanspecifythespeed(inlinespersecond)atwhichthedatashouldbesent.

Page 87: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

87

87

1. Changetotheexercisedirectory.

$ cd $DEVSH/exercises/spark-streaming-multi

2. StreamtheLoudacreWeblogfilesatarateof20linespersecondusingtheprovidedtestscript.

$ python streamtest.py localhost 1234 20 \

$DEVDATA/weblogs/*

Thisscriptexitsaftertheclientdisconnects,soyouwillneedtorestartthescriptwhenyourestartyourSparkapplication.

Displaying the Total Request Count

3. Astubfileforthisexercisehasbeenprovidedforyouintheexercisedirectory.ThestubcodecreatesaStreamingcontextforyou,andcreatesaDStreamcalledlogsbasedonweblogrequestmessagesreceivedonanetworksocket.

ForPython,startwiththestubfileStreamingLogsMB.pyinthestubs-pythondirectory.

ForScala,aMavenprojectdirectorycalledstreaminglogsMB_projecthasbeenprovidedintheexercisedirectory.Tocompletetheexercise,startwiththestubcodeinsrc/main/scala/stubs/StreamingLogsMB.scala.

4. Enablecheckpointingtoadirectorycalledlogcheckpt.

5. Countthenumberofpagerequestsoverawindowoffiveseconds.Printouttheupdatedfive-secondtotaleverytwoseconds.

• Hint:UsethecountByWindowfunction.

Building and Running Your Application

6. Inadifferentterminalwindowthantheoneinwhichyoustartedthestreamtest.pyscript,changetothecorrectdirectoryforthelanguageyouareusingforyourapplication.

Page 88: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

88

88

ForPython,changetotheexercisedirectory:

$ cd $DEVSH/exercises/spark-streaming-multi

ForScala,changetotheprojectdirectoryfortheexercise:

$ cd \

$DEVSH/exercises/spark-streaming-multi/streaminglogsMB_project

7. IfyouareusingScala,buildyourapplicationJARfileusingthemvn packagecommand.

8. Usespark-submittorunyourapplicationlocallyandbesuretospecifytwothreads;atleasttwothreadsornodesarerequiredtorunningastreamingapplication,whiletheVMclusterhasonlyone.Yourapplicationtakestwoparameters:thehostnameandtheportnumbertoconnecttheDStreamto.Specifythesamehostandportatwhichthetestscriptyoustartedearlierislistening.

$ spark-submit --master 'local[2]' \

stubs-python/StreamingLogsMB.py localhost 1234

• Note:Usesolution-python/StreamingLogsMB.pytorunthesolutionapplicationinstead.

$ spark-submit --master 'local[2]' \

--class stubs.StreamingLogsMB \

target/streamlogmb-1.0.jar localhost 1234

• Note:Use--classsolution.StreamingLogsMBtorunthesolutionclassinstead.

9. Afterafewmoments,theapplicationshouldconnecttothetestscript’ssimulatedstreamofwebserverlogoutput.Confirmthatforeverybatchofdatareceived(everysecond),theapplicationdisplaysthefirstfewKnowledgeBaserequestsandthecountofrequestsinthebatch.Reviewthefiles.

Page 89: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

89

89

10. Returntotheterminalwindowinwhichyoustartedthestreamtest.pytestscriptearlier.StopthetestscriptbytypingCtrl+C.Youdonotneedtowaituntilalltheweblogdatahasbeensent.

Warning: Stopping Your Application

You must stop the test script before stopping your Spark Streaming application.

If you attempt to stop the application while the test script is still running, you

may find that the application appears to hang while it takes several minutes to

complete. (It will make repeated attempts to reconnect with the data source,

which the test script does not support.)

11. Afterthetestscripthasstopped,stopyourapplicationbytypingCtrl+Cintheterminalwindowtheapplicationisrunningin.

Bonus Exercise

Extendtheapplicationyouwroteabovetoalsocountthetotalnumberofpagerequestsbyuserfromthestartoftheapplication,andthendisplaythetoptenuserswiththehighestnumberofrequests.

Followthestepsbelowtoimplementasolutionforthisbonusexercise:

1. Usemap-reducetocountthenumberoftimeseachusermadeapagerequestineachbatch(ahit-count).

• Hint:RememberthattheUserIDisthe3rdfieldineachline.

2. DefineafunctioncalledupdateCountthattakesanarray(inPython)orsequence(inScala)ofhit-countsandanexistinghit-countforauser.Thefunctionshouldreturnthesumofthenewhit-countsplustheexistingcount.

• Hint:AnexampleofanupdateCountfunctionisinthecoursematerialandthecodecanbefoundin$DEVSH/examples/spark/spark-streaming.

3. UseupdateStateByKeywithyourupdateCountfunctiontocreateanewDStreamofusersandtheirhit-countsovertime.

Page 90: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

90

90

4. UsetransformtocallthesortByKeytransformationtosortbyhit-count.

• Hint:Youwillhavetoswapthekey(userID)withthevalue(hit-count)tosort.

Note:Thesolutionfilesforthisexerciseincludecodeforthisbonusexercise.

This is the end of the exercise

Page 91: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

91

91

Hands-On Exercise: Process Apache Kafka Messages with Apache Spark Streaming

Files and Data Used in This Exercise

Exercise directory: $DEVSH/exercises/spark-streaming-kafka

Python stub: stubs-python/StreamingLogsKafka.py

Python solution: stubs-solution/StreamingLogsKafka.py

Scala project:

Project directory: streaminglogskafka_project

Stub class: stubs.StreamingLogsKafka

Solution class: solution.StreamingLogsKafka

Data (local): $DEVDATA/weblogs/*

Inthisexercise,youwillwriteanApacheSparkStreamingapplicationto

handleweblogsreceivedasmessagesonaKafkatopic.

Inapriorexercise,youstartedaFlumeagentthatcollectsweblogfilesfromalocalspooldirectoryandpassesthemtoaKafkasink.Inthisexercise,youwillusethesameFlumeagenttoproducedataandpublishittoKafka.TheKafkatopicwillbethedatasourceforyourSparkStreamingapplication.

Important:Thisexercisedependsonapriorexercise:“SendWebServerLogMessagesfromFlumetoKafka.”Ifyouwereunabletocompletethatexercise,runthecatch-upscriptandadvancetothecurrentexercise:

$ $DEVSH/scripts/catchup.sh

Page 92: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

92

92

Consuming Messages from a Kafka Direct DStream

1. ForPython,startwiththestubfileStreamingLogsKafka.pyinthestubs-pythondirectory,whichimportsthenecessaryclassesfortheapplication.

ForScala,aMavenprojectdirectorycalledstreaminglogskafka_projecthasbeenprovidedintheexercisedirectory.Tocompletetheexercise,startwiththestubcodeinsrc/main/scala/stubs/StreamingLogsKafka.scala,whichimportsthenecessaryclassesfortheapplication.

2. CreateaDStreamusingKafkaUtils.createDirectStream.

• Thebrokerlistconsistsofasinglebroker:localhost:9092.

• Thetopiclistconsistsofasingletopic:theargumenttothemainfunctionpassedinbytheuserwhentheapplicationissubmitted.

RefertothecoursematerialsforthedetailsofcreatingaKafkastream.

3. Kafkamessagesarein(key,value)form,butforthisapplication,thekeyisnullandonlythevalueisneeded.(Thevalueistheweblogline.)MaptheDStreamtoremovethekeyanduseonlythevalue.

4. ToverifythattheDStreamiscorrectlyreceivingmessages,displaythefirst10elementsineachbatch.

5. ForeachRDDintheDStream,displaythenumberofitems—thatis,thenumberofrequests.

Tip:Pythondoesnotallowcallingprintwithinalambdafunction,sodefineanamedfunctiontoprint.

6. SavethefilteredlogstotextfilesinHDFS.Usethebasedirectoryname/loudacre/streamlog/kafkalogs.

Building and Running Your Application

7. Changetothecorrectdirectoryforthelanguageyouareusingforyourapplication.

Page 93: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

93

93

ForPython,changetotheexercisedirectory:

$ cd $DEVSH/exercises/spark-streaming-kafka

ForScala,changetotheprojectdirectoryfortheexercise:

$ cd \

$DEVSH/exercises/spark-streaming-kafka/streaminglogskafka_project

8. IfyouareusingScala,youwillneedtobuildyourapplicationJARfileusingthemvn packagecommand.

9. Usespark-submittorunyourapplicationlocallyandbesuretospecifytwothreads;atleasttwothreadsornodesarerequiredtorunastreamingapplication,whiletheVMclusterhasonlyone.Yourapplicationtakesoneparameter:thenameoftheKafkatopicfromwhichtheDStreamwillreadmessages,weblogs.

$ spark-submit --master 'local[2]' \

stubs-python/StreamingLogsKafka.py weblogs

• Note:Usesolution-python/StreamingLogsKafka.pytorunthesolutionapplicationinstead.

$ spark-submit --master 'local[2]' \

--class stubs.StreamingLogsKafka \

target/streamlogkafka-1.0.jar weblogs

• Note:Use--classsolution.StreamingLogsKafkatorunthesolutionclassinstead.

Producing Messages for Spark Streaming

10. Inaseparateterminalwindow,startaFlumeagentusingtheconfigurationfilefromtheFlume/Kafkaexercise:

Page 94: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

94

94

$ flume-ng agent --conf /etc/flume-ng/conf \

--conf-file \

$DEVSH/exercises/flafka/spooldir_kafka.conf \

--name agent1 -Dflume.root.logger=INFO,console

11. WaitafewmomentsfortheFlumeagenttostartup.Youwillseeamessagelike:Component type: SINK, name: kafka-sink started

12. Inaseparatenewterminalwindow,runthescripttoplacetheweblogfilesinthe/flume/weblogs_spooldirdirectory.

IfyoucompletedtheFlumeexercisesorrancatchup.shpreviously,thescriptwillpromptwhetheryouwanttoclearoutthespooldirdirectory.Besuretoenterywhenprompted.

$ $DEVSH/exercises/flafka/copy-move-weblogs.sh \

/flume/weblogs_spooldir

• Note:Youcanrerunthecopy-move-weblogs.shscripttosendtheweblogdatatoSparkStreamingagainifneededtotestyourapplication.

13. ReturntotheterminalwindowwhereyourSparkapplicationisrunningtoverifythecountoutput.AlsoreviewthecontentsofthesavedfilesinHDFSdirectories/loudacre/streamlog/kafkalogs-<time-stamp>.Thesedirectoriesholdpartfilescontainingthepagerequests.

Cleaning Up

14. StoptheFlumeagentbypressingCtrl+C.Youdonotneedtowaituntilalltheweblogdatahasbeensent.

15. StoptheSparkapplicationbypressingCtrl+C.(YoumayseeseveralerrormessagesresultingfromtheinterruptionofthejobinSpark;youmaydisregardthese.)

Page 95: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

95

95

This is the end of the exercise

Page 96: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

96

96

Appendix A: Enabling Jupyter Notebook for PySpark Jupyter(iPythonNotebook)isinstalledontheVMforthiscourse.Touseitinsteadofthecommand-lineversionofPySpark,followthesesteps:

1. Openthefollowingfileforediting:/home/training/.bashrc

2. Uncommentoutthefollowingline(removetheleading#).

# export PYSPARK_DRIVER_PYTHON_OPTS='notebook ……..jax'

3. Savethefile.

4. Openanewterminalwindow.(Itmustbeanewterminalsoitreloadsyouredited.bashrcfile.)

5. ConfirmthechangeswiththefollowingLinuxcommand:

$ env | grep PYSPARK

Theoutputshouldbeasfollows.Ifyoudonotseethisoutput,the.bashrcfilewasnoteditedorsavedproperly.

PYSPARK_DRIVER_PYTHON=ipython

PYSPARK_DRIVER_PYTHON_OPTS=notebook --ip 127.0.0.1

--port 3333 --no-mathjax

6. Enterpysparkintheterminal.Thiswillcauseabrowserwindowtoopen,andyoushouldseethefollowingwebpage:

Page 97: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

97

97

7. OntherighthandsideofthepageselectPython2fromtheNewmenu.

8. EntersomeSparkcodesuchasthefollowingandusetheplaybuttontoexecuteyourSparkcode.

9. Noticetheoutputdisplayed.

Page 98: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

98

98

Page 99: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

99

99

Appendix B: Managing Services on the Course Virtual Machine OnthecourseVM,Hadoopisinstalledinpseudo-distributedmode.ThismeansalltheHadoopservices(daemons)thatwouldnormallyberunningondifferentmachinesinaclusterarerunningonasinglemachine.Thismachineisplayingtherolesofclientnodes,masternodes,andworkernodeswhenyouperformtheexercises.TherearemanyservicesthatcouldberunningontheVMatanygiventime,suchastheNameNode,DataNode,YARNResourceManager,andHue.YoucandiscoverwhatservicesareavailableontheVMbylistingthecontentofthe/etc/init.d/directoryontheLinuxfilesystem.If,forexample,theNameNodedaemonstoppedworking,youwouldneedtoknowthenameoftheserviceinordertocheckitsstatus,startit,stopit,andsoon.Thisisthecommandtofindthenameofthatservice:

$ ls -c1 /etc/init.d/*hdfs*

Fromthelistthatthelscommandproduces,youcanseethespecificnameoftheNameNodeservice:hadoop-hdfs-namenode.Tomanagetheservice,youneedsuperuserprivileges.Youcouldswitchtotherootuser(passwordtraining)orusethesudocommand("superuserdo").Usetheservicecommandwithsudotomanagetheservice.Tolisttheavailableoptionsyoucanspecify,usesudo servicewithaservicenameandnoadditionaloptions:

$ sudo service hadoop-hdfs-namenode

Tocheckthestatusofaservice,usethestatusoption:

$ sudo service hadoop-hdfs-namenode status

Tostartaservicethatisnotrunning,usethestartoption:

Page 100: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

100

100

$ sudo service hadoop-hdfs-namenode start

Torestartaservicethatisalreadyrunning,usetherestartoption:

$ sudo service hadoop-hdfs-namenode restart

BelowisalistofservicesinstalledontheVMthatarepertinenttothiscourse:

GeneralApacheHadoopandYARNServices

• hadoop-hdfs-datanode

• hadoop-hdfs-namenode

• hadoop-yarn-nodemanager

• hadoop-yarn-resourcemanager

HadoopMapReduceServices

• hadoop-mapreduce-historyserver

ApacheSparkServices

• spark-history-server ApacheHive/ApacheImpalaServices

• impala-catalog

• impala-state-store

• impala-server

• hive-metastore

• hive-server2

Page 101: Developer Training for Apache Spark and Hadoop: Hands-On … · distributed mode is a method of running Hadoop whereby all Hadoop daemons run on the same machine. It is, essentially,

Copyright © 2010-2016 Cloudera, Inc. All rights reserved. Not to be reproduced or shared without prior written consent from Cloudera.

101

101

OtherImportantServices

• hue

• kafka-server

• mysqld

• zookeeper-server