log ingestion kafka -- impala using apex

Post on 15-Apr-2017

420 Views

Category:

Data & Analytics

0 Downloads

Preview:

Click to see full reader

TRANSCRIPT

Log Ingestion: Kafka -> Impala using Apex

Apache Apex Meetup

Apache Apex™ is now a

Top Level Project

Agenda● Use Case - Fraud Analysis and Reporting

● Technologies Used

● Use Case - Specifications

● DAG Design

● Batch Vs Streaming Approach

Use Case - Fraud Analysis and Reporting

KAFKA

Web Server

Parquet Files on HDFS

IMPALA

Queries

Queries

Files

Events Events

● Interactive querying and analysis of huge volume of data stored in HDFS

● Generating Reports for the customers

5

● Terminologies○ Topic○ Producer○ Consumer○ Broker

What’s Kafka

What’s Impala

● Massively distributed, massively parallel query

engine

● SQL on top of Hadoop

● Completely new execution engine, Faster results

○ Does not build on MapReduce unlike Hive

○ High throughput, Low Latency

Impala Performs best when..

● Partitioning○ Physically dividing data based on one or more column

values○ Speeds up queries that test those columns.

● File Format○ For large volumes of data, Parquet performs best

● Block Size○ Number of blocks defines the degree of parallelism○ For Parquet, 256 MB is recommended.

What’s Parquet

● Column based storage format for Hadoop

● Optimized for querying subset of columns within a “wide” table.

● Limits IO to data actually needed○ Loads only columns that need to be

accessed

● Saves Space○ Columnar layout compresses better

1 2 3 4

Parquet Format● Row Group

○ A group of rows in columnar format○ Max sized buffered in memory while writing○ Roughly : 50 MB < row group < 1 GB

● Column chunk ○ The data for one column in a row group ○ Column chunks can be read independently

for efficient scans

● Page ○ Unit of access in a column chunk○ Minimum size to read to access single record○ Roughly : 8KB < page < 1 MB

Use Case - Specifications

● Events from 5000 customers landing on multiple kafka topics as JSON records

● Maximum size of each JSON record : 10KB

● 700 columns per record

● Average data inflow of 5k records / sec

○ ~ 4 TB / day

Use Case - Specifications Cont’d

● Time taken for a record to be available to query must NOT exceed 3 mins

● Data to be stored in Parquet File Format based on a partition scheme { "date_time": "2016-01-21 01:41:25", "cust_id": "MED-4", "event_id": 0, "device_first_seen": "2015-07-29 16:12:32", "flash_os": "win7", "flash_system_state": 53}

{ "date_time": "2016-02-23 21:41:25", "cust_id": "MED-4", "event_id": 0, "device_first_seen": "2015-07-29 16:12:32", "flash_os": "win7", "flash_system_state": 53}

cust=MED-4

year=2016

month=01 month=02 month=03

DAG Design

Kafka Input Operator JSON Parser Parquet

Writer

Compact MetaData Emitter

Compact Data Writer Operator

POJOJSON

File Meta Data

Compact Meta Data

Reads from multiple kafka topics

Parses JSON records to POJO

Writes Parquet files based on partitions;Ready for Impala queries

Tracks size of files per customer

Compacts small files;Performant Impala Queries

Limitation & Solution

● Impala queries (read) and File operations (write) happening on same directory

● Impala query performance may get

affected

● Solution:○ Maintain “Active” & “Passive”

directory for each partition○ Point impala to “Active”○ Compact on “Passive”○ Swap (“Active”,”Passive”)

cust=MED-4 / year=2016 / month=02 /

Impala QueriesFile

Writing

cust=MED-4 / year=2016 / month=02 /

ACTIVE PASSIVE

File Writing

Impala Queries Compaction

Improved DAG Design

Kafka Input Operator JSON Parser

Parquet Writer

Impala Operator

Parquet WriterCompact Data Writer Operator

Compact MetaData Emitter

Compact MetaData Emitter

Compact Data Writer Operator

POJO

POJO

JSON

File Meta Data

File Meta Data

Compact Meta Data

Compact Meta Data

Compact Meta Data

Compact Meta Data

Batch Approach Streaming Approach● MR + OOZIE + MySQL● 80-85% code related to non functional

aspects of dataflow○ Making data flow transactional○ Handling failures manually○ Developing own checkpoint

mechanisms● OOZIE

○ Takes 30-45 seconds to initiate a job.

○ Separate JVM invoked for each node in DAG thereby increasing overhead

● Wastage of hardware resources in repeated job initialization

● APEX● Built in checkpointing mechanism

for failure recovery● Reused available operators● Optimized on hardware usage

Resources

Apache Apex Meetup

• Apache Apex website - http://apex.apache.org/

• Subscribe - http://apex.apache.org/community.html

• Download - http://apex.apache.org/downloads.html

• Twitter - @ApacheApex; Follow - https://twitter.com/apacheapex

• Facebook - https://www.facebook.com/ApacheApex/

• Meetup - http://www.meetup.com/topics/apache-apex

• Startup Program – Free Enterprise License for startups, Universities, Non-Profits

Thank You

top related