spark streaming and hdfs etl on kubernetes...on-premise yarn (hdfs) vs cloud k8s (external...

12
Spark Streaming and HDFS ETL with Kubernetes Piotr Mrowczynski, CERN IT-DB-SAS Prasanth Kothuri, CERN IT-DB-SAS 1

Upload: others

Post on 31-Dec-2020

7 views

Category:

Documents


0 download

TRANSCRIPT

Page 1: Spark Streaming and HDFS ETL on Kubernetes...On-Premise YARN (HDFS) vs Cloud K8s (External Storage)!4 • Kubernetes allows native ad-hoc clusters, scaling of nodes, on-spot instances

Spark Streaming and HDFS ETL with Kubernetes

Piotr Mrowczynski, CERN IT-DB-SAS Prasanth Kothuri, CERN IT-DB-SAS

1

Page 2: Spark Streaming and HDFS ETL on Kubernetes...On-Premise YARN (HDFS) vs Cloud K8s (External Storage)!4 • Kubernetes allows native ad-hoc clusters, scaling of nodes, on-spot instances

Operational challenges for Big Data at CERN

!2

• YARN with HDFS storage • Physical machines allocated – means no resource elasticity, compute

coupled with data storage • Good for stable, predictable production workloads. Good for use-cases with

data locality (database-like system, ETL, adhoc analytics of archives in HDFS)

• Challenges • Mixed: ETL, Interactive Spark with EOS/S3/(HDFS in some formats?),

Spark Streaming with Kafka. No user isolation - high priority and low priority share the same environment regardless of job requirements.

• Limited to the capacity and resources of on-premise Hadoop clusters, difficult to horizontaly scale.

Page 3: Spark Streaming and HDFS ETL on Kubernetes...On-Premise YARN (HDFS) vs Cloud K8s (External Storage)!4 • Kubernetes allows native ad-hoc clusters, scaling of nodes, on-spot instances

On-Premise YARN (HDFS) vs Cloud K8s (External Storage)

!3

• Data stored on disk can be large, and compute nodes can be scaled separate.

• Trade-off between data locality and compute elasticity (also data locality and networking infrastructure)

• Data locality is important in case of some data formats not to read too much data

Page 4: Spark Streaming and HDFS ETL on Kubernetes...On-Premise YARN (HDFS) vs Cloud K8s (External Storage)!4 • Kubernetes allows native ad-hoc clusters, scaling of nodes, on-spot instances

On-Premise YARN (HDFS) vs Cloud K8s (External Storage)

!4

• Kubernetes allows native ad-hoc clusters, scaling of nodes, on-spot instances (subset of VMs can be pre-empted any time)

• Cloud managed clusters simplify dev-ops required to provision and maintain clusters

Page 5: Spark Streaming and HDFS ETL on Kubernetes...On-Premise YARN (HDFS) vs Cloud K8s (External Storage)!4 • Kubernetes allows native ad-hoc clusters, scaling of nodes, on-spot instances

apiVersion: v1 clusters: - cluster: certificate-authority-data: <base64-encoded-cert> server: https://188.184.93.9:6443 name: k8s-spark-gp contexts: - context: cluster: k8s-spark-gp namespace: A user: spark name: default current-context: default kind: Config preferences: {} users: - name: spark user: token: <decoded-token>

Kubernetes Authentication User Service Accounts and Tokens

!5

Resources Namespace A

Kubernetes Resource Manager

Resources Namespace B

Install Spark Services $ helm install --name spark \

https://gitlab.cern.ch/db/spark-service/spark-service-charts/raw/master/cern-spark-services-1.0.0.tgz

Install default user $ helm install --name spark-user-default --set namespace=default \

https://gitlab.cern.ch/db/spark-service/spark-service-charts/raw/master/cern-spark-user-1.0.0.tgz

Page 6: Spark Streaming and HDFS ETL on Kubernetes...On-Premise YARN (HDFS) vs Cloud K8s (External Storage)!4 • Kubernetes allows native ad-hoc clusters, scaling of nodes, on-spot instances

Kubernetes as failure-tolerant scheduler for YARN applications

!6

apiVersion: batch/v1 kind: Job metadata: name: hdfs-etl namespace: default spec: template: metadata: labels: app: hdfs-etl spec: containers: - name: hdfs-etl # take image with spark configuration release from 26.04.2019 # if to pull new image each time: imagePullPolicy: Always image: gitlab-registry.cern.ch/db/spark-service/spark-k8s-examples:v1.1-060519 # source configuration and submit job command: ["/bin/sh"] args: - -c - source hadoop-setconf.sh analytix; spark-submit \

—conf spark.driver.host=$POD_HOSTNAME \ —class ch.cern.monitoring.HdfsETL \ $SPARK_HOME/examples/jars/spark-k8s-examples-assembly-1.1.jar;

env: # authenticate yarn/hdfs access - name: KRB5CCNAME value: "/etc/krb5/krb5cc" # define driver hostname - name: POD_HOSTNAME valueFrom: fieldRef: fieldPath: status.hostIP volumeMounts: # authenticate yarn/hdfs access - name: krb5 readOnly: true mountPath: "/etc/krb5" restartPolicy: OnFailure volumes: # authenticate yarn/hdfs access - name: krb5 secret: secretName: krb5 # required for access to the driver host ports from yarn cluster hostNetwork: true

HDFS Hadoop Distributed File System

YARN Resource Manager

Spark/Hadoop (Spark Executors)

K8s Job with failure policies (only Spark Driver)

Kubernetes Resource Manager

Page 7: Spark Streaming and HDFS ETL on Kubernetes...On-Premise YARN (HDFS) vs Cloud K8s (External Storage)!4 • Kubernetes allows native ad-hoc clusters, scaling of nodes, on-spot instances

Kubernetes as failure-tolerant scheduler for YARN applications

!7

apiVersion: batch/v1beta1 kind: CronJob metadata: name: hdfs-etl spec: schedule: "* * * * *" # every minute concurrencyPolicy: Forbid # only 1 job at the time ttlSecondsAfterFinished: 100 # cleanup for concurrency policy jobTemplate: spec: template: spec: containers: …

restartPolicy: Always …

Runs in a k8s job/cronjob: - if concurrency policy is Forbid, two

cronjobs cannot run at the same time - Restart policy will restart on Container if

it exists without Complete state - Liveness containers for alarms - Combination of CronJob and failure

policies (ttlSecondsAfterFinished, activeDeadlineSeconds) can cover most of the failure cases

- Requres configuration from IT-DB-SAS

- https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/#define-a-liveness-command

- https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/

- https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/

apiVersion: batch/v1 kind: Job metadata: name: hdfs-etl namespace: default spec: template: spec: containers: … restartPolicy: Always …

Page 8: Spark Streaming and HDFS ETL on Kubernetes...On-Premise YARN (HDFS) vs Cloud K8s (External Storage)!4 • Kubernetes allows native ad-hoc clusters, scaling of nodes, on-spot instances

Spark Streaming with K8s Jobs

!8

apiVersion: batch/v1 kind: Job metadata: name: kafka namespace: default spec: template: metadata: labels: app: kafka spec: serviceAccountName: spark containers: - name: kafka # take image with spark configuration release from 26.04.2019 imagePullPolicy: Always image: gitlab-registry.cern.ch/db/spark-service/spark-k8s-examples:v1.1-070519 # source configuration and submit job command: ["/bin/sh"] args: - -c # source hadoop-setconf.sh k8s-spark-gp - (kubectl proxy --port 8080 &); spark-submit \

—deploy-mode client --master k8s://http://127.0.0.1:8080 \ —conf spark.kubernetes.container.image=\ gitlab-registry.cern.ch/db/spark-service/spark-k8s-examples:v1.1-060519 \ —conf spark.kubernetes.namespace=default \ —conf spark.driver.host=$POD_HOSTNAME \ —class ch.cern.monitoring.KafkaRead \ $SPARK_HOME/examples/jars/spark-k8s-examples-assembly-1.1.jar \ ”fts_raw_complete" \ ”monit-kafkax-dev-b3b87d619c.cern.ch:9092”;

env: - name: POD_HOSTNAME valueFrom: fieldRef: fieldPath: status.hostIP ports: - containerPort: 4040 restartPolicy: OnFailure hostNetwork: true

Kafka

Spark/K8s Executors

Kubernetes Resource Manager

K8S Job (Driver)

Spark App controlled by K8s Job: - Similarly as HDFS/YARN, requres

configuration from IT-DB-SAS (no self-service)

- K8s job controls failure tolerance - Checkpoints need to be written to some

persistent storage as S3 or CephFS (operator defaults)

Page 9: Spark Streaming and HDFS ETL on Kubernetes...On-Premise YARN (HDFS) vs Cloud K8s (External Storage)!4 • Kubernetes allows native ad-hoc clusters, scaling of nodes, on-spot instances

Spark Streaming with K8s Operator

!9

#spark-submit \ #--master local[*] \ #--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" \ #--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" \ #--class ch.cern.db.StreamQuery /build/target/scala-2.11/spark-k8s-examples-assembly-1.1.jar \ #"spark-streaming" "dbnile-kafka-a-5:9093,dbnile-kafka-b-5:9093,dbnile-kafka-c-5:9093" "./kafka.jks" "password" #

apiVersion: "sparkoperator.k8s.io/v1beta1" kind: SparkApplication metadata: name: stream-query namespace: default spec: type: Scala mode: cluster image: gitlab-registry.cern.ch/db/spark-service/spark-k8s-examples:v1.1-060519 # if to pull new image if time: imagePullPolicy: Always sparkVersion: 2.4.1 mainClass: ch.cern.db.StreamQuery mainApplicationFile: local:///usr/hdp/spark/examples/jars/spark-k8s-examples-assembly-1.1.jar mode: cluster sparkConf: # krb auth "spark.driver.extraJavaOptions": "-Djava.security.auth.login.config=./jaas.conf" "spark.executor.extraJavaOptions": "-Djava.security.auth.login.config=./jaas.conf" arguments: - "spark-streaming" # topic - "dbnile-kafka-a-5:9093,dbnile-kafka-b-5:9093,dbnile-kafka-c-5:9093" # brokers - "./kafka.jks" # tls - "password" # tls driver: cores: 1 coreLimit: "1000m" memory: "1024m" labels: version: 2.4.0 serviceAccount: spark volumeMounts: - name: krb5 readOnly: true mountPath: "/etc/krb5" envVars: KRB5CCNAME: "/etc/krb5/krb5cc" executor: instances: 1 cores: 1 memory: "2000m" labels: version: 2.4.0 volumeMounts: - name: krb5 readOnly: true mountPath: "/etc/krb5" envVars: KRB5CCNAME: "/etc/krb5/krb5cc" restartPolicy: type: Always volumes: - name: krb5 secret: secretName: krb5

Kafka

Spark/K8s Executors/Driver

Kubernetes Resource Manager

Spark/K8s Operator

Spark Job controlled by Spark Operator: - SparkApplication YAML to define job - Checkpoints need to be written to some

persistent storage as S3 or CephFS (operator defaults)

- Self Service (operator controlls everything)

- https://gitlab.cern.ch/db/spark-service/spark-k8s-examples

- https://github.com/GoogleCloudPlatform/spark-on-k8s-operator

Page 10: Spark Streaming and HDFS ETL on Kubernetes...On-Premise YARN (HDFS) vs Cloud K8s (External Storage)!4 • Kubernetes allows native ad-hoc clusters, scaling of nodes, on-spot instances

Demo Spark Operator

!10

https://gitlab.cern.ch/db/spark-service/spark-k8s-examples/tree/add_stream_pipeline#structured-streaming-with-operator

Page 12: Spark Streaming and HDFS ETL on Kubernetes...On-Premise YARN (HDFS) vs Cloud K8s (External Storage)!4 • Kubernetes allows native ad-hoc clusters, scaling of nodes, on-spot instances

Spark K8s Monitoring

!12

Kubernetes Dashboard to monitor jobs: - Use K8s Dashboard to log/exec/edit/delete/create/get-ports - Go to https://<master-ip>:<dashboard-port> e.g. ( https://k8s-spark-

gp-3hz46xpv6wpy-master-0:31023 )

Driver WEB UI: - With SparkOperator: get port of SparkApplication (kubectl describe

sparkapplication <name>) - With K8s Job: get host and port of Pod (kubectl describe pod -l

“app=kafka”)

Spark History UI: - Go to http://<master-ip>:<history-port> e.g. ( http://k8s-spark-test-

q7oapjm5lwle-master-0:32112 )

Spark Metrics: - Go to https://hadoop-grafana.web.cern.ch/d/1/sparkmetrics