hadoop + cassandra: fast queries on data lakes, and wikipedia search tutorial
DESCRIPTION
Today’s services rely on massive amount of data to be processed, but require at the same time to be fast and responsive. Building fast services on big data batch- oriented frameworks is definitely a challenge. At ING, we have worked on a stack that can alleviate this problem. Namely, we materialize data model by map-reducing Hadoop queries from Hive to Cassandra. Instead of sinking the results back to hdfs, we propagate the results into Cassandra key-values tables. Those Cassandra tables are finally exposed via a http API front-end service.TRANSCRIPT
Fast Queries on Data Lakes
Exposing bigdata and streaming analytics using hadoop, cassandra, akka and spray
Natalino Busa@natalinobusa
Big and Fast. Tools Architecture Hands on Application!
Parallelism Hadoop Cassandra Akka
Machine Learning Statistics Big Data
Algorithms Cloud Computing Scala Spray
Natalino Busa@natalinobusa
www.natalinobusa.com
Challenges
Not much time to reactEvents must be delivered fast to the new machine APIsIt’s Web, and Mobile Apps: latency budget is limited
Loads of information to processUnderstand well the user historyAccess a larger context
OK, let’s build some apps
home brewedwikipedia search engine … Yeee ^-^/
Tools of the day:
Hadoop: Distributed Data OS
ReliableDistributed, Replicated File System
Low cost↓ Cost vs ↑ Performance/Storage
Computing Powerhouse
All clusters CPU’s working in parallel for running queries
Cassandra: A low-latency 2D store
ReliableDistributed, Replicated File System
Low latencySub msec. read/write operations
Tunable CAPDefine your level of consistency
Data model: hashed rows, sorted wide columns
Architecture model: No SPOF, ring of nodes, omogeneous system
Lambda architecture
BatchComputing
HTTP RESTful API
In-MemoryDistributed Database
In-memoryDistributed DB’s
Lambda ArchitectureBatch + Streaming
low-latencyWeb API services
StreamingComputing
All Data Fast Data
wikipedia abstracts(url, title, abstract, sections)
hadoopmapper.py
hadoopreducer.py
Publish pages on Cassandra
Produce inverted index entries
Top 10 Urls per word go to Cassandra
How to: Build an inverted index : Apple -> Apple Inc, Apple Tree, The Big Apple
CREATE KEYSPACE wikipedia WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};
CREATE TABLE wikipedia.pages ( url text, title text, abstract text, length int, refs int, PRIMARY KEY (url) );
CREATE TABLE wikipedia.inverted ( keyword text, relevance int, url text, PRIMARY KEY ((keyword), relevance) ); Data model ...
memory
disk compute disk
diskcomputedisk
memory
disk diskcompute
memory
disk diskcompute
memory
memory
diskcomputedisk
map (k,v) shuffle & sort reduce (k,list(v))
compute
cat enwiki-latest-abstracts.xml | ./mapper.py | ./reducer.py
Map-Reduce
demystified
./mapper.py
produces tab separated triplets:
element 008930 http://en.wikipedia.org/wiki/Goldwith 008930 http://en.wikipedia.org/wiki/Goldsymbol 008930 http://en.wikipedia.org/wiki/Goldatomic 008930 http://en.wikipedia.org/wiki/Goldnumber 008930 http://en.wikipedia.org/wiki/Golddense 008930 http://en.wikipedia.org/wiki/Goldsoft 008930 http://en.wikipedia.org/wiki/Goldmalleable 008930 http://en.wikipedia.org/wiki/Goldductile 008930 http://en.wikipedia.org/wiki/Gold
Map-Reduce
demistified
./reducer.py
produces tab separated triplets for the same key:
ductile 008930 http://en.wikipedia.org/wiki/Goldductile 008452 http://en.wikipedia.org/wiki/Hydroformingductile 007930 http://en.wikipedia.org/wiki/Liquid_metal_embrittlement...
Map-Reduce
demistified
memory
disk compute disk
diskcomputedisk
memory
disk diskcompute
memory
disk diskcompute
memory
memory
diskcomputedisk
map (k,v) shuffle & sort reduce (k,list(v))
compute
def main(): global cassandra_client
logging.basicConfig() cassandra_client = CassandraClient() cassandra_client.connect(['127.0.0.1']) readLoop() cassandra_client.close()
Mapper ...
doc = ET.fromstring(doc) ...
#extract words from title and abstract words = [w for w in txt.split() if w not in STOPWORDS and len(w) > 2]
#relevance algorithm relevance = len(abstract) * len(links)
#mapper output to cassandra wikipedia.pages table cassandra_client.insertPage(url, title, abstract, length, refs)
#emit unique the key-value pairs emitted = list() for word in words: if word not in emitted: print '%s\t%06d\t%s' % (word, relevance, url) emitted.append(word)
Mapper ...
T split !!!
wikipedia abstracts(url, title, abstract, sections)
hadoopmapper.sh
hadoopreducer.sh
Publish pages on Cassandra
Extract inverted index
Top 10 Urls per word go to Cassandra
Inverted index : Apple -> Apple Inc, Apple Tree, The Big Apple
Export during the
"map" phase
memory
disk compute disk
diskcomputedisk
memory
disk diskcompute
memory
disk diskcompute
memory
memory
diskcomputedisk
map (k,v) shuffle & sort reduce (k,list(v))
computecassandra
cassandra
cassandra
from cassandra.cluster import Cluster
class CassandraClient: session = None insert_page_statement = None
def connect(self, nodes): cluster = Cluster(nodes) metadata = cluster.metadata self.session = cluster.connect() log.info('Connected to cluster: ' + metadata.cluster_name) prepareStatements()
def close(self): self.session.cluster.shutdown() self.session.shutdown() log.info('Connection closed.')
Cassandra client
def prepareStatement(self): self.insert_page_statement = self.session.prepare(""" INSERT INTO wikipedia.pages (url, title, abstract, length, refs) VALUES (?, ?, ?, ?, ?); """)
def insertPage(self, url, title, abstract, length, refs): self.session.execute( self.insert_page_statement.bind( (url, title, abstract, length, refs)))
Cassandra client
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.4.0.jar \ -files mapper.py,reducer.py \ -mapper ./mapper.py \ -reducer ./reducer.py \ -jobconf stream.num.map.output.key.fields=1 \ -jobconf stream.num.reduce.output.key.fields=1 \ -jobconf mapred.reduce.tasks=16 \ -input wikipedia-latest-abstract \ -output $HADOOP_OUTPUT_DIR
YARN: mapreduce v2
Using map-reduce and yarn
wikipedia abstracts(url, title, abstract, sections)
hadoopmapper.sh
hadoopreducer.sh
Publish pages on Cassandra
Extract inverted index
Top 10 Urls per word go to Cassandra
Inverted index : Apple -> Apple Inc, Apple Tree, The Big Apple
Export inverted inded
during "reduce" phase
SELECT TRANSFORM (url, abstract, links) USING 'mapper.py' AS (relevance, url)FROM hive_wiki_tableORDER BY relevance LIMIT 50;
Hive UDF
functions and
hooks
Second method: using hive sql queriesdef emit_ranking(n=100): global sorted_dict for i in range(n): cassandra_client.insertWord(current_word, relevance, url)
…
def readLoop(): # input comes from STDIN for line in sys.stdin: # parse the input we got from mapper.py word, relevance, url = line.split('\t', 2)
if current_word == word : sorted_dict[relevance] = url else: if current_word: emit_ranking()… Reducer ...
memory
disk compute disk
diskcomputedisk
memory
disk diskcompute
memory
disk diskcompute
memory
memory
diskcomputedisk
map (k,v) shuffle & sort reduce (k,list(v))
compute
cassandra
cassandra
Front-end:
@app.route('/word/<keyword>')def fetch_word(keyword): db = get_cassandra() pages = [] results = db.fetchWordResults(keyword) for hit in results: pages.append(db.fetchPageDetails(hit["url"]))
return Response(json.dumps(pages), status=200, mimetype="application/json")
if __name__ == '__main__': app.run()
Front-End:
prototyping in Flask
Expose during Map or Reduce?
Expose Map
- only access to local information
- simple, distributed "awk" filter
Expose in Reduce
- need to collect data scattered across your cluster
- analysis on all the available data
Latency tradeoffs
Two runtimes frameworks: cassandra : in-memory, low-latencyhadoop : extensive, exhaustive, churns all the data
Statistics and machine learning:
Python and R : they can be used for batch and/or realtime
Fastest analysis: still the domain on C, Java, Scala
Some lessons learned
● Use mapreduce to (pre)process data● Connect to Cassandra during MR ● Use MR as for batch heavy lifting
● Lambda architecture: Fast Data + All Data
Some lessons learned
Expose results to Cassandra for fast access
- responsive apps
- high troughput / low latency
Hadoop as a background tool
- data validation, new extractions, new algorithms
- data harmonization, correction, immutable system of records
The tutorial is on github
https://github.com/natalinobusa/wikipedia
Parallelism Mathematics Programming
Languages Machine Learning Statistics
Big Data Algorithms Cloud Computing
Natalino Busa@natalinobusa
www.natalinobusa.com
Thanks !Any questions?