embulk, an open-source plugin-based parallel bulk data loader

Post on 14-Jul-2015

37.091 Views

Category:

Software

6 Downloads

Preview:

Click to see full reader

TRANSCRIPT

Sadayuki FuruhashiFounder & Software Architect

Treasure Data, inc.

EmbulkAn open-source plugin-based parallel bulk data loader that makes painful data integration work relaxed.

Sharing our knowledge on RubyGems to manage arbitrary files.

A little about me...> Sadayuki Furuhashi

> github/twitter: @frsyuki > Treasure Data, Inc.

> Founder & Software Architect

> Open-source hacker > MessagePack - Efficient object serializer > Fluentd - An unified data collection tool > Prestogres - PostgreSQL protocol gateway for Presto > Embulk - A plugin-based parallel bulk data loader > ServerEngine - A Ruby framework to build multiprocess servers > LS4 - A distributed object storage with cross-region replication > kumofs - A distributed strong-consistent key-value data store

Today’s talk

> What’s Embulk? > How Embulk works? > The architecture > Writing Embulk plugins > Roadmap & Development > Q&A + Discussion

What’s Embulk?

> An open-source parallel bulk data loader

> using plugins

> to make data integration relaxed.

What’s Embulk?

> An open-source parallel bulk data loader > loads records from “A” to “B”

> using plugins > for various kinds of “A” and “B”

> to make data integration relaxed. > which was very painful…

Storage, RDBMS, NoSQL, Cloud Service,

etc.

broken records,transactions (idempotency),

performance, …

The pains of bulk data loading

Example: load a 10GB CSV file to PostgreSQL > 1. First attempt → fails > 2. Write a script to make the records cleaned

• Convert ”20150127T190500Z” → “2015-01-27 19:05:00 UTC”

• Convert “\N" → “”

• many cleanings…

> 3. Second attempt → another error • Convert “Inf” → “Infinity”

> 4. Fix the script, retry, retry, retry… > 5. Oh, some data got loaded twice!?

The pains of bulk data loading

Example: load a 10GB CSV file to PostgreSQL > 6. Ok, the script worked. > 7. Register it to cron to sync data every day. > 8. One day… it fails with another error

• Convert invalid UTF-8 byte sequence to U+FFFD

The pains of bulk data loading

Example: load 10GB CSV × 720 files > Most of scripts are slow.

• People have little time to optimize bulk load scripts

> One file takes 1 hour → 720 files takes 1 month (!?)

A lot of integration efforts for each storages: > XML, JSON, Apache log format (+some custom), … > SAM, BED, BAI2, HDF5, TDE, SequenceFile, RCFile… > MongoDB, Elasticsearch, Redshift, Salesforce, …

The problems:

> Data cleaning (normalization) > How to normalize broken records?

> Error handling > How to remove broken records?

> Idempotent retrying > How to retry without duplicated loading?

> Performance optimization > How to optimize the code or parallelize?

The problems at Treasure Data

Treasure Data Service? > “Fast, powerful SQL access to big data from connected

applications and products, with no new infrastructure or special skills required.”

> Customers want to try Treasure Data, but > SEs write scripts to bulk load their data. Hard work :(

> Customers want to migrate their big data, but > Hard work :(

> Fluentd solved streaming data collection, but > bulk data loading is another problem.

A solution:

> Package the efforts as a plugin. > data cleaning, error handling, retrying

> Share & reuse the plugin. > don’t repeat the pains!

> Keep improving the plugin code. > rather than throwing away the efforts every time

> using OSS-style pull-reqs & frequent releases.

EmbulkEmbulk is an open-source, plugin-based parallel bulk data loaderthat makes data integration works relaxed.

HDFS

MySQL

Amazon S3

Embulk

CSV Files

SequenceFile

Salesforce.com

Elasticsearch

Cassandra

Hive

Redis

HDFS

MySQL

Amazon S3

Embulk

CSV Files

SequenceFile

Salesforce.com

Elasticsearch

Cassandra

Hive

Redis

✓ Parallel execution ✓ Data validation ✓ Error recovery ✓ Deterministic behavior ✓ Idempotet retrying

bulk load

HDFS

MySQL

Amazon S3

Embulk

CSV Files

SequenceFile

Salesforce.com

Elasticsearch

Cassandra

Hive

Redis

✓ Parallel execution ✓ Data validation ✓ Error recovery ✓ Deterministic behavior ✓ Idempotet retrying

Plugins Plugins

bulk load

How Embulk works?

# install $ wget https://bintray.com/artifact/download/

embulk/maven/embulk-0.2.0.jar -o embulk.jar $ chmod 755 embulk.jar

Installing embulk

Bintray releases

Embulk is released on Bintray

wget embulk.jar

# install $ wget https://bintray.com/artifact/download/

embulk/maven/embulk-0.2.0.jar -o embulk.jar $ chmod 755 embulk.jar

# guess $ vi partial-config.yml $ ./embulk guess partial-config.yml

-o config.yml

Guess format & schema in: type: file paths: [data/examples/] out: type: example

in: type: file paths: [data/examples/] decoders: - {type: gzip} parser: charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' header_line: true columns: - name: time type: timestamp format: '%Y-%m-%d %H:%M:%S' - name: account type: long - name: purchase type: timestamp format: '%Y%m%d' - name: comment type: string out: type: example

guess

by guess plugins

# install $ wget https://bintray.com/artifact/download/

embulk/maven/embulk-0.2.0.jar -o embulk.jar $ chmod 755 embulk.jar

# guess $ vi partial-config.yml $ ./embulk guess partial-config.yml

-o config.yml

# preview $ ./embulk preview config.yml $ vi config.yml # if necessary

+--------------------------------------+---------------+--------------------+ | time:timestamp | uid:long | word:string | +--------------------------------------+---------------+--------------------+ | 2015-01-27 19:23:49 UTC | 32,864 | embulk | | 2015-01-27 19:01:23 UTC | 14,824 | jruby | | 2015-01-28 02:20:02 UTC | 27,559 | plugin | | 2015-01-29 11:54:36 UTC | 11,270 | fluentd | +--------------------------------------+---------------+--------------------+

Preview & fix config

# install $ wget https://bintray.com/artifact/download/

embulk/maven/embulk-0.2.0.jar -o embulk.jar $ chmod 755 embulk.jar

# guess $ vi partial-config.yml $ ./embulk guess partial-config.yml

-o config.yml

# preview $ ./embulk preview config.yml $ vi config.yml # if necessary

# run $ ./embulk run config.yml -o config.yml

in: type: file paths: [data/examples/] decoders: - {type: gzip} parser: charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' header_line: true columns: - name: time type: timestamp format: '%Y-%m-%d %H:%M:%S' - name: account type: long - name: purchase type: timestamp format: '%Y%m%d' - name: comment type: string last_paths: [data/examples/sample_001.csv.gz] out: type: example

Deterministic run

in: type: file paths: [data/examples/] decoders: - {type: gzip} parser: charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' header_line: true columns: - name: time type: timestamp format: '%Y-%m-%d %H:%M:%S' - name: account type: long - name: purchase type: timestamp format: '%Y%m%d' - name: comment type: string last_paths: [data/examples/sample_002.csv.gz] out: type: example

Repeat

# install $ wget https://bintray.com/artifact/download/

embulk/maven/embulk-0.2.0.jar -o embulk.jar $ chmod 755 embulk.jar

# guess $ vi partial-config.yml $ ./embulk guess partial-config.yml

-o config.yml

# preview $ ./embulk preview config.yml $ vi config.yml # if necessary

# run $ ./embulk run config.yml -o config.yml

# repeat $ ./embulk run config.yml -o config.yml $ ./embulk run config.yml -o config.yml

The architecture

InputPlugin OutputPlugin

Embulk executor plugin

read records write records

InputPlugin OutputPlugin

Embulk executor plugin

MySQL, Cassandra, HBase, Elasticsearch,

Treasure Data, …

record

record

InputPlugin

FileInputPlugin

OutputPlugin

FileOutputPlugin

EncoderPlugin

FormatterPlugin

DecoderPlugin

ParserPlugin

Embulk executor plugin

read files

decompress

parse files into records

write files

compress

format records into files

InputPlugin

FileInputPlugin

OutputPlugin

FileOutputPlugin

EncoderPlugin

FormatterPlugin

DecoderPlugin

ParserPlugin

Embulk executor plugin

HDFS, S3,Riak CS, …

gzip, bzip2,3des, …

CSV, JSON,RCFile, …

buffer

buffer

record

record

buffer

buffer

Writing Embulk plugins

InputPlugin

module Embulk class InputExample < InputPlugin Plugin.register_input('example', self)

def self.transaction(config, &control) # read config task = { 'message' => config.param('message', :string, default: nil) } threads = config.param('threads', :int, default: 2)

columns = [ Column.new(0, 'col0', :long), Column.new(1, 'col1', :double), Column.new(2, 'col2', :string), ]

# BEGIN here

commit_reports = yield(task, columns, threads)

# COMMIT here puts "Example input finished"

return {} end

def run(task, schema, index, page_builder) puts "Example input thread #{@index}…"

10.times do |i| @page_builder.add([i, 10.0, "example"]) end @page_builder.finish

commit_report = { } return commit_report end end end

OutputPlugin

module Embulk class OutputExample < OutputPlugin Plugin.register_output('example', self)

def self.transaction( config, schema, processor_count, &control) # read config task = { 'message' => config.param('message', :string, default: "record") }

puts "Example output started." commit_reports = yield(task) puts "Example output finished. Commit reports = #{commit_reports.to_json}"

return {} end

def initialize(task, schema, index) puts "Example output thread #{index}..." super @message = task.prop('message', :string) @records = 0 end

def add(page) page.each do |record| hash = Hash[schema.names.zip(record)] puts "#{@message}: #{hash.to_json}" @records += 1 end end

def finish end

def abort end

def commit commit_report = { "records" => @records } return commit_report end end end

GuessPlugin

# guess_gzip.rb module Embulk

class GzipGuess < GuessPlugin Plugin.register_guess('gzip', self)

GZIP_HEADER = "\x1f\x8b".force_encoding('ASCII-8BIT').freeze

def guess(config, sample_buffer) if sample_buffer[0,2] == GZIP_HEADER return {"decoders" => [{"type" => "gzip"}]} end return {} end end

end

# guess_ module Embulk

class GuessNewline < TextGuessPlugin Plugin.register_guess('newline', self)

def guess_text(config, sample_text) cr_count = sample_text.count("\r") lf_count = sample_text.count("\n") crlf_count = sample_text.scan(/\r\n/).length if crlf_count > cr_count / 2 && crlf_count > lf_count / 2 return {"parser" => {"newline" => "CRLF"}} elsif cr_count > lf_count / 2 return {"parser" => {"newline" => "CR"}} else return {"parser" => {"newline" => "LF"}} end end end

end

Releasing to RubyGems

Examples > embulk-plugin-postgres-json.gem

> https://github.com/frsyuki/embulk-plugin-postgres-json > embulk-plugin-redis.gem

> https://github.com/komamitsu/embulk-plugin-redis > embulk-plugin-input-sfdc-event-log-files.gem

> https://github.com/nahi/embulk-plugin-input-sfdc-event-log-files

Roadmap & Development

Roadmap

> Add missing JRuby Plugin APIs > ParserPlugin, FormatterPlugin > DecoderPlugin, EncoderPlugin

> Add Executor plugin SPI > Add ssh distributed executor

> embulk run —command ssh %host embulk run %task > Add MapReduce executor > Add support for nested records (?)

Contributing to the Embulk project

> Pull-requests & issues on Github > Posting blogs

> “I tried Embulk. Here is how it worked” > “I read Embulk code. Here is how it’s written” > “Embulk is good because…but bad because…”

> Talking on Twitter with a word “embulk" > Writing & releasing plugins > Windows support > Integration to other software

> ETL tools, Fluentd, Hadoop, Presto, …

Q&A + Discussion?

Hiroshi Nakamura @nahi

Muga Nishizawa @muga_nishizawa

Sadayuki Furuhashi @frsyuki

Embulk committers:

https://jobs.lever.co/treasure-data

Cloud service for the entire data pipeline. We’re hiring!

top related