big data systems - courses.cs.washington.edumapreduce input • where does input come from? •...
TRANSCRIPT
BigDataSystems
BigDataParallelism
• Hugedataset
• crawleddocuments,webrequestlogs,etc.
• Naturalparallelism:
• canworkondifferentpartsofdataindependently
• imageprocessing,grep,indexing,manymore
Challenges
• ParallelizeapplicaFon
• Wheretoplaceinputandoutputdata?
• WheretoplacecomputaFon?
• Howtocommunicatedata?Howtomanagethreads?HowtoavoidnetworkboJlenecks?
• BalancecomputaFons
• HandlefailuresofnodesduringcomputaFon
• SchedulingseveralapplicaFonswhowanttoshareinfrastructure
GoalofMapReduce
• TosolvethesedistribuFon/fault-toleranceissuesonceinareusablelibrary
• Toshieldtheprogrammerfromhavingtore-solvethemforeachprogram
• Toobtainadequatethroughputandscalability
• Toprovidetheprogrammerwithaconceptualframeworkfordesigningtheirparallelprogram
MapReduce
• Overview:
• ParFFonlargedatasetintoMsplits
• RunmaponeachparFFon,whichproducesRlocalparFFons;usingaparFFonfuncFonR
• Hiddenintermediateshufflephase
• RunreduceoneachintermediateparFFon,whichproducesRoutputfiles
Details
• Inputvalues:setofkey-valuepairs
• Jobwillreadchunksofkey-valuepairs
• “key-value”pairsagoodenoughabstracFon
• Map(key,value):
• SystemwillexecutethisfuncFononeachkey-valuepair
• Generateasetofintermediatekey-valuepairs
• Reduce(key,values):
• Intermediatekey-valuepairsaresorted
• ReducefuncFonisexecutedontheseintermediatekey-values
Countwordsinweb-pages
Map(key,value){//keyisurl//valueisthecontentoftheurlForeachwordWinthecontentGenerate(W,1);}
Reduce(key,values){//keyisword(W)//valuesarebasicallyall1sSum=Sumall1sinvalues
//generateword-countpairsGenerate(key,sum);}
Reverseweb-linkgraph
Gotogoogleadvancedsearch:"findpagesthatlinktothepage:"cnn.com
Map(key,value){//key=url//value=contentForeachurl,linkingtotargetGenerate(outputtarget,url);}
Reduce(key,values){//key=targeturl//values=allurlsthatpointtothetargeturlGenerate(key,listofvalues);}
• QuesFon:howdoweimplement“join”inMapReduce?
• ImagineyouhavealogtableLandsomeothertableRthatcontainssayuserinformaFon
• PerformJoin(L.uid==R.uid)
• SaysizeofL>>sizeofR
• Bonus:considerrealworldzipfdistribuFons
Comparisons
• Worthcomparingittootherprogrammingmodels:
• distributedsharedmemorysystems
• bulksynchronousparallelprograms
• key-valuestorageaccessedbygeneralprograms
• MoreconstrainedprogrammingmodelforMapReduce
• OthermodelsarelatencysensiFve,havepoorthroughputefficiency
• MapReduceprovidesforeasyfaultrecovery
ImplementaFon
• Dependsontheunderlyinghardware:sharedmemory,messagepassing,NUMAsharedmemory,etc.
• InsideGoogle:
• commodityworkstaFons
• commoditynetworkinghardware(1Gbps-10Gbpsnow-atnodelevelandmuchsmallerbisecFonbandwidth)
• cluster=100sor1000sofmachines
• storageisthroughGFS
MapReduceInput
• Wheredoesinputcomefrom?
• Inputisstriped+replicatedoverGFSin64MBchunks
• ButinfactMapalwaysreadsfromalocaldisk
• TheyruntheMapsontheGFSserverthatholdsthedata
• Tradeoff:
• Good:Mapreadsatdiskspeed(localaccess)
• Bad:onlytwoorthreechoicesofwhereagivenMapcanrun
• potenFalproblemforloadbalance,stragglers
IntermediateData
• WheredoesMapReducestoreintermediatedata?
• OnthelocaldiskoftheMapserver(notinGFS)
• Tradeoff:
• Good:localdiskwriteisfasterthanwriFngovernetworktoGFSserver
• Bad:onlyonecopy,potenFalproblemforfault-toleranceandload-balance
OutputStorage
• WheredoesMapReducestoreoutput?
• InGFS,replicated,separatefileperReducetask
• SooutputrequiresnetworkcommunicaFon--slow
• ItcanthenbeusedasinputforsubsequentMapReduce
QuesFon
• WhatarethescalabilityboJlenecksforMapReduce?
Scaling
• Mapcallsprobablyscale
• butinputmightnotbeinfinitelyparFFonable,andsmallinput/intermediatefilesincurhighoverheads
• Reducecallsprobablyscale
• butcan’thavemoreworkersthankeys,andsomekeyscouldhavemorevaluesthanothers
• Networkmaylimitscaling
• Stragglerscouldbeaproblem
FaultTolerance
• Themainidea:MapandReducearedeterminisFc,funcFonal,andindependent
• soMapReducecandealwithfailuresbyre-execuFng
• WhatifaworkerfailswhilerunningMap?
• CanwerestartjustthatMaponanothermachine?
• Yes:GFSkeepscopyofeachinputspliton3machines
• Masterknows,tellsReduceworkerswheretofindintermediatefiles
FaultTolerance
• IfaMapfinishes,thenthatworkerfails,doweneedtore-runthatMap?
• Intermediateoutputnowinaccessibleonworker'slocaldisk.
• Thusneedtore-runMapelsewhereunlessallReduceworkershavealreadyfetchedthatMap'soutput.
• WhatifMaphadstartedtoproduceoutput,thencrashed?
• NeedtoensurethatReducedoesnotconsumetheoutputtwice
• WhatifaworkerfailswhilerunningReduce?
RoleoftheMaster
• Keepsstateregardingthestateofeachworkermachine(pingseachmachine)
• Reschedulesworkcorrespondingtofailedmachines
• OrchestratesthepassingoflocaFonstoreducefuncFons
LoadBalance
• WhatifsomeMapmachinesarefasterthanothers?
• Orsomeinputsplitstakelongertoprocess?
• SoluFon:manymoreinputsplitsthanmachines
• MasterhandsoutmoreMaptasksasmachinesfinish
• Thusfastermachinesdobiggershareofwork
• Butthere'saconstraint:
• WanttorunMaptaskonmachinethatstoresinputdata
• GFSkeeps3replicasofeachinputdatasplit
• onlythreeefficientchoicesofwheretoruneachMaptask
Stragglers
• Oqenonemachineisslowatfinishingverylasttask
• badhardware,overloadedwithsomeotherwork
• Loadbalanceonlybalancesnewlyassignedtasks
• SoluFon:alwaysschedulemulFplecopiesofverylasttasks!
HowmanyMRtasks?
• PaperusesM=10xnumberofworkers,R=2x.
• More=>
• finergrainedloadbalance.
• lessredundantworkforstragglerreducFon.
• spreadtasksoffailedworkerovermoremachines
• overlapMapandshuffle,shuffleandReduce.
• Less=>bigintermediatefilesw/lessoverhead.
• MandRalsomaybeconstrainedbyhowdataisstripedinGFS(e.g.,64MBchunks)
Discussion
• whataretheconstraintsimposedonmapandreducefuncFons?
• howwouldyouliketoexpandthecapabilityofmapreduce?
MapReduceCriFcism
• “Giantstepbackwards”inprogrammingmodel
• Sub-opFmalimplementaFon
• “Notnovelatall”
• MissingmostoftheDBfeatures
• IncompaFblewithalloftheDBtools
ComparisontoDatabases
• Hugesourceofcontroversy;claims:
• paralleldatabaseshavemuchmoreadvanceddataprocessingsupportthatleadstomuchmoreefficiency
• supportanindex;selecFonisaccelerated
• providesqueryopFmizaFon
• paralleldatabasessupportamuchrichersemanFcmodel
• supportaschema;sharingacrossapps
• supportSQL,efficientjoins,etc.
WheredoesMRwin?
• Scaling
• Loadingdataintosystem
• Faulttolerance(parFalrestarts)
• Approachability
SparkMoFvaFon
• MRProblems
• cannotsupportcomplexapplicaFonsefficiently
• cannotsupportinteracFveapplicaFonsefficiently
• Rootcause
• Inefficientdatasharing
In MapReduce, the only way to share data across jobs is stable storage -> slow!
MoFvaFon
Goal:In-MemoryDataSharing
Challenge
• HowtodesignadistributedmemoryabstracFonthatisbothfaulttolerantandefficient?
OtheropFons
• ExisFngstorageabstracFonshaveinterfacesbasedonfine-grainedupdatestomutablestate
• E.g.,RAMCloud,databases,distributedmem,Piccolo
• RequiresreplicaFngdataorlogsacrossnodesforfaulttolerance
• Costlyfordata-intensiveapps
• 10-100xslowerthanmemorywrite
RDDAbstracFon
• Restrictedformofdistributedsharedmemory
• immutable,parFFonedcollecFonofrecords
• canonlybebuiltthroughcoarse-graineddeterminisFctransformaFons(map,filter,join…)
• Efficient fault-tolerance using lineage
• Log coarse-grained operations instead of fine-grained data updates
• An RDD has enough information about how it’s derived from other dataset
• Recompute lost partitions on failure
Fault-tolerance
DesignSpace
OperaFons
• TransformaFons(e.g.map,filter,groupBy,join)
• LazyoperaFonstobuildRDDsfromotherRDDs
• AcFons(e.g.count,collect,save)
• Returnaresultorwriteittostorage
lines=spark.textFile(“hdfs://...”)
errors=lines.filter(lambdas:s.startswith(“ERROR”))messages=errors.map(lambdas:s.split(‘\t’)[2])
messages.persist()
messages.filter(lambdas:“foo”ins).count()messages.filter(lambdas:“bar”ins).count()...
Base RDDTransformed RDD
Action
Result: full-text search of Wikipedia in <1 sec(vs 20 sec for on-disk data) Result: scaled to 1 TB data in 5-7 sec
(vs 170 sec for on-disk data)
Example: Mining Console Logs
Load error messages from a log into memory, then interactively search
RDD Fault ToleranceRDDs track the transformations used to build them (their lineage) to recompute lost data
E.g:
messages=textFile(...).filter(lambdas:s.contains(“ERROR”)).map(lambdas:s.split(‘\t’)[2])
HadoopRDD path = hdfs://…
FilteredRDD func = contains(...)
MappedRDD func = split(…)
Lineage
• Sparkusesthelineagetoschedulejobs
• TransformaFononthesameparFFonformastage
• Joins,forexample,areastageboundary
• Needtoreshuffledata
• Ajobrunsasinglestage
• pipelinetransformaFonwithinastage
• SchedulejobwheretheRDDparFFonis
Lineage&FaultTolerance
• Greatopportunityforefficientfaulttolerance
• Let'ssayonemachinefails
• Wanttorecomputeonlyitsstate
• Thelineagetellsuswhattorecompute
• FollowthelineagetoidenFfyallparFFonsneeded
• Recomputethem
• Forlastexample,idenFfyparFFonsoflinesmissing
• Alldependenciesare“narrow”;eachparFFonisdependentononeparentparFFon
• NeedtoreadthemissingparFFonoflines;recomputethetransformaFons
FaultRecovery
Example:PageRank
Optimizing Placement
• links&ranksrepeatedlyjoined
• Canco-parFFonthem(e.g.,hashbothonURL)
• Canalsouseappknowledge,e.g.,hashonDNSname
PageRankPerformance
TensorFlow:SystemforML
• OpenSource,lotsofdevelopers,externalcontributors
• Usedin:RankBrain(rankresults),Photos(imagerecogniFon),SmartReply(automaFcemailresponses)
ThreetypesofML
• Largescaletraining:hugedatasets,generatemodels
• Google’spreviousDistBelieffor100sofmachines
• Lowlatencyinference:runningmodelsindatacenters,phones,etc.
• Customengines
• TesFngnewideas
• Singlenodeflexiblesystems(Torch,Theano)
TensorFlow
• Commonwaytowriteprograms
• Dataflow+Tensors
• Mutablestate
• SimplemathemaFcaloperaFons
• AutomaFcdifferenFaFon
Background: NN Training
• Takeinputimage
• ComputelossfuncFon(forwardpass)
• Computeerrorgradients(backwardpass)
• Updateweights
• Repeat
ComputaFonisaDFG
ExampleCode
ExampleCode
Parameter Server Architecture
Statelessworkers,statefulparameterservers(DHT)CommutaFveupdatestoparameterserver
TensorFlow
• Flexiblearchitectureformappingoperatorsandparameterserverstodifferentdevices
• SupportsmulFpleconcurrentexecuFonsonoverlappingsubgraphsoftheoverallgraph
• IndividualverFcesmayhavemutablestatethatcanbesharedbetweendifferentexecuFonsofthegraph
TensorFlowhandlestheglue
Synchrony?
• AsynchronousexecuFonissomeFmeshelpful,addressesstragglers
• Asynchronycausesconsistencyproblems
• TensorFlow:pursuessynchronoustraining
• Butaddskbackupmachinestoreducethestragglerproblem
• UsesdomainspecificknowledgetoenablethisopFmizaFon
OpenResearchProblems
• AutomaFcplacement:dataflow-greatmechanism,butnotclearhowtouseitappropriately
• mutablestate-splitround-robinacrossparameterservernodes,statelesstasksreplicatedonGPUsasmuchasitfits,restonCPUs
• HowtotakedataflowrepresentaFontogeneratemoreefficientcode?