symantec: cassandra data modelling techniques in action
TRANSCRIPT
© 2015. All Rights Reserved. 2
1 Product Overview
2 Summarization Overview
3 Intermediate Summary Schema
© 2015. All Rights Reserved. 4
Product Overview
• The ‘Information Map’ is a multi-tenant, Veritas-managed cloud service
• It allows our customers to get better insight into their data and to be able manage it more efficiently
• We collect metadata for various objects like files in file servers spread across multiple data centers.
• Users can slice and dice data across numerous filters, from here they can drill down to specific entities.
© 2015. All Rights Reserved. 6
C* Cluster
• 18 Node C* Cluster – AWS Instance type – i2.2xlarge– 8 vCPU– 60 GB RAM– 1.6 TB SSD (2 * 800)
• C* cluster stats- 5 billion items total, average 300 million per customer, largest customer
has 1.9 billion items- 380 GB – 410 GB per node (total ~ 7 TB across cluster)
© 2015. All Rights Reserved. 9
Summarization Overview
• Customer needs interactive queries (response time of < 5s ) and visualization across extremely large data sets.
– Indexing all data and using indexing statistics aggregation doesn't scale.– Based on pre-defined queries and ranges, data can be pre-aggregated
into summaries.
© 2015. All Rights Reserved. 10
Two Stage Summarization
ElasticsearchCassandra
Several billion items
Ingest items
Grouped in to buckets
containing100’s of
millions of rows
Item Metadata Table
Intermediate summary Table
Daily Aggregate & Transform
10’s of millions of items
Fully Searchable index in Elastic Search
Summariseitems
ETL
© 2015. All Rights Reserved. 11
Summary Data - Example
ItemId
Location Content Source(file server)
Container(Share)
Size FileType Creation Date
1 Reading Server1 Share1 100 KB Word 17/09/20152 Reading Server1 Share1 80 KB Word 18/09/20153 NewYork Server2 Share2 150 KB Excel 11/01/20154 NewYork Server2 Share2 150 KB Excel 13/01/20155 NewYork Server2 Share2 600 KB Excel 13/02/2015
Location Content Source (file server)
Container(share)
Age Bucket(relative today)
FileType TotalCount
TotalSize
Reading Server1 Share1 Last Month Word 2 180 KB
NewYork Server2 Share2 Last Year Excel 3 1000 KB
Container(Share)
Weekly Bucket (from 01-01-2015)
FileType Items with Size
Share1 37 Word 1-100KB,2-80KBShare2 2 Excel 3-150KB,4-250KBShare2 7 Excel 5-600KB
Item Metadata (Cassandra)
1st Stage summarization (Cassandra)
2nd Stage summarization (Elasticsearch)
© 2015. All Rights Reserved. 13
Intermediate Summary Schema
• We needed a schema that allows us to - Calculate count and size for various aggregation buckets- Be able to perform insert and updates in an idempotent manner- Relatively quickly iterate through rows related to one ‘container’ as
containers are logical unit of operation when summarizing
© 2015. All Rights Reserved. 14
Making it idempotent
• Pre Cassandra 2.1 Counters are not idempotent • Used a map to of items and size stored against bucket
combinations– Makes it idempotent and allows navigating back to item
• First schema had– PRIMARY KEY(ContainerId,Item_Type_bucket,Size_Bucket,Owner_bucket,
cDate_Bucket, mDate_bucket,aDate_bucket)– This uses compound primary key with ContainerId as partition key– Allows to
• enumerate all the data for a container • efficient slice range queries for known buckets
© 2015. All Rights Reserved. 15
Problem with first attempt
• Cassandra stores data on a node by partition key.• ContainerId as partition key leads to hotspots and excessive wide rows
Bucket_a1|Bucket_b1|bucket_c1{itemId1,size1;itemid2,size2;
…}
Bucket_a1|Bucket_b2|bucket_c1{itemId3,size3;itemid4,size4;
…}
Bucket_a1|Bucket_b1|bucket_c2{itemId1,size1;itemid2,size2;
…}
All other combinations for ContainerId1
ContainerId2
Bucket_a2|Bucket_b1|bucket_c1{itemId11,size11;itemid12,size2;
…}
Bucket_a2|Bucket_b2|bucket_c1
{itemId13,size13;itemid14,size14;
…}
Bucket_a2|Bucket_b1|bucket_c2
{itemId14,size21;itemid15,size22;
…}
All other combinations for ContainerId2
ContainerId1
© 2015. All Rights Reserved. 16
Handling excessive wide rows
- We have pre-known combinations of item type and size buckets- By combining them with containerId and making it composite partition key we can
reduce the physical row size.– PRIMARY KEY ((ContainerId,Item_Type_bucket,Size_Bucket), Owner_bucket,
cDate_Bucket, mdate_bucket,aDate_bucket)
16
Container1/ItemType1/SizeBucket1C1|O1|CDt1|MDt1|ADt1
{itemId1,size1;itemid2,size2;
…}
C1|O1|CDt2|MDt2|ADt1{itemId3,size3;itemid4,size4;
…}
C1|O1|CDt1|MDt1|ADt1{itemId1,size1;itemid2,size2;
…}
All other combinations for Container1, filetype1& sizebucket1
C1|O1|CDt1|MDt1|ADt1{itemId11,size11;itemid12,size2;
…}
C1|O1|CDt2|MDt2|ADt1{itemId13,size13;itemid14,size14;
…}
C1|O1|CDt1|MDt1|ADt1{itemId14,size21;itemid15,size22;
…}
All other combinations for container1, filetype1& sizebucket2
Container1/ItemType2/SizeBucket2
© 2015. All Rights Reserved. 17
Handling excessive wide rows – contd.
- To enumerate all the rows for a container, we make queries using all the known combinations of ‘item type’ and ‘size bucket’ along with that container id.
- For e.g. if there are 50 values in ‘item type’ and 10 in ‘size bucket’ then we will make 500 queries for that container id one by one.
- For container Id1- For each ‘item type’
- For each size ‘size bucket’ - Page through all rows from intermediate_summary table by
partition key i.e. containerId1, item_type1, size_bucket1
17
© 2015. All Rights Reserved. 18
Handling hot spots
- Based on data profile it could still lead to hot spots- Used ‘salt’ as part of composite partition key- Salt derived from hashing some of the item metadata - PRIMARY KEY ((ContainerId,Item_Type_bucket,Size_Bucket, salt),
Owner_bucket, cDate_Bucket, mdate_bucket,aDate_bucket)- For a salt range of 1-50 we now have sub divided partition key row size by
1/50th assuming even distribution
18
© 2015. All Rights Reserved. 19
Compaction Strategy
- Started getting lots of ReadTimeout during ETL- nodetool cfhistogram showed many SSTables used per read- Updates to data causing a logical row to spread into many SSTables- Moved from default ‘SizeTiered’ to ‘Levelled’ compaction that
- Guarantees that 90% of all reads will be satisfied from a single sstable- But uses much more I/O compared to size-tiered compaction
- We tried and found ingest rates were not impacted while ETL errors reduced
19
© 2015. All Rights Reserved. 20
Issue with Collection types- Collections are read in their entirety- Collection types are not for un-bounded scenarios and limited to 64K- So changed the schema and now have itemId in PK- PRIMARY KEY ((container_id, size_bucket, item_type, salt), owner_id,
cdate_bucket, mdate_bucket, adate_bucket, item_id))- We manage client side how many items belong to same bucket- In-memory aggregation of items using Bucket combinations
20
© 2015. All Rights Reserved. 21
Tombstone issue
- Excess deletes are caused because item metadata change means deleting and reinserting
- Hit more than 100K tombstones during reading- Reduced gc_grace_seconds to 5 days in intermediate_summary- Used a separate ‘repair’ script for intermediate_summary to be run every 3
days- Changed schema again to do aggregation per partition key and take dates out
of PK- PRIMARY KEY ((container_id, size_bucket, item_type, salt), owner_id, item_id)) – date
buckets just non key columns- In-memory aggregation of items using (container_id, size_bucket, item_type, salt)
combinations
21