divolte collector - meetup presentation

Post on 06-Jul-2015

419 Views

Category:

Technology

4 Downloads

Preview:

Click to see full reader

DESCRIPTION

Meetup talk about Divolte Collector (https://github.com/divolte/divolte-collector)

TRANSCRIPT

GoDataDrivenPROUDLY PART OF THE XEBIA GROUP

@asnare / @fzk signal@godatadriven.com

Divolte Collector

Andrew Snare / Friso van Vollenhoven

Because life’s too short for log file parsing

99% of all data in Hadoop156.68.7.63 - - [28/Jul/1995:11:53:28 -0400] "GET /images/WORLD-logosmall.gif HTTP/1.0" 200 669 137.244.160.140 - - [28/Jul/1995:11:53:29 -0400] "GET /images/WORLD-logosmall.gif HTTP/1.0" 304 0 163.205.160.5 - - [28/Jul/1995:11:53:31 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 4324 163.205.160.5 - - [28/Jul/1995:11:53:40 -0400] "GET /shuttle/countdown/count70.gif HTTP/1.0" 200 46573 140.229.50.189 - - [28/Jul/1995:11:53:54 -0400] "GET /shuttle/missions/sts-67/images/images.html HTTP/1.0" 200 4464 163.206.89.4 - - [28/Jul/1995:11:54:02 -0400] "GET /shuttle/technology/sts-newsref/sts-mps.html HTTP/1.0" 200 215409 163.206.89.4 - - [28/Jul/1995:11:54:05 -0400] "GET /images/KSC-logosmall.gif HTTP/1.0" 200 1204 163.206.89.4 - - [28/Jul/1995:11:54:05 -0400] "GET /images/shuttle-patch-logo.gif HTTP/1.0" 200 891 131.110.53.48 - - [28/Jul/1995:11:54:07 -0400] "GET /shuttle/technology/sts-newsref/stsref-toc.html HTTP/1.0" 200 84905 163.205.160.5 - - [28/Jul/1995:11:54:14 -0400] "GET /images/KSC-logosmall.gif HTTP/1.0" 200 1204 130.160.196.81 - - [28/Jul/1995:11:54:15 -0400] "GET /shuttle/resources/orbiters/challenger.html HTTP/1.0" 200 8089 131.110.53.48 - - [28/Jul/1995:11:54:16 -0400] "GET /images/shuttle-patch-small.gif HTTP/1.0" 200 4179 137.244.160.140 - - [28/Jul/1995:11:54:16 -0400] "GET /shuttle/missions/sts-69/mission-sts-69.html HTTP/1.0" 200 10136 131.110.53.48 - - [28/Jul/1995:11:54:18 -0400] "GET /images/KSC-logosmall.gif HTTP/1.0" 200 1204 131.110.53.48 - - [28/Jul/1995:11:54:19 -0400] "GET /images/launch-logo.gif HTTP/1.0" 200 1713 130.160.196.81 - - [28/Jul/1995:11:54:19 -0400] "GET /shuttle/resources/orbiters/challenger-logo.gif HTTP/1.0" 200 4179 163.205.160.5 - - [28/Jul/1995:11:54:25 -0400] "GET /shuttle/missions/sts-70/images/images.html HTTP/1.0" 200 8657 130.181.4.158 - - [28/Jul/1995:11:54:26 -0400] "GET /history/rocket-history.txt HTTP/1.0" 200 26990 137.244.160.140 - - [28/Jul/1995:11:54:30 -0400] "GET /images/KSC-logosmall.gif HTTP/1.0" 304 0 137.244.160.140 - - [28/Jul/1995:11:54:31 -0400] "GET /images/launch-logo.gif HTTP/1.0" 304 0 137.244.160.140 - - [28/Jul/1995:11:54:38 -0400] "GET /history/apollo/images/apollo-logo1.gif HTTP/1.0" 304 0 168.178.17.149 - - [28/Jul/1995:11:54:48 -0400] "GET /shuttle/missions/sts-65/mission-sts-65.html HTTP/1.0" 200 131165 140.229.50.189 - - [28/Jul/1995:11:54:53 -0400] "GET /shuttle/missions/sts-67/images/KSC-95EC-0390.jpg HTTP/1.0" 200 128881 131.110.53.48 - - [28/Jul/1995:11:54:58 -0400] "GET /shuttle/missions/missions.html HTTP/1.0" 200 8677 131.110.53.48 - - [28/Jul/1995:11:55:02 -0400] "GET /images/launchmedium.gif HTTP/1.0" 200 11853 131.110.53.48 - - [28/Jul/1995:11:55:05 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 200 786 128.159.111.141 - - [28/Jul/1995:11:55:09 -0400] "GET /procurement/procurement.html HTTP/1.0" 200 3499 128.159.111.141 - - [28/Jul/1995:11:55:10 -0400] "GET /images/op-logo-small.gif HTTP/1.0" 200 14915 128.159.111.141 - - [28/Jul/1995:11:55:11 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 200 786 128.159.111.141 - - [28/Jul/1995:11:55:11 -0400] "GET /images/KSC-logosmall.gif HTTP/1.0" 200 1204 192.213.154.220 - - [28/Jul/1995:11:55:15 -0400] "GET /shuttle/countdown/tour.html HTTP/1.0" 200 4347 192.213.154.220 - - [28/Jul/1995:11:55:15 -0400] "GET /images/KSC-94EC-412-small.gif HTTP/1.0" 200 20484

GoDataDriven

USER

HTTP request:/org/apache/hadoop/io/IOUtils.html

log transportservice

log event:2012-07-01T06:00:02.500Z /org/apache/hadoop/io/IOUtils.html

transport logs tocompute cluster

off line analytics /model training

batch updatemodel state

serve model result(e.g. recommendations) streaming log

processingstreaming updatemodel state

Typical web optimization architecture

GoDataDriven

Parse HTTP server logs

access.log

GoDataDriven

How did it get there?

Option 1: parse HTTP server logs

• Ship log files on a schedule

• Parse using MapReduce jobs

• Batch analytics jobs feed online systems

GoDataDriven

HTTP server log parsing

• Inherently batch oriented

• Schema-less (URL format is the schema)

• Initial job to parse logs into structured format

• Usually multiple versions of parsers required

• Requires sessionizing

• Logs usually have more than you ask for (bots, image requests, spiders, health check, etc.)

GoDataDriven

Stream HTTP server logs

access.logMessage Queue or Event Transport

(Kafka, Flume, etc.) EVENTS

tail -F

EVENTS

OTHER CONSUMERS

GoDataDriven

How did it get there?

Option 2: stream HTTP server logs

• tail -F logfiles

• Use a queue for transport (e.g. Flume or Kafka)

• Parse logs on the fly

• Or write semi-schema’d logs, like JSON

• Parse again for batch work load

GoDataDriven

Stream HTTP server logs

• Allows for near real-time event handling when consuming from queues

• Sessionizing? Duplicates? Bots?

• Still requires parser logic

• No schema

GoDataDriven

Tagging

index.html script.

js

web server

access.log

tracking server

Message Queue or Event Transport(Kafka, Flume, etc.) EVENTS

OTHER CONSUMERS

web page traffic

tracking traffic(asynchronous)

structured events

structured events

GoDataDriven

How did it get there?

Option 3: tagging

• Instrument pages with special ‘tag’, i.e. special JavaScript or image just for logging the request

• Create special endpoint that handles the tag request in a structured way

• Tag endpoint handles logging the events

GoDataDriven

Tagging

• Not a new idea (Google Analytics, Omniture, etc.)

• Less garbage traffic, because a browser is required to evaluate the tag

• Event logging is asynchronous

• Easier to do inflight processing (apply a schema, add enrichments, etc.)

• Allows for custom events (other than page view)

GoDataDriven

Also…

• Manage session through cookies on the client side

• Incoming data is already sessionized

• Extract additional information from clients

• Screen resolution

• Viewport size

• Timezone

GoDataDriven

Looks familiar?

<script> (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){ (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o), m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m) })(window,document,'script','//www.google-analytics.com/analytics.js','ga'); ! ga('create', 'UA-40578233-2', 'godatadriven.com'); ga('send', 'pageview'); !</script>

GoDataDriven

Divolte Collector

Tag based click stream data collection for

Hadoop and Kafka.

GoDataDriven

Divolte Collector

index.html script.

js

web server

access.log

tracking server

Message Queue or Event Transport(Kafka, Flume, etc.) EVENTS

OTHER CONSUMERS

web page traffic

tracking traffic(asynchronous)

structured events

structured events

The TAG

<script src="//tr.example.com/divolte.js" defer async> </script>

Schema!

{ "namespace": "com.example.record", "type": "record", "name": "ClickEventRecord", "fields": [ { "name": "productNumber", "type": ["null", "string"], "default": null }, { "name": "shop", "type": ["null", "string"], "default": null }, { "name": "category", "type": ["null", "string"], "default": null }, { "name": "advisor", "type": ["null", "string"], "default": null }, { "name": "searchPhrase", "type": ["null", "string"], "default": null }, { "name": "basketProductNumber", "type": ["null", "string"], "default": null }, { "name": "basketSizeCode", "type": ["null", "string"], "default": null }, { "name": "basketProductCount", "type": ["null", "string"], "default": null } ] }

Mapping// Page type detector: // http://.../basket basket = "^https?://[^/]+/basket(?:[?#].*)?$" !// Page type detector: // http://.../search?q=fiets search = "^https?://[^/]+/search\\?.*$" !// Page type detector: // http://.../checkout checkout = "^https?://[^/]+/checkout(?:[?#].*)?$" !// Page type detector: // http://.../thankyou payment_ok = "^https://[^/]+/thankyou(?:[?#].*)?$"

MappingpageType { type = regex_name regexes = [ home, category, shop, basket, search, customercare ] field = location } productNumber { type = regex_group regex = pdp field = location group = product } viewportPixelWidth = viewportPixelWidth viewportPixelHeight = viewportPixelHeight screenPixelWidth = screenPixelWidth screenPixelHeight = screenPixelHeight

Configure

divolte { server { host = 0.0.0.0 use_x_forwarded_for = true landing_page = false } ! tracking { cookie_domain = .example.com include "click-schema-mapping.conf" schema_file = /etc/divolte/ClickEventRecord.avsc } !…

Configure

kafka_flusher { enabled = true producer = { metadata.broker.list = [ "broker1:9092", "broker2:9092", "broker3:9092" ] } } !…

Configure hdfs_flusher { hdfs { replication = 3 } ! simple_rolling_file_strategy { roll_every = 60 minutes sync_file_after_records = 1000 sync_file_after_duration = 10 seconds ! working_dir = /divolte/inflight publish_dir = /divolte/published } } }

Run

./bin/divolte-collector

Demo: Javadoc analytics!

javadoc -d outputdir \ -bottom '<script src="//localhost:8290/divolte.js" defer async></script>' \ -subpackages .

Kafka event consumer

GoDataDriven

private static class JavadocEventHandler implements EventHandler<JavadocEventRecord> { private static final String TCP_SERVER_HOST = "127.0.0.1"; private static final int TCP_SERVER_PORT = 1234;! private Socket socket = null; private OutputStream stream;! @Override public void setup() throws Exception { socket = new Socket(TCP_SERVER_HOST, TCP_SERVER_PORT); stream = socket.getOutputStream(); }! @Override public void handle(JavadocEventRecord event) throws Exception { if (!event.getDetectedDuplicate()) { // Avro's toString already produces JSON. stream.write(event.toString().getBytes(StandardCharsets.UTF_8)); stream.write("\n".getBytes(StandardCharsets.UTF_8)); } }! @Override public void shutdown() throws Exception { if (null != stream) stream.close(); if (null != socket) socket.close(); }}

GoDataDriven

public static void main(String[] args) { final DivolteKafkaConsumer<JavadocEventRecord> consumer = DivolteKafkaConsumer.createConsumer( KAFKA_TOPIC, ZOOKEEPER_QUORUM, KAFKA_CONSUMER_GROUP_ID, NUM_CONSUMER_THREADS, () -> new JavadocEventHandler(), JavadocEventRecord.getClassSchema());! Runtime.getRuntime().addShutdownHook(new Thread(() -> { System.out.println("Shutting down consumer."); consumer.shutdownConsumer(); }));! System.out.println("Starting consumer."); consumer.startConsumer();}

SQL FTW!

GoDataDriven

CREATE EXTERNAL TABLE javadoc_analytics ( firstInSession boolean -- other fields are created automatically from schema)ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'LOCATION '/divolte/published'TBLPROPERTIES ( 'avro.schema.url'='hdfs:///JavadocEventRecord.avsc');

Python & Spark

GoDataDriven

export IPYTHON=1export IPYTHON_OPTS="notebook --ip=0.0.0.0"pyspark \ --jars divolte-spark-assembly-0.1.jar \ --driver-class-path divolte-spark-assembly-0.1.jar \ --num-executors 40

GoDataDriven

Spark&

Spark Streaming

GoDataDriven

import io.divolte.spark.avro._ import org.apache.avro.generic.IndexedRecord import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ !val sc = new SparkContext() val events = sc.newAvroFile[IndexedRecord](path) !// And then… val records = events.toRecords // or val eventFields = events.fields("sessionId", "location", "timestamp")

GoDataDriven

// Kafka configuration. val consumerConfig = Map( "group.id" -> "some-id-for-the-consumer-group", "zookeeper.connect" -> "zookeeper-connect-string", "auto.commit.interval.ms" -> "5000", "auto.offset.reset" -> "largest" ) val topicSettings = Map("divolte" -> Runtime.getRuntime.availableProcessors()) !val sc = new SparkContext() val ssc = new StreamingContext(sc, Seconds(15)) !// Establish the source event stream. val stream = ssc.divolteStream[GenericRecord](consumerConfig, topicSettings, StorageLevel.MEMORY_ONLY) !// And then… val eventStream = stream.toRecords // or val locationStream = stream.fields("location")

Also in the box

GoDataDriven

Zero config deploy

• Easy to use for local development

• Works out of the box with zero custom config

• Comes with a built in schema and mapping

• Works on local machine without Hadoop

• Flushes to /tmp on local file system

GoDataDriven

Collector has no global state

• Load balancer friendly

• Horizontally scalable

• Shared nothing

• (other than HDFS and Kafka)

GoDataDriven

In stream de-duplication

• The internet is a mean place; data will have noise

• In stream hash based de-duplication

• Low false negative rate

• Virtually zero false positive rate

• Requires URI based routing from load balancer

• Easy to setup on nginx

• Supported on many hardware load balancers

GoDataDriven

Corrupt request detection

• The internet is still a mean place… Some URLS are truncated

• Incomplete events detected and discarded

GoDataDriven

Defeat Chrome’s pre-rendering

• Chrome sometimes speculatively pre-renders pages in the background

• This triggers JS even if the page is not shown

• Unless you use the Page Visibility API to detect this

• Which we do

• We take care of many other JS caveats as well

GoDataDriven

Custom events

• Divolte presents itself as a JS library

• Map custom event parameters directly onto Avro fields

<!-- client side --> <script> divolte.signal("addToBasket", { count: 2, productId: "a3bc38de" }) </script>

// server side mapping eventType = eventType !basketProductId { type = event_parameter name = productId }

GoDataDriven

Bring your own IDs

• Generate page view ID on server side

• Possible to relate server side logging to page views and other client side events

<script src="//…/divolte.js#a28de3bf42a5dc98c03" defer async> </script>

GoDataDriven

User agent parsing

• On the fly parsing of user agent string

• Uses: http://uadetector.sourceforge.net/

• Updates user agent database at runtime without restart

GoDataDriven

IP to geo coordinates

• On the fly enrichment with geo coordinates based on IP address

• MaxMind geoIP database

• https://www.maxmind.com/en/geoip2-databases

• Updates database at runtime without restart

• Sets:

• Latitude & longitude

• Country, City, Subdivision

GoDataDriven

https://github.com/divolte/divolte-collector

https://github.com/divolte/divolte-examples

https://github.com/divolte/divolte-spark

https://github.com/divolte/divolte-kafka-consumer

GoDataDriven

We’re hiring / Questions? / Thank you!

@asnare / @fzk signal@godatadriven.com

Andrew Snare / Friso van Vollenhoven

top related