exemplos de uso de apache spark usando aws elastic map reduce
TRANSCRIPT
Exemplos de Uso de Apache Spark usando AWS Elastic MapReduceAWS Meetup Rio - April 2016
Felipe Almeida ([email protected] | queirozf.com)
Project url: https://github.com/queirozfcom/aws-meetup-april-2016
Estrutura da palestra
● Introdução Spark● Introdução EMR● Casos de uso comuns do EMR● Criação de um cluster no EMR● Casos de uso Spark (batch)● Exemplo para rodar no EMR● Casos de uso Spark (streaming)● Exemplo para rodar no EMR
2
Introdução Spark
● Ferramenta para processamento de dados distribuídos em memória, feita na linguagem Scala
4
Introdução Spark
● Ferramenta para processamento de dados distribuídos em memória, feita na linguagem Scala
● Começou como uma alternativa ao Hadoop MapReduce, atacando duas áreas principais, consideradas fraquezas do MapReduce:○ Processamento iterativo (várias tarefas em sequência)○ Processamento interativo (análise exploratória de dados)
5
Introdução Spark
● Ferramenta para processamento de dados distribuídos em memória, feita na linguagem Scala
6
Este pequeno pedaço de código lê um arquivo texto do HDFS e executa uma contagem de palavras no mesmo, de forma distribuída
Introdução Spark
● Ferramenta para processamento de dados distribuídos em memória, feita na linguagem Scala
7
Diferença no tempo de execução de duas tarefas de regressão logística (um algoritmo de aprendizado de máquina) em Hadoop MapReduce e Spark
Este pequeno pedaço de código lê um arquivo texto do HDFS e executa uma contagem de palavras no mesmo, de forma distribuída
Introdução Spark
● Atualmente (versão 1.6.1), o Spark:○ Pode fazer quaisquer operações do tipo map/reduce○ Tem módulo para tarefas de aprendizado de máquina○ Tem módulo para processamento de streams de dados○ Tem módulo para análise de dados com DataFrames, como
em R, Pandas (python) e similares○ etc.
8
Introdução Spark
● Atualmente (versão 1.6.1), o Spark:○ Pode fazer quaisquer operações do tipo map/reduce○ Tem módulo para tarefas de aprendizado de máquina○ Tem módulo para processamento de streams de dados○ Tem módulo para análise de dados com DataFrames, como
em R, Pandas (python) e similares○ etc.
● Tudo isso○ de forma distribuída, em memória○ com APIs amigáveis, em Scala, Java, Python e também R.
9
Introdução Spark
Há dois modos principais de execução:● Batch
○ Você inicia o processamento de um arquivo ou dataset finito, o Spark processa a(s) tarefa(s) configurada(s) e pára.
10
Introdução Spark
Há dois modos principais de execução● Batch
○ Você inicia o processamento de um arquivo ou dataset finito, o Spark processa a(s) tarefa(s) configurada(s) e pára.
● Streaming○ Você processa um stream de dados contínuo; a execução não
pára até que haja algum erro ou você termine a aplicação manualmente.
11
Introdução EMR
O AWS Elastic MapReduce (EMR) é um serviço gerenciado de clusters Hadoop
● O AWS EMR também dá suporte ao Spark (desde 2015)
● O EMR é acessível pelo console AWS:
13
Detalhe: console AWS
Introdução EMR
Importante: Há dois modos de execução de clusters EMR.● Cluster mode
○ Após a criação, o cluster só é desligado manualmente ou se houver um erro
14
Introdução EMR
Importante: Há dois modos de execução de clusters EMR.● Cluster mode
○ Após a criação, o cluster só é desligado manualmente ou se houver um erro
● Step execution (ou autotermination)○ Após a criação, o cluster executa as tarefas que você
configurou e é desligado automaticamente:
15Escolha do modo de execução
Introdução EMR
Obs: Há também uma ferramenta de linha de comando (CLI) que lhe permite fazer todas as ações que foram feitas pelo console da AWS.
16
Introdução EMR
Obs: Há também uma ferramenta de linha de comando (CLI) que lhe permite fazer todas as ações que foram feitas pelo console da AWS:
Alguns comandos disponíveis:● create-cluster● add-steps● list-clusters● install-applications● ssh
(Veja todos os comandos disponíveis neste neste link)
17
Casos de uso comuns do EMR
● Terceirizar custo e expertise de criação e manutenção de um cluster (com dezenas ou até centenas de máquinas) para a AWS
19
Casos de uso comuns do EMR
● Terceirizar custo e expertise de criação e manutenção de um cluster (com dezenas ou até centenas de máquinas) para a AWS
● Executar jobs esporádicos sem a necessidade de possuir um cluster○ No modo step execution, você só paga o tempo que a tarefa
está sendo executada
20
Casos de uso comuns do EMR
● Terceirizar custo e expertise de criação e manutenção de um cluster (com dezenas ou até centenas de máquinas) para a AWS
● Executar jobs esporádicos sem a necessidade de possuir um cluster○ No modo step execution, você só paga o tempo que a tarefa
está sendo executada
● Conectar serviços da AWS, e.g. S3, Kinesis, DynamoDB.
21
Criação de um cluster no EMR
● O Spark não é incluído por default nos clusters. É preciso marcar a opção correta:
23
Na tela de criação de cluster, selecione a opção que inclui o Spark
Criação de um cluster no EMR
● Se você quiser ter acesso à interface de administração do Spark e do YARN, é necessário criar o cluster usando uma chave de autenticação:
24
Opcionalmente, escolha uma chave de acesso que você tenha criado
Casos de uso Spark Batch
● Análise exploratória de grandes conjuntos de dados
● Fazer Data Warehousing em cima de um grande conjunto de dados, estilo OLAP
27
Casos de uso Spark Batch
● Análise exploratória de grandes conjuntos de dados
● Fazer Data Warehousing em cima de um grande conjunto de dados, estilo OLAP
● Treinar um modelo de aprendizado de máquina sobre um grande conjunto de dados
28
Casos de uso Spark Batch
● Análise exploratória de grandes conjuntos de dados
● Fazer Data Warehousing em cima de um grande conjunto de dados, estilo OLAP
● Treinar um modelo de aprendizado de máquina sobre um grande conjunto de dados
● Qualquer tarefa de analytics que antes era feita via Hadoop MapReduce
29
Exemplo Execução Spark Batch no EMR
As etapas para ter uma tarefa Spark Batch rodando no AWS EMR são:● Escrita do código e empacotamento do mesmo em um JAR (Java
Archive File)● Upload dos dados que serão analisados para o S3● Upload do código (JAR) para o S3● Criação do cluster em modo step, ou autotermination, configurado
para rodar o seu código● Esperar o cluster subir e seu job completar● Em caso de sucesso, pegar o resultado do seu job, caso tenha
sido salvo em algum meio externo (e.g. S3)
31
Exemplo Execução Spark Batch no EMR
O código de exemplo é o seguinte: ● Ler um arquivo em formato texto do S3 (tão grande quanto se
queira)● Calcular, de forma distribuída, o número de palavras no arquivo● Salvar o resultado (ou seja, o número de vezes que cada palavra
ocorre no arquivo inteiro) em outro bucket do S3
32
Exemplo Execução Spark Batch no EMR
Código completo: (disponível em http://bit.do/spark-batch-example-aws-meetup)
object WordCount{
def main(args:Array[String]){
if(args.length < 1){
System.err.println("Please set arguments for <s3_input_dir> <s3_output_dir>")
System.exit(1)
}
val inputDir = args(0)
val outputDir = args(1)
val cnf = new SparkConf().setAppName("Spark Distributed WordCount")
val sc = new SparkContext(cnf)
val textFile = sc.textFile(inputDir)
val counts = textFile.flatMap(line => line.split("\\s+")).map(word => (word, 1)).reduceByKey( (a,b) => a+b )
counts.saveAsTextFile(outputDir)
sc.stop()
}
}
33
Exemplo Execução Spark Batch no EMR
Código completo: (disponível em http://bit.do/spark-batch-example-aws-meetup)
object WordCount{
def main(args:Array[String]){
if(args.length < 1){
System.err.println("Please set arguments for <s3_input_dir> <s3_output_dir>")
System.exit(1)
}
val inputDir = args(0)
val outputDir = args(1)
val cnf = new SparkConf().setAppName("Spark Distributed WordCount")
val sc = new SparkContext(cnf)
val textFile = sc.textFile(inputDir)
val counts = textFile.flatMap(line => line.split("\\s+")).map(word => (word, 1)).reduceByKey( (a,b) => a+b )
counts.saveAsTextFile(outputDir)
sc.stop()
}
}
34
O endereço do bucket de input e de output são parâmetros do job
Exemplo Execução Spark Batch no EMR
Código completo: (disponível em http://bit.do/spark-batch-example-aws-meetup)
object WordCount{
def main(args:Array[String]){
if(args.length < 1){
System.err.println("Please set arguments for <s3_input_dir> <s3_output_dir>")
System.exit(1)
}
val inputDir = args(0)
val outputDir = args(1)
val cnf = new SparkConf().setAppName("Spark Distributed WordCount")
val sc = new SparkContext(cnf)
val textFile = sc.textFile(inputDir)
val counts = textFile.flatMap(line => line.split("\\s+")).map(word => (word, 1)).reduceByKey( (a,b) => a+b )
counts.saveAsTextFile(outputDir)
sc.stop()
}
}
35
Leitura do arquivo do S3
Exemplo Execução Spark Batch no EMR
Código completo: (disponível em http://bit.do/spark-batch-example-aws-meetup)
object WordCount{
def main(args:Array[String]){
if(args.length < 1){
System.err.println("Please set arguments for <s3_input_dir> <s3_output_dir>")
System.exit(1)
}
val inputDir = args(0)
val outputDir = args(1)
val cnf = new SparkConf().setAppName("Spark Distributed WordCount")
val sc = new SparkContext(cnf)
val textFile = sc.textFile(inputDir)
val counts = textFile.flatMap(line => line.split("\\s+")).map(word => (word, 1)).reduceByKey( (a,b) => a+b )
counts.saveAsTextFile(outputDir)
sc.stop()
}
}
36
Essa linha faz todo o processamento distribuído: a partição do arquivo, contagem de palavras e a agregação
Exemplo Execução Spark Batch no EMR
Código completo: (disponível em http://bit.do/spark-batch-example-aws-meetup)
object WordCount{
def main(args:Array[String]){
if(args.length < 1){
System.err.println("Please set arguments for <s3_input_dir> <s3_output_dir>")
System.exit(1)
}
val inputDir = args(0)
val outputDir = args(1)
val cnf = new SparkConf().setAppName("Spark Distributed WordCount")
val sc = new SparkContext(cnf)
val textFile = sc.textFile(inputDir)
val counts = textFile.flatMap(line => line.split("\\s+")).map(word => (word, 1)).reduceByKey( (a,b) => a+b )
counts.saveAsTextFile(outputDir)
sc.stop()
}
}
37
O resultado do processamento é salvo no bucket S3 de destino
Casos de uso Spark Streaming
● Monitoramento de ativos, serviços
● Processamento de eventos em tempo real para alimentação de dashboards
42
Casos de uso Spark Streaming
● Monitoramento de ativos, serviços
● Processamento de eventos em tempo real para alimentação de dashboards
● Processamento de dados vindos de serviços como ○ AWS Kinesis ○ Apache Kafka○ Apache Flume○ Twitter○ ZeroMQ○ MQTT
43
Casos de uso Spark Streaming
● Monitoramento de ativos, serviços
● Processamento de eventos em tempo real para alimentação de dashboards
● Processamento de dados vindos de serviços como ○ AWS Kinesis ○ Apache Kafka○ Apache Flume○ Twitter○ ZeroMQ○ MQTT
44
Há clientes oficiais do Spark para todos esses serviços!
Exemplo de Execução Spark Streaming no EMR
As etapas para ter uma tarefa Spark Streaming rodando no AWS EMR são:
● Criação do cluster em modo cluster● Escrita do código e empacotamento do mesmo em um JAR (Java
Archive File)● Upload do código (JAR) para o S3● Adição do seu job no cluster● Caso não tenha havido erro na inicialização, a aplicação vai ficar
rodando até haver algum erro ou ela ser finalizada manualmente.
46
Exemplo de Execução Spark Streaming no EMR
As etapas para ter uma tarefa Spark Streaming rodando no AWS EMR são:
● Criação do cluster em modo cluster● Escrita do código e empacotamento do mesmo em um JAR (Java
Archive File)● Upload do código (JAR) para o S3● Adição do seu job no cluster● Caso não tenha havido erro na inicialização, a aplicação vai ficar
rodando até haver algum erro ou ela ser finalizada manualmente.
47
Note que, neste caso, o cluster é criado antes do envio do job
Exemplo de Execução Spark Streaming no EMR
● Exemplo: contar quantos tweets em português são criados a cada 10 segundos: (disponível em http://bit.do/spark-streaming-example-aws-meetup)
val config = new SparkConf().setAppName("aws-meetup-rio-2016-streaming")
val ssc = new StreamingContext(config,batchDuration)
val twitterConf = new ConfigurationBuilder()
twitterConf.setOAuthAccessToken(accessToken)
twitterConf.setOAuthAccessTokenSecret(accessTokenSecret)
twitterConf.setOAuthConsumerKey(apiKey)
twitterConf.setOAuthConsumerSecret(apiSecret)
val auth = AuthorizationFactory.getInstance(twitterConf.build())
val tweets = TwitterUtils.createStream(ssc,Some(auth))
val portuguesetweets = tweets.map(status => status.getLang) .filter(lang => lang == "pt" )
portuguesetweets.count().print()
ssc.start()
48
Exemplo de Execução Spark Streaming no EMR
● Exemplo: contar quantos tweets em português são criados a cada 10 segundos: (disponível em http://bit.do/spark-streaming-example-aws-meetup)
val config = new SparkConf().setAppName("aws-meetup-rio-2016-streaming")
val ssc = new StreamingContext(config,batchDuration)
val twitterConf = new ConfigurationBuilder()
twitterConf.setOAuthAccessToken(accessToken)
twitterConf.setOAuthAccessTokenSecret(accessTokenSecret)
twitterConf.setOAuthConsumerKey(apiKey)
twitterConf.setOAuthConsumerSecret(apiSecret)
val auth = AuthorizationFactory.getInstance(twitterConf.build())
val tweets = TwitterUtils.createStream(ssc,Some(auth))
val portuguesetweets = tweets.map(status => status.getLang).filter(lang => lang == "pt" )
portuguesetweets.count().print()
ssc.start()
49
Criação do stream twitter
Filtragem pelo idioma (“pt”)
Agregação (count) e impressão na saída padrão
Exemplo de Execução Spark Streaming no EMR
● O resultado é impresso na saída padrão:
50
Podemos ver telas de logs, saída padrão e erro do YARN no EMR