elasticsearch and spark
TRANSCRIPT
Elasticsearch and
SparkANIMESH PANDEY
PROJECT CONSILIENCE
Agenda Who am I?
Text searching
Full text based
Term based
Databases vs. Search engines
Why not simple SQL?
Why need Lucene?
Elasticsearch
Concepts/APIs
Network/Discovery
Split-brain Issue
Solutions
Data Structure
Inverted Index
SOLR – Dataverse’s Search
Why not SOLR for Consilience?
Elasticsearch – Consilience’s Search
Language integration
Python
Java
Scala
SPARK
Why Spark?
Where Spark?
When Spark?
Language support
Conclusion and Questions
Who am I?
Animesh Pandey
Computer Science grad student @
Northeastern University, Boston
Intern for Project Consilience for
Summer 2015
Job: integration of Elasticsearch and
Spark into the existing project
Text Searching
Text – a from of data
Text – available from various resources
Internet, books, articles etc.
We are concerned with digital text or converting the traditional text to digital
Digital text – internet, news articles, blogs, research papers
Traditional text – any text from a physical book, manuscript, typed papers,
newspapers etc.
Traditional text conversion to digital text
Automatic - Optical Character Recognizers (OCR) e.g. Tesseract by Google Inc.
Manual - type to a system
Full text based vs. Term based
Full text based search
Most general kind of search
Used everyday when using
Google, Bing or Yahoo
In the background it is much more
than a simple character by
character match
Lot of pre-processing involved for
a Full text search
Term based search
Generally comprises of exact term
matching
You can think of it as a SQL query
where try to find documents that
contain the exact match of a
specified word
Databases vs. Search Engines
The both have unique strengths but also have overlapping capabilities
Similarities:
Both can be stored as data stores
Basic updates and modifications can be done using both
Differences:
Search Engines
Used for both structured as well
as unstructured data
The results are ordered as per
the relevance of the result to
the query
Databases
Used for structured data
There is relevance
matching between the
query and results
Why not simple SQL?
MySQL provides us some ways to perform a full text search along with term
based searches BUT …..
Needs MyISAM storage engine. It was the default storage engine of MySQL.
MyISAM is optimized for read operations with few write operations or may be
none.
But you cannot avoid write (update/modify) operations.
MyISAM creates one index for one table.
No. of tables = No. of index => more tables more complexity.
Relational DBs have locks. They won’t read/write operations if already one
operation is being executed.
How does a search engine help?
Efficient indexing of data
You don’t need multiple indices like you needed in Databases
Index is on all fields/combinations of fields
Analyzing data
Text search
Tokenzing => splitting of text
Stemming => converting words to their root forms
Filtering => removal of certain words
Relevance Scoring
In order to solve the problems mentioned before there are several
Open Source search engines….
Information Retrieval Software Library
Free/Open Source
Supported by Apache Foundation
Created by Doug Cutting
Since 1999
In order to use it there are two Java libraries available…..
APACHE LUCENE
Built on Lucene
Perfect for single server search
Part of the Lucene project (Lucene comes with Solr)
Large user and developer base
This is Dataverse’s Search engine. Later will talk why using
Elasticsearch here won’t make a big difference
APACHE SOLR
{
"status" : 200,
"name" : "Fafnir",
"cluster_name" : "elasticsearch",
"version" : {
"number" : "1.4.2",
"build_hash" : "927caff6f05403e936c20bf4529f144f0c89fd8c",
"build_timestamp" : "2014-12-16T14:11:12Z",
"build_snapshot" : false,
"lucene_version" : "4.10.2"
},
"tagline" : "You Know, for Search"
}
ELASTICSEARCH
Free/Open source
Built on top of Lucene
Created by Shay Banon @kimchy
Current stable version is 1.6.0
Has wrappers in many languages
RESTful Service
JSON API over HTTP
Chrome Plugins – Marvel Sense and POSTman
Can be used from Java, Python and many other languages
High availability and clustering is very easy to set up
Long term persistence
What does Elasticsearch add to Lucene?
Elasticsearch is a “download and use” distro
Executables
Log files
Node Configs
Data Storage
├── bin
│ ├── elasticsearch
│ ├── elasticsearch.in.sh
│ └── plugin
├── config
│ ├── elasticsearch.yml
│ └── logging.yml
├── data
│ └── cluster1
├── lib
│ ├── elasticsearch-x.y.z.jar
│ ├── ...
│ └──
└── logs
├── elasticsearch.log
└── elasticsearch_index_search_slowlog.log
└── elasticsearch_index_indexing_slowlog.log
Jar
Distributions
Here we can initialize the basic configuration
required to start an ES node. Following are the
config types that are generally changed.
cluster.name – the cluster to which it’ll join
node.name – specify name of the node
node.master – whether the node is a master
node.data – whether this node will hold data
path.data – path of the index
path.conf – path of the config folder (scripts or
any file put in this folder)
path.logs – path of the logs
elasticsearch.yml – Config file of Elasticsearch
curl -XPUT "http://localhost:9200/social_media/" -d'
{
"settings": {
"node": {
"master": true
},
"path": {
"conf": "D:/social_media/config/"
},
"index": {
"number_of_shards": 3,
"number_of_replicas": 1
}
}
}'
Underlying Lucene Inverted Index
This is term to document mapping
Inverted index contains terms mapped to
all documents in which it occurred
Every document is paired with the term
frequency of the term being considered
Sum all term frequencies to get corpus
frequency of the term
Shards and Replicas
Primary Shard
Created when indexing
Index has 1..N primary shards
Persistent
This is the actual data
Replica Shard
Index has 0..N primary replicas
Not persistent
The is copy of the data
Promoted to Primary shard if the node fails
Nodes discovery
Nodes discovery in ES is using multicast
Unicast is also possible
Can be modified by changing elasticsearch.yml
In multicast the master node will send requests to all nodes to check
which are waiting for connection
discovery.zen.ping.multicast.enabled: false
discovery.zen.ping.unicast.hosts: [“host1", "host2:port", "host3"]
Split-brain Issue
Suppose we have three node cluster which has 1 master and 2 slaves
Suppose due to some reason connection to NODE 2 fails
NODE 2 will promote its replica shards to primary shards and will convert itself to a Master
Cluster will be in an inconsistent state
Indexing request to NODE 2 won’t be reflected to NODE 1 – NODE 3
This will result in two different indices => different results
Solving the Split-brain issue
Specify the number of masters in a cluster
discovery.zen.minimum_master_nodes = (N/2 + 1), where N is the number of nodes in a
cluster
In the three node cluster, the cluster with one node will fail and the production will come to
know about such issue
discovery.zen.ping.timeout should be increased in a slow network so that nodes get
extra time to ping to each other
Default value is 3 seconds
Elasticsearch APIs
There are certain number of APIs provided by elasticsearch. We will
be covering the ones useful to us:
INDEX API
SETTING API
MAPPING API
TERMVECTOR/MTERMVECTOR API
BULK API
SEARCH API
Processing of Text using Analyzers (Settings API)
Analyzers help in manipulating the
text that is to be indexed.
Tokenizers, stemmers, token-filters are
the most used Analyzers.
Analyzers are usually given a name/id
so that they can be used in future with
any type of text.
There are other analyzers as well that
are based on term-replacement,
regular-expression pattern,
punctuation characters.
Custom analyzers can also be
created in ES.
curl -XPUT
"http://localhost:9200/social_media/tweet/_settings" -d'
{
"settings": {
"index": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"analysis": {
"analyzer": {
"my_english": {
"type": "custom",
"tokenizer": "whitespace",
"filter": [
"lowercase",
"type_as_payload",
"cust_stop"
]
}
},
"filter": {
"cust_stop": {
"type": "stop",
"stopwords_path": "stoplist.txt",
}
}
}
}
}’
Mapping of Documents to be indexed (Mappings API)curl -XPUT
"http://localhost:9200/social_media/tweet/_mapping" -d
'{
"tweet": {
"properties": {
"_id": {
"type": "string",
"store": True,
"index": "not_analyzed"
},
"text": {
"type": "multi_field",
"fields": {
"text": {
"include_in_all": False,
"type": "string",
"store": False,
"index": "not_analyzed"
},
"_analyzed": {
"type": "string",
"store": True,
"index": "analyzed",
"term_vector":
"with_positions_offsets_payloads",
"analyzer": “my_english”
}
}
}
}}}
Elasticsearch auto-maps fields but we
can also specify the types.
Data types provided by ES:
String
Number
Boolean
Date-time
Geo-point (coordinates)
Attachment (requires plugin)
Consilience uses this for indexing PDF
files
Creation of Index
Specifying setting and mapping and sending a PUT request to Elasticsearch initializes the index
Now the task is to send documents to Elasticsearch
We have to keep in mind the mappings of each field in the document
Document Metadata fields
_id : identifier of the document
_index : index name
_type : mapping type
_source : enabled/disabled
_timestamp
_ttl
_size : size of uncompressed _source
_version
Indexing a document (Index API)
curl -XPOST
"http://localhost:9200/social_media/tweet/616272192
012165183" -d '{
"_source": {
"text": "random text",
"exact_text": "random text"
}
}‘
For ES 1.6.0+
curl -XPOST
"http://localhost:9200/social_media/tweet/616272192
012165183" -d '{
"text": "random text",
"exact_text": "random text"
}'
{
'_index': 'social_media',
'_type': 'tweet',
'_id': ‘616272192012165120',
'_source': {
'text': '@bshor Thanks for the info; this will
help us. Are these the 2 datasets you were
uploading? https://t.co/W1M4vrQUEI
https://t.co/ITRycQnPKz',
'exact_text': '@bshor Thanks for the info; this
will help us. Are these the 2 datasets you were
uploading? https://t.co/W1M4vrQUEI
https://t.co/ITRycQnPKz'
}
}
Document structure Indexing new document
Retrieving term vectors (Termvector API)
termvector or mtermvector APIs are used for
getting the term-vectors
We can change the above DSL according to
our needs
curl -XGET
"http://localhost:9200/social_media/tweet/616272192012165183/_termve
ctor" -d'
{
"fields" : ["text"],
"offsets" : true,
"payloads" : true,
"positions" : true,
"term_statistics" : true,
"field_statistics" : true
}'
{
"_index": "social_media",
"_type": "tweet",
"_id": "616272192012165183",
"_version": 1,
"found": true,
"term_vectors": {
"text": {
"field_statistics": {
"sum_doc_freq": 65,
"doc_count": 6,
"sum_ttf": 66
},
"terms": {
"random": {
"doc_freq": 1,
"ttf": 1,
"term_freq": 1,
"tokens": [
{
"position": 0,
"start_offset": 0,
"end_offset": 6,
"payload": "d29yZA=="
}
]
},
"text": {
"doc_freq": 1,
"ttf": 1,
"term_freq": 1,
"tokens": [
{
"position": 1,
"start_offset": 7,
"end_offset": 11,
"payload": "d29yZA=="
}
]
}
}
}
}
}
Processing independent documents
This can be done by using Analyze API
The analyzer my_english was defined in Slide 16
The above DSL results in where document was
“Text to analyze”
curl -XGET "http://localhost:9200/social_media/_analyze?analyzer=my_english&text=Text to analyze"
{
"tokens": [
{
"token": "text",
"start_offset": 0,
"end_offset": 4,
"type": "word",
"position": 1
},
{
"token": "analyze",
"start_offset": 8,
"end_offset": 15,
"type": "word",
"position": 3
}
]
}
Working with Shingles Shingles are a way to index group of
tokens like unigrams, bigrams etc.
"shingle_filter" : {
"type" : "shingle",
"min_shingle_size" : 2, // for bigrams
"max_shingle_size" : 2,
"output_unigrams": True
}
curl -XGET
"http://localhost:9200/social_media/_anal
yze?analyzer=my_english_shingle&text=Text
to analyze"
{
"tokens": [
{
"token": "text",
"start_offset": 0,
"end_offset": 4,
"type": "word",
"position": 1
},
{
"token": "text _",
"start_offset": 0,
"end_offset": 8,
"type": "shingle",
"position": 1
},
{
"token": "_ analyze",
"start_offset": 8,
"end_offset": 15,
"type": "shingle",
"position": 2
},
{
"token": "analyze",
"start_offset": 8,
"end_offset": 15,
"type": "word",
"position": 3
}
]
}
This filter can be used in
termvector API to get
vectors containing both
unigram and bigrams
Searching in Index (Search API)
Default search
Exact phrase matching
curl -XGET "http://localhost:9200/social_media/tweet/_search" -d'
{
"query": {
"match": {
"text._analyzed": “some Texts“ // will search for “some text”, “some” and “text”
}
},
"explain": true
}‘
curl -XGET "http://localhost:9200/social_media/tweet/_search" -d'
{
"query": {
"match_phrase": {
"text": “some Texts“ // will search for “some Texts” as a phrase
}
},
"explain": true
}‘
Recommended Design Patterns
Keep the number of nodes odd
Take pre-cautions to avoid Split-brain issue
Regularly refresh indices
Add refresh_interval to settings
Manage heap size
ES_HEAP_SIZE <= ½ of the system’s RAM but not more than 32GB
export ES_HEAP_SIZE=10g
./bin/elasticsearch -Xmx10g -Xms10g
Use Aliases
Searches are made using an index created from the original index
This prevents cluster down time or delays that may occur during the updation/modification of the index
Delete aliases when they become old and create new one
You can create time-based aliases as well
Use Routing
A way to know which shard contains what document
Reduces the lookup time during searches
When bulk indexing
Timeout after every push
Push should be of maximum size 2-3MB
Why not SOLR?
SOLR is a better search engine than Elasticsearch
But we require Term_vectors and analysis more than a search
ES provides better APIs for analytics
termvector with field and term statistics
mtermvector
search with explain enabled
function_scoring (Didn’t mention before)
If you need only a search engine, go for SOLR. If you need something more
than that Elasticsearch is the best choice.
Language Support
We have
JAVA wrappers : org.elasticsearch.*
Python wrapper: py-elasticsearch
Scala wrapper : elastic4s
Domain Specific Language (DSL) : cURL/JSON as shown in every
example previously
Lets add some SPARK to ES…
Apache Spark is an engine for large scale data processing
It runs programs nearly 100 times faster than Hadoop
Has language support for Python, Java, Scala and R
For Project Consilience:
Earlier I had thought of keeping the starting and end point of the whole
application to be Spark
i.e. read files using spark, index them using Elasticsearch and apply clustering
using Spark’s MLlib
Flat file reading is very direct in Spark
spark.textfile() => parallel reading of the file in chunks
spark.wholetextfile() => loads complete file into memory
Lets add some SPARK to ES…
Earlier experiments were done in
Scala
Scala gave us the advantage
of Functional programming
along with the Parallel
processing
Now Java 8 also provides with
Functional programming so
Scala and Java won’t make
much difference
import org.elasticsearch.spark._ //ES-Spark connector
val conf = new SparkConf()
.setAppName(“super_spark")
.setMaster("local[2]")
.set("spark.executor.memory", "1g")
.set("spark.rdd.compress", "true")
.set("spark.storage.memoryFraction", "1")
.set("es.index.auto.create", "true")
.set(“es.node”, 9200)
// other configurations can be added as well
val sc = new SparkContext(conf)
// parallel reading for arrays. Same syntax in Java and Python
val data = sc.parallelize(1 to 10000).collect().filter(_ < 100)
data.foreach(println)
val textFile = sc.textFile("/home/cloudera/Documents/pg2265.txt")
val counts = textFile
.flatMap(line => line.split(" ")) // all tokens in an array
.filter(_ != ' ') // remove all empty tokens
.map(word => (word.replaceAll("\\p{P}", "") // remove
punctuations
.toLowerCase(), 1)) // convert to lower case
.reduceByKey(_ + _) // add as per key values
val thing = counts.collect()
sc.makeRDD(<put a Mapping here>).saveToEs("spark/docs")
Tried the Spark-Hadoop-Elasticsearch connector but noticed some
overhead and unnecessary computations
The project currently won’t accept large volumes of data and that too
frequently. So fast computation isn’t really required
What we want is features to do clustering. Those features can easily be
provided by Elasticsearch
May be in future, Spark will be added in the first phase of the project.
As of now Spark will be used for Clustering of the documents. The
library MLlib provides APIs for this
Lets add some SPARK to ES…
THANKS!
QUESTIONS??
REFERENCES
Learning Elasticsearch – Anurag Patel (Red Hat)
Introduction to Elasticsearch – Roy Russo
Apache Spark and Elasticsearch – Holden Karau UMD 2014
Streamlining Search Indexing using Elastic Search and Spark (Holden
Karau)
Video Link : https://www.youtube.com/watch?v=jYicnlunDQ0