(bdt401) big data orchestra - harmony within data analysis tools | aws re:invent 2014
DESCRIPTION
Yes, you can build a data analytics solution with a relational database, but should you? What about scalability? What about flexibility? What about cost? In this session, we demonstrate how to build a real world solution for location-based data analytics, with the combination of Amazon Kinesis, Amazon DynamoDB, Amazon Redshift, Amazon CloudSearch, and Amazon EMR. We discuss how to integrate these services to create a robust solution in terms of security, simplicity, speed, and low cost.TRANSCRIPT
(in 45 minutes)
Queue
Table
Textual
Search
%LIKE%
Analytical
Reports
BLOBs
Geospatial
Queries
Events
Table
MBL310
MBL311
Amazon SQS Amazon Kinesis
Auto-scaling Shards provisioning
=> Simple to set up and operate, easy to deploy
new version to a new queue
=> More cost effective in high scale, once you
tuned the system
“At least once delivery” Multiple “exactly once in order delivery”
=> Easy to start with a single worker => A set of dedicated workers that are working in
different intervals and different operations
from flask import Flask, request
application = Flask(__name__)
# main entry point for SQS, accepting only POST requests
@application.route("/sqs/", methods=['POST'])
def sqs():
application.logger.debug('Message was received for processing!')
doc = parse_request(request)
load_to_cloudsearch(doc)
put_into_dynamodb(doc)
return "" # OK
def load_to_cloudsearch(doc):
# index document
doc_serv = g.domain.get_document_service()
doc_serv.add(doc[id], doc)
application.logger.debug('Inserting docId: %s', doc[id])
# send index batch to CloudSearch
try:
doc_serv.commit()
except CommitMismatchError as e:
application.logger.error('CommitMismatchError raised')
for msg in e.errors:
application.logger.error('Error: %s', msg)
raise
finally:
doc_serv.clear_sdf() # clear SDF for next iteration
def put_into_dynamodb(doc):
itemData = copy.deepcopy(doc) # I want different fields in DynamoDB
del itemData['day'] # TS is enough, day is only for faceting
#Using GeoHasing for DynamoDB lookup index
geojson_location = "{{\"coordinates\":[{0},{1}],\"type\":\"Point\"}}".format(doc['latitude'], doc['longitude'])
itemData['location'] = geojson_location
geo_server_url = "http://geo-server.elasticbeanstalk.com/wl-dynamodb-geo?point={0},{1}".format(doc['latitude'], doc['longitude']))
itemData['geohash'] = int(requests.get(geo_server_url).content)
itemData['geobox'] = itemData['geohash']/10000000000000
# PUT into DynamoDB Table
item = Item(g.eventsTable, data=itemData)
item.save()
DynamoDB Geospatial
// Using Leaflet to show a map
function show_map(position) {
var latitude = position.coords.latitude;
var longitude = position.coords.longitude;
map = L.map('map').setView([latitude, longitude], 15);
L.tileLayer('http://{s}.tiles.mapbox.com/v3/guyernest.jngcdfig/{z}/{x}/{y}.png', {
attribution: 'Map data ©…’, maxZoom: 18
}).addTo(map);
// Call DDB to show markers on the map
var bounds = map.getBounds();
var request = { minLng: bounds.getWest(), maxLng: bounds.getEast(), minLat:bounds.getSouth(), maxLat: bounds.getNorth() }
$.ajax({
type: "POST",
url: 'http://geo-server.elasticbeanstalk.com/wl-dynamodb-geo',
data: '{ action: query-rectangle, request :' + JSON.stringify(request)+'}',
success: success,
dataType : "json"
});
function success(data) {
data.result.forEach(function(entry) {
var marker = L.marker([parseFloat(entry.latitude),parseFloat(entry.longitude)])
marker.addTo(map);
marker.bindPopup("<b>"+entry.comment+"</b><br>"+"<img src='"+entry.img”’>")
.openPopup();
});
}
Amazon CloudSearch Native Geospatial support
• Latitude and Longitude data types
• Region search
• Distance sort
• Supports mobile
emr
MyKeyPair --bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark
{
"ClusterId": "j-38X214F58P62M"
}
>> aws emr ssh --cluster-id j-38X214F58P62M --key-pair-fileMyKeyPair.pem
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
// Define the schema using a case class.
case class Event(event_id: String, time: String, latitude: Float, longitude: Float)
// Create an RDD of Event objects from S3 “folder” and register it as a table.
val events = sc.textFile("s3://spark-bucket-demo/spark/events").map(_.split(",")).
map(p => Event(p(0), p(1), p(2).trim.toFloat), p(3).trim.toFloat))
events.registerAsTable(”events”)
// SQL statements can be run by using the SQL methods provided by sqlContext.
Val oct = sql("SELECT event_id FROM events WHERE time >= ‘2014-10-01’ AND time <= ‘2014-11-31’”)
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
oct.map(t => ”event-id: " + t(0)).collect().foreach(println)
AWS CloudTrail
permissions
############### Setup for initial small environment ###############
# Kinesis
aws kinesis create-stream --stream-name "StreamName" --shard-count 1
# CloudSearch
aws cloudsearch create-domain --domain-name "SearchDomain"
# DynamoDB
aws dynamodb create-table
--attribute-definitions --table-name "TableName" --key-schema AttributeName=Id,KeyType=HASH Attribute..
# Redshift
aws redshift create-cluster --cluster-identifier "ClusterID" --cluster-type single-node --node-type…
# EMR
aws emr run-job-flow --name "JobFlow"
--instances {"MasterInstanceType": "m1.medium", "SlaveInstanceType": "m1.medium", "InstanceCount”:} }
--steps [
{
"Name": "Analyze Positions",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": { "Jar": "s3://emr-steps/AnalizePositions.jar",
}
}
]
# Kinesisaws kinesis create-stream --stream-name "StreamName" --shard-count 1
# Redshift
aws redshift create-cluster --cluster-identifier
"ClusterID" --cluster-type single-node
--node-type dw2.large --master-username "master-
username" --master-user-password "master-user-
password"
#################### Scaling the infrastructure when needed ####################
# Kinesis
aws kinesis split-shard --stream-name "StreamName" --shard-to-split $SHARD_ID --new-starting-hash-key $MID_HASH
# CloudSearch
aws cloudsearch update-scaling-parameters --domain-name "SearchDomain" --scaling-parameters DesiredInstanceType=search.m2.xlarge,DesiredReplicationCount=2
# DynamoDB
aws dynamodb update-table --table-name "TableName" --provisioned-throughput ReadCapacityUnits=100,WriteCapacityUnits=20
# Redshift
aws redshift modify-cluster --cluster-identifier "ClusterID" --number-of-nodes 2
# EMR
aws emr add-instance-groups --job-flow-id "JobFlow"
--instance-groups Name=insGroup,Market=SPOT,InstanceRole=TASK,BidPrice='0.3',InstanceType=m1.medium,InstanceCount=2
// update an event as close only is the reports is coming from the same geo-box
Table table = dynamo.getTable(TABLE_NAME);
table.updateItem("event-id", "7982e605-dc7d-4199-bc3e-d449733932e2”,
// update expression
"set status = 'close'",
// condition expression
"geobox = :geobox",
null,
new ValueMap()
.withInt(":geobox", 515811)
);
SDKs
Java Python (boto) PHP .NET Ruby Node.js
iOS Android AWS Toolkit for
Visual Studio
AWS Toolkit
for Eclipse
AWS Tools for
Windows
PowerShell
AWS CLI
JavaScriptnew!
Learn from AWS big data experts
start-to-finish post on analyzing
and visualizing big data
blogs.aws.amazon.com/bigdata
Please give us your feedback on this session.
Complete session evaluations and earn re:Invent swag.
http://bit.ly/awsevals