embulk, an open-source plugin-based parallel bulk data loader

36
Sadayuki Furuhashi Founder & Software Architect Treasure Data, inc. Embulk An open-source plugin-based parallel bulk data loader that makes painful data integration work relaxed. Sharing our knowledge on RubyGems to manage arbitrary files.

Upload: sadayuki-furuhashi

Post on 14-Jul-2015

37.091 views

Category:

Software


6 download

TRANSCRIPT

Page 1: Embulk, an open-source plugin-based parallel bulk data loader

Sadayuki FuruhashiFounder & Software Architect

Treasure Data, inc.

EmbulkAn open-source plugin-based parallel bulk data loader that makes painful data integration work relaxed.

Sharing our knowledge on RubyGems to manage arbitrary files.

Page 2: Embulk, an open-source plugin-based parallel bulk data loader

A little about me...> Sadayuki Furuhashi

> github/twitter: @frsyuki > Treasure Data, Inc.

> Founder & Software Architect

> Open-source hacker > MessagePack - Efficient object serializer > Fluentd - An unified data collection tool > Prestogres - PostgreSQL protocol gateway for Presto > Embulk - A plugin-based parallel bulk data loader > ServerEngine - A Ruby framework to build multiprocess servers > LS4 - A distributed object storage with cross-region replication > kumofs - A distributed strong-consistent key-value data store

Page 3: Embulk, an open-source plugin-based parallel bulk data loader

Today’s talk

> What’s Embulk? > How Embulk works? > The architecture > Writing Embulk plugins > Roadmap & Development > Q&A + Discussion

Page 4: Embulk, an open-source plugin-based parallel bulk data loader

What’s Embulk?

> An open-source parallel bulk data loader

> using plugins

> to make data integration relaxed.

Page 5: Embulk, an open-source plugin-based parallel bulk data loader

What’s Embulk?

> An open-source parallel bulk data loader > loads records from “A” to “B”

> using plugins > for various kinds of “A” and “B”

> to make data integration relaxed. > which was very painful…

Storage, RDBMS, NoSQL, Cloud Service,

etc.

broken records,transactions (idempotency),

performance, …

Page 6: Embulk, an open-source plugin-based parallel bulk data loader

The pains of bulk data loading

Example: load a 10GB CSV file to PostgreSQL > 1. First attempt → fails > 2. Write a script to make the records cleaned

• Convert ”20150127T190500Z” → “2015-01-27 19:05:00 UTC”

• Convert “\N" → “”

• many cleanings…

> 3. Second attempt → another error • Convert “Inf” → “Infinity”

> 4. Fix the script, retry, retry, retry… > 5. Oh, some data got loaded twice!?

Page 7: Embulk, an open-source plugin-based parallel bulk data loader

The pains of bulk data loading

Example: load a 10GB CSV file to PostgreSQL > 6. Ok, the script worked. > 7. Register it to cron to sync data every day. > 8. One day… it fails with another error

• Convert invalid UTF-8 byte sequence to U+FFFD

Page 8: Embulk, an open-source plugin-based parallel bulk data loader

The pains of bulk data loading

Example: load 10GB CSV × 720 files > Most of scripts are slow.

• People have little time to optimize bulk load scripts

> One file takes 1 hour → 720 files takes 1 month (!?)

A lot of integration efforts for each storages: > XML, JSON, Apache log format (+some custom), … > SAM, BED, BAI2, HDF5, TDE, SequenceFile, RCFile… > MongoDB, Elasticsearch, Redshift, Salesforce, …

Page 9: Embulk, an open-source plugin-based parallel bulk data loader

The problems:

> Data cleaning (normalization) > How to normalize broken records?

> Error handling > How to remove broken records?

> Idempotent retrying > How to retry without duplicated loading?

> Performance optimization > How to optimize the code or parallelize?

Page 10: Embulk, an open-source plugin-based parallel bulk data loader

The problems at Treasure Data

Treasure Data Service? > “Fast, powerful SQL access to big data from connected

applications and products, with no new infrastructure or special skills required.”

> Customers want to try Treasure Data, but > SEs write scripts to bulk load their data. Hard work :(

> Customers want to migrate their big data, but > Hard work :(

> Fluentd solved streaming data collection, but > bulk data loading is another problem.

Page 11: Embulk, an open-source plugin-based parallel bulk data loader

A solution:

> Package the efforts as a plugin. > data cleaning, error handling, retrying

> Share & reuse the plugin. > don’t repeat the pains!

> Keep improving the plugin code. > rather than throwing away the efforts every time

> using OSS-style pull-reqs & frequent releases.

Page 12: Embulk, an open-source plugin-based parallel bulk data loader

EmbulkEmbulk is an open-source, plugin-based parallel bulk data loaderthat makes data integration works relaxed.

Page 13: Embulk, an open-source plugin-based parallel bulk data loader

HDFS

MySQL

Amazon S3

Embulk

CSV Files

SequenceFile

Salesforce.com

Elasticsearch

Cassandra

Hive

Redis

Page 14: Embulk, an open-source plugin-based parallel bulk data loader

HDFS

MySQL

Amazon S3

Embulk

CSV Files

SequenceFile

Salesforce.com

Elasticsearch

Cassandra

Hive

Redis

✓ Parallel execution ✓ Data validation ✓ Error recovery ✓ Deterministic behavior ✓ Idempotet retrying

bulk load

Page 15: Embulk, an open-source plugin-based parallel bulk data loader

HDFS

MySQL

Amazon S3

Embulk

CSV Files

SequenceFile

Salesforce.com

Elasticsearch

Cassandra

Hive

Redis

✓ Parallel execution ✓ Data validation ✓ Error recovery ✓ Deterministic behavior ✓ Idempotet retrying

Plugins Plugins

bulk load

Page 16: Embulk, an open-source plugin-based parallel bulk data loader

How Embulk works?

Page 17: Embulk, an open-source plugin-based parallel bulk data loader

# install $ wget https://bintray.com/artifact/download/

embulk/maven/embulk-0.2.0.jar -o embulk.jar $ chmod 755 embulk.jar

Installing embulk

Bintray releases

Embulk is released on Bintray

wget embulk.jar

Page 18: Embulk, an open-source plugin-based parallel bulk data loader

# install $ wget https://bintray.com/artifact/download/

embulk/maven/embulk-0.2.0.jar -o embulk.jar $ chmod 755 embulk.jar

# guess $ vi partial-config.yml $ ./embulk guess partial-config.yml

-o config.yml

Guess format & schema in: type: file paths: [data/examples/] out: type: example

in: type: file paths: [data/examples/] decoders: - {type: gzip} parser: charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' header_line: true columns: - name: time type: timestamp format: '%Y-%m-%d %H:%M:%S' - name: account type: long - name: purchase type: timestamp format: '%Y%m%d' - name: comment type: string out: type: example

guess

by guess plugins

Page 19: Embulk, an open-source plugin-based parallel bulk data loader

# install $ wget https://bintray.com/artifact/download/

embulk/maven/embulk-0.2.0.jar -o embulk.jar $ chmod 755 embulk.jar

# guess $ vi partial-config.yml $ ./embulk guess partial-config.yml

-o config.yml

# preview $ ./embulk preview config.yml $ vi config.yml # if necessary

+--------------------------------------+---------------+--------------------+ | time:timestamp | uid:long | word:string | +--------------------------------------+---------------+--------------------+ | 2015-01-27 19:23:49 UTC | 32,864 | embulk | | 2015-01-27 19:01:23 UTC | 14,824 | jruby | | 2015-01-28 02:20:02 UTC | 27,559 | plugin | | 2015-01-29 11:54:36 UTC | 11,270 | fluentd | +--------------------------------------+---------------+--------------------+

Preview & fix config

Page 20: Embulk, an open-source plugin-based parallel bulk data loader

# install $ wget https://bintray.com/artifact/download/

embulk/maven/embulk-0.2.0.jar -o embulk.jar $ chmod 755 embulk.jar

# guess $ vi partial-config.yml $ ./embulk guess partial-config.yml

-o config.yml

# preview $ ./embulk preview config.yml $ vi config.yml # if necessary

# run $ ./embulk run config.yml -o config.yml

in: type: file paths: [data/examples/] decoders: - {type: gzip} parser: charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' header_line: true columns: - name: time type: timestamp format: '%Y-%m-%d %H:%M:%S' - name: account type: long - name: purchase type: timestamp format: '%Y%m%d' - name: comment type: string last_paths: [data/examples/sample_001.csv.gz] out: type: example

Deterministic run

Page 21: Embulk, an open-source plugin-based parallel bulk data loader

in: type: file paths: [data/examples/] decoders: - {type: gzip} parser: charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' header_line: true columns: - name: time type: timestamp format: '%Y-%m-%d %H:%M:%S' - name: account type: long - name: purchase type: timestamp format: '%Y%m%d' - name: comment type: string last_paths: [data/examples/sample_002.csv.gz] out: type: example

Repeat

# install $ wget https://bintray.com/artifact/download/

embulk/maven/embulk-0.2.0.jar -o embulk.jar $ chmod 755 embulk.jar

# guess $ vi partial-config.yml $ ./embulk guess partial-config.yml

-o config.yml

# preview $ ./embulk preview config.yml $ vi config.yml # if necessary

# run $ ./embulk run config.yml -o config.yml

# repeat $ ./embulk run config.yml -o config.yml $ ./embulk run config.yml -o config.yml

Page 22: Embulk, an open-source plugin-based parallel bulk data loader

The architecture

Page 23: Embulk, an open-source plugin-based parallel bulk data loader

InputPlugin OutputPlugin

Embulk executor plugin

read records write records

Page 24: Embulk, an open-source plugin-based parallel bulk data loader

InputPlugin OutputPlugin

Embulk executor plugin

MySQL, Cassandra, HBase, Elasticsearch,

Treasure Data, …

record

record

Page 25: Embulk, an open-source plugin-based parallel bulk data loader

InputPlugin

FileInputPlugin

OutputPlugin

FileOutputPlugin

EncoderPlugin

FormatterPlugin

DecoderPlugin

ParserPlugin

Embulk executor plugin

read files

decompress

parse files into records

write files

compress

format records into files

Page 26: Embulk, an open-source plugin-based parallel bulk data loader

InputPlugin

FileInputPlugin

OutputPlugin

FileOutputPlugin

EncoderPlugin

FormatterPlugin

DecoderPlugin

ParserPlugin

Embulk executor plugin

HDFS, S3,Riak CS, …

gzip, bzip2,3des, …

CSV, JSON,RCFile, …

buffer

buffer

record

record

buffer

buffer

Page 27: Embulk, an open-source plugin-based parallel bulk data loader

Writing Embulk plugins

Page 28: Embulk, an open-source plugin-based parallel bulk data loader

InputPlugin

module Embulk class InputExample < InputPlugin Plugin.register_input('example', self)

def self.transaction(config, &control) # read config task = { 'message' => config.param('message', :string, default: nil) } threads = config.param('threads', :int, default: 2)

columns = [ Column.new(0, 'col0', :long), Column.new(1, 'col1', :double), Column.new(2, 'col2', :string), ]

# BEGIN here

commit_reports = yield(task, columns, threads)

# COMMIT here puts "Example input finished"

return {} end

def run(task, schema, index, page_builder) puts "Example input thread #{@index}…"

10.times do |i| @page_builder.add([i, 10.0, "example"]) end @page_builder.finish

commit_report = { } return commit_report end end end

Page 29: Embulk, an open-source plugin-based parallel bulk data loader

OutputPlugin

module Embulk class OutputExample < OutputPlugin Plugin.register_output('example', self)

def self.transaction( config, schema, processor_count, &control) # read config task = { 'message' => config.param('message', :string, default: "record") }

puts "Example output started." commit_reports = yield(task) puts "Example output finished. Commit reports = #{commit_reports.to_json}"

return {} end

def initialize(task, schema, index) puts "Example output thread #{index}..." super @message = task.prop('message', :string) @records = 0 end

def add(page) page.each do |record| hash = Hash[schema.names.zip(record)] puts "#{@message}: #{hash.to_json}" @records += 1 end end

def finish end

def abort end

def commit commit_report = { "records" => @records } return commit_report end end end

Page 30: Embulk, an open-source plugin-based parallel bulk data loader

GuessPlugin

# guess_gzip.rb module Embulk

class GzipGuess < GuessPlugin Plugin.register_guess('gzip', self)

GZIP_HEADER = "\x1f\x8b".force_encoding('ASCII-8BIT').freeze

def guess(config, sample_buffer) if sample_buffer[0,2] == GZIP_HEADER return {"decoders" => [{"type" => "gzip"}]} end return {} end end

end

# guess_ module Embulk

class GuessNewline < TextGuessPlugin Plugin.register_guess('newline', self)

def guess_text(config, sample_text) cr_count = sample_text.count("\r") lf_count = sample_text.count("\n") crlf_count = sample_text.scan(/\r\n/).length if crlf_count > cr_count / 2 && crlf_count > lf_count / 2 return {"parser" => {"newline" => "CRLF"}} elsif cr_count > lf_count / 2 return {"parser" => {"newline" => "CR"}} else return {"parser" => {"newline" => "LF"}} end end end

end

Page 31: Embulk, an open-source plugin-based parallel bulk data loader

Releasing to RubyGems

Examples > embulk-plugin-postgres-json.gem

> https://github.com/frsyuki/embulk-plugin-postgres-json > embulk-plugin-redis.gem

> https://github.com/komamitsu/embulk-plugin-redis > embulk-plugin-input-sfdc-event-log-files.gem

> https://github.com/nahi/embulk-plugin-input-sfdc-event-log-files

Page 32: Embulk, an open-source plugin-based parallel bulk data loader

Roadmap & Development

Page 33: Embulk, an open-source plugin-based parallel bulk data loader

Roadmap

> Add missing JRuby Plugin APIs > ParserPlugin, FormatterPlugin > DecoderPlugin, EncoderPlugin

> Add Executor plugin SPI > Add ssh distributed executor

> embulk run —command ssh %host embulk run %task > Add MapReduce executor > Add support for nested records (?)

Page 34: Embulk, an open-source plugin-based parallel bulk data loader

Contributing to the Embulk project

> Pull-requests & issues on Github > Posting blogs

> “I tried Embulk. Here is how it worked” > “I read Embulk code. Here is how it’s written” > “Embulk is good because…but bad because…”

> Talking on Twitter with a word “embulk" > Writing & releasing plugins > Windows support > Integration to other software

> ETL tools, Fluentd, Hadoop, Presto, …

Page 35: Embulk, an open-source plugin-based parallel bulk data loader

Q&A + Discussion?

Hiroshi Nakamura @nahi

Muga Nishizawa @muga_nishizawa

Sadayuki Furuhashi @frsyuki

Embulk committers:

Page 36: Embulk, an open-source plugin-based parallel bulk data loader

https://jobs.lever.co/treasure-data

Cloud service for the entire data pipeline. We’re hiring!