processamento de dados em tempo real - qconsp...it coordinator and software architect at movile msc....

47
Processamento de dados em "tempo real" Eiti Kimura QConSP19 com Apache Spark Structured Stream

Upload: others

Post on 27-Jun-2020

1 views

Category:

Documents


0 download

TRANSCRIPT

Page 1: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Processamento de dados em "tempo real"

Eiti KimuraQConSP19

com Apache Spark Structured Stream

Page 2: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

● IT Coordinator and Software Architect at Movile● Msc. in Electrical Engineering● Apache Cassandra MVP (2014/2015 e 2015/2016)● Cassandra Summit Speaker (2014 e 2015)● Strata Hadoop World Singapore Speaker (2016)● Spark Summit Speaker (2017)● RedisConf Speaker (2018)

Eiti Kimura

eitikimura

Page 3: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra
Page 4: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

+1 BilhãoDe mensagens por mês

Page 5: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra
Page 6: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

SUMÁRIO

● Introdução ao Apache Spark● Nova API Structured Streaming● Caso de uso de uma aplicação de

processamento em "tempo real"● Lições aprendidas

Page 7: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Introdução ao Apache Spark

Apache Spark™ is a fast and general engine for large-scale data processing.

Page 8: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Em que o Apache Spark é usado?

● Processos de ETL

● Consultas Interativas (SQL)

● Análise de dados avançada

● Machine Learning

● Streaming sobre grandes datasets

Page 9: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Stream de dados

Page 10: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Structured Stream do Spark

API de alto nível para desenvolvimento de aplicações contínuas de processamento de stream de dados, integrando com storages de forma consistente e tolerante a falhas.

Page 11: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Apache Spark Structured Stream

• Nova API de alto nível• Junção de dados contínua com conteúdo estático • Integrações com diversas fontes de dados• Tolerante a falhas (checkpoints) • Tratamento de eventos desordenados (watermark)

Page 12: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Processamento Batch com DataFrames

input = spark.read .format("csv") .load("source-path")

result = input .select("device", "signal") .where("signal > 15")

result.write .format("parquet") .save("dest-path")

Leitura de um arquivo CSV

Aplicação de filtros e seleção

Escrita em formato parquet

Page 13: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Processamento Streaming com DataFrames

input = spark.readStream .format("csv") .load("source-path")

result = input .select("device", "signal") .where("signal > 15")

result.writeStream .format("parquet") .start("dest-path")

Leitura de um arquivo CSV Stream

Aplicação de filtros e seleção

Escrita em formato parquet Stream

Substitui read por readStream

Código não muda!

Substitui write por writeStreame save por start

Page 14: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra
Page 15: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra
Page 16: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra
Page 17: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra
Page 18: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Tipos de Entrada de Dados (Input Sources)

Distributed File System, Dir

Apache Kafka Stream

Socket Connection

Page 19: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Modos de Output, saída de dados

● Complete: todas as linhas resultantes do processamento são direcionadas para saída de dados

● Update: somente as linhas que sofreram alterações ao longo da última execução

● Append: somente as novas linhas geradas no processamento

Page 20: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Tipos de Saída de Dados (Output Sinks)

Distributed File System

Apache Kafka Stream

Memory/Console

Foreach Extension

Page 21: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Tolerância a Falhas com Checkpoints

Checkpointing: gravação de metadados (ex: offsets) em write ahead logs em disco (S3/HDFS) para recuperação em caso de falhas.

Page 22: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Tratamento de dados desordenados Watermarking

Page 23: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

USE CASE

Processador de dados em tempo quase real

Page 24: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Subscription & Billing System a.k.a

SBSSBS

Page 25: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

+110 milhõesDe transações por dia

4 Grandes

operadoras no Brasil

Page 26: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Arquitetura do Processador Contínuo

Page 27: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Arquitetura do Processador Contínuo

Page 28: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Arquitetura do Processador Contínuo

Page 29: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Arquitetura do Processador Contínuo

Page 30: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Arquitetura do Processador Contínuo

Page 31: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra
Page 32: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Amostra de Dados CSV (Gzipped)

838,2,5500000000,100015,"{""authCode"":""3215"",""transactionIdAuth"":""101170622042837

4938""}",SUBSCRIPTION_050,0.5,11,0,1,14,Subscription renew.,2017-07-18

13:22:59.518,,,19,PRE,false,Charge Fail. CTN[31984771092][PRE][GSM]: Without

Credit.,,,,0,458,,engine2dc2,23,3,5,2017-07-18

13:22:59.544,,FE1952B0-571D-11E7-8A17-CA2EE9B22EAB,NT0359

838,2,5500000000,100008,"{""authCode"":""9496"",""transactionIdAuth"":""117170703192540

9718""}",SUBSCRIPTION_099,0.99,11,0,1,14,Subscription renew.,2017-07-18

13:22:58.893,,,19,PRE,false,Charge Fail. CTN[21976504467][PRE][GSM]: Without

Credit.,,,,0,1074,,engine2dc2,24,3,5,2017-07-18

13:22:58.928,,3ADF36D0-6040-11E7-9619-A2D6E78E4511,NT0360

703,2,5500000000,100004,"{""authCode"":""6838"",""transactionIdAuth"":""118170706120694

8526""}",SUBSCRIPTION_299,2.99,11,0,1,14,Subscription renew.,2017-07-18

13:22:59.246,,,19,PRE,false,Charge Fail. CTN[84994640470][PRE][GSM]: Without

Credit.,,,,0,748,,engine2dc2,24,3,5,2017-07-18 13:22:59.254, NT0299

Page 33: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Fonte de Entrada de Dados

val streamReader = spark.readStream

.format("csv")

.option("header", false)

.option("mode", "DROPMALFORMED")

.schema(ReadSchemas.csvTransactionSchema)

.load("hdfs://YOUR_PATH/20*/*/*/*.gz")

val conf = new SparkConf().setAppName("Structured Streaming")

val spark = SparkSession.builder()

.config(conf).getOrCreate()

fragmento de código Scala

INPUT

Page 34: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Estrutura na definição de um Schema

StructField("origin_id", IntegerType, true)

Nome do campo

Tipo do campo

pode ser nulo?

Page 35: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Definição de Schema de Leitura de Dados

// the csv data schema

def csvTransactionLogSchema = StructType {

StructType(Array(

StructField("id", StringType, true),

StructField("application_id", IntegerType, true),

StructField("carrier_id", IntegerType, true),

StructField("phone", StringType, true),

StructField("price", DoubleType, true),

StructField("origin_id", IntegerType, true),

. . .

))

}

fragmento de código Scala

Page 36: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Processamento (Spark SQL API) v1val query = streamReader

.withColumn("date", $"creation_date".cast("date"))

.withColumn("successful_charges", when($"transaction_status_id" === 2, 1))

.withColumn("no_credit", when($"transaction_status_id" === 0, 1).otherwise(0))

.withColumn("error", when($"transaction_status_id" === 3).otherwise(0))

.filter("carrier_id IN (1,2,4,5)")

.filter("transaction_status_id NOT IN (5, 6)")

.filter("transaction_action_id IN (0, 1)")

.withWatermark("creation_date", "3 hour")

.groupBy($"carrier_id", window($"creation_date", "5 minutes").as("window"))

.agg($"carrier_id",

avg($"response_time").as("avg_response_time"),

sum($"successful_charges").as("successful_charges"),

sum($"no_credit").as("no_credit"),

sum($"error").as("error"),

count($"carrier_id").as("total_attempts"))

select case when

filtering

aggregation

Page 37: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Processamento (Spark SQL API) v2

streamReader .withWatermark("creation_date", "3 hour") .createOrReplaceTempView("transaction_temp_table")

val streamReader = spark.readStream

.format("csv")

.schema(ReadSchemas.csvTransactionSchema)

.load("hdfs://YOUR_PATH/20*/*/*/*.gz")

Page 38: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Processamento (Spark SQL) v2val query : DataFrame = spark.sql( """ SELECT carrier_id, TO_DATE(creation_date) as record_date, HOUR(creation_date) as hour_of_day, WINDOW(creation_date, "5 minutes").start as start_date, AVG(response_time) as avg_response_time , SUM(CASE WHEN transaction_status_id = 2 THEN 1 ELSE 0 END) as successful_charges, SUM(CASE WHEN transaction_status_id = 0 THEN 1 ELSE 0 END) as no_credit, count(carrier_id) as total_attempts FROM transaction_raw WHERE carrier_id IN (1,2,4,5) AND transaction_action_id IN (0, 1) AND transaction_status_id NOT IN (5, 6) GROUP BY carrier_id, TO_DATE(creation_date), HOUR(creation_date), WINDOW(creation_date, "5 minutes").start""")

SELECT carrier_id, TO_DATE(creation_date) as record_date, HOUR(creation_date) as hour_of_day, WINDOW(creation_date, "5 minutes").start as start_date, AVG(response_time) as avg_response_time , SUM(CASE WHEN transaction_status_id = 2 THEN 1 ELSE 0 END) as successful_charges, SUM(CASE WHEN transaction_status_id = 0 THEN 1 ELSE 0 END) as no_credit, count(carrier_id) as total_attempts FROM transaction_temp_table WHERE carrier_id IN (1,2,4,5) AND transaction_action_id IN (0, 1) AND transaction_status_id NOT IN (5, 6) GROUP BY carrier_id, TO_DATE(creation_date), HOUR(creation_date), WINDOW(creation_date, "5 minutes").start

Page 39: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Fonte de Saída de Dados

val jdbcWriter = new JDBCSink(resource, username, password)

val foreachStream = query

.select($"carrier_id", $"date", $"hour_of_day", $"start_date", $"end_date")

.writeStream

.foreach(jdbcWriter)

.outputMode(OutputMode.Update())

.trigger(Trigger.ProcessingTime("2 minute"))

.option("checkpointLocation", "hdfs://YOUR_PATH/checkpoint-complete/")

.start

foreachStream.awaitTermination()

OUTPUT

Page 40: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Result table em consolidação

+--+-----------+-----+------+------+------------+--------+---------+-----+-------+

|id|record_date|hour |start |end |avg_resp_tm |success |no_credit|error|tot_att|

+--+-----------+-----+------+------+------------+-- -----+---------+-----+-------+

|1 |2017-07-18 |13 |13:20 |13:25 |618.8061297 | 4 |2607 |195 |2806 |

|2 |2017-07-18 |13 |13:20 |13:25 |1456.424283 | 13 |10912 |1503 |12428 |

|5 |2017-07-18 |13 |13:20 |13:25 |1161.730896 | 9 |2796 |532 |3337 |

|4 |2017-07-18 |13 |13:20 |13:25 |2950.642105 | 4 |1364 |54 |1425 |

+--+-----------+-----+------+------+------------+--------+---------+-----+-------+

Window (5 mins)Consolidação

Watermark (ts) Agregações

Operadoras

Page 41: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

RESULTADOS OBTIDOS

60-90 minsProcessamento de Arquivos

via ETL persistência em banco de analytics

3-5 minsProcessamento contínuo com

Apache Structured Stream

10-15 segundosConsulta de consolidação

para exibição no Dashboard

< 1 segundoConsulta para exibição no

Dashboard

ANTES DEPOIS

mais rápido~ 30x

mais rápido

Page 42: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Lições Aprendidas

Page 43: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

deSchema

EVOLUÇÃO

def csvSchema = StructType {

StructType(Array(

StructField("id", StringType, true),

StructField("application_id", IntegerType, true),

StructField("carrier_id", IntegerType, true),

StructField("creation_date", TimestampType, true)

))}

Page 44: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Resiliência no processamento com Streams

val input = spark.readStream

.option("mode", "DROPMALFORMED")

spark.sqlContext

.setConf("spark.sql.files.ignoreCorruptFiles","true")

Page 45: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Performance Check

WARN ProcessingTimeExecutor:66 - Current batch

is falling behind. The trigger interval is 1000

milliseconds, but spent 19455 milliseconds

Page 46: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Apache Spark Releases

● (2.3) Continuous Processing in Structured Streaming ○ Nova engine de execução de queries sobre Streaming com

latência de sub-milissegundo de ponta-a-ponta

● (2.4) Kafka Client upgrade ○ atualização versão do client do Kafka da 0.10.0.1 para 2.0.0

Page 47: Processamento de dados em tempo real - QConSP...IT Coordinator and Software Architect at Movile Msc. in Electrical Engineering Apache Cassandra MVP (2014/2015 e 2015/2016) Cassandra

Obrigado

Perguntas?

github.com/eitikimura/structured-streaming

eitikimura