embulk, an open-source plugin-based parallel bulk data loader

Sadayuki Furuhashi Founder & Software Architect Treasure Data, inc. Embulk An open-source plugin-based parallel bulk data loader that makes painful data integration work relaxed. Sharing our knowledge on RubyGems to manage arbitrary files.

Upload: sadayuki-furuhashi

Post on 14-Jul-2015




6 download


Page 1: Embulk, an open-source plugin-based parallel bulk data loader

Sadayuki FuruhashiFounder & Software Architect

Treasure Data, inc.

EmbulkAn open-source plugin-based parallel bulk data loader that makes painful data integration work relaxed.

Sharing our knowledge on RubyGems to manage arbitrary files.

Page 2: Embulk, an open-source plugin-based parallel bulk data loader

A little about me...> Sadayuki Furuhashi

> github/twitter: @frsyuki > Treasure Data, Inc.

> Founder & Software Architect

> Open-source hacker > MessagePack - Efficient object serializer > Fluentd - An unified data collection tool > Prestogres - PostgreSQL protocol gateway for Presto > Embulk - A plugin-based parallel bulk data loader > ServerEngine - A Ruby framework to build multiprocess servers > LS4 - A distributed object storage with cross-region replication > kumofs - A distributed strong-consistent key-value data store

Page 3: Embulk, an open-source plugin-based parallel bulk data loader

Today’s talk

> What’s Embulk? > How Embulk works? > The architecture > Writing Embulk plugins > Roadmap & Development > Q&A + Discussion

Page 4: Embulk, an open-source plugin-based parallel bulk data loader

What’s Embulk?

> An open-source parallel bulk data loader

> using plugins

> to make data integration relaxed.

Page 5: Embulk, an open-source plugin-based parallel bulk data loader

What’s Embulk?

> An open-source parallel bulk data loader > loads records from “A” to “B”

> using plugins > for various kinds of “A” and “B”

> to make data integration relaxed. > which was very painful…

Storage, RDBMS, NoSQL, Cloud Service,


broken records,transactions (idempotency),

performance, …

Page 6: Embulk, an open-source plugin-based parallel bulk data loader

The pains of bulk data loading

Example: load a 10GB CSV file to PostgreSQL > 1. First attempt → fails > 2. Write a script to make the records cleaned

• Convert ”20150127T190500Z” → “2015-01-27 19:05:00 UTC”

• Convert “\N" → “”

• many cleanings…

> 3. Second attempt → another error • Convert “Inf” → “Infinity”

> 4. Fix the script, retry, retry, retry… > 5. Oh, some data got loaded twice!?

Page 7: Embulk, an open-source plugin-based parallel bulk data loader

The pains of bulk data loading

Example: load a 10GB CSV file to PostgreSQL > 6. Ok, the script worked. > 7. Register it to cron to sync data every day. > 8. One day… it fails with another error

• Convert invalid UTF-8 byte sequence to U+FFFD

Page 8: Embulk, an open-source plugin-based parallel bulk data loader

The pains of bulk data loading

Example: load 10GB CSV × 720 files > Most of scripts are slow.

• People have little time to optimize bulk load scripts

> One file takes 1 hour → 720 files takes 1 month (!?)

A lot of integration efforts for each storages: > XML, JSON, Apache log format (+some custom), … > SAM, BED, BAI2, HDF5, TDE, SequenceFile, RCFile… > MongoDB, Elasticsearch, Redshift, Salesforce, …

Page 9: Embulk, an open-source plugin-based parallel bulk data loader

The problems:

> Data cleaning (normalization) > How to normalize broken records?

> Error handling > How to remove broken records?

> Idempotent retrying > How to retry without duplicated loading?

> Performance optimization > How to optimize the code or parallelize?

Page 10: Embulk, an open-source plugin-based parallel bulk data loader

The problems at Treasure Data

Treasure Data Service? > “Fast, powerful SQL access to big data from connected

applications and products, with no new infrastructure or special skills required.”

> Customers want to try Treasure Data, but > SEs write scripts to bulk load their data. Hard work :(

> Customers want to migrate their big data, but > Hard work :(

> Fluentd solved streaming data collection, but > bulk data loading is another problem.

Page 11: Embulk, an open-source plugin-based parallel bulk data loader

A solution:

> Package the efforts as a plugin. > data cleaning, error handling, retrying

> Share & reuse the plugin. > don’t repeat the pains!

> Keep improving the plugin code. > rather than throwing away the efforts every time

> using OSS-style pull-reqs & frequent releases.

Page 12: Embulk, an open-source plugin-based parallel bulk data loader

EmbulkEmbulk is an open-source, plugin-based parallel bulk data loaderthat makes data integration works relaxed.

Page 13: Embulk, an open-source plugin-based parallel bulk data loader



Amazon S3


CSV Files







Page 14: Embulk, an open-source plugin-based parallel bulk data loader



Amazon S3


CSV Files







✓ Parallel execution ✓ Data validation ✓ Error recovery ✓ Deterministic behavior ✓ Idempotet retrying

bulk load

Page 15: Embulk, an open-source plugin-based parallel bulk data loader



Amazon S3


CSV Files







✓ Parallel execution ✓ Data validation ✓ Error recovery ✓ Deterministic behavior ✓ Idempotet retrying

Plugins Plugins

bulk load

Page 16: Embulk, an open-source plugin-based parallel bulk data loader

How Embulk works?

Page 17: Embulk, an open-source plugin-based parallel bulk data loader

# install $ wget https://bintray.com/artifact/download/

embulk/maven/embulk-0.2.0.jar -o embulk.jar $ chmod 755 embulk.jar

Installing embulk

Bintray releases

Embulk is released on Bintray

wget embulk.jar

Page 18: Embulk, an open-source plugin-based parallel bulk data loader

# install $ wget https://bintray.com/artifact/download/

embulk/maven/embulk-0.2.0.jar -o embulk.jar $ chmod 755 embulk.jar

# guess $ vi partial-config.yml $ ./embulk guess partial-config.yml

-o config.yml

Guess format & schema in: type: file paths: [data/examples/] out: type: example

in: type: file paths: [data/examples/] decoders: - {type: gzip} parser: charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' header_line: true columns: - name: time type: timestamp format: '%Y-%m-%d %H:%M:%S' - name: account type: long - name: purchase type: timestamp format: '%Y%m%d' - name: comment type: string out: type: example


by guess plugins

Page 19: Embulk, an open-source plugin-based parallel bulk data loader

# install $ wget https://bintray.com/artifact/download/

embulk/maven/embulk-0.2.0.jar -o embulk.jar $ chmod 755 embulk.jar

# guess $ vi partial-config.yml $ ./embulk guess partial-config.yml

-o config.yml

# preview $ ./embulk preview config.yml $ vi config.yml # if necessary

+--------------------------------------+---------------+--------------------+ | time:timestamp | uid:long | word:string | +--------------------------------------+---------------+--------------------+ | 2015-01-27 19:23:49 UTC | 32,864 | embulk | | 2015-01-27 19:01:23 UTC | 14,824 | jruby | | 2015-01-28 02:20:02 UTC | 27,559 | plugin | | 2015-01-29 11:54:36 UTC | 11,270 | fluentd | +--------------------------------------+---------------+--------------------+

Preview & fix config

Page 20: Embulk, an open-source plugin-based parallel bulk data loader

# install $ wget https://bintray.com/artifact/download/

embulk/maven/embulk-0.2.0.jar -o embulk.jar $ chmod 755 embulk.jar

# guess $ vi partial-config.yml $ ./embulk guess partial-config.yml

-o config.yml

# preview $ ./embulk preview config.yml $ vi config.yml # if necessary

# run $ ./embulk run config.yml -o config.yml

in: type: file paths: [data/examples/] decoders: - {type: gzip} parser: charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' header_line: true columns: - name: time type: timestamp format: '%Y-%m-%d %H:%M:%S' - name: account type: long - name: purchase type: timestamp format: '%Y%m%d' - name: comment type: string last_paths: [data/examples/sample_001.csv.gz] out: type: example

Deterministic run

Page 21: Embulk, an open-source plugin-based parallel bulk data loader

in: type: file paths: [data/examples/] decoders: - {type: gzip} parser: charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' header_line: true columns: - name: time type: timestamp format: '%Y-%m-%d %H:%M:%S' - name: account type: long - name: purchase type: timestamp format: '%Y%m%d' - name: comment type: string last_paths: [data/examples/sample_002.csv.gz] out: type: example


# install $ wget https://bintray.com/artifact/download/

embulk/maven/embulk-0.2.0.jar -o embulk.jar $ chmod 755 embulk.jar

# guess $ vi partial-config.yml $ ./embulk guess partial-config.yml

-o config.yml

# preview $ ./embulk preview config.yml $ vi config.yml # if necessary

# run $ ./embulk run config.yml -o config.yml

# repeat $ ./embulk run config.yml -o config.yml $ ./embulk run config.yml -o config.yml

Page 22: Embulk, an open-source plugin-based parallel bulk data loader

The architecture

Page 23: Embulk, an open-source plugin-based parallel bulk data loader

InputPlugin OutputPlugin

Embulk executor plugin

read records write records

Page 24: Embulk, an open-source plugin-based parallel bulk data loader

InputPlugin OutputPlugin

Embulk executor plugin

MySQL, Cassandra, HBase, Elasticsearch,

Treasure Data, …



Page 25: Embulk, an open-source plugin-based parallel bulk data loader









Embulk executor plugin

read files


parse files into records

write files


format records into files

Page 26: Embulk, an open-source plugin-based parallel bulk data loader









Embulk executor plugin

HDFS, S3,Riak CS, …

gzip, bzip2,3des, …








Page 27: Embulk, an open-source plugin-based parallel bulk data loader

Writing Embulk plugins

Page 28: Embulk, an open-source plugin-based parallel bulk data loader


module Embulk class InputExample < InputPlugin Plugin.register_input('example', self)

def self.transaction(config, &control) # read config task = { 'message' => config.param('message', :string, default: nil) } threads = config.param('threads', :int, default: 2)

columns = [ Column.new(0, 'col0', :long), Column.new(1, 'col1', :double), Column.new(2, 'col2', :string), ]

# BEGIN here

commit_reports = yield(task, columns, threads)

# COMMIT here puts "Example input finished"

return {} end

def run(task, schema, index, page_builder) puts "Example input thread #{@index}…"

10.times do |i| @page_builder.add([i, 10.0, "example"]) end @page_builder.finish

commit_report = { } return commit_report end end end

Page 29: Embulk, an open-source plugin-based parallel bulk data loader


module Embulk class OutputExample < OutputPlugin Plugin.register_output('example', self)

def self.transaction( config, schema, processor_count, &control) # read config task = { 'message' => config.param('message', :string, default: "record") }

puts "Example output started." commit_reports = yield(task) puts "Example output finished. Commit reports = #{commit_reports.to_json}"

return {} end

def initialize(task, schema, index) puts "Example output thread #{index}..." super @message = task.prop('message', :string) @records = 0 end

def add(page) page.each do |record| hash = Hash[schema.names.zip(record)] puts "#{@message}: #{hash.to_json}" @records += 1 end end

def finish end

def abort end

def commit commit_report = { "records" => @records } return commit_report end end end

Page 30: Embulk, an open-source plugin-based parallel bulk data loader


# guess_gzip.rb module Embulk

class GzipGuess < GuessPlugin Plugin.register_guess('gzip', self)

GZIP_HEADER = "\x1f\x8b".force_encoding('ASCII-8BIT').freeze

def guess(config, sample_buffer) if sample_buffer[0,2] == GZIP_HEADER return {"decoders" => [{"type" => "gzip"}]} end return {} end end


# guess_ module Embulk

class GuessNewline < TextGuessPlugin Plugin.register_guess('newline', self)

def guess_text(config, sample_text) cr_count = sample_text.count("\r") lf_count = sample_text.count("\n") crlf_count = sample_text.scan(/\r\n/).length if crlf_count > cr_count / 2 && crlf_count > lf_count / 2 return {"parser" => {"newline" => "CRLF"}} elsif cr_count > lf_count / 2 return {"parser" => {"newline" => "CR"}} else return {"parser" => {"newline" => "LF"}} end end end


Page 31: Embulk, an open-source plugin-based parallel bulk data loader

Releasing to RubyGems

Examples > embulk-plugin-postgres-json.gem

> https://github.com/frsyuki/embulk-plugin-postgres-json > embulk-plugin-redis.gem

> https://github.com/komamitsu/embulk-plugin-redis > embulk-plugin-input-sfdc-event-log-files.gem

> https://github.com/nahi/embulk-plugin-input-sfdc-event-log-files

Page 32: Embulk, an open-source plugin-based parallel bulk data loader

Roadmap & Development

Page 33: Embulk, an open-source plugin-based parallel bulk data loader


> Add missing JRuby Plugin APIs > ParserPlugin, FormatterPlugin > DecoderPlugin, EncoderPlugin

> Add Executor plugin SPI > Add ssh distributed executor

> embulk run —command ssh %host embulk run %task > Add MapReduce executor > Add support for nested records (?)

Page 34: Embulk, an open-source plugin-based parallel bulk data loader

Contributing to the Embulk project

> Pull-requests & issues on Github > Posting blogs

> “I tried Embulk. Here is how it worked” > “I read Embulk code. Here is how it’s written” > “Embulk is good because…but bad because…”

> Talking on Twitter with a word “embulk" > Writing & releasing plugins > Windows support > Integration to other software

> ETL tools, Fluentd, Hadoop, Presto, …

Page 35: Embulk, an open-source plugin-based parallel bulk data loader

Q&A + Discussion?

Hiroshi Nakamura @nahi

Muga Nishizawa @muga_nishizawa

Sadayuki Furuhashi @frsyuki

Embulk committers:

Page 36: Embulk, an open-source plugin-based parallel bulk data loader


Cloud service for the entire data pipeline. We’re hiring!