anomaly detection with apache spark
Upload: centro-de-investigacion-para-la-gestion-tecnologica-del-riesgo-cigtr
Post on 27-Jan-2015
139 views
DESCRIPTION
Sean Owen. Director of Data Science Cloudera. Curso de Verano "Innovación Disruptiva en tecnologías de seguridad". Campus Vicálvaro de la URJC. Summer Course "Disruptive innovation in security technologies". URJC's Vicálvaro Campus.TRANSCRIPT
![Page 1: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/1.jpg)
1
Anomaly Detection withApache Spark: WorkshopSean Owen / Director of Data Science / Cloudera
![Page 2: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/2.jpg)
Anomaly Detection
2
• What is “Unusual”?• Server metrics• Access patterns• Transactions
• Labeled, or not• Sometimes know
examples of “unusual”• Sometimes not
• Applications• Network security• IT monitoring• Fraud detection• Error detection
![Page 3: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/3.jpg)
Clustering
3
• Identify dense clusters of data points
• Unusual = far from any cluster
• What is “far”?• Unsupervised learning• Can “supervise” with
some labels to improve or interpret
en.wikipedia.org/wiki/Cluster_analysis
![Page 4: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/4.jpg)
k-means++ clustering
4
• Simple, well-known, parallel algorithm
• Iteratively assign points, update centers (“means”)
• Goal: points close to nearest cluster center
• Must choose k, number of clusters
mahout.apache.org/users/clustering/fuzzy-k-means.html
![Page 5: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/5.jpg)
5
Anomaly Detection in KDD Cup ‘99
![Page 6: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/6.jpg)
KDD Cup 1999
6
• Annual ML competitionwww.sigkdd.org/kddcup/index.php
• ’99: Computer network intrusion detection
• 4.9M connections• Most normal, many
known to be attacks• Not a realistic sample!
![Page 7: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/7.jpg)
7
0,tcp,http,SF,215,45076,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,0,0,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,normal.
Label
Service Bytes Received
% SYN errors
![Page 8: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/8.jpg)
Apache Spark: Something For Everyone
8
• From MS Dryad, UC Berkeley, DataBricks
• Scala-based• Expressive, efficient• JVM-based
• Scala-like API• Distributed works like
local, works like streaming• Like Apache Crunch is
Collection-like
• Interactive REPL• Distributed• Hadoop-friendly
• Integrate with where data, cluster already is
• ETL no longer separate• MLlib
![Page 9: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/9.jpg)
9
Clustering, Take #0
![Page 10: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/10.jpg)
10
val rawData = sc.textFile("/user/srowen/kddcup.data", 120)rawData: org.apache.spark.rdd.RDD[String] = MappedRDD[13] at textFile at <console>:15
rawData.count...res1: Long = 4898431
![Page 11: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/11.jpg)
11
0,tcp,http,SF,215,45076,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,0,0,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,normal.
![Page 12: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/12.jpg)
12
import scala.collection.mutable.ArrayBuffer
val dataAndLabel = rawData.map { line => val buffer = ArrayBuffer[String]() buffer.appendAll(line.split(",")) buffer.remove(1, 3) val label = buffer.remove(buffer.length-1) val vector = buffer.map(_.toDouble).toArray (vector,label)}
val data = dataAndLabel.map(_._1).cache()
![Page 13: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/13.jpg)
13
import org.apache.spark.mllib.clustering._
val kmeans = new KMeans()val model = kmeans.run(data)
model.clusterCenters.foreach(centroid => println(java.util.Arrays.toString(centroid)))
val clusterAndLabel = dataAndLabel.map { case (data,label) => (model.predict(data),label) }val clusterLabelCount = clusterAndLabel.countByValue
clusterLabelCount.toList.sorted.foreach { case ((cluster,label),count) => println(f"$cluster%1s$label%18s$count%8s") }
![Page 14: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/14.jpg)
14
0 back. 22030 buffer_overflow. 300 ftp_write. 80 guess_passwd. 530 imap. 120 ipsweep. 124810 land. 210 loadmodule. 90 multihop. 70 neptune. 10720170 nmap. 23160 normal. 972781
0 perl. 30 phf. 40 pod. 2640 portsweep. 104120 rootkit. 100 satan. 158920 smurf. 28078860 spy. 20 teardrop. 9790 warezclient. 10200 warezmaster. 201 portsweep. 1
Terrible.
![Page 15: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/15.jpg)
15
Clustering, Take #1: Choose k
![Page 16: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/16.jpg)
16
import scala.math._import org.apache.spark.rdd._
def distance(a: Array[Double], b: Array[Double]) = sqrt(a.zip(b).map(p => p._1 - p._2).map(d => d * d).sum)
def clusteringScore(data: RDD[Array[Double]], k: Int) = { val kmeans = new KMeans() kmeans.setK(k) val model = kmeans.run(data) val centroids = model.clusterCenters data.map(datum => distance(centroids(model.predict(datum)), datum)).mean}
val kScores = (5 to 40 by 5).par.map(k => (k, clusteringScore(data, k)))
![Page 17: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/17.jpg)
17
![Page 18: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/18.jpg)
18
(5, 1938.8583418059309)(10,1614.7511288131)(15,1406.5960973638971)(20,1111.5970245349558)(25, 905.536686115762)(30, 931.7399112938756)(35, 550.3231624120361)(40, 443.10108628017787)
![Page 19: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/19.jpg)
19
kmeans.setRuns(10)kmeans.setEpsilon(1.0e-6)(30 to 100 by 10)
(30, 886.974050712821)(40, 747.4268153420192)(50, 370.2801596900413)(60, 325.883722754848)(70, 276.05785104442657)(80, 193.53996444359856)(90, 162.72596475533814)(100,133.19275833671574)
![Page 20: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/20.jpg)
20
Clustering, Take #2: Normalize
![Page 21: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/21.jpg)
21
data.unpersist(true)
val numCols = data.take(1)(0).lengthval n = data.countval sums = data.reduce((a,b) => a.zip(b).map(t => t._1 + t._2))val sumSquares = data.fold(new Array[Double](numCols)) ((a,b) => a.zip(b).map(t => t._1 + t._2*t._2))val stdevs = sumSquares.zip(sums).map { case(sumSq,sum) => sqrt(n*sumSq - sum*sum)/n }val means = sums.map(_ / n)
val normalizedData = data.map( (_,means,stdevs).zipped.map((value,mean,stdev) => if (stdev <= 0) (value-mean) else (value-mean)/stdev)).cache()
val kScores = (50 to 120 by 10).par.map(k => (k, clusteringScore(normalizedData, k)))
![Page 22: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/22.jpg)
22
(50, 0.008184436460307516)(60, 0.005003794119180148)(70, 0.0036252446694127255)(80, 0.003448993315406253)(90, 0.0028508261816040984)(100,0.0024371619202127343)(110,0.002273862516438719)(120,0.0022075535103855447)
![Page 23: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/23.jpg)
23
Clustering, Take #3: Categoricals
![Page 24: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/24.jpg)
24
val protocols = rawData.map( _.split(",")(1)).distinct.collect.zipWithIndex.toMap...
val dataAndLabel = rawData.map { line => val buffer = ArrayBuffer[String]() buffer.appendAll(line.split(",")) val protocol = buffer.remove(1) val vector = buffer.map(_.toDouble)
val newProtocolFeatures = new Array[Double](protocols.size) newProtocolFeatures(protocols(protocol)) = 1.0 ... vector.insertAll(1, newProtocolFeatures) ... (vector.toArray,label)}
![Page 25: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/25.jpg)
25
(50, 0.09807063330707691)(60, 0.07344136010921463)(70, 0.05098421746285664)(80, 0.04059365147197857)(90, 0.03647143491690264)(100,0.02384443440377552)(110,0.016909326439972006)(120,0.01610738339266529)(130,0.014301399891441647)(140,0.008563067306283041)
![Page 26: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/26.jpg)
26
Clustering, Take #4: Labels, Entropy
![Page 27: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/27.jpg)
27
0,tcp,http,SF,215,45076,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,0,0,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,normal.
Label
![Page 28: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/28.jpg)
Using Labels with Entropy
28
• Measures mixed-ness• Bad clusters have
very mixed labels• Function of cluster’s
label frequencies, p(x)• Good clustering =
low entropy clusters
- p log pΣ
![Page 29: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/29.jpg)
29
def entropy(counts: Iterable[Int]) = { val values = counts.filter(_ > 0) val sum: Double = values.sum values.map { v => val p = v / sum -p * log(p) }.sum}
def clusteringScore(data: RDD[Array[Double]], labels: RDD[String], k: Int) = { ... val labelsInCluster = data.map(model.predict(_)).zip(labels). groupByKey.values val labelCounts = labelsInCluster.map( _.groupBy(l => l).map(t => t._2.length)) val n = data.count labelCounts.map(m => m.sum * entropy(m)).sum / n}
![Page 30: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/30.jpg)
30
(30, 1.0266922080881913)(40, 1.0226914826265483)(50, 1.019971839275925)(60, 1.0162839563855304)(70, 1.0108882243857347)(80, 1.0076114958062241)(95, 0.4731290640152461)(100,0.5756131018520718)(105,0.9090079450132587)(110,0.8480807836884104)(120,0.3923520444828631)
![Page 31: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/31.jpg)
31
Detecting an Anomaly
![Page 32: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/32.jpg)
32
val kmeans = new KMeans()kmeans.setK(95)kmeans.setRuns(10)kmeans.setEpsilon(1.0e-6)val model = kmeans.run(normalizedData)
def distance(a: Array[Double], b: Array[Double]) = sqrt(a.zip(b).map(p => p._1 - p._2).map(d => d * d).sum)
val centroids = model.clusterCentersval distances = normalizedData.map(datum => (distance(centroids(model.predict(datum)), datum), datum))
distances.top(5) (Ordering.by[(Double,Array[Double]),Double](_._1))
![Page 33: Anomaly detection with Apache Spark](https://reader033.vdocuments.us/reader033/viewer/2022061223/54c6aa2c4a7959aa4b8b459b/html5/thumbnails/33.jpg)
From Here to Production?
33
• Real data set!• Algorithmic
• Other distance metrics• k-means|| init
• Use data point IDs
• Real-Time• Spark Streaming?• Storm?
• Continuous Pipeline• Visualization