using morphlines for on-the-fly etl
DESCRIPTION
Cloudera Morphlines is a new open source framework, recently added to the CDK, that reduces the time and skills necessary to integrate, build, and change Hadoop processing applications that extract, transform, and load data into Apache Solr, Apache HBase, HDFS, enterprise data warehouses, or analytic online dashboards.TRANSCRIPT
1
Using Morphlines for on-‐the-‐fly ETL
Wolfgang Hoschek (@whoschek) SF Data Engineering Meetup July 2013
Agenda
• Big Data, ETL and Search – seMng the stage • Cloudera Morphlines Architecture • Component Deep Dive • Cloudera Search Use Cases • What’s next?
Feel free to ask quesUons as we go!
Example ETL Use Case: Distributed Search on Hadoop
Flume Hue UI
Custom UI
Custom App
Solr
Solr
Solr
SolrCloud query
query
query
Index (ETL)
Hadoop Cluster
MR
HDFS
Index (ETL)
HBase Index (ETL)
Cloudera Morphlines Architecture
Solr
Solr
Solr
SolrCloud
Logs, tweets, social media, html,
images, pdf, text….
Anything you want to index
Flume, MR Indexer, HBase indexer, etc... Or your applicaUon!
Morphline Library
Morphlines can be embedded in any applicaUon…
Your App!
Cloudera Morphlines
• Open Source framework for simple ETL • Consume any kind of data from any kind of data source, process and load into any app or storage system
• Designed for Near Real Time apps & Batch apps • Ships as part Cloudera Developer Kit (CDK) and Cloudera Search
• It’s a Java library • ASL licensed on github hbps://github.com/cloudera/cdk
• Similar to Unix pipelines, but more convenient & efficient • ConfiguraUon over coding (reduce Ume & skills) • Supports common file formats
• Log Files & Text • Avro, Sequence file • JSON, HTML & XML • Etc… (pluggable)
• Extensible set of transformaUon commands
ExtracUon, TransformaUon and Loading
• Chain of pipelined commands
• Simple and flexible data mapping & transformaUon
• Reusable across mulUple index workloads
• Over Ume, extend and re-‐use across plagorm workloads
syslog Flume Agent
Solr sink
Command: readLine
Command: grok
Command: loadSolr
Solr
Event
Record
Record
Record
Document
Morph
line Library
Like a Unix Pipeline
• Like Unix pipelines where the data model is generalized to work with streams of generic records, including arbitrary binary payloads
• Designed to be embedded into Hadoop components such as Search, Flume, MapReduce, Pig, Hive, Sqoop
Stdlib + plugins
• Framework ships with a set of frequently used high level transformaUon and I/O commands that can be combined in applicaUon specific ways
• The plugin system allows the adding of new transformaUons and I/O commands and integrates exisUng funcUonality and third party systems in a straighgorward manne
Flexible Data Model
• A record is a set of named fields where each field has an ordered list of one or more Java Objects (i.e. Guava’s ArrayListMulUmap)
• Field can have mulUple values and any two records need not use common field names
• Corresponds exactly to Solr/Lucene data model • Pass not only structured data, but also arbitrary binary data
Passing Binary Data
• _abachment_body field (opUonal) • java.io.InputStream or Java byte[]
• opUonal fields assist w/ detecUng & parsing data type • _abachment_mimetype field
• e.g. "applicaUon/pdf"
• _abachment_charset field • e.g. "UTF-‐8"
• _abachment_name field • e.g. "cars.pdf”
• Conceptually similar to email and HTTP headers/body
Processing Model
• Morphline commands manipulate conUnuous or arbitrarily large streams of records
• A command transforms a record into zero or more records
• The output records of a command are passed to the next command in the chain
• A command can contain nested commands • A morphline is a tree of commands, essenUally a push-‐based data flow engine
Processing Model Non-‐Goals
• Designed to embedded into mulUple host systems, thus… • No noUon of persistence or durability or distributed compuUng or node failover
• Basically just a chain of in-‐memory transformaUons in the current thread
• No need to manage mulUple nodes or threads -‐ already covered by host systems such as MapReduce, Flume, Storm, etc.
• However, a morphline does support passing noUficaUons • E.g. BEGIN_TRANSACTION, COMMIT_TRANSACTION, ROLLBACK_TRANSACTION, SHUTDOWN
Performance and Scaling
• The runUme compiles morphline on the fly • The runUme processes all commands of a given morphline in the same thread
• For scalability, deploy many morphline instances on a cluster in many Flume agents and MapReduce tasks
Syntax
• HOCON format (Human-‐OpUmized Config Object NotaUon)
• Basically JSON slightly adjusted for the configuraUon file use case
• Came out of typesafe.com • Also used by Akka and Play frameworks
Example: Indexing log4j w/ stacktraces
juil. 25, 2012 10:49:40 AM hudson.triggers.SafeTimerTask run ok juil. 25, 2012 10:49:46 AM hudson.triggers.SafeTimerTask run failed com.amazonaws.AmazonClientException: Unable to calculate a request signature at com.amazonaws.auth.AbstractAWSSigner.signAndBase64Encode(AbstractAWSSigner.java:71) at java.util.TimerThread.run(Timer.java:505) Caused by: com.amazonaws.AmazonClientException: Unable to calculate a request signature at com.amazonaws.auth.AbstractAWSSigner.sign(AbstractAWSSigner.java:90) at com.amazonaws.auth.AbstractAWSSigner.signAndBase64Encode(AbstractAWSSigner.java:68) ... 14 more Caused by: java.lang.IllegalArgumentException: Empty key at javax.crypto.spec.SecretKeySpec.<init>(SecretKeySpec.java:96) at com.amazonaws.auth.AbstractAWSSigner.sign(AbstractAWSSigner.java:87) ... 15 more juil. 25, 2012 10:49:54 AM hudson.slaves.SlaveComputer tryReconnect
Record 1
Record 2
Record 3
Example: Indexing log4j w/ stacktraces
morphlines : [ { id : morphline1 importCommands : ["com.cloudera.**", "org.apache.solr.**"] commands : [ { readMultiLine { regex : "(^.+Exception: .+)|(^\\s+at .+)|(^\\s+... \\d+ more)|(^\\s*Caused by:.+)" what : previous charset : UTF-8 } } { logDebug { format : "output record: {}", args : ["@{}"] } } { loadSolr {} ] } ]
Example: Escape to Java Code
morphlines : [ { id : morphline1 importCommands : ["com.cloudera.**", "org.apache.solr.**"] commands : [ { java { code: """ List tags = record.get("tags"); if (!tags.contains("hello")) { return false; } tags.add("world"); return child.process(record); """ } } ] } ]
Current Command Library
• Integrate with and load into Apache Solr • Flexible log file analysis • Single-‐line record, mulU-‐line records, CSV files • Regex based pabern matching and extracUon • IntegraUon with Avro, JSON, XML, HTML • IntegraUon with Apache Hadoop Sequence Files • IntegraUon with SolrCell and all Apache Tika parsers • Auto-‐detecUon of MIME types from binary data using Apache Tika
Current Command Library (cont’d)
• ScripUng support for dynamic java code • OperaUons on fields for assignment and comparison • OperaUons on fields with list and set semanUcs • if-‐then-‐else condiUonals • A small rules engine (tryRules) • String and Umestamp conversions • slf4j logging • Yammer metrics and counters • Decompression and unpacking of arbitrarily nested container file formats
• etc
Plugin Commands
• Easy to add new I/O & transformaUon cmds • Integrate exisUng funcUonality and third party systems
• Implement Java interface Command or subclass AbstractCommand
• Add it to Java classpath • No registraUon or other administraUve acUon required
Morphline Example – syslog with grok
morphlines : [ { id : morphline1 importCommands : ["com.cloudera.**", "org.apache.solr.**"] commands : [ { readLine {} } { grok { dicUonaryFiles : [/tmp/grok-‐dicUonaries] expressions : { message : """<%{POSINT:syslog_pri}>%{SYSLOGTIMESTAMP:syslog_Umestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}""" } } } { loadSolr {} } ] } ]
Example Input <164>Feb 4 10:46:14 syslog sshd[607]: listening on 0.0.0.0 port 22 Output Record syslog_pri:164 syslog_Umestamp:Feb 4 10:46:14 syslog_hostname:syslog syslog_program:sshd syslog_pid:607 syslog_message:listening on 0.0.0.0 port 22.
PotenUal New Plugin Commands
• Extract, clean, transform, join, integrate, enrich and decorate records
• Examples • join records with external data sources such as relaUonal databases, key-‐value stores, local files or IP Geo lookup tables.
• Perform DNS resoluUon, expand shortened URLs • fetch linked metadata from social networks • do senUment analysis & annotate record accordingly • conUnuously maintain stats over sliding windows • compute exact or approx. disUnct values & quanUles
Use Case: Cloudera Search
An Integrated Part of the Hadoop System
One pool of data
One security framework
One set of system resources
One management interface
What is Cloudera Search?
• Full-‐text, interacUve search and faceted navigaUon • Batch, near real-‐Ume, and on-‐demand indexing • Apache Solr integrated with CDH
• Established, mature search with vibrant community • Separate runUme like MapReduce, Impala • Incorporated as part of the Hadoop ecosystem
• Open Source • 100% Apache, 100% Solr • Standard Solr APIs
ETL for Distributed Search on Apache Hadoop
Flume Hue UI
Custom UI
Custom App
Solr
Solr
Solr
SolrCloud query
query
query
Index (ETL)
Hadoop Cluster
MR
HDFS
Index (ETL)
HBase Index (ETL)
Near Real Time ETL & Indexing with Flume
Log File
Apache Solr and Apache Flume • Data ingest at scale • Flexible extracUon and mapping
• Indexing at data ingest • Packaged as Flume Morphline Solr Sink
HDFS
Flume Agent
Indexer w/ Morphline
Other Log File Flume Agent
Indexer w/ Morphline
26
agent.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink agent.sinks.solrSink.morphlineFile = /etc/flume-ng/conf/morphline.conf
Flume.conf
Cloudera Manager Flume Morphline GUI
27
Scalable Batch ETL & Indexing
Index shard
Files
Index shard
Indexer w/ Morphline
Files
Solr server
Indexer w/ Morphline
Solr server
28
HDFS
Solr and MapReduce • Flexible, scalable batch indexing
• Start serving new indices with no downUme
• On-‐demand indexing, cost-‐efficient re-‐indexing
• Packaged as MapReduceIndexerTool
hadoop ... MapReduceIndexerTool --morphline-file morphline.conf ...
MapReduceIndexerTool
29
hadoop ... MapReduceIndexerTool --morphline-file morphline.conf ...
S0_0_0
Extractors(Mappers)
Leaf Shards(Reducers)
Root Shards(Mappers)
S0_0_1S0S0_1_0
S0_1_1
S1_0_0
S1_0_1S1S1_1_0
S1_1_1
Input Files
...
...
...
...
• Morphline runs inside Mapper
Near Real Time indexing of Apache HBase
HDFS
HBase
interacUve load
Lily HBase Indexer(s)
with Morphline Tr
iggers on
updates Solr server
Solr server Solr server Solr server Solr server
Search
+ = Large scale tabular data immediate access & updates fast & flexible informaDon discovery
B IG DATA DATAMANAGEMENT
Batch & Near Real Time ETL
Tweets
Flume Solr
Hue UI
HDFS
MapReduceIndexerTool, Impala, HBase, Mahout, EDW, MR, etc
Lily HBase Indexer
HdfsSink
Query
MapReduceIndexerTool
Log Formats
Social Media
HTML
Images
Custom UI
Query
Custom App
...
Morphline
Morphline
MorphlineSinkMorphline
HBase
OLTP
What’s next
• More work on Apache HBase IntegraUon • IntegraUon into Apache Crunch • Stream AnalyUcs
Conclusion
• Cloudera Development Kit w/ Morphlines • Open Source -‐ ASL License • Version 0.4.1 shipping • Extensive documentaUon • Send your quesUons and feedback to cdk-‐dev mailing list • Also ships integrated with Cloudera Search
• Free QuickStart VM also available!