london cassandra meetup 10/23: apache cassandra at british gas connected homes for an internet of...

27
Connected Homes Jim Anning, Head of Data & Analytics Josep Casals, Lead Data Engineer

Upload: planet-cassandra

Post on 15-Jul-2015

796 views

Category:

Technology


2 download

TRANSCRIPT

Page 1: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case

Connected HomesJim Anning, Head of Data & Analytics

Josep Casals, Lead Data Engineer

Page 2: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case
Page 3: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case

Data Science!Analytics!

Data Engineering

Page 4: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case
Page 5: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case
Page 6: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case

Hive

100K - 2 minutes

Page 7: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case
Page 8: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case

2.3 Billion Events

Page 9: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case
Page 10: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case

My Energy

3.8M - Monthly!400K - Daily!

200K - Half Hourly!5K - 10 seconds

Page 11: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case

Insight

Page 12: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case

218 dimension dataset expressed in 3d using T-distributed Stochastic Neighbour Embedding (Cool Data Science Stuff)

Page 13: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case

C*Spark RabbitMQ

R

Python

Scala

DataStax OpsCenter

Java

Page 14: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case

Non-Intrusive!Load Monitoring

Page 15: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case

30 minute data Filtering Time Series Magic

ClusteringMarkov ModelFridge

Consumption

Fridges - They are exciting… really

Page 16: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case
Page 17: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case

30 minute data Filtering Time Series Magic

ClusteringMarkov ModelFridge

Consumption

Fridges - They are exciting… really

Page 18: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case

Home device data Collection Processing Storing

30 TB

Page 19: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case

Use cases• Data storage

• Spark Streaming from queue !

• Data processing • Transformations and Joins

!

• Data analytics • Data science productionising

Page 20: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case

Data StreamingEnd 2015: !

•Hive Home -> 200k users •~ 15000 messages / s!

!

•Connected boilers -> 25k users •~ 2500 messages / s!

!

•Live Energy -> 50k users •~ 8500 messages / s

Page 21: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case

import org.apache.spark.streaming.{Seconds, StreamingContext}!import org.apache.spark.SparkConf!!object StreamingDemo{! def main(args: Array[String]) {! // Connect to the Cassandra Cluster! val client: CassandraConnector = new CassandraConnector()! val cassandraNode = if(args.length > 0) args(0) else "127.0.0.1"! client.connect(cassandraNode)! client.execute("CREATE KEYSPACE IF NOT EXISTS streaming_test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } ;")! client.execute("CREATE TABLE IF NOT EXISTS streaming_test.words (word text PRIMARY KEY, count int);")! // Create a StreamingContext with a SparkConf configuration! val sparkConf = new SparkConf()! .setAppName(“StreamingDemo”)! val ssc = new StreamingContext(sparkConf, Seconds(5))!! // Create a DStream that will connect to serverIP:serverPort! val lines = ssc.socketTextStream("localhost", 9999)!! client.execute("USE streaming_test;")! lines.foreachRDD(rdd => {! rdd.collect().foreach(line => {! if (line.length > 1) {! client.execute("INSERT INTO words (word, count)" + "VALUES ('" + line + "', 1);")! }! println("Line from RDD: " + line)! })! })! ssc.start()! }!}

(1) Spark Streaming - Storing to C*

Use Spark-C* connector

Replace by a RabbitMQ or Kafka stream.

Page 22: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case

Data ProcessingEnergy Readings from British Gas head ends: !• 30 M rows monthly ~ 1 M rows daily !• 48 columns per row (1 reading each 1/2h) !• 4M rows contracts tables !• Need to join both tables based on contract

date !• Need to convert columns to rows for data

science !

Difficult to handle on a relational DB

Page 23: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case

object TransformReadings {! def main(args: Array[String]) {! val cassandraHost = if(args.length > 0) args(0) else "127.0.0.1"! val conf = new SparkConf(true).setAppName("TransformReadings")! .set("spark.cassandra.connection.host", cassandraHost)! .setMaster("local[2]")!! val sc = new SparkContext(conf)! val serIngest = sc.cassandraTable("ser_ingest", "profile_reads")! .groupBy(row => row.get[String]("mpxn"))!! val contracts = sc.cassandraTable("ser_ingest", "contract")! .select("mpxn","move_in_date","business_partner","contract_account","premise","postcode_gis")! .groupBy(row => row.get[String]("mpxn").takeRight(10))!! serIngest.join(contracts)! .flatMap(transformRow)! .saveToCassandra("ser",! "half_hourly_reads",! SomeColumns("mxpn",! "month",! "energy_type",! "readdate",! "reading",! "business_partner_id",! "contract_id",! "premise_id",! "postcode"! )! )! }!! def transformRow (data: (String, (Iterable[CassandraRow],Iterable[CassandraRow]))) = {….)! )! }!

(2) Transformations and Joins

join +

transform

Page 24: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case

def transformRow (data: (String, (Iterable[CassandraRow],Iterable[CassandraRow]))) = {! val halfHourlyColumnNames = (0 to 47).map(i => f"t${i/2}%02d${30*(i%2)}%02d") // Create seq. of column names! val row = data._2._1.head! val contracts = data._2._2! val readDate = row.get[Date]("reading_date")! val current_contract = contracts.filter(c => c.get[Date]("move_in_date").before(readDate)).toList! .sortBy(_.get[Date]("move_in_date"))! .reverse! .head! !!! halfHourlyColumnNames.map(col => (! row.get[String]("mpxn"),! new SimpleDateFormat("yyyy-MM").format(readDate),! "elec",! new SimpleDateFormat("yyyy-MM-dd hhmm").parse(new SimpleDateFormat("yyyy-MM-dd ").format(readDate) + col.drop(1)),! row.get[Option[Double]](col).getOrElse(0.0),! current_contract.get[String]("business_partner"),! current_contract.get[String]("contract_account"),! current_contract.get[String]("premise"),! current_contract.get[String]("postcode_gis")! )! )! }

(2) Transformations and Joins

Get the right

Create a row for

Page 25: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case

Data science productionising

!• Data scientists write algorithms in R and C# !• Fridge energy calculation algorithm

• -> 1000 loc in C# !• Spark allows us to reduce from 1000 to 400 loc !• We translate R and C# to Java / Scala !• Would be great if Scala was the language of

choice for Data scientists

Page 26: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case

def main(args: Array[String]) = {! val sparkConf = new SparkConf().{…}!! val (month, keyspace, interimSave) = parseParams(args, config)!! val sc = new SparkContext(sparkConf)!! val readings = sc.cassandraTable("ser", "profile_reads").filter({r: CassandraRow => r.getString("month") == month })!! val grouped = readings.groupBy({ r: CassandraRow => r.getLong("mxpn")})!! def buildStructurePartial = buildStructure _!! val initial = grouped.map(buildStructurePartial.tupled).flatMap(m => m)!! val baseload = initial.map({ x: MonthlyBreakdown => {! x.markProcessed(getBaseload(x), "appliances", "baseload")! x }})!! val fridged = baseload.map( { x: MonthlyBreakdown => {! x.markProcessed(getFridge(x), "appliances", "fridge")! x }})!! if( interimSave ) {! val fridgeresults = fridged.map({ x: MonthlyBreakdown => x.interimStage("fridge")}).flatMap(identity).flatMap(identity)! fridgeresults.saveToCassandra(keyspace, "interim_breakdown", Seq("mxpn", "stage", "month", "readdate", "breakdown"))! }!! val results = fridged.map({x: MonthlyBreakdown => x.rollupResults()}).flatMap(identity)!! results.saveToCassandra( keyspace, "energy_breakdown", Seq("business_partner_id", "premise_id", "mpxn", "customer_type", "start_date", "end_date", "group", "energy_type", "category", "value"))!

(3) Data science productionising

Spark allows us to chain different algorithm

Page 27: London Cassandra Meetup 10/23: Apache Cassandra at British Gas Connected Homes for an Internet of Things Use Case

We are Hiring :)!!

twitter:

@JimAnning @jcasals

!mail:

[email protected] [email protected]

!