spark and bloomberg by sudarshan kadambi and partha nageswaran
TRANSCRIPT
Spark @ Bloomberg: Dynamic Composable Analytics
Partha NageswaranSudarshan KadambiBLOOMBERG L.P.
Spark at Bloomberg: Dynamic Composable Analytics
• AdaptationofSparkinBloombergisevolvingfromcraftingstand-aloneSparkAppstoServerized SparkApps
Spark Cluster
Spark App
Spark Cluster
Spark App
Spark Cluster
Spark Server
Spark App
Spark App
A Couple of Tenets• Preferenceforontheflycalculationsoverpre-computedvalues
• Supportanalyticsonanalytics,adinfinituminadynamicmanner
Spark Serverization - Motivation• Stand-aloneSparkAppsonisolatedclustersposechallenges:
– Managementofclusters,replicationofdata,etc.
– Analyticsareconfined tospecificcontentsetsmakingCross-AssetAnalytics muchharder
– NeedtohandleReal-timeingestion ineachApp!
– Redundancy in:
» CraftingandManagingofRDDs/DFs
» Codingofthesameorsimilartypesoftransforms/actions
Spark Cluster
Spark App
Spark Cluster
Spark App
Spark Serverization - Approach• Long-runningprocess(SparkServer)maintainsashared
SparkContext
– SparkAppswithinprocess sharethesameSparkContext
• ProvideaContainerbasedapproachtocapabilitiessuchas:
– Deploying andManagingSparkApplications
– Deploying, ManagingandSharingRDDs/DFsacrossSparkApps
– Handling on-the-flyanalyticsonStreamingdata
– Declarativeorchestrationofhigher-order analyticsonRDDs/DFsandacrossSparkApplications
Spark Server
Spark Context
Spark App
Spark App
DF DF
DF DF
Benefits of Container Approach• ProvideLifecycleManagement forSparkApps
• ProvideLifecycleManagement forRDDs/DFs
• Provideotherdeclarative qualitiesofservices, suchas:
– RoutingofRequests toappropriateSparkApps
– NamingServicecapabilities forRDDs/DFs
– IngestionServices
– SecuringaccesstoSparkApps,andRDDs/DFs
Spark Server
Lifecycle Services
Naming Services
Security Services
RoutingServices
Query Services
IngestionServices
…
Introducing Managed DataFrames(MDFs)
• AManagedDataFrame (MDF)isanamedDataFrame,optionallycombinedwithExecutionMetadata
– MDFscanbesearchedbynameORbyanyColumnNamedefined intheSchemaof thecorrespondingDF
• ExecutionMetadataincludes:
– DataDistribution metadatacapturesinformationabout thedatadepth, histogram information, etc.
– E.g.:AmanagedDataFrame forpricingof stocks,representing 2yearsofhistoricaldata andanotherforrepresenting 30yearsofhistoricaldata
MDF
Price DF<ID, Price>
Name: Shallow
PriceMDF
ExecutionMetadata:* 2 Yr Price
History
MDF
Price DF<ID, Price>
Name: Deep
PriceMDF
ExecutionMetadata:
* 30 Yr Price History
Introducing Managed DataFrames(MDFs)
– DataDerivationmetadatawhicharemathematicalexpressions thatdefinehowadditional columnscanbesynthesized fromexistingcolumns intheschema
– E.g.:adjPrice isaderivedColumn, definedintermsofthebasePricecolumn
– Inessence,anMDFwithdataderivationmetadatahaveaSchemathatisaunionofthecontainedDFandthederivedcolumns
MDF
Name:ShallowPriceDF
ExecutionMetadata:* 2 Yr Price
History* adjPrice =
Price – 3% of Price
Price DF<ID, Price>
MDF
Name:Deep
PriceDF
ExecutionMetadata:
* 30 Yr Price History
* adjPrice = Price – 1.75% of
Price
Price DF<ID, Price>
Introducing MDF Registry• ARegistry, calledtheMDFRegistry
withintheSparkServer providessupportfor:
– BindingMDFsbyName
– LookingupMDFsbyName
– LookingupMDFbyaColumn Name(anelementoftheMDFSchema),etc.
• TheMDFRegistrymaintainsa'table'thatassociates theNameoftheMDFwiththeDFreference andColumnsintheDF
MDFRegistryName Columns DF
Ref.MetaData
ShallowPriceDF
Price,adjPrice
… …
DeepPriceDF
……
…
Price,adjPrice
Introducing DF Function Transform Libraries (FTLs)
• Standard(Analytics)functionscanbeexpressedas'pre-canned'SparkTransforms/ActionsinFunctionTransformLibraries(FTLs)
• SparkAppscancomposepre-cannedtransformswithotherapplicationlogictransformsonMDFs,bylookingupthepre-cannedtransformsfromFTLs
– E.g.:convertedPriceDF =FTL.apply(prices,“ConvertCurrency”,params);
Function Transform Library (FTL)
ConvertCurrency
…
df.join(rates.filter(rates("toCCY")=== toCCY), df("CURRENCY")=== rates("fromCCY") && df("DATE") === rates("DATE")).select(df("ID"), df("DATE"), rates("RATE") * df("VALUE"), rates("toCCY"))
Introducing Request Processor• SparkApps(calledRequestProcessors - RP)
withintheSparkServerareimplementedcomplianttospecifications
– TheseRPsareprovidedaccesstotheRegistryandFTLs
– AreresponsibleforcomposingtransformsandactionsononeormoreMDFs
– MaydynamicallybindadditionalMDFs(materializedorotherwise)forusebyotherApps
Request Handler
Request Processor
.
MDF Registry
lookup MDFs
FTLs
applyFunction
MDFs
decoratewithTransforms
collect
Spark Context
Request Processor
Request Processor
Client Query Request Processor
Request Handler (E.g.: Apache CXF running in Tomcat)
MDF Registry
MDF
12
Function Transform Library
(FTL)
ConvertCurrency
…
use MDF
MDF
MDF
Bloomberg Spark Server
Bloomberg Spark Server
13
Spark Context
Request Processor
Request Processor
Request Processor
Request Handler (E.g.: Apache CXF running in Tomcat)
MDF Registry
MDF1
MDF2
Function Transform Library
(FTL)
currConv …1 2
1 2
Ingestion Manager
Bloomberg Spark Server
14
Spark Context
Request Processor
Request Processor
Request Processor
Request Handler (E.g.: Apache CXF running in Tomcat)
MDF Registry
MDF1
MDF2
Function Transform Library
(FTL)
currConv …1 2
1 2
Ingestion Manager
Schema Repository
15
• Enterprise-widedatapipeline
• External(toSpark)schemarepositoryandservice
• EnablesMDFlookupbyadatasetschemaelement
• Analyticexpressionscannowbecomposedoverdataelements
Execution Metadata
16
• Connection Identifiers
• BackingStores
• Real-time Topics
• StorageLevel&RefreshRate
• SubsetPredicate,etc.
Cross-Domain Analytics
17
• Registrationofpre-materializedDataFrames
• Collaborativeanalyticsbetweenapplicationworkflows
• DynamiccreationofManagedDataFrames
• Ad-hoc cross-domainanalytics
Subsetting
18
• Highvaluedatasub-settedwithinSpark
• Reducecostofqueryingexternaldatastore
• Specifiedasafilterpredicateattimeofregistration
• E.g.Membercompaniesofpopularindices[Dow30,S&P500,…]haverecordsplacedwithinSpark
Subsetting
19
• SeamlessstitchingbetweendatainSpark(DFsubset)andbackingstore(DFsubset’)
• (DFsubset UDFsubset’).filter(query) =DFsubset.filter(query) UDFsubset’.filter(query)
• Future:Predicatelogicbetween queryandsubsetpredicates
• Datasetownersprovidedknobsforcostvsperformance.
• Future:LRUcache
Ingestion: AB Swap
20
• PeriodicdatapullintoSparkfromthebackingstore
• Subsetcriteriaappliedduringdataretrieval
• Scenariowhenbackingstorekeptcontinuouslyupdated,externaltoSpark
• Avro-deserializationpusheddownintovariousconnectors
Ingestion: Stream Reconciliation
21
• Analyticsneedstobelow-latencywithrespecttoqueries,butalsodatafreshness
• Sincedataisbeingsub-settedwithinSpark,needtokeepthesubsetuptodate
• DatasetspublishedtodifferentKafkatopics.
• 1:1mappingbetween datasets,topicsandDStreams.
Ingestion: Stream Reconciliation
22
Backing Store
U1 U2 U3 UN DFsubset
S1 S2 S3 SNDFN
MDF A
Real-Time Stream
(updateStateByKey)
(Avro Deserialize, Subset Predicate)
(foreachRDD, convert to DF)
Reference Counting
23
• AnMDFcontainsmultiplegenerationofDFs,beinggeneratedanddestroyed
• MultiplegenerationsoperateduponbyRPsatgivenpointintime
• ReferencecountingtokeeptrackofwhatDFsarebeingusedandbywhom
• Longrunningqueriesabortedforforcedreclamation
Snapshot Consistency
24
• Multiplequeriesneedtooperateonsamesnapshotofdata
• Howtoachieve,ifdataconstantlychangingunderneath?
• EachDFwithinMDFassociatedwithtimeepoch
• Registrylookupwithareferencetime
• Time-alignsub-setted dataframeswithdatainbackingstore
Spark Challenges
25
• Lowlatencyperformanceconsistency
• EfficientStreamreconciliation
• SparkDriverHA
• Strongconsistencyacrosscontextsneedsstateexternalization
MDF Acknowledgements
Andrew Foster Joe Davey Shubham Chopra
Hamel KothariNimbus Goehausen
THANK [email protected]@bloomberg.net