deep dive and best practices for windows azure storage services
DESCRIPTION
More info on http://www.techdays.beTRANSCRIPT
Windows Azure Storage Services
Yves Goeleven@YvesGoeleven
Me?
Yves Goeleven Solution Architect Capgemini Windows Azure MVP
Co-founder Azug Founder Goeleven.com Founder Messagehandler.net Contributor NServiceBus
Agenda
Deep Dive Architecture Global Namespace Front End Layer Partition Layer Stream Layer
Best Practices Table Partitioning Strategy Change Propagation Strategy Counting Strategy Master Election Strategy
Warning! This is a level 400 session!
I assume you know
Storage services Highly scalable Highly available Cloud storage
I assume you know
That brings following data abstractions Tables Blobs Drives Queues
I assume you know
It is NOT a relational database Storage services exposed trough a REST full API With some sugar on top (aka an SDK) LINQ to tables support might make you think it is a database
This talk is based on an academic whitepaper
http://sigops.org/sosp/sosp11/current/2011-Cascais/printable/11-calder.pdf
Which discusses how
Storage services defy the rules of the CAP theorem Distributed & scalable system Highly available Strongly Consistent Partition tolerant
All at the same time!
Now how did they do that?Deep Dive: Ready for a plunge?
Global Namespace
Global namespace
Every request gets sent to http(s)://AccountName.Service.core.windows.net/PartitionName/
ObjectName
Global namespace
DNS based service location http(s)://AccountName.Service.core.windows.net/PartitionName/
ObjectName DNS resolution Datacenter (f.e. Western Europe) Virtual IP of a Stamp
Stamp
Scale unit A windows azure deployment 3 layers
Typically 20 racks each Roughly 360 XL instances
With disks attached 2 Petabyte (before june 2012) 30 Petabyte
LB
Front End Layer
Partition Layer
Stream Layer
Intra-stamp replication
Incoming read request
Arrives at Front End Routed to partition layer http(s)://AccountName.Service.core.windows.net/PartitionName/
ObjectName Reads usually from memory
Partition Layer
Front End LayerLB
FE FE FE FE FE FE FE FE FE
PS PS PS PS PS PS PS PS PS
Incoming read request
2 nodes per request Blazingly fast Flat “Quantum 10” network topology 50 Tbps of bandwidth
10 Gbps / node Faster than typical ssd drive
Front End Layer
Incoming Write request
Written to stream layer Synchronous replication 1 Primary Extend Node 2 Secondary Extend Nodes
Stream Layer
Partition Layer
Primary SecondarySecondary
PS PS PS PS PS PS PS PS PS
EN EN EN EN EN EN EN
Geo replication
LB
Front End Layer
Partition Layer
Stream Layer
Intra-stamp replication
LB
Front End Layer
Partition Layer
Stream Layer
Intra-stamp replication
Inter-stamp replication
Location Service
Location Service
Stamp Allocation Manages DNS records Determines active/passive stamp configuration
Assigns accounts to stamps Geo Replication Disaster Recovery Load Balancing Account Migration
Front End Layer
Front End Nodes
Stateless role instances Authentication & Authorization
Shared Key (storage key) Shared Key Lite Shared Access Signatures
Routing to Partition Servers Based on PartitionMap (in memory cache)
Versioning x-ms-version: 2011-08-18
Throttling
Front End Layer
FE FE FE FE FE FE FE FE FE
Partition Map
Determine partition server Used By Front End Nodes Managed by Partition Manager (Partition Layer) Ordered Dictionary
Key: http(s)://AccountName.Service.core.windows.net/PartitionName/ObjectName
Value: Partition Server Split across partition servers
Range Partition
Practical examples
Blob Each blob is a partition Blob uri determines range Range only matters for listing Same container: More likely same range
https://myaccount.blobs.core.windows.net/mycontainer/subdir/file1.jpg Https://myaccount.blobs.core.windows.net/mycontainer/subdir/file2.jpg
Other container: Less likely https://myaccount.blobs.core.windows.net/othercontainer/subdir/file1.jpg
Practical examples
Queue Each queue is a partition Queue name determines range All messages same partition Range only matters for listing
Practical examples
Table Each group of rows with same PartitionKey is a partition Ordering within set by RowKey Range matters a lot! Expect continuation tokens!
Continuation tokens
Requests across partitions All on same partition server
1 result Across multiple ranges (or to many results)
Sequential queries required 1 result + continuation token Repeat query with token until done
Will almost never occur in development! As dataset is typically small
Partition Layer
PartitionLayer
Dynamic Scalable Object Index Object Tables
System defined data structures Account, Entity, Blob, Message, Schema, PartitionMap
Dynamic Load Balancing Monitor load to each part of the index to determine hot spots Index is dynamically split into thousands of Index Range Partitions based on load Index Range Partitions are automatically load balanced across servers to quickly adapt
to changes in load Updates every 15 seconds
Partition layer
Consists of Partition Master
Distributes Object Tables Partitions across Partition Servers Dynamic Load Balancing
Partition Server Serves data for exactly 1 Range Partition Manages Streams
Lock Server Master Election Partition Lock
Partition Layer
PS PS PS
PM PM
LS LS
Dynamic Scalable Object Index
How it works
Partition Layer
PS PS PS
PM PM
LS LS
Front End Layer
FE FE FE FE
Account
Container
Blob
Aaaa Aaaa Aaaa
Harry Pictures Sunrise
Harry Pictures Sunset
Richard Images Soccer
Richard Images Tennis
Zzzz Zzzz zzzz
A - H
H’- R
R’- Z
Map
Dynamic Load Balancing
Not in the critical path!
Stream Layer
Partition Layer
Primary SecondarySecondary
PS PS PS PS PS PS PS PS PS
EN EN EN EN EN EN EN
PM PM
LS LS
Partition Server
Architecture Log Structured Merge-Tree
Easy replication Append only persisted streams
Commit log Metadata log Row Data (Snapshots) Blob Data (Actual blob data)
Caches in memory Memory table (= Commit log) Row Data Index (snapshot location) Bloom filter Load Metrics
Memory Data
Persistent Data
Commit Log Stream
Metadata Log Stream
Row Data Stream
Blob Data Stream
MemoryTable
RowCache
IndexCache
BloomFilters
Load Metrics
Incoming Read request
HOT Data Completely in memory
Memory Data
Persistent DataCommit Log
StreamMetadata Log
Stream
Row Data Stream
Blob Data Stream
MemoryTable
RowCache Index
CacheBloomFilters
Load Metrics
Incoming Read request
COLD Data Partially Memory
Memory Data
Persistent DataCommit Log
StreamMetadata Log
Stream
Row Data StreamRow Data Stream
Blob Data Stream
MemoryTable
RowCache Index
CacheBloomFilters
Load Metrics
Incoming Write Request
Persistent First Followed by memory
Memory Data
Persistent DataCommit Log
StreamCommit Log
StreamMetadata Log
Stream
Row Data Stream
Blob Data Stream
MemoryTable
RowCache Index
CacheBloomFilters
Load Metrics
Checkpoint process
Not in critical path!
Memory Data
Persistent DataCommit Log Stream
Commit Log Stream
Metadata Log Stream
Row Data Stream
Blob Data Stream
MemoryTable
RowCache Index
CacheBloomFilters
Load Metrics
Row Data StreamCommit Log Stream
Geo replication process
Not in critical path!
Memory Data
Persistent Data
Commit Log Stream
Commit Log Stream
Metadata Log Stream
Row Data Stream
Blob Data Stream
MemoryTable
RowCache
IndexCache
BloomFilters
Load MetricsMemory Data
Persistent Data
Commit Log Stream
Commit Log Stream
Metadata Log Stream
Row Data Stream
Blob Data Stream
MemoryTable
RowCache
IndexCache
BloomFilters
Load Metrics
Row Data Stream
Stream Layer
Stream layer
Append-Only Distributed File System Streams are very large files
File system like directory namespace Stream Operations
Open, Close, Delete Streams Rename Streams Concatenate Streams together Append for writing Random reads
Extent
Stream layer
Concepts Block
Min unit of write/read Checksum Up to N bytes (e.g. 4MB)
Extent Unit of replication Sequence of blocks Size limit (e.g. 1GB) Sealed/unsealed
Stream Hierarchical namespace Ordered list of pointers to extents Append/Concatenate
Stream //foo/myfile.data
Extent Extent Extent
Pointer 1 Pointer 2 Pointer 3
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Sealed UnsealedSealed
Stream layer
Consists of Paxos System
Distributed consensus protocol Stream Master
Extend Nodes Attached Disks Manages Extents Optimization Routines
Stream Layer
EN EN EN EN
SMSM
SM
Paxos
Extent creation
Allocation process Partition Server
Request extent / stream creation Stream Master
Chooses 3 random extend nodes based on available capacity
Chooses Primary & Secondary's Allocates replica set
Random allocation To reduce Mean Time To Recovery
Stream Layer
EN EN EN
SM
Partition LayerPS
PrimarySecondarySecondary
Replication Process
Bitwise equality on all nodes Synchronous Append request to primary Written to disk, checksum Offset and data forwarded to
secondaries Ack
Journaling Dedicated disk (spindle/ssd) For writes Simultaneously copied to data
disk (from memory or journal)
Stream Layer
EN EN EN
SM
Partition LayerPS
Primary Secondary Secondary
Dealing with failure
Ack from primary lost going back to partition layer Retry from partition layer can cause multiple blocks to be appended
(duplicate records)
Unresponsive/Unreachable Extent Node (EN) Seal the failed extent & allocate a new extent and append immediately
ExtentExtent
Stream //foo/myfile.data
Extent Extent
Pointer 1 Pointer 2 Pointer 3
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Extent
Blo
ck
Blo
ck
Blo
ck
Extent
Blo
ck
Replication Failures (Scenario 1)
Stream Layer
EN EN EN
SM
Partition Layer
PS
Primary
Blo
ck
Blo
ck
Blo
ck
Secondary
Blo
ck
Blo
ck
Blo
ck
Secondary
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Seal
120120
Primary
Blo
ck
Blo
ck
Blo
ck
Secondary
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Secondary
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Replication Failures (Scenario 2)
Stream Layer
EN EN EN
SM
Partition Layer
PS
Primary
Blo
ck
Blo
ck
Blo
ck
Secondary
Blo
ck
Blo
ck
Blo
ck
Secondary
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Seal
120100
Primary
Blo
ck
Blo
ck
Blo
ck
Secondary
Blo
ck
Blo
ck
Blo
ck
Secondary
Blo
ck
Blo
ck
Blo
ck?
Network partitioning during read
Stream Layer
EN EN EN
SM
Partition Layer
PS
Primary
Blo
ck
Blo
ck
Blo
ck
Secondary
Blo
ck
Blo
ck
Blo
ck
Secondary
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Data Stream Partition Layer only reads
from offsets returned from successful appends
Committed on all replicas Row and Blob Data Streams Offset valid on any replica Safe to read
Blo
ck
Network partitioning during read
Stream Layer
EN EN EN
SM
Partition Layer
PS
Primary
Blo
ck
Blo
ck
Blo
ck
Secondary
Blo
ck
Blo
ck
Blo
ck
Secondary
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Log Stream Logs are used on partition
load Check commit length first Only read from
Unsealed replica if all replicas have the same commit length
A sealed replica
1, 2
Seal
Secondary
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Secondary
Blo
ck
Blo
ck
Blo
ck
Blo
ck
Garbage Collection
Downside to this architecture Lot’s of wasted disk space
‘Forgotten’ streams ‘Forgotten’ extends Unused blocks in extends after write errors
Garbage collection required Partition servers mark used streams Stream manager cleans up unreferenced streams Stream manager cleans up unreferenced extends Stream manager performs erasure coding
Read Solomon algorithm Allows recreation of cold streams after deleting extends
Beating the CAP theorem
Layering & co-design Stream Layer
Availability with Partition/failure tolerance For Consistency, replicas are bit-wise identical up to the commit length
Partition Layer Consistency with Partition/failure tolerance For Availability, RangePartitions can be served by any partition server Are moved to available servers if a partition server fails
Designed for specific classes of partitioning/failures seen in practice Process to Disk to Node to Rack failures/unresponsiveness Node to Rack level network partitioning
Best practices
Partitioning Strategy
Tables: Great for writing data, horrible for complex querying Data is automatically spread across extend nodes Indexes are automatically spread across partition servers Queries to many servers may be required to list data
Front End Layer
Stream Layer
Partition Layer
SecondarySecondary
PS PS PS PS PS PS PS PS PS
EN EN EN EN EN EN EN
Partitioning Strategy
Insert all aggregates in a dedicated partition Aggregate
Records that belong together as a unit F.e. Order with orderliness
Dedicated Unique combination of partition parts
Account, table, partitionkey Best insert performance
Use Entity Group Transactions All aggregate records insert in 1 go Transactional
Use Rowkey only for ordering Can be blank
Account Table PK RowKey Properties
Europe Orders Order1 Order 1 data
Order1 01 Orderline 1 data
Order1 02 Orderline 2 data
Europe Orders Order2 Order 2 data
Order2 01 Orderline 1 data
Order2 02 Orderline 2 data
US Orders Order3 Order 3 data
Order3 01 Orderline 1 data
Order3 02 Orderline 2 data
Partitioning Strategy
Query By = Store By Do not query across partitions, it hurts! Instead store a copy of the data to query In 1 partition
PartitionKey Combination of query by’s or well known identifiers
Rowkey Use for order Must be unique in combo PK
Guarantees fast retrieval Every query = 1 partition server
Acct
Table PK RK Properties
Yves
Orders 0634911534000000000
1 Order 1 data
0634911534000000000
1_01 Orderline 1 data
0634911534000000000
1_02 Orderline 2 data
Yves
Orders 0634911534000000000
3 Order 3 data
0634911534000000000
3_01 Orderline 1 data
0634911534000000000
3_02 Orderline 2 data
Yves
Orders 0634911666000000000
2 Order 2 data
0634911666000000000
2_01 Orderline 1 data
0634911666000000000
2_02 Orderline 2 data
Partitioning Strategy
DO Denormalize! It’s not a sin! Limit dataset to values required by query Do precompute Do format up front Etc…
Acct
Table PK RK Properties
Yves
Orders 0634911534000000000
1 €200
Yves
Orders 0634911534000000000
3 $500
Yves
Orders 0634911666000000000
2 €1000
Bonus Tip
Great partition keys Any combination of properties of the object Who
F.e. current userid When
DateTime.Format("{0:D19}", DateTime.MaxValue.Ticks – DateTime.UtcNow.Ticks) Where
Geohash algorithm (geohash.org) Why
Flow context through operations/messages
Add all partition key values as properties to the original aggregate root! Or update will be your next nightmare
Change Propagation Strategy
Use queued messaging NO TRANSACTION support Split multi-partition update In separate atomic operations Built-in retry mechanism on storage queues Use compensation logic if needed
Example Update primary records in entity group tx Publish ‘Updated’ event through queues
Maintain list interested nodes in storage Secondary nodes update indexes in response
Worker roles
S S S
Web Roles/Websites
P
Change Propagation Strategy
Sounds hard, but don’t worry I’ve gone through all the hard work already And contributed this back to the community NServiceBus
Bonus Works very well to update web role caches Very nice in combination with SignalR!
As A Service www.messagehandler.net
Worker roles
S S S
Web Roles/Websites
P
Counting
Counting records across partitions is a PITA, because querying is…
And there is no built in support Do it upfront I typically count ‘created’ events in flight
It’s just another handler BUT…
+1 is not an idempotent operation Keep the count atomic
No other operation except +1 & save Use sharded counters
Every thread on every node ‘Owns’ a shard of the counter (prevent concurrency)
In single partition Roll up when querying and/or regular interval
Worker roles
S S S
Web Roles/Websites
P
C
Counting
Counting actions in Goeleven.com public class CountActions : IHandleMessages<ActionCreated> { public void Handle(ActionCreated message) { string identifier = RoleEnvironment.CurrentRoleInstance.Id + "." + Thread.CurrentThread.ManagedThreadId; var shard = counterShardRepository.TryGetShard("Actions", identifier);
if (shard == null) { shard = new CounterShard { PartitionKey = "Actions", RowKey = identifier, Partial = 1 }; counterShardRepository.Add(shard); } else { shard.Partial += 1; counterShardRepository.Update(shard); } counterShardRepository.SaveAllChanges(); } }
Acct
Table PK RK Properties
My Counters
Actions RollUp 2164
My Counters
Actions Goeleven.Worker_IN_0.11
12
My Counters
Actions Goeleven.Worker_IN_1.56
15
Counting
Rollup public class Counter { public long Value { get; private set; }
public Counter(IEnumerable<CounterShard> counterShards) { foreach (var counterShard in counterShards) { Value += counterShard.Partial; } } }
Acct
Table PK RK Properties
My Counters
Actions RollUp 2164
My Counters
Actions Goeleven.Worker_IN_0.11
12
My Counters
Actions Goeleven.Worker_IN_1.56
15
My Counters
Actions 2181
Master Election
Scheduling in multi-node environment Like rollup All nodes kick in together doing the same thing Master election required to prevent conflicts Remember the locking service? It surfaces through blob as a ‘Lease’ So you can use it too! Partition Layer
PS PS PS
PM PM
LS LS
Master Election
Try to acquire a lease If success start work, and hold on to the lease until done
leaseId = blob.TryAcquireLease(); if (leaseId != null) { renewalThread = new Thread(() => { Thread.Sleep(TimeSpan.FromSeconds(40)); blob.RenewLease(leaseId); }); renewalThread.Start(); }
// do your thingrenewalThread.Abort();
blob.ReleaseLease(leaseId);
Find blob extension on http://blog.smarx.com/posts/leasing-windows-azure-blobs-using-the-storage-client-library
Wrapup
What we discussed
Deep Dive Global Namespace Front End Layer Partition Layer Stream Layer
Best Practices Table Partitioning Strategy Change Propagation Strategy Counting Strategy Master Election Strategy
Resources
Want to know more? White paper: http://sigops.org/sosp/sosp11/current/2011-Cascais/printable/11-
calder.pdf Video: http://www.youtube.com/watch?v=QnYdbQO0yj4 More slides: http://sigops.org/sosp/sosp11/current/2011-Cascais/11-calder.pptx Blob extensions: http://blog.smarx.com/posts/leasing-windows-azure-blobs-using-the-
storage-client-library NServiceBus: https://github.com/nservicebus/nservicebus
See it in action www.goeleven.com www.messagehandler.net
Q&A