Download - Data Pipeline at Tapad
![Page 1: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/1.jpg)
Data Pipeline at Tapad
@tobym@TapadEng
![Page 2: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/2.jpg)
Who am I?Toby MatejovskyFirst engineer hired at Tapad 3+ years agoScala developer
@tobym
![Page 3: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/3.jpg)
What are we talking about?
![Page 4: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/4.jpg)
Outline• What Tapad does• Why bother with a data pipeline?• Evolution of the pipeline• Day in the life of a analytics pixel• What’s next
![Page 5: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/5.jpg)
What Tapad DoesCross-platform advertising and analyticsProcess billions of events per day
The Tapad Difference.A Unified View.
![Page 6: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/6.jpg)
Cross platform?Device Graph
Node=deviceedge=inferred connection
Billion devicesQuarter billion edges
85+% accuracy
The Tapad Difference.A Unified View.
![Page 7: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/7.jpg)
Why a Data Pipeline?Graph buildingSanity while processing big dataDecouple componentsData accessible at multiple stages
![Page 8: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/8.jpg)
Graph BuildingRealtime mode, but don’t impact bidding latencyBatch mode
![Page 9: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/9.jpg)
SanityBillions of events, terabytes of logs per dayDon’t have NSA’s budgetClear data retention policyStore aggregations
![Page 10: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/10.jpg)
Decouple ComponentsBidder only bids, graph-building process only builds graph
Data stream can split and merge
![Page 11: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/11.jpg)
Data accessible at multiple stagesLogs on edge of systemLocal spool of dataKafka brokerConsumer local spoolHDFS
![Page 12: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/12.jpg)
Evolution of the Data PipelineDark Ages: Monolithic process, synchronous process
Renaissance: Queues, asynchronous work in same process
Age of Exploration: Inter-process comm, ad hoc batching
Age of Enlightenment: Standardize on Kafka and Avro
![Page 13: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/13.jpg)
Dark AgesMonolithic process, synchronous process
It was fast enough, and we had to start somewhere.
![Page 14: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/14.jpg)
RenaissanceQueues, asynchronous work in same process
No, it wasn’t fast enough.
![Page 15: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/15.jpg)
Age of ExplorationInter-process communication, ad hoc batching
Servers at the edge batch up events, ship them to another service.
![Page 16: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/16.jpg)
Age of EnlightenmentStandardize on Kafka and Avro
Properly engineered and supported, reliable
![Page 17: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/17.jpg)
Age of EnlightenmentStandardize on Kafka and Avro
Properly engineered and supported, reliable
![Page 18: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/18.jpg)
Tangent!Batching, queues, and serialization
The Tapad Difference.A Unified View.
![Page 19: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/19.jpg)
BatchingBatching is great, will really help throughput
Batching != slow
The Tapad Difference.A Unified View.
![Page 20: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/20.jpg)
QueuesQueues are amazing, until they explode and destroy the Rube Goldberg machine.
“I’ll just increase the buffer size.”- spoken one day before someone ended up on double
PagerDuty rotation
The Tapad Difference.A Unified View.
![Page 21: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/21.jpg)
Care and feeding of your queueMonitor
Back-pressure
Buffering
Spooling
Degraded mode
The Tapad Difference.A Unified View.
![Page 22: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/22.jpg)
Serialization - Protocol BuffersTagged fieldsSort of self-describingrequired, optional, repeated fields in schema“Map” type: message StringPair { required string key = 1; optional string value = 2;}
The Tapad Difference.A Unified View.
![Page 23: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/23.jpg)
Serialization - AvroOptional field: union { null, long } user_timestamp = null;
Splittable (Hadoop world)
Schema evolution and storage
The Tapad Difference.A Unified View.
![Page 24: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/24.jpg)
Day in the life of a pixelBrowser loads pixel from pixel server
Pixel server immediately responds with 200 and transparent gif,
then serializes requests into a batch file
Batch file ships every few seconds or when the file reaches 2K
pixel server - pixel ingress - kafka - consumer - hdfs - hadoop jobs
![Page 25: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/25.jpg)
Day in the life of a pixelPixel ingress server receives 2 kilobyte file containing serialized web requests.
Deserialize, process some requests immediately (update database), then convert into Avro records with schema hash header, and publish to various Kafka topics
pixel server - pixel ingress - kafka - consumer - hdfs - hadoop jobs
![Page 26: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/26.jpg)
Day in the life of a pixelProducer client figures out where to publish via the broker they connect toKafka topics are partitioned into multiple chunks, each has a master and slave and are on different servers to survive an outage.Configurable retention based on timeCan add topics dynamically
pixel server - pixel ingress - kafka - consumer - hdfs - hadoop jobs
![Page 27: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/27.jpg)
Day in the life of a pixelConsumer processes are organized into groupsMany consumer groups can read from same Kafka topicPlugins:trait Plugin[A] { def onStartup(): Unit def onSuccess(a: A): Unit def onFailure(a: A): Unit def onShutdown(): Unit}
GraphitePlugin, BatchingLogfilePlaybackPlugin, TimestampDrivenClockPlugin, BatchingTimestampDrivenClockPlugin, …
pixel server - pixel ingress - kafka - consumer - hdfs - hadoop jobs
![Page 28: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/28.jpg)
Day in the life of a pixeltrait Plugins[A] {
private val _plugins = ArrayBuffer.empty[Plugin[A]]
def plugins: Seq[Plugin[A]] = _plugins
def registerPlugin(plugin: Plugin[A]) = _plugins += plugin}
pixel server - pixel ingress - kafka - consumer - hdfs - hadoop jobs
![Page 29: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/29.jpg)
Day in the life of a pixelobject KafkaConsumer { sealed trait Result { def notify[A](plugins: Seq[Plugin[A]], a: A): Unit }
case object Success extends Result { def notify[A](plugins: Seq[Plugin[A]], a: A) { plugins.foreach(_.onSuccess(a)) } }}
pixel server - pixel ingress - kafka - consumer - hdfs - hadoop jobs
![Page 30: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/30.jpg)
/** Decorate a Function1[A, B] with retry logic */case class Retry[A, B](maxAttempts: Int, backoff: Long)(f: A => B){ def apply(a: A): Result[A, B] = { def execute(attempt: Int, errorLog: List[Throwable]): Result[A, B] = { val result = try { Success(this, a, f(a)) } catch { … Failure(this, a, e :: errorLog) … }
result match { case failure @ Failure(_, _, errorLog) if errorLog.size < maxAttempts => val _backoff = (math.pow(2, attempt) * backoff).toLong Thread.sleep(_backoff) // wait before the next invocation execute(attempt + 1, errorLog) // try again case failure @ Failure(_, _, errorLog) => failure } } execute(attempt = 0, errorLog = Nil) }
pixel server - pixel ingress - kafka - consumer - hdfs - hadoop jobs
![Page 31: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/31.jpg)
Day in the life of a pixelConsumers log into “permanent storage” in HDFS.
File format is Avro, written in batches.
Data retention policy is essential.
pixel server - pixel ingress - kafka - consumer - hdfs - hadoop jobs
![Page 32: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/32.jpg)
Day in the life of a pixelHadoop 2 - YARN
Scalding to write map-reduce jobs easily
Rewrite Avro files as Parquet
Oozie to schedule regular jobs
pixel server - pixel ingress - kafka - consumer - hdfs - hadoop jobs
![Page 33: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/33.jpg)
pixel server - pixel ingress - kafka - consumer - hdfs - hadoop jobs
YARN
![Page 34: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/34.jpg)
Scaldingclass WordCountJob(args : Args) extends Job(args) { TextLine( args("input") ) .flatMap('line -> 'word) { line : String => tokenize(line) } .groupBy('word) { _.size } .write( Tsv( args("output") ) )
// Split a piece of text into individual words. def tokenize(text : String) : Array[String] = { // Lowercase each word and remove punctuation. text.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+") }}
pixel server - pixel ingress - kafka - consumer - hdfs - hadoop jobs
![Page 35: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/35.jpg)
ParquetColumn-oriented storage for Hadoop
Nested data is okay
Projections
Predicates
pixel server - pixel ingress - kafka - consumer - hdfs - hadoop jobs
![Page 36: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/36.jpg)
Parquetval requests = ParquetAvroSource .project[Request](args("requests"), Projection[Request]("header.query_params", "partner_id")) .read .sample(args("sample-rate").toDouble) .mapTo('Request -> ('queryParams, 'partnerId)) { req: TapestryRequest => (req.getHeader.getQueryParams, req.getPartnerId) }
pixel server - pixel ingress - kafka - consumer - hdfs - hadoop jobs
![Page 37: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/37.jpg)
Oozie<workflow-app name="combined_queries" xmlns="uri:oozie:workflow:0.3">
<start to="devices-location"/>
<!--<start to="export2db"/>-->
<action name="devices-location">
<shell xmlns="uri:oozie:shell-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<exec>hadoop</exec>
<argument>fs</argument>
<argument>-cat</argument>
<argument>${devicesConfig}</argument>
<capture-output/> </shell>
<ok to="networks-location"/> <error to="kill"/> </action>
pixel server - pixel ingress - kafka - consumer - hdfs - hadoop jobs
![Page 38: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/38.jpg)
Day in the life of a pixelNear real-time consumers and batch hadoop jobs generate data cubes from incoming events and save those aggregations into Vertica for fast and easy querying with SQL.
pixel server - pixel ingress - kafka - consumer - hdfs - hadoop jobs
![Page 39: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/39.jpg)
Stack summaryScala, Jetty/Netty, FinagleAvro, Protocol Buffers, ParquetKafkaZookeeperHadoop - YARN and HDFSVerticaScaldingOozie, Sqoop
![Page 40: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/40.jpg)
What’s next?HiveDruidImpalaOozie alternative
![Page 41: Data Pipeline at Tapad](https://reader033.vdocuments.us/reader033/viewer/2022042700/558571f4d8b42a512c8b4bad/html5/thumbnails/41.jpg)
Thank You yes, we’re hiring! :)
@tobym@TapadEng
Toby Matejovsky, Director of [email protected]@tobym