replication and workload management for in-memory oltp

107
R W M I-M OLTP D by Dai Qin A thesis submitted in conformity with the requirements for the degree of Doctor of Philosophy The Edward S. Rogers Sr. Department of Electrical & Computer Engineering University of Toronto © Copyright by Dai Qin 2021

Upload: others

Post on 30-Jan-2022

2 views

Category:

Documents


0 download

TRANSCRIPT

Page 1: Replication and Workload Management for In-Memory OLTP

Replication and Workload Management for In-Memory OLTPDatabases

by

Dai Qin

A thesis submitted in conformity with the requirementsfor the degree of Doctor of Philosophy

The Edward S. Rogers Sr. Department of Electrical & Computer EngineeringUniversity of Toronto

© Copyright by Dai Qin 2021

Page 2: Replication and Workload Management for In-Memory OLTP

Abstract

Replication and Workload Management for In-Memory OLTP Databases

Dai QinDoctor of Philosophy

The Edward S. Rogers Sr. Department of Electrical & Computer EngineeringUniversity of Toronto

2021

Online transaction processing (OLTP) databases are a critical component of modern computing

infrastructure services. As a result, they must be highly available, and they must process

requests efficiently for a wide range of workloads. Databases provide high availability by using

replication so that when a database replica fails, a backup can take over. They use workload

management mechanisms for efficiently supporting different types of workloads with varying

levels of skew and contention.

This thesis revisits the challenges of database replication and workload management for

in-memory databases. Unlike traditional disk-based databases, in-memory databases are de-

signed for workloads whose entire dataset fits in DRAM memory. These databases are highly

scalable, raising challenges for replication and workload management. For example, traditional

database replication schemes suffer from the network, instead of storage, bottlenecks because

in-memory databases have much higher throughput, and traditional workload management

solutions significantly limit the performance of in-memory databases.

In this thesis, we propose using deterministic concurrency control as the basis for replication

and workload management. Deterministic concurrency control allows transactions to execute

concurrently while guaranteeing equivalence to a predetermined serial ordering of transactions.

For data replication, we propose a replay-based scheme that executes transactions concurrently

and scalably on the backup database in the serial order predetermined by the primary database.

Our solution reduces network bandwidth requirements to 10-15% of traditional database repli-

cation schemes. For workload management, we propose two optimizations to deterministic

ii

Page 3: Replication and Workload Management for In-Memory OLTP

concurrency control that help parallelize internal database operations. These optimizations en-

able handling contention and skewed workloads efficiently and provide 30% to 6× performance

improvements.

iii

Page 4: Replication and Workload Management for In-Memory OLTP

Acknowledgements

This thesis would not be possible without my advisors Prof Ashvin Goel and Prof AngelaDemke Brown. They put a significant amount of time and effort into supervising this research.Throughout my entire Ph.D. study, they were always supportive both professionally and per-sonally, and I have learned countless invaluable skills from them. Getting a Ph.D. is hard, andI cannot imagine what it would be like without their help and support.

I would like to thank my committee, Prof Michael Stumm, Prof Ding Yuan, and Prof EddieKohler, for taking the time to read the thesis in extensive detail. Their constructive adviceappreciably improved this thesis. It is my honour to have you all on my committee. I wouldalso like to acknowledge Dr. F. Ryan Johnson and Prof Tianzheng Wang for their help andencouragement in getting me started in the database area. I learned many things about databasesfrom them in the early days of my Ph.D. study.

Finally, I would like to express my gratitude to many friends who helped me during mystudy. It was a pleasure to know you all on this unforgettable journey.

iv

Page 5: Replication and Workload Management for In-Memory OLTP

Contents

Acknowledgements . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . iv

Table of Contents . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . v

List of Tables . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . viii

List of Figures . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . ix

1 Introduction . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 11.1 Motivation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 2

1.1.1 Challenges with Database Replication . . . . . . . . . . . . . . . . . . 31.1.2 Challenges with Workload Management . . . . . . . . . . . . . . . . . 3

1.2 Our Approach . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 41.3 Contributions . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 51.4 Thesis Organization . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 6

2 Background . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 72.1 Transactions . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 7

2.1.1 Atomicity . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 72.1.2 Isolation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 82.1.3 Durability . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 11

2.2 In-Memory Databases . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 112.2.1 Shared-Nothing vs Shared-Memory Architecture . . . . . . . . . . . . 112.2.2 Programming Model . . . . . . . . . . . . . . . . . . . . . . . . . . . 122.2.3 Opacity . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 13

2.3 Deterministic Concurrency Control . . . . . . . . . . . . . . . . . . . . . . . . 15

3 Replay Based Primary-Backup Replication . . . . . . . . . . . . . . . . . . . . . 183.1 Motivating Incidents . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 20

v

Page 6: Replication and Workload Management for In-Memory OLTP

3.2 Multi-Version Deterministic Concurrency Control . . . . . . . . . . . . . . . . 203.2.1 Observation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 213.2.2 Transaction Model . . . . . . . . . . . . . . . . . . . . . . . . . . . . 223.2.3 Overview of Approach . . . . . . . . . . . . . . . . . . . . . . . . . . 223.2.4 Recording on the Primary . . . . . . . . . . . . . . . . . . . . . . . . 233.2.5 Deterministic Execution on the Backup . . . . . . . . . . . . . . . . . 25

3.3 Implementation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 283.3.1 Recording on the Primary . . . . . . . . . . . . . . . . . . . . . . . . 293.3.2 Replaying on the Backup . . . . . . . . . . . . . . . . . . . . . . . . . 313.3.3 Failover . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 313.3.4 Read-Only Transactions . . . . . . . . . . . . . . . . . . . . . . . . . 32

3.4 Evaluation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 323.4.1 Workloads . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 333.4.2 Network Traffic . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 343.4.3 Primary Performance . . . . . . . . . . . . . . . . . . . . . . . . . . . 353.4.4 Replay Performance . . . . . . . . . . . . . . . . . . . . . . . . . . . 37

3.5 Bug Detection . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 403.5.1 Ignoring Write-Write Conflicts . . . . . . . . . . . . . . . . . . . . . . 413.5.2 Logical Phantom Detection . . . . . . . . . . . . . . . . . . . . . . . . 41

3.6 Summary . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 43

4 Workload Management Under Deterministic Concurrency Control . . . . . . . . 444.1 The Caracal Design . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 46

4.1.1 Deterministic Concurrency Control . . . . . . . . . . . . . . . . . . . 464.1.2 Initialization Phase: Batch Append . . . . . . . . . . . . . . . . . . . 494.1.3 Execution Phase: Split-on-Demand . . . . . . . . . . . . . . . . . . . 494.1.4 Garbage Collection . . . . . . . . . . . . . . . . . . . . . . . . . . . . 534.1.5 Logging and Recovery . . . . . . . . . . . . . . . . . . . . . . . . . . 544.1.6 Limitations of Determinism . . . . . . . . . . . . . . . . . . . . . . . 55

4.2 Implementation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 564.2.1 Serial ID Assignment . . . . . . . . . . . . . . . . . . . . . . . . . . . 564.2.2 Initialization of the Version Array . . . . . . . . . . . . . . . . . . . . 564.2.3 Transaction Scheduler . . . . . . . . . . . . . . . . . . . . . . . . . . 584.2.4 Index Search Optimization . . . . . . . . . . . . . . . . . . . . . . . . 594.2.5 Inlining . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 604.2.6 Memory Allocation . . . . . . . . . . . . . . . . . . . . . . . . . . . . 60

vi

Page 7: Replication and Workload Management for In-Memory OLTP

4.2.7 NUMA Scaling . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 614.3 Evaluation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 61

4.3.1 Hardware and Software Platform . . . . . . . . . . . . . . . . . . . . . 614.3.2 Comparison Databases . . . . . . . . . . . . . . . . . . . . . . . . . . 624.3.3 YCSB . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 634.3.4 TPC-C Like . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 654.3.5 Comparing with Non-Deterministic Databases . . . . . . . . . . . . . 684.3.6 Impact of Optimizations . . . . . . . . . . . . . . . . . . . . . . . . . 714.3.7 Tuning . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 734.3.8 Garbage Collection . . . . . . . . . . . . . . . . . . . . . . . . . . . . 744.3.9 Latency . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 744.3.10 Scaling Out . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 76

4.4 Summary . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 77

5 Related Work . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 785.1 Deterministic Concurrency Control . . . . . . . . . . . . . . . . . . . . . . . . 785.2 Replication . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 80

5.2.1 Log Shipping . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 815.2.2 Logging and Recovery in Shared-Nothing Architectures . . . . . . . . 81

5.3 Workload Management . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 835.3.1 Shared-Memory Architectures . . . . . . . . . . . . . . . . . . . . . . 835.3.2 Shared-Nothing Architectures . . . . . . . . . . . . . . . . . . . . . . 86

6 Conclusions and Future Work . . . . . . . . . . . . . . . . . . . . . . . . . . . . 87

vii

Page 8: Replication and Workload Management for In-Memory OLTP

List of Tables

2.1 Final Outcome of A and B Under Different Isolations . . . . . . . . . . . . . . 102.2 Example CVEs of PL/SQL and Javascript Runtimes . . . . . . . . . . . . . . . 16

3.1 Input Size versus Output Size in TPC-C . . . . . . . . . . . . . . . . . . . . . 213.2 Network Traffic . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 333.3 Concurrent Interleaving in ERMIA that ignores Write-Write Conflicts . . . . . 413.4 A Phantom Example . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 423.5 Logical Phantom . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 42

5.1 Comparison with Existing Deterministic Schemes . . . . . . . . . . . . . . . . 78

viii

Page 9: Replication and Workload Management for In-Memory OLTP

List of Figures

2.1 T1 and T2 on Two Cores . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 10

3.1 An Example of Commit Back In Time . . . . . . . . . . . . . . . . . . . . . . 243.2 An Example of Executing a Transaction . . . . . . . . . . . . . . . . . . . . . 273.3 Primary Performance . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 353.4 Primary Performance, 1 Gb/s Network . . . . . . . . . . . . . . . . . . . . . . 363.5 Concurrent Replay Performance . . . . . . . . . . . . . . . . . . . . . . . . . 383.6 Throughput With Different Epoch Lengths . . . . . . . . . . . . . . . . . . . . 393.7 Link List Versions . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 40

4.1 The Caracal Architecture . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 474.2 Load Balancing Split Pieces . . . . . . . . . . . . . . . . . . . . . . . . . . . 524.3 Per-core Buffers for Batched Initialization . . . . . . . . . . . . . . . . . . . . 574.4 YCSB Performance (8:2 read/write ratio) . . . . . . . . . . . . . . . . . . . . 644.5 YCSB Contention Performance (7/10 contended keys) . . . . . . . . . . . . . . 654.6 TPC-C and TPC-C Single Warehouse Performance . . . . . . . . . . . . . . . 674.7 YCSB Performance (8:2 read/write ratio) . . . . . . . . . . . . . . . . . . . . 684.8 YCSB Contention Performance (7/10 contended keys) . . . . . . . . . . . . . . 694.9 TPC-C and TPC-C Single Warehouse Performance . . . . . . . . . . . . . . . 704.10 Impact of optimizations on different workloads . . . . . . . . . . . . . . . . . 724.11 Tuning Thresholds for split-on-demand . . . . . . . . . . . . . . . . . . . . . . 734.12 Memory Usage . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 754.13 Latency and Throughput . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 764.14 Distributed TPC-C Throughput . . . . . . . . . . . . . . . . . . . . . . . . . . 76

ix

Page 10: Replication and Workload Management for In-Memory OLTP

Chapter 1

Introduction

Databases are popular data management platforms for various types of applications. Theystore data in storage media and process data by answering application queries. According toStackOverflow’s 2018 Developer Survey [71], 66,270 out of 100,000 developers said that theyuse some form of a database for work. Amazon AWS has reported [71] that its database service(RDS) grew roughly 40% from 2017 to 2018.

Based on the kind of queries that the database processes, there are two types of databases:Online Analytics Processing (OLAP) databases and Online Transaction Processing (OLTP)databases. OLAP databases are designed to answer ad-hoc and read-only queries. Thesequeries involve scanning a large number of rows and joining many tables. OLAP databases areoften used to generate business reports and management forms. In contrast, OLTP databasesare designed to process a large number of simple queries efficiently. Each query, which is calleda transaction, accesses a few rows and often updates some data. OLTP databases are used byapplications that change data dynamically, like websites, mobile apps and banking systems.

OLTP databases process transactions concurrently for efficiency. For correctness, databasescoordinate transactions so that their outcome is equivalent to some serial order, which is calledconcurrency control. Concurrency also introduces work assignment problems. To utilizemultiple processors efficiently, transactions need to be assigned to processors evenly, or elseload imbalance can degrade performance.

OLTP databases are critical infrastructure services, and in many scenarios, the time it takesfor the database to respond can impact business revenue. For example, Akamai reports [1] thatevery 100 millisecond delay in website load time can hurt revenue conversion rates by 7%. Thisdata shows the importance of ensuring that: (1) databases must be always available; and (2)database requests must be processed efficiently.

However, there are two challenges to achieving these goals. First, databases may fail, butwhen they fail, the entire application stops working. To overcome this situation, we need another

1

Page 11: Replication and Workload Management for In-Memory OLTP

Chapter 1. Introduction 2

backup server, so when the primary database fails, another backup database can take over andrespond to requests. This requires that the primary database replicates all modifications to thebackup in realtime, which is commonly called database replication.

The second challenge is that the performance of the database can be significantly affectedby the type of workload. When transactions run concurrently, they may access shared data.These accesses are called conflicting accesses when at least one of them updates the data. Thedatabase coordinates these conflicting accesses, as part of its concurrency control, to preservedata integrity. Often, databases are designed to process low-conflict workloads efficiently, buttheir performance collapses under high-conflict workloads. In this case, the database spendssignificant time coordinating the conflicting accesses, which is called contention. However,in practice, many workloads are unpredictable and may encounter contention at any time.Thus the database must perform contention management to scale these workloads. Similarly,the database must ensure that the workload is well balanced across processors so that all theprocessors can be utilized efficiently. We refer to these requirements as workload management.

1.1 Motivation

Database replication and workload management challenges are not new in the database commu-nity. Many traditional databases use log shipping [61, 45, 32] for database replication. When atransaction commits on the primary database, the primary database logs its data modifications.The log, containing the new data value that the transaction modifies, is sent over the network tothe backup database. The backup database replays the log by applying the new values from thelog to the database. However, to maintain data integrity, the backup database has to replay thelog sequentially.

Many traditional databases also perform workload management. The majority of tradi-tional databases use locking based concurrency control, and so their workload managementmechanisms also rely on locking. Under contention, transactions need to wait for a long timewhen acquiring locks. Thus, many workload management schemes [22, 65] detect contentionby measuring the wait time that occurs during locking, and they handle contention by abortingand retrying transactions at a different time in the future.

However, traditional databases were designed when rotating disks were used for storage withcore counts and memory capacities were limited due to price. These database designs focus onoptimizing disk IO because they are bottlenecked by disk performance. As a result, replicationand workload management did not have a significant impact on database performance.

Today, modern machines are equipped with high core counts and terabytes of memory,allowing many OLTP applications to fit entirely into memory, and improving database storage

Page 12: Replication and Workload Management for In-Memory OLTP

Chapter 1. Introduction 3

performance by orders of magnitude. Many commercial database vendors are starting tooffer the in-memory version of their database, such as SAP HANA [19, 69], SQL Server(Hekaton) [10], Oracle TimesTen [57], and Apache/Pivotal Geode [76]. These products aredeployed by various customers across many industries [66].

These technology trends have exposed many scaling challenges in the design of traditionaldisk-based databases, leading to a large body of research in novel designs for in-memorydatabases [72, 58, 69, 10, 79, 34, 40, 31]. These in-memory databases offer much higherperformance but traditional replication and workload management schemes introduce highoverheads in these databases as discussed below.

1.1.1 Challenges with Database Replication

Traditional replication schemes assume that the network bandwidth is sufficient because theprimary database is bottlenecked by storage. However, since in-memory databases run at muchfaster speeds, without being bottlenecked by the storage hardware, they generate much higheramounts of data for replication. For example, an in-memory database such as Silo [79] cangenerate close to 1GB/s of log data [90]. Furthermore, to tolerate regional power failure andother disaster situations, the primary and the backup are often located in different geographiclocations. However, network providers generally offer limited bandwidth with relatively highcosts for these inter-region networks and upgrading wide-area networks is an expensive propo-sition. Therefore, for in-memory databases, network bandwidth is the major bottleneck for longdistance replication, and so it is critical to optimize network usage.

On the backup, we also need to ensure that the backup database can keep up with thethroughput of the primary database. This wasn’t a challenge for disk-based primary databasesbecause the throughput was bottleneck by the disk, so sequential log replay works well in thesedatabases. However, for in-memory databases, the primary database is efficient and scalableand so the backup database must also be efficient and scalable to avoid becoming the bottleneck.

1.1.2 Challenges with Workload Management

Traditional databases generally use locking-based concurrency control schemes [47, 54, 22, 65].The workload management schemes in these databases rely on lock wait times for detectingcontention in the workload. Locking-based concurrency control works well for traditionaldatabases because the disk bottleneck outweighs the cost of locking. However, for in-memorydatabases, locking-based protocols are not widely used because the cost of locking can limitthe performance of the in-memory database [40, 89, 82]. While there has been recent progress

Page 13: Replication and Workload Management for In-Memory OLTP

Chapter 1. Introduction 4

on workload management in optimistic concurrency control protocols[40, 82, 31], workloadmanagement for other in-memory database protocols remains unexplored.

Since in-memory database concurrency control has low overheads, workload managementalso needs to be performed efficiently. For example, heavy-weight techniques such as maintain-ing and analyzing the wait-for graph [22] will not work well with high-performance in-memorydatabases.

1.2 Our Approach

This thesis revisits the replication and workload management challenges for in-memory data-bases. We propose low-overhead solutions for both replication and workload management sothat the in-memory database can provide these features with minimal impact on its performance.

We propose using deterministic concurrency control for both replication and workloadmanagement. A deterministic database uses a concurrency control scheme that guaranteesequivalence to a predetermined serial ordering of transactions [77]. Unlike non-deterministicconcurrency control schemes, the serial ordering is established before transactions are executed,ensuring that a transaction’s outcome is uniquely determined by the database’s initial state and anordered set of known previous transactions. This property allows implementing concurrencycontrol efficiently because it converts a global serial order into fine-grained per-row accessorder. Furthermore, it enables our scalable replication and workload management methods, asdescribed below.

For replication, we observe that the size of the transaction’s input parameters is generallymuch smaller than the size of the transaction output, which is the amount of data written by atransaction. To optimize network usage and to ensure that the backup can scale with the primarydatabase, we propose a scalable replay-based replication scheme. The primary database sendstransaction input parameters to the backup database, which replays transactions concurrently.The backup database uses deterministic concurrency control to ensure that the replay followsthe serial order established by the primary, thus guaranteeing that its outcome is consistent withthe primary.

For workload management, we design a shared memory deterministic database. Unlikeprevious deterministic databases that use partitioning to avoid contention, but can suffer fromload imbalance, our shared memory design providing good load balancing but may suffer fromcontention due to data sharing. We propose two optimizations for reducing contention. Theseoptimizations reorder and parallelize the internal operations of the database by leveraging thepredetermined serial order in a deterministic database. The resulting database provides goodperformance for both skewed or contended workloads.

Page 14: Replication and Workload Management for In-Memory OLTP

Chapter 1. Introduction 5

Our use of deterministic concurrency control is especially well suited for in-memory data-bases because the predetermined serial ordering has a smaller impact on database performance.For example, if many fast transactions are ordered after a slow transaction, and all transactionsneed to update a value at the end, then all fast transactions need to wait for the slow transactionto end. However, if the slow transaction is ordered after fast transactions, then while the fasttransactions are executing, parts of the slow transaction can execute in parallel, which willimprove performance compared to the previous serial order. For this reason, deterministicconcurrency control is not used in disk-based databases because disk access latency is highlyvariable and 105 ∼ 106× higher than DRAM. However, with in-memory databases, storageis fast and has relatively uniform access latency, and so the gap between the slow and the fasttransactions is significantly reduced, limiting the impact of the predetermined serial ordering.

1.3 Contributions

We identify replication and workload management as potential performance bottlenecks as wetransit from disk-based databases to in-memory databases. In this thesis, we propose the useof deterministic concurrency control to solve these challenges for in-memory databases.

We propose a scalable generic replication scheme that works for any serializable primarydatabase. Our scheme reduces network transfer requirements to only 15-20% of the networkbandwidth compared to traditional log-based replication. Our evaluation using a fast in-memoryprimary database running on 32 cores shows that the backup can scale as well as the primary.Our replay-based replication scheme also helps detect arbitrary race conditions that lead toserializability violations, helping database developers detect low-level concurrency bugs. Wehave found two such bugs in an existing in-memory database.

We are the first to explore tolerating skewed and contended workloads under deterministicconcurrency control. We argue that data sharing is the key to enabling load balance, anddeterminism provides an effective method for handling contention that is introduced by datasharing. We propose optimizations that leverage determinism to batch and parallelize databaseinternal operations.

We implement a shared-memory deterministic database that uses multi-versioning anda novel versioning structure to speed up version lookup during deterministic execution. Ourevaluation shows that our optimizations improve performance under highly skewed or contendedworkloads from 2× ∼ 10× compared to our baseline deterministic database. Our evaluationalso shows that these optimizations provide more effective workload management compared tostate-of-the-art, non-deterministic in-memory databases.

Page 15: Replication and Workload Management for In-Memory OLTP

Chapter 1. Introduction 6

1.4 Thesis Organization

The thesis is organized as follows. In Chapter 2, we describe basic concepts in databases,then background on in-memory database architectures, including the programming interfacefor these databases. We also introduce the core concepts underlying deterministic concurrencycontrol.

Chapter 3 describes our solution to geographical replication of in-memory databases. Weshow how our replication protocol saves network traffic, while ensuring that the backup is ableto keep up with the primary database. Chapter 4 describes our work towards managing skewand contended workloads in a deterministic in-memory database.

In Chapter 5, we describe relevant work in the area of deterministic databases, replicationand workload management. Some of these systems are used as baselines in our evaluations.Finally, Chapter 6 concludes the thesis and describes avenues for future research directions.

Page 16: Replication and Workload Management for In-Memory OLTP

Chapter 2

Background

In this chapter, Section 2.1 first provide background on database transactions. Next, Section 2.2provides background on in-memory databases. Finally, Section 2.3 provides background ondeterministic concurrency control.

2.1 Transactions

OLTP databases allow grouping query operations together to form a transaction. Transactionsare the basic unit of execution in a database, and the database provides several data integrityguarantees to transactions, such as atomicity, isolation and durability.1 Next, we discuss theseproperties.

2.1.1 Atomicity

A database ensures that query operations inside a transaction are atomic: either they succeedall together, or they all fail. This property is maintained even if the database fails and restarts.Since failures can happen at any time, databases usually implement atomicity with logging.When a transaction commits, the database logs all the modifications to a log on stable storage,so that after a power failure, the database can replay the log to recover the data. If a transactionlogs only some of its modifications before the power failure, then the database can ignore itslog, thus ensuring failure atomicity. There are many ways to design the logging format andthe recovery algorithm. Among them, the ARIES algorithm [49] is a classic logging algorithmdesigned for disk-based databases.

1Consistency is often mentioned along with these properties, but it is often considered an application-levelproperty.

7

Page 17: Replication and Workload Management for In-Memory OLTP

Chapter 2. Background 8

Atomicity does not guarantee database availability because it focuses on recovery afterthe database fails, instead of preventing the failure of the database. Modern infrastructurerequires databases to be highly available. Even if the database recovers from power failures,the downtime caused by failures is often unacceptable. For high availability, databases usereplication so that when the primary replica fails, the backup replica can take over withoutcausing any downtime.

For high availability, data modifications need to be replicated in realtime to other serversover the network. To tolerate regional or data-center level failures, replicas are often deployedin different geographical locations, such as at a different zone or data-center. This is commonlyreferred to as geographical replication, and data is replicated over the wide-area network in thissetting.

Logging can be extended to implement replication. Instead of logging to stable storage, theprimary database sends the log over the network. At the same time, the replica on the receivingside replays the log to update and synchronize the backup database with the primary databasein real time.

2.1.2 Isolation

Isolation in databases describes how a transaction outcome affects other concurrently executingtransactions. Thus isolation defines the notion of data integrity under concurrency. Differentisolation mechanisms achieve different levels of integrity guarantees, which are referred toas isolation levels. One common isolation level is serializability or serializable isolation.Serializability guarantees that the final data outcome of concurrently executing transactions isthe same as the outcome of transactions executing serially in some order.

Serializability is an essential requirement for deterministic concurrency control schemesbecause they require a serial order before transaction execution. Thus we require databases tobe serializable for our proposed solutions. For replication, we rely on the primary database toresolve and inform the backup database about the serial order of all transactions. For workloadmanagement, we use the predetermined serial order to avoid coordination.

In disk-based databases, serializability is usually implemented by using locks to con-trol the visibility of data modifications. For these databases, the cost of acquiring locksisn’t a performance bottleneck because the disk bottleneck significantly outweighs the lock-ing costs. However, with in-memory databases, the much faster storage exposes lockingcosts. Thus, in-memory databases use more advanced concurrency control mechanisms forproviding serializable isolation, while being able to process millions of transactions per-second [72, 58, 18, 79, 35, 34, 40, 31].

Page 18: Replication and Workload Management for In-Memory OLTP

Chapter 2. Background 9

Optimistic concurrency control (OCC) is a popular choice among in-memory databases [79,31, 35, 40]. Instead of acquiring locks when accessing data, OCC accesses data without anycoordination. Before commit, the transaction validates all the data it has read or written, and ifno such data has been modified since the transaction first accessed it, the transaction commits.Otherwise, OCC determines that the transaction has a conflict and aborts the transaction, leavingno modifications to the database.

Some in-memory databases [20, 6, 34, 89] improve upon OCC by letting more transactionscommit even if they are determined to be conflicting by OCC. Doing so while maintainingserializability is a challenging task. Under OCC, if the validation succeeds, the transactioncan commit in a serial order that is equivalent to physical time. When the validation fails, thetransaction cannot commit using the physical time. In this case, these databases will try tocommit the transaction with a serial order that is older than the current physical time.

To do so, they target a specific type of conflict, often known as write-skew [20, 6]. Duringvalidation, if OCC determines a data read is overwritten by another transaction, then we callthis conflict a write-skew. Instead of aborting the transaction like OCC, these approaches willtry to commit the transaction before the overwriter transaction in serial order, which is olderthan the physical time. To maintain serializability, the serial order of this transaction must alsobe after all other transactions that this transactions overwrites. Only when such a serial orderis feasible, these database will commit the transaction. These approaches require the databaseto track write-skew conflicts efficiently.

Aside from serializability, many weaker isolation levels, such as snapshot isolation and readcommitted are provided by many commercial systems [47, 54]. These weaker isolation levelsallow certain race conditions in the database and sacrifice data integrity for performance oravailability [75, 9]. These weaker isolation levels are specified based on the behaviour of theirconcurrency control algorithms, so comparing them involves classifying what race conditionsare allowed among these algorithms, which is a difficult task. Bernstein[4] et al. propose 8types of anomalies (data races) to compare 6 isolation levels. In this section, we briefly compareread committed, snapshot isolation, and serializability using a simple example.

Read committed provides atomicity at the row level. When a transaction accesses a row, noother transaction can modify that row until the access finishes. Based on this, it also guaranteesany data access must be committed by other transactions. Accessing data that is generated byan uncommitted transaction is not allowed in this isolation level.

Snapshot isolation provides all guarantees of read committed, and it has stricter rules onwhat data transactions can access. The transaction can only access the data, if it is generated

Page 19: Replication and Workload Management for In-Memory OLTP

Chapter 2. Background 10

by transactions committed before this transaction starts.

ABE

000

Value T1: A=B+10;E=1

T2: B=A+10+E

ABE

???

Value

core 2

core 1

Figure 2.1: T1 and T2 on Two Cores

Final OutcomeSerializable A = 10, B = 21 or A = 20, B = 10

Snapshot Isolation A = 10, B = 21 or A = 20, B = 10 or A = 10, B = 10

Read Committed A = 10, B = 21 or A = 20, B = 10 or A = 10, B = 10 or A = 10,B = 11

Table 2.1: Final Outcome of A and B Under Different Isolations

Consider the following example in Figure 2.1. There are three rows, A, B, and E, with initialvalues 0 in the database. Transaction T1 reads B and writes A and then writes E. TransactionT2 first reads A and then E and finally writes B. Suppose, T1 and T2 start on different cores,with each core executing at arbitrarily speed, and then the two transactions commit. Table 2.1lists the possible outcomes of A and B under different isolation levels.

Under serializability, the database will detect conflicts when accessing A and B, and it willabort or delay either T1 or T2 to achieve a serial schedule. The final outcome of the database iseither A = 10, B = 21 or A = 20, B = 10.

Under snapshot isolation, while it is possible to achieve a serializable schedule, there is nosuch guarantee. For example, if T1 and T2 both start before the other commits, then they willonly read the initial value of A and B. With snapshot isolation, there is no data conflict detectedas both transactions are writing to two different rows. Therefore, the final outcome may also beA = 10, B = 10.

Read committed is even weaker than snapshot isolation because it does not create a snapshotview for transactions. So it is possible for T2 to read A, and then T1 to start and commit, afterwhich T2 can read the updated value of E. Thus the final outcome may also be A = 10,B = 11.

Weaker isolation levels raise serious data integrity risks. Application developers oftenfail to understand the implications of the weaker guarantees from these isolation levels, leavingapplications vulnerable to bugs such as race conditions and attacks. For example, the ACIDRainattack [84] shows that an attacker can carefully construct requests and exploit inconsistentapplication state, an attack that has caused serious consequences in the wild. Such an attackis easily eliminated by providing serializable isolation. This thesis focuses on serializableisolation because it is most commonly used in in-memory databases.

Page 20: Replication and Workload Management for In-Memory OLTP

Chapter 2. Background 11

2.1.3 Durability

Durability is a property that requires that all data modified by a transaction be saved to stablestorage before the transaction response is sent back to the client. This property overlaps withatomicity as both of them have to save data modifications to stable storage. Atomicity defineshow to save data and durability defines when to save data.

Databases often delay transactions to improve durability performance. After a transactioncommits, its data is not saved to stable storage immediately. Instead, the database will grouptransactions and save their data to storage in a batch to improve disk IO utilization. Thistechnique is called group commit [28, 70] and is commonly used in many disk-based databases.

Similarly, many in-memory databases batch transactions into epochs for group commit [79,34, 77]. To guarantee durability, the database does not respond to the client until the epoch ispersisted to storage. Similar to group commit, this adds processing latencies for transactions,but it ensures that storage performance has no impact on the throughput of the database.

Our solutions to both replication and workload management depend on an epoch-basedmechanism. For replication, the primary database replicates at the epoch granularity so thatdata replication provides the same guarantees as durability. For our workload management, weuse the epoch as a batching mechanism to perform deterministic execution as well as reorder andparallelize internal operations. Epoch-based processing also provides a convenient mechanismfor garbage collection and memory management.

2.2 In-Memory Databases

In this section, we first describe the types of database architectures used for in-memory data-bases. Next, we describe the programming model for achieving high performance in in-memorydatabases. Finally, we explain the notion of opacity, which simplifies programming in-memorydatabases.

2.2.1 Shared-Nothing vs Shared-Memory Architecture

In-memory OLTP databases are designed for modern hardware with many cores and processors.They are scaled using two types of database architectures: shared-nothing (partitioned) andshared-memory (non-partitioned).

In the shared-nothing architecture [72, 58, 18], data is divided into multiple partitionsand each partition can only be accessed by one core. The advantage of the shared-nothingarchitecture is that concurrent data accesses do not need to be synchronized. This design workswell for some types of applications when the data and the workload can be easily partitioned.

Page 21: Replication and Workload Management for In-Memory OLTP

Chapter 2. Background 12

However, its performance depends heavily on the partitioning scheme [73, 16]. If the dataschema is difficult to partition or workload changes cause a load imbalance across partitions,then the performance degrades significantly. For example, for some skewed workloads, it isdifficult to partition the data, which leads to significant imbalance in CPU load across cores.Similarly, if the workload changes from a uniform access pattern to a skewed access pattern,application developers may need to re-partition the database for improving performance.

In contrast, the shared-memory database architecture allows running transactions on arbi-trary cores and synchronizes data accesses using the concurrency control protocol. This archi-tecture can easily load balance skewed workloads across cores, e.g., by scheduling transactionsin round-robin order, without the involvement of developers. Thus commercial in-memorydatabases, such as Microsoft Hekaton [10] and SAP HANA [19], primarily use this architec-ture. However, this architecture can suffer from performance degradation under contendedworkloads [2, 40, 31]. Thus, the database must perform contention management for handlingsuch workloads. Contention management in in-memory databases usually involves reorderingtransactions after they abort, which essentially involves searching for another non-contendedserial order when the current serial order incurs contention.

2.2.2 Programming Model

Currently, the interactive client-server model is the most widely used database programmingmodel. In this model, the application executes on the database client and connects to thedatabase server over the network. As the application executes, it sends queries over the networkand waits for the query results. Eventually, the application sends a “commit” command tocommit the transaction.

This programming model is easy to learn because the application simply send IO requeststo the database using interactive queries. Moreover, if the application needs to perform adistributed transaction, it only needs to send these IO requests to multiple database servers.

In this model, the database server waits for the application until the application sends the“commit” command. Since the transaction code executes on the client, while database queriesexecute on the server, network communication between the client and server can drasticallyincrease the length of the transaction, which increases the potential for conflicting data accessesand thus aborts.

An alternative programming model is the stored procedure model in which transaction codeis deployed in the database server. Applications need to split their code into the Data AccessLayer and the View Layer. The Data Access Layer only accesses the data while the ViewLayer code only renders the UI. Code in the Data Access Layer is deployed in the database as

Page 22: Replication and Workload Management for In-Memory OLTP

Chapter 2. Background 13

transactions, and the View Layer code runs on the client and it needs to invoke transactionsthrough remote procedure calls over the network. Instead of providing interactive queries,the database provides data access functions that the Data Access Layer calls directly. Withstored procedures, transactions can be executed in “one-shot” style [79, 31], i.e., transactionparameters are provided to the server to start running the transaction, and transactions run tocompletion without communicating with the database client.

The stored procedure programming model can also support distributed transactions. In thiscase, the database needs to provide an interface for transactions to access and process remotedata. There are several ways to provide this interface. For example, FaRM [13, 14, 68] leverageshigh performance network devices to enable transactions to access remote memory directly.FaSST [33] provides Remote Procedure Call primitives for transactions to process remote databy invoking code that runs on another machine.

Although this programming model is more intrusive for application developers, it providesbetter performance and is supported by many traditional databases [48, 55] as well as many in-memory databases [10, 79, 34, 40]. To further speed up transaction processing, some databases,such as Calvin [77], FaunaDB [23] and Redis [63], also require that the stored procedure specifyall the primary keys of rows that the procedure will access before the procedure begins execution.

While the interactive programming model is currently more popular, its overheads for in-memory databases are much higher than for disk-based databases [46]. Thus, most in-memorydatabases advocate using the one-shot programming model for better performance.

2.2.3 Opacity

Serializable isolation defines the integrity of data after transactions commit, but it does notdefine the consistency of data that a transaction reads during execution. Opacity guarantees thattransactions always read a consistent data snapshot relative to the isolation level, immaterial ofwhether transactions commit or abort.

In this thesis, we aim to provide data replication and workload management for in-memorydatabase without sacrificing opacity. Our use of deterministic concurrency control makes itrelatively easy to provide opacity.

Unfortunately, the importance of opacity is often overlooked. Many databases [79, 40, 31]provide serializability, but sacrifice opacity and allow transactions to view inconsistent data forbetter performance. In a sense, these designs disable the isolation mechanism temporarily untilcommit time. At commit, the isolation mechanism is enabled, so that it can validate and abortconflicting transactions. However, the lack of opacity can cause application-level constraintsto be violated, leading to crashes or memory corruption. With carefully constructed requests,

Page 23: Replication and Workload Management for In-Memory OLTP

Chapter 2. Background 14

one can exploit the application, so that it executes arbitrary code and persists bogus data atthe operating system layer or in the application runtime. If the application is embedded in thedatabase and needs to abort due to serializability, then it is possible that the rollback routine ofthe database will not be invoked due to an arbitrary code execution attack. Even if the databaseaborts the transaction eventually, damage may persist beyond the control of the database, forexample in OS-level files or the common libc runtime.

This problem is first described by Dice et al. [11] in the context of transactional memory.These issues raise serious concerns in deployed systems. For example, FaRM [14] has recentlybeen extended with opacity [68].TransferOwnership(old_owner , new_owner):assert(old_owner.level == 3)old_owner.level = new_owner.levelnew_owner.level = 3

Listing 2.1: TransferOwnership Transaction

ShowDashBoard(owner, participants):assert(owner.level == 3)

int nr_rev_by_level[3];for (user: participants):# user.level cannot be 3 (owner)# There is only one owner.nr_rev_by_level[user.level] += user.nr_rev

...

Listing 2.2: ShowDashBoard Transaction

Take an online document collaboration application as an example, where each documentcan be owned by only one owner. Each participant in the document has a privilege level,represented by an integer: 3 means owner; 2 means editor; 1 means commentator; 0 meansread-only. In the database, the ShowDashBoard transaction and the TransferOwnershiptransaction are programmed as stored procedures, as shown in Listing 2.1 and Listing 2.2. TheShowDashBoard transaction shows the number of revisions contributed by participants otherthan the owner, aggregated by their privilege levels. Since there is only one owner for eachdocument, no participant should have a privilege level of 3. Thus the ShowDashBoard codeuses an array of size 3 for nr_rev_by_level.

Both transactions are correct code because they execute safely when running serially. How-ever, suppose a TransformOwnership transaction commits after a ShowDashBoard transac-tion has started. Without opacity, a participant may now have a privilege level of 3. Thisviolates the programmer’s assumption because the data it reads is inconsistent. As a conse-quence, nr_rev_by_level array will overflow. However, notice that nr_rev_by_level is

Page 24: Replication and Workload Management for In-Memory OLTP

Chapter 2. Background 15

the first local variable in this stack frame and so the overflow address stores the return addressof this function. Before the database aborts this transaction, the transaction will jump to auser-specified location without invoking the database rollback routine.

A common misperception is that the cause of this problem is that the programming languageis not memory-safe, and by using a memory-safe language like Javascript or PL/SQL (used byMongoDB and Oracle) to write stored procedures, we can eliminate the memory corruptionissue introduced by the lack of opacity. However, a memory-safe language does not eliminatethe issue for two reasons. First, the compiler and the runtime for a memory-safe language isa large attack surface and often suffers from vulnerabilities that can lead to the same memorycorruption issue. Table 2.2 is a non-exhaustive list of vulnerabilities in common memory-safelanguages used by various databases. These vulnerabilities skip certain bound checks andviolate memory-safety. Coincidentally, CVE-2017-0234 can trigger the exact array overflowshown in the previous example if the ShowDashBoard transaction was written in Javascript.Second, a memory-safe language often has calls to native library code for performance reasons,for example unicode encoding and decoding in CVE-2004-1362. Often these native code havememory corruption bugs as well.

2.3 Deterministic Concurrency Control

In a deterministic database, the serial ordering of transactions is established before transactionsare executed and the outcome of the database is consistent with serial execution in that order.Thus the database ensures that a transaction’s outcome is uniquely determined by the database’sinitial state and an ordered set of previously known transactions. The simplest implementationof a deterministic database involves executing all transactions serially in the predefined order ina single thread. This is clearly correct but unable to utilize the parallelism available on modernmulti-core systems.

To enable concurrent transaction execution under deterministic concurrency control, onesolution is to employ the shared-nothing architecture. Data is partitioned and each partition isassigned exclusively to a single core.2 Before execution, the serial order of each transaction isassigned, and then the transaction is split into one or more pieces so that each piece only accessesa single partition. The pieces of a cross-partition transaction can run in parallel when there isno dataflow dependency between them. However, to satisfy dataflow dependencies, a piece canonly start executing when its dependent pieces finish execution on other cores. During execution,each core executes pieces in an order that is equivalent to the predetermined serial order. Thisapproach is simple to implement, but performance is highly workload-dependent. For example,

2Note that the partitioning scheme should ensure that a scan operation accesses a single partition.

Page 25: Replication and Workload Management for In-Memory OLTP

Chapter 2. Background 16

Table 2.2: Example CVEs of PL/SQL and Javascript Runtimes

ID Component DetailCVE-2014-6514

Oracle PL/SQL

Undisclosed: affect confidentiality, integrity, and avail-ability via unknown vectors.CVE-2009-2001

CVE-2006-0435 Undisclosed: allows attackers to bypass the PLSQLEx-clusion list and access excluded packages and procedures.

CVE-2004-1362 Does not perform character conversions properly. Maybypass access restrictions for certain procedures.

CVE-2003-0634Stack-based buffer overflow in the PL/SQL EXTPROCfunctionality. Execute arbitrary code via a long libraryname.

CVE-2002-0695Microsoft T-SQL

Buffer overflow in the Transact-SQL (T-SQL) Open-RowSet component, allows remote attackers to executearbitrary code via a query that calls the OpenRowSet com-mand.

CVE-2014-4061 Improper control use of stack memory. May cause dae-mon hang.

CVE-2019-1068handles processing of internal functions, an authenticatedattacker would need to submit a specially crafted query toan affected SQL server.

CVE-2019-8518 Webkit Javascript Core Multiple memory corruption issues. May lead to arbitrarycode execution.

CVE-2017-0234 Microsoft JavascriptEngine

The vulnerability could corrupt memory in such a way thatan attacker could execute arbitrary code in the context ofthe current user.

CVE-2018-5188

SpiderMonkey

Memory safety bugs. Some of these bugs showed evi-dence of memory corruption and we presume that withenough effort that some of these could be exploited to runarbitrary code.

CVE-2018-5151

CVE-2018-12375

CVE-2016-1953 possibly execute arbitrary code via vectors related tojs/src/jit/arm/Assembler-arm.cpp

this approach can cause load imbalance and intra-transaction data-flow dependencies that crosspartitions can cause significant coordination delays and poor performance.

In a shared-memory architecture, we need to ensure that concurrently running transactionsread and write values that are identical to the values read by these transactions executing serially.To do so, these schemes often batch transactions into epochs and require transactions to declarethe read-sets and write-sets before execution, i.e., transactions need to declare the rows theywill access or modify before they run.

Before executing the transactions in an epoch, the database initializes all the transactions.This involves converting the global serial order of transactions into row-level access order. Forexample, suppose T1 and T2 have global serial order 1 and 2, and both update row r. Thenduring initialization, the database needs to convert the global order on row r so that T2 reads

Page 26: Replication and Workload Management for In-Memory OLTP

Chapter 2. Background 17

r after T1 updates r. There are many ways to represent this row-level order, which we willdescribe in detail in related work. During concurrent execution, instead of enforcing the globalserial order, the database controls the access order for each row. In this way, if two transactionsaccess a disjoint set of rows, they can execute in parallel in either order. Otherwise, their accessto common rows is ordered according to the serial order.

Page 27: Replication and Workload Management for In-Memory OLTP

Chapter 3

Replay Based Primary-BackupReplication

Databases are often a critical part of modern computing infrastructures and hence many real-world database deployments use backup and failover mechanisms to guard against catastrophicfailures. For example, many disk-based databases use log shipping to improve database avail-ability [61, 45, 32]. In this scheme, transactions run on a primary server and after they commiton the primary, the database recovery log is transferred asynchronously and replayed on thebackup. If the primary fails, incoming requests can be redirected to the backup.

This replication scheme raises several challenges for in-memory databases. These databasessupport high transaction rates, in the millions of transactions per second, for online transactionprocessing workloads [72, 38, 79]. These fast databases can generate 50 GB of data per minuteon modern hardware [90]. Logging at this rate requires expensive, high-bandwidth storage [79]and leads to significant CPU overheads [42]. For replication, the log transfer requires a 10 Gb/slink for a single database. Failover and disaster recovery in enterprise environments, wherethe backup is located across buildings (possibly separated geographically), is thus an expensiveproposition. These network links are expensive to operate or lease, and upgrades require majorinvestment. A second challenge is that the backup ensures consistency with the primary byreplaying the database log in serial order. This replay is hard to perform concurrently, and sothe backup performance may not scale with the primary performance [30].

These challenges can lead to the network or the backup becoming a bottleneck for theprimary, which is otherwise scalable. Two trends worsen these issues: 1. the availability ofincreasing numbers of cores, and 2. novel database designs [53, 34], both of which improvedatabase performance even further.

While there is much recent work on optimizing logging and scalable recovery [42, 90, 88,85], replication for fast databases requires a different set of tradeoffs, for two reasons. First,

18

Page 28: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 19

when logging for recovery, if the storage throughput becomes a bottleneck, storage performancecan be easily upgraded. For instance, a 1 TB Intel SSD 750 Series PCIe card costing lessthan $1000 can provide 1 GB/s sequential write performance, which can sustain the loggingrequirements described above. Much cheaper SSDs can also provide similar performance inRAID configurations. In comparison, when logging for replication, if a network link becomesa bottleneck, especially high-speed leased lines, an upgrade typically has prohibitive costs andmay not even be available.

Second, unlike recovery, which is performed offline after a crash, a backup needs to beable to catch up with the primary, or it directly impacts primary performance [3]. Databasesperform frequent checkpointing, so the amount of data to recover is bounded. If the recoverymechanism doesn’t scale with the primary, the consequence is a little more time for recovery.However, for replication, if the backup cannot sustain the primary throughput, then it will fallincreasingly far behind and may not be able to catch up later.

Our goal is to perform database replication with minimal performance impact on theprimary database. We aim to 1. reduce the logging traffic, and 2. perform replay on the backupefficiently so that the backup scales with the primary. To reduce network traffic, we proposeusing deterministic concurrency control for replicating databases. Deterministic concurrencycontrol guarantees that the execution outcome is consistent with a predetermined serial order.For replication, we can have the primary send the transaction inputs and the serial order tothe backup, which can then replay the transactions deterministically. This approach reducesnetwork traffic significantly because, as we show later, transaction inputs for OLTP workloadsare much smaller than their output values.

On the primary database, we record and send the transaction write-set so that the backupcan determine the records that were written by the transaction. The primary database canemploy any concurrency control mechanism as long as it can track the global serial order.On the backup, we use multi-versioning deterministic concurrency control: writers executeconcurrently and safely create new versions while readers can access old versions. Our replayuses epoch-based processing, which allows both readers and writers to efficiently determine thecorrect versions to access. Together, these techniques allow highly concurrent and deterministicreplay.

Our main contribution is a generic and scalable replay-based database replication mecha-nism. By decoupling our replication scheme from the primary database, we enable supportingdifferent primary database designs. Our approach allows the primary to use any concurrencycontrol scheme that supports total ordering of transactions, and imposes no restrictions on theprogramming model. In addition, the backup is designed so that it makes no assumptions aboutthe workloads, data partitioning, or the load balancing mechanism on the primary. For example,

Page 29: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 20

to support different kinds of applications fast databases often make various design trade-offs,such as data partitioning [72, 17] and a special programming model [18]. Our approach isdesigned to scale without relying on any specific primary database optimizations, and withoutrequiring any developer effort for tuning the backup for these optimizations.

We have implemented replication for ERMIA [34], a fast in-memory database designed tosupport heterogeneous workloads. ERMIA supports serializable isolation and opacity. Ourbackup database is specifically designed and optimized for replaying transactions concurrently.Our experiments with TPC-C workloads show that our approach requires 15-20% of the networkbandwidth required by traditional logging. The backup scales well, replaying transactions as fastas the primary, and the primary performance is comparable to its performance with traditionallogging. An added reliability benefit of our generic replication mechanism is that it can be usedto validate the concurrency control scheme on the primary. We found and helped fix severalserious bugs in the ERMIA implementation that could lead to non-serializable schedules.

3.1 Motivating Incidents

On Jan 31, 2017, an incident [25] at gitlab.com, a popular source code hosting site, illustratesthe importance of fast and scalable database replication. Early that day, a spam robot createdlarge numbers of read-write transactions on the production database. Due to limited networkbandwidth, the backup database lagged far behind the primary and eventually stopped working.The maintenance crew decided to wipe the backup database and set up a new backup instancefrom scratch. However, while setting up the backup instance, a team member logged into thewrong server and accidentally deleted the production database. The gitlab.com site had toshut down for more than 24 hours to recover from an offline backup and lost 6 hours of data.

Another post by Uber [36] describes the same problem. Log shipping incurs significantnetwork traffic because it sends physical data, and this is expensive on the cloud because backupmachines are usually situated across buildings or data centers. Uber’s current solution to theseproblems is to use MySQL’s statement level replication [51]. This approach saves some networktraffic because the logging granularity is row level. However, this approach can still generateclose to 10Gb/s for a fast database, and the re-execution needs to be performed on a singlethread.

3.2 Multi-Version Deterministic Concurrency Control

In this section, we describe our replay-based approach for replicating fast databases. Our aimis to replay transactions on the backup concurrently using deterministic concurrency, while

Page 30: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 21

Input SizeInput Size

+ Write-SetOutput Size(Row Level)

Output Size(Cell Level)

NewOrdera 200 bytes 340 bytes 593 bytes 495 bytesDelivery 12 bytes 732 bytes 5320 bytesb 2620 bytesPayment 32 bytes 80 bytes 346 bytes 103 bytes

aWe assume that the customer orders 10 items on average.bThis output includes the c_data column in the Customer Table, which is a VARCHAR(500) type. We estimate

its length to be around 30.

Table 3.1: Input Size versus Output Size in TPC-C

requiring low network bandwidth. We begin by estimating the network bandwidth that can besaved by our record-replay method. Next we describe our transaction model, followed by anoverview of our approach. Finally we provide more details about the design of the primary andthe backup databases.

3.2.1 Observation

Our design is motivated by the observation that for many OLTP workloads, the transactioninput size is smaller than the output size. As a result, replicating transaction inputs to replay thetransactions will reduce network traffic compared to replicating the transaction outputs. Thisapproach is often called Command logging in partitioned databases [42].

To estimate the likely benefits of our approach, we consider the TPC-C benchmark, designedto be representative of common OLTP applications. TPC-C uses several stored procedures tosimulate an online-shopping workload. It contains three read-write transactions. Table 3.1shows the input and output size of these transactions. For example, in the TPC-C NewOrdertransaction, the input parameters are the customer and the products the customer would like topurchase, and the outputs are the rows updated by the NewOrder transaction.

The input size is the total size of the parameters of the stored procedures. In Column2, we estimate the total size of the parameters and the write-set keys, which are required inour approach. Next, we estimate the output size based on commonly used row-level logging(Column 3) as well as cell-level logging (Column 4).

We observe that the input parameter and write-set key sizes for OLTP transactions issignificantly smaller than the output size. This suggests that our replay-based approach cansave network traffic by replicating the transaction inputs and re-executing transactions on thebackup. As a side effect of reducing network traffic, our approach can help reduce the recentmodifications that may be lost after a failure.

Page 31: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 22

3.2.2 Transaction Model

We assume the stored procedure programming model for the in-memory primary database.As described in Section 2.2.2, this programming model does not require any client interactionwhile the transaction is executing, and thus is commonly used among in-memory databases.

For replaying transactions deterministically, we need to handle two sources of non-determinismin transactions. The first is non-deterministic database functions such as NOW() and RAND().We capture the return values of these function calls on the primary and pass them as inputparameters to the transaction during replay. While our current system does not perform thisrecord-replay automatically, the functionality can be implemented at the SQL layer or the Clibrary layer [26], without requiring any changes to the transaction processing code. Some ill-defined SQL queries can also return non-deterministic results, for exmaple “SELECT * FROMTable T1 LIMIT 10” would return 10 arbitrary rows from table T1. Unlike OLAP work-loads, OLTP applications avoid these types of queries, as their behavior is not clearly defined.However, to support them, the developer could add “ORDER BY” clauses to ensure determinism,or we can revert to row-level logging for such queries.

The second source of non-determinism arises due to concurrently reading and writing datato the database, which presents challenges for concurrent recording and replay, similar to thechallenges with replaying other concurrent programs [26] and systems [15]. This is the type ofnon-determinism that is resolved by deterministic concurrency control.

We assume that data integrity is maintained on the primary using serializable isolation, sothat the outcome of the concurrent execution is the same as the transactions executing in someserial order. Serializability is commonly supported by fast, in-memory databases [10, 79, 34,87]. Although opacity is important for the primary database (See Section 2.2.3), our schemedo not require the primary database to provide opacity.

3.2.3 Overview of Approach

Serializability regulates non-determinism, making it simple to replay transactions correctly.For example, command logging can be used to replay transactions deterministically in a serialorder. This serial replay scheme requires minimal network traffic because the primary onlyneeds to send the input parameters and the serial order of the transactions. However, replayingtransactions in serial order will not scale, making the backup a bottleneck.

An alternative replay scheme is to send the read-set for each transaction (the data for allrecords read by the transaction). With this data, transactions can be replayed concurrentlybecause read values are available and reads do not need to be synchronized with writes. Asimilar approach is used to replay concurrent programs in multi-core systems [15]. However,

Page 32: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 23

OLTP transactions have a high read-write ratio and issue large scans, and so read-set basedreplay will generate significant network traffic.

Our approach lies in between, generating logs that are similar in size to command logging,while allowing concurrent and scalable replay similar to read-set based replay. Besides theinput needed for serial replay, we also send the write-set of each transaction to the backup. Thewrite-set contains the keys of all the records written by the transaction, but not the row data.The write-set, together with the serial order, allows transactions to infer the correct data to readduring concurrent replay. This approach has lower logging requirements than value loggingbecause keys are generally smaller than row data.

On the backup, we use our multi-versioning deterministic concurrency control to replaytransactions. We use multi-versioning for two reasons. First, the different versions provide arow-level ordering constraint, allowing us to replay transactions deterministically on the backup.Second, multi-versioning provides better scalability because transactions can read and writedifferent versions of rows concurrently. In particular, writers can create new row versions safelywhile readers are accessing old versions.

As deterministic concurrency requires row-level ordering constraints, we need a mechanismto identify the version to read while allowing concurrent replay. If we know about all the possibleversions of a record before starting execution, we can infer the correct version to read based onthe serial order of transactions. Based on this insight, we replay transactions in epochs, so thatall versions of a row that will be created in an epoch are known in advance. This allows thereads of a transaction to select the correct version, without waiting for all previous transactionsto complete in an epoch. The result is that replay can proceed concurrently, while requiringsynchronization for true read-after-write dependencies only.

As mentioned in Section 2.1.3, grouping transactions for replication does not affect thethroughput of the database. However, it affects the latency of transactions. Transactions mustbe replicated to the backup before results are returned to the client, so the latency of a transactionis the sum of the duration of an epoch and the network latency to the backup database.

3.2.4 Recording on the Primary

On the primary, we record the inputs and the write-set of transactions, which is relativelysimple. In addition, we need to determine the serial order of transactions and batch transactionsin epochs. Next, we describe these two operations in more detail.

Page 33: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 24

T1 Write A (A1) CommitT2 Read A (A0) Write C Commit

Figure 3.1: An Example of Commit Back In Time

Determining the Serial Order

Our approach allows the primary database to use any concurrency control mechanism thatensures serializability. The primary may be a single or multi-versioned store, and it mayor may not partition data. We assign a serial id to each transaction, representing its serialorder, and send this serial id with the rest of the transaction information to the backup. Theserial id is closely tied to the concurrency control scheme implemented by the database. Forexample, the traditional methods for implementing serializability are two-phase locking andoptimistic concurrency control [38, 79]. For both, when the transaction commits, we use aglobal timestamp (often maintained by the database implementation) to assign the serial id ofthe transaction.

While serializable isolation requires the existence of a serial order, in some concurrencycontrol schemes, this order may not be the same as the global timestamp order. For example,multi-version databases often implement more advanced concurrency control schemes such asSerializable Snapshot Isolation (SSI) [20] and Serial Safety Net (SSN) [81]. These schemesallow transactions to commit “back in time”.

Consider the example shown in Figure 3.1. T1 writes a new version of A, while T2 reads Aand writes C concurrently. T2 reads A before T1 commits and so it reads A0, the old version ofA. Then T1 writes to A and commits, creating A1, a new version of A. Finally, T2 writes to Cand commits.

Under two-phase locking, T2 would hold the lock on A and so T1 would not have been ableto proceed with writing A until T2 had committed. This schedule would not be valid underoptimistic concurrency control either, because T2’s read-set has been overwritten, and so T2

would be aborted.However in multi-version protocols like SSI and SSN, T1 and T2 can commit successfully.

In fact, this concurrent interleaving is serializable, and the serial order is T2, T1. In this case,we say that T2 commits back in time, before T1. In this case, obtaining the serial id is moreinvolved because the commit order does not indicate the serial order.

Batching Transactions in Epochs

As mentioned in Section 3.2.3, we need to batch transactions into epochs so that the backup canreplay the transactions within an epoch concurrently. An epoch must satisfy the constraint that

Page 34: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 25

any data read in the epoch must have been written before the end of the epoch. In other words,transactions should never read data from future epochs, or else epoch-based replay could notbe performed correctly.

The simplest method for implementing epochs is to batch transactions in actual commitorder. Since transactions are serializable, this order satisfies our epoch constraint. Epochsbased on commit order work correctly even when transactions can commit back in time in ourmulti-versioned backup database. A transaction in a later epoch may need to read an olderversion of a data item, but a transaction never needs to read from a future epoch. Consider theexample in Figure 3.1 again. Suppose T1 is batched in an epoch and T2 is batched in the nextepoch. T1 will create version A1 during replay, but when T2 is replayed in the next epoch, it willstill read the previous version, A0, because its serial order is before T1. The benefit of using thecommit order for defining epoch boundaries is that we do not need to wait for transactions tocommit back in time. Note that a single-versioned backup design, e.g., based on Calvin [77],cannot easily support such back in time commits across epochs.

It may appear that epochs are expensive to implement but they can be defined more flexiblythan just using the commit order. In practice, databases often use epochs for various purposessuch as recovery or resource management. We can reuse the same epochs for batching trans-actions. For example, ERMIA supports snapshot isolation and uses epochs based on the starttimes of transactions, which allows more efficient garbage collection of unused versions [34].Our implementation for ERMIA uses the same epochs because snapshot isolation guaranteesthat reads see the last committed values at the time the transaction started, hence satisfying ourepoch constraint.

3.2.5 Deterministic Execution on the Backup

We designed our backup database from scratch because there are several differences betweenreplay and normal database execution. First, non-deterministic functions need to be handledby replaying the return values of these functions recorded at the primary. Second, we requirean initialization phase at each epoch so that transactions in the epoch can be replayed correctly.Third, we replay committed transactions only, and thus require a simple synchronization schemeand no machinery for handling aborts or deadlocks.

To simplify the design of a failover scheme, we implemented the backup database with astructure similar to a fast, in-memory database, such as Silo [79] and ERMIA [34]. We usea B-Tree index to represent a table in the database, with the keys of the B-Tree representingthe primary keys of the table. Like many multi-version databases, the values in the B-Tree arepointers to indirect objects that maintain the different versions of the rows. In our case, this

Page 35: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 26

object is an array, with each entry containing a version number and a row, for all the versionsthat are created in an epoch. The version number is the serial id of the transaction that createdthis row version. It not only indicates the transaction that created this version but also the serialorder in which the version was created. The array is kept sorted by the version number.

Initialization

We need to first perform an initialization step before replaying an epoch. For each transactionin the epoch, we process each key in its write-set by creating a new empty version. The versionnumber of the row is the transaction’s serial id, and its value is set to an empty value, indicatingthe value has not yet been produced. This empty value will be overwritten by actual row datawhen this transaction is replayed later. New row empty versions are added to the version arrayusing insertion sort to maintain sorted order.

We perform the initialization step concurrently on the backup. Our initialization doesnot rely on partitioning and uses the shared-memory architecture (See Section 2.2.1). There-fore, when adding new empty versions to a row, we need to acquire the row-level lock forsynchronization.

This step scales for two reasons. First, updates are more frequent than inserts in OLTPtransactions. Since the values in the B-Tree leaf nodes point to indirect objects, while insertingnew keys can modify the B-Tree index, updating existing keys does not modify the B-Tree. As aresult, concurrent initialization will generate a read-mostly workload on the B-Tree index, andexisting B-Tree implementations scale well under a read-mostly workload [43]. Second, wesend transaction data from each primary core to a separate backup core to minimize contentionon the sending or receiving path. This data is mostly sorted by serial id. As a result, whennew versions are inserted in the version array, they are mostly appended, and insertion sort isefficient in this case.

Execution

After the initialization step, we execute the transactions in the entire epoch concurrently. Anyread during execution will see all the possible empty versions created during the epoch. Whena transaction with a serial id t reads a key, we simply read from the greatest version that issmaller than t, i.e., the previous version. We use binary search to find this correct version inthe version array. If this version contains an empty row, we need to wait until the actual rowdata is written (by the transaction that creates this version).

Figure 3.2 shows an example of executing a transaction (Transaction 6) at the backup. Thefigure shows that transactions with serial id 4-7 are batched in Epoch 1. Transaction 6 has an

Page 36: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 27

Read A Write B

Figure 3.2: An Example of Executing a Transaction

input parameter x, and it reads key A and writes key B. Transactions 4, 5 and 7 write to key A.On the backup, the index represents a table or an index to the table. We initialize Epoch 1

by creating versions for keys A and B. Each key points to an indirect object containing a sortedarray of row versions. For epoch 1, we initialize versions 4, 5 and 7 for key A, and version 6 forkey B. When Transaction 6 is executed, it reads version 5 of key A (the largest version smallerthan 6). At this point, this version is still an empty row, and so Transaction 6 will wait untilTransaction 5 updates this version. During execution, Transaction 6 will write to version 6 ofkey B.

One corner case is that a transaction can read its own writes. On the first read, the transactioncorrectly reads the previous version. However, after a write, we can no longer read the previousversion. We resolve this issue with a per-transaction write buffer. On a read, the transactionfirst searches the buffer for the updates it has made before searching the database. When thetransaction commits, it releases the writes from the buffer into the database and frees the entirebuffer. Since OLTP transactions are read-dominated, the overhead of using the write buffer issmall.

A delete is equivalent to updating with a null pointer in our multi-versioning scheme. On adelete, instead of allocating a new version, the database writes a null pointer to the version slot.If another transaction happens to read this version, the database will return a null pointer, justlike reading a non-existent row, which is often encountered during a scan operation.

There is no need to handle phantoms on the backup. A scan operation will always find the

Page 37: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 28

correct set of keys because all inserted keys are initialized in the initialization phase. Similar toother multi-versioning approaches, the scan operation will skip the key if there is no version toread from: either the insertion is serialized after the scan or the key is deleted before the scan.

We do not need to grab any lock or perform any synchronization, other than waitingfor empty row versions. This avoids the significant overheads associated with performingconcurrency control [27]. Since we only replay committed transactions, there are no aborts,and the execution is deadlock free.

In some workloads, some rows are repeatedly updated within an epoch, which creates manyversions for these rows. In a non-deterministic database, the transaction is likely to read thelatest version. However, in a deterministic database, the serial order is predefined, and thus, wemay need to access older versions. In our design, with binary search, we can efficiently find thecorrect version to access.

Garbage Collection

We perform garbage collection of old versions of rows at epoch boundaries, when we know thattransactions are not executing and thus the old versions are not being accessed concurrently.At initialization, we only perform garbage collection for keys that are overwritten in the epoch.For each such key, our garbage collector requires that the previous versions of these keys will beread from only the last epoch in which they were written. Thus it keeps all the versions of thatlast epoch, and reclaims all previous versions from preceding epochs. The garbage collector’srequirement is met by ERMIA because it only creates new epochs when all cores have enteredthe current epoch. For example, it will only create Epoch N when all transactions in EpochN − 2 have committed. As a result, transactions in epoch N will not read a version created inepoch N − 2, if there is an update in epoch N − 1. When we execute epoch N , we need to keepall the versions of epoch N − 1 but can reclaim previous versions.

3.3 Implementation

Our backup database is designed to work with any primary database that provides serializability.To stress our replay server, we chose to use ERMIA [34], a fast in-memory database, as theprimary database. ERMIA performs well under heterogeneous workloads such TPC-E and TPC-C+ in which OLTP transactions run together with some long running transactions. ERMIAalso achieves high throughput on traditional OLTP workloads like TPC-C. Next, we describeour replay implementation for ERMIA.

Page 38: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 29

3.3.1 Recording on the Primary

The TPC-C and TPC-C+ benchmarks implemented in ERMIA are not fully deterministic. Forbenchmarking purposes, these transactions generate random data for the database contents. Inpractice, these random contents would be specified by the end user, as inputs to transactions.We modified the TPC-C and TPC-C+ benchmark in ERMIA to pass the randomly generatedcontent as input to the transactions, making the benchmark logic deterministic.

Obtaining Serial Order

ERMIA implements two serializability-enforcing concurrency control protocols, SSI [20] andSSN [81]. Both use timestamps to disallow certain transaction interleavings that may causeserializability violations, and they allow “back in time” commits for additional concurrency.Our prototype supports obtaining the serial order for both SSI and SSN. Next, we describe howthe SSI protocol works, and then how we derive the transaction serial order from the protocol.

In SSI, similar to OCC, the transaction checks if its read-set has been overwritten beforecommit. If not, it acquires a commit timestamp and assigns it to all the tuples in its write-setand then commits. If its read-set has been overwritten, then a write skew is possible, making itdangerous to commit the transaction [4]. SSI determines whether the transaction can commitwithout violating serializability by looking for dangerous structures, in which two adjacentrw-dependency edges occur between concurrent transactions [20]. ERMIA implements SSI byfirst determining the minimum commit timestamp among all the overwriters of this transaction.

We call this timestamp the skew_timestamp. Next, the transaction checks the previ-ous versions of the tuples in its write-set. For these tuples, we calculate the maximum oftheir commit timestamps and their readers’ commit timestamps1, and we refer to this times-tamp as the predecessor_stamp. If the predecessor_stamp is larger than or equal to theskew_timestamp, the transaction aborts, otherwise it can commit at the skew_timestamp,i.e., commit back in time.

While the SSI concurrency control protocol is complicated, calculating the serial id isrelatively simple. When a transaction commits back in time, we assign its serial id to be2 × skew_timestamp − 1. Otherwise, the serial id is assigned as 2 × commit_id of thetransaction. This assignment ensures that the serial id of a transaction that commits back intime is below the skew timestamp.

SSN assigns two timestamp to each tuple: successor stamp (sstamp) and predecessor stamp(pstamp). Similar to SSI, the transaction’s sstamp keeps track of the minimum of its commit

1Since keeping track of all commit timestamps of readers is expensive, ERMIA only tracks which core hasread the data and the latest commit timestamp on that core, which may cause false positives when aborting atransaction.

Page 39: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 30

stamp and the sstamp of its overwriters, and the transaction’s pstamp keeps track of the maximumpstamp from the transactions of all the overwritten tuples. When the transaction’s sstamp isgreater than its pstamp, then the transaction can commit.

Unlike SSI, SSN allows nested commit back in time. For example, T2 can commit backbefore T1, and T3 can commit back before T2. However, all three transactions will have thesame sstamp, which is T1’s commit stamp. To choose the correct serial id in SSN, we need totrack the nested commit back level. In addition to sstamp, we add another ε attribute to eachtuple. When the transaction commits back in time, ε is inherited from its successor transactionand is incremented by 1. In the above example, all three transactions will have the same sstamp,but T1’s ε = 0, T2’s ε = 1, and T3’s ε = 2. Using this, we can calculate serial id in SSN asMAX_EPS × sstamp − ε. We set MAX_EPS = 32768. We modified ERMIA’s SSN so that if εis larger thanMAX_EPS, the database will abort the transaction. The maximum ε that we haveobserved in our workloads is less than 50, so we believe our MAX_EPS setting will work in mostcases.

Batching Transactions in Epochs

ERMIA uses an epoch manager for tracking various timelines, including for garbage collectionof dead versions, and RCU-style memory management. We reuse these epochs to batchtransactions. In each epoch, transactions are committed on different cores. We create a TCPconnection for each core, and send the transactions committed on that core to a specific core onthe backup database, where those transactions are replayed. This allows the backup to performthe replay initialization on different cores concurrently with minimal contention.

We use one worker thread per core for processing an epoch. Each epoch consists of threestages: fetch, initialization and execution. Each worker performs all these stages sequentiallyand the workers execute concurrently. During fetch, the worker fetches the data from thenetwork socket and parses the packet. During initialization, empty values are inserted fortransactions using their serial id. Last, transactions execute deterministically.

We allow the fetch and the initialization phase of the current epoch, and the execution phaseof the current epoch and the fetch phase of the next epoch to run concurrently. However, theinitialization stage and the execution stage do not overlap in time (within or across epochs). Asa result, during execution, the B-Tree index and the version arrays are read-only, and thus donot need any synchronization. We use a single control thread to synchronize the stages.

To handle primary failures, we need to ensure that a transaction’s outcome is replicated tothe backup before responding to the client application. A strawman approach is to send thetransaction inputs to the backup after commit, wait for the backup to acknowledge the transactionand then respond to the client. However, this approach is incorrect because a transaction on the

Page 40: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 31

primary may depend on another transaction executing on another core and this ordering is notnecessarily preserved when data is sent over the network. When the primary fails, the backupmay only have the transaction but not its dependencies and so replaying on the backup databasewill diverge from the primary.

In our current prototype, for simplicity, transactions have to wait for the entire epoch to besent to the backup. Because epochs are sent in order, our prototype is correct. However, thisincreases the latency of transactions to the sum of network RTT (round trip) and the length ofan epoch.

We can achieve lower transaction latencies by allowing the backup to replay a partial epoch.The primary sends the transaction after commit and the backup acknowledges the transaction.After that, the transaction waits until all connections have received an acknowledgement thatis larger than this transaction’s serial id. Then, the transaction can respond to the client. If theprimary fails, the backup is left with a partial epoch to replay. The backup should replay alltransactions that are smaller than the minimum serial id acknowledged among all connections.

3.3.2 Replaying on the Backup

Our prototype system uses Masstree [43] as the B-Tree index because it scales well for read-mostly workloads. We need to acquire a lock on the indirect object associated with a key whenupdating the key during replay initialization. Since this lock is unlikely to be contended, asexplained in Section 3.2.5, we use a spinlock for its implementation.

During replay, the B-Tree indices and the version arrays are read but their structures are notupdated. The only synchronization is while waiting for empty rows. To make the wait efficient,we replay transactions using co-routines. When a transaction needs to wait, it performs auser-space context switch to the next transaction in the run queue, so threads make progresseven when a transaction needs to wait.

3.3.3 Failover

When the primary database fails, database requests need to be redirected to the backup database,so that it can take over from the primary. Since our backup database is replay-based, and itsinternal multi-version storage structures are different from the primary database, we need tomigrate the database to a new primary.

Our prototype can export the backup database into an ERMIA checkpoint image and startERMIA from the image. This implementation is simple and requires no changes to the primarydatabase. We scan the tables and indices and export them in ERMIA’s checkpoint format. Ourcurrent implementation is single-threaded due to limitations in ERMIA’s checkpoint format,

Page 41: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 32

thus it takes roughly two minutes to export a 32 warehouse TPC-C workload. This time can besignificantly reduced by using concurrent checkpointing methods [90].

A more efficient failover scheme should convert data structures in memory. In our currentprototype, all the row data storage format and indices are compatible with ERMIA, but themulti-version storage structures need conversion. For efficient failover, our replay engine couldmaintain an ERMIA compatible row structure for only the latest version, because after a failover,the primary database only needs to start from this version. Alternatively, our replay enginecould be a plugin for ERMIA, making it aware of our storage structure, allowing it to convertthem lazily.

Our failover functionality can also be used to ease the process of upgrading primary databasesoftware. Upgrading of databases is often a cumbersome process that needs to be performedoffline [62]. With our replay-based replication, we can export the backup to the new storageformat asynchronously, and then minimize downtime by performing a controlled shutdown ofthe primary and replay-based failover to the backup.

3.3.4 Read-Only Transactions

Since our backup database is multi-versioned, it can provide consistent snapshots to read-onlytransactions. The simplest option is to serve snapshots at epoch boundaries by executing thesetransactions in the same way as we replay transactions. To reduce staleness, we can also trackthe serial id before which all transactions have been replayed, and assign this serial id to theread-only transaction. This serial id can be maintained scalably by using per-core counters.

Currently, we do not support read-only transactions because our garbage collector is un-aware of them. To support read-only transactions, we would need to track in-flight read-onlytransactions to prevent our the GC collecting currently in-use versions.

3.4 Evaluation

In this section, we evaluate the performance of our replication scheme. We use ERMIA asthe primary database. We compare our approach against a baseline ERMIA (no log ship-ping), ERMIA with its log shipping implementation, and our Calvin implementation on thebackup [77].

ERMIA uses row-level logging, which consumes less network traffic than traditional ARIESstyle page-level logging. All our experiments are performed with ERMIA using the SSIconcurrency control protocol. We also performed experiments using the SSN protocol; thenumbers were similar and are not presented here.

Page 42: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 33

The Calvin primary ships both the read-set and the write-set keys. Our Calvin backup im-plementation closely mirrors the original design. However, unlike the original implementationthat uses a centralized lock manager for deterministic locking during initialization, we reusethe version arrays in our backup to implement a per-tuple lock queue that allows concurrentinitialization. As described in Section 3.2.4, Calvin is unable to handle back in time commitsacross epochs, and our implementation ignores these errors.

Our evaluation uses two metrics. First, we compare the network traffic of the differentmethods. Second, we measure the throughput and scalability on the primary and the backupdatabases. We use two network configurations in our log shipping and replay experiments. Formost experiments, we use a 10 Gb/s Ethernet network so that the network is not a bottleneck.We also show the impact on the primary when using a slower 1 Gb/s network.

We execute the ERMIA primary database and our backup database on machines with thesame hardware configuration: 4-socket Intel Xeon CPU E5-2650 (32 cores total) and 512 GBDRAM. Data sets for all workloads fit into DRAM. Both databases run on Linux 3.10 withglibc 2.17. ERMIA is compiled with gcc 4.8.5 and our backup database is compiled with clang3.8, both with the -O3 optimization.

Record Log Shipping Calvin Calvin OptimizedTPC-C 6.988 GB 35.739 GB (5.11x) 8.445 GB (1.21x) 7.257 GB (1.04x)

TPC-C+ 5.081 GB 27.398 GB (5.39x) 12.789 GB (2.52x) 11.969 GB (2.36x)TPC-C Spread 3.600 GB 24.481 GB (6.80x) 4.730 GB (1.31x) 3.814 GB (1.06x)

TPC-C+ Spread 2.832 GB 19.120 GB (6.75x) 8.157 GB (2.88x) 7.486 GB (2.64x)

Table 3.2: Network Traffic

3.4.1 Workloads

We use 4 kinds of workloads in our experiments: TPC-C, TPC-C+, TPC-C Spread and TPC-C+Spread. TPC-C is a typical OLTP workload that simulates an E-Commerce workload: customersquery and purchase items online, and items are delivered from warehouses to customers. Thereare three types of read-write transactions in TPC-C: NewOrder, Delivery, and Payment.These read-write transactions constitute 92% of the total transactions; the remaining 8% areread-only transactions.

TPC-C+ [6] is designed to evaluate heterogeneous (non-pure OLTP) workloads. It is similarto TPC-C, but it adds CreditCheck, a type of analytic transaction. This transaction scans thecustomer’s order history and account balance to determine the customer’s credit level, and ittakes significantly longer to run than other TPC-C transactions.

Page 43: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 34

By default, ERMIA partitions the database by warehouse id and associates a worker threadwith each warehouse. Since most TPC-C/TPC-C+ transactions tend to operate on a singlewarehouse, ERMIA runs a transaction on the worker thread serving its warehouse. To evaluatehow our approach will perform when data cannot be easily partitioned, we disable this threadpinning in ERMIA by letting transactions run on arbitrary worker threads. We call these the“spread” workloads: TPC-C Spread and TPC-C+ Spread.

During replay, we avoid second-guessing the data partitioning policy. We replay thetransaction on the same backup core as the transaction was run on the primary, which helpspreserve the partitioning policy of the primary.

3.4.2 Network Traffic

In this section, we measure the network traffic using the 10 Gb/s network so that it is not abottleneck. We use 16 cores on the primary database, because with 32 cores, log shipping cangenerate traffic close to 10 Gb/s.

Table 3.2 shows the network traffic generated by each workload in 60 seconds. Comparedto our approach, log shipping requires 5x more network bandwidth for the TPC-C and TPC-C+workloads, and 7x more bandwidth for the spread workloads. Our bandwidth savings are higherfor the spread workloads because they commit fewer NewOrder transactions (due to higher abortrates). We save more network traffic on the other transactions types, which make up a largershare of the committed transactions.

For the Calvin backup, the primary sends read-sets as well as write-sets to the backup. Thisrequires 1.2-2.9x network traffic compared to our approach because some transactions issuelong table scans. Most of these scans are issued on read-only tables and thus read locks arenot needed for these rows during replay. To measure the benefits of optimizing for read-onlytables, we modify the Calvin primary so that it specifically does not send the read-set keys forthese tables.

With this optimization, the network traffic for Calvin is close to our approach for the TPC-Cand TPC-C Spread workloads, but still incurs much higher network traffic on the TPC-C+workload. For TPC-C, most of the records being read are updated in the transaction, andso the read-set is mostly covered by the write-set. However for TPC-C+, the CreditChecktransaction issues scans on many read-write tables, and sending the read-set for these scansconsumes significant network traffic. By default, the CreditCheck transactions are 4% of allthe issued transactions. Increasing this ratio will increase the network traffic as well.

We also evaluate statement level logging in MySQL [51] using the TPC-C implementationfrom Percona Lab [60]. We configure the benchmark with just 1 warehouse and run the workload

Page 44: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 35

8 16 24 32Number of cores

0

100000

200000

300000

400000

500000

600000

Thr

ough

put (

txn/

s)

TPC-C No ReplicationTPC-C Log ShippingTPC-C RecordTPC-C Spread No ReplicationTPC-C Spread Log ShippingTPC-C Spread Record

8 16 24 32Number of cores

0

100000

200000

300000

400000

Thr

ough

put (

txn/

s)

TPC-C+ No ReplicationTPC-C+ Log ShippingTPC-C+ RecordTPC-C+ Spread No ReplicationTPC-C+ Spread Log ShippingTPC-C+ Spread Record

Figure 3.3: Primary Performance

for 60 seconds. MySQL is able to commit 7078 transactions and generates a 22MB replicationlog. With ERMIA generating 700K transactions/s, MySQL would require 20x network trafficcompared to our approach.

3.4.3 Primary Performance

In this section, we measure the performance impact on the primary database for recordingtransaction inputs, the write-set keys and sending them to the backup. For the baseline, we usedefault ERMIA (no log shipping). We also show the performance of ERMIA with log shipping.To avoid any performance bottlenecks on the backup, we discard any packets that it receivesduring this experiment. We warm up ERMIA by running each workload for 30 seconds andthen we measure the average throughput for the next 30 seconds.

Fast databases are designed to scale with the number of cores. Thus, we show the throughputof the workloads with increasing numbers of cores (from 8 to 32). Figure 3.3 shows that the

Page 45: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 36

8 16 24 32Number of cores

0

100000

200000

300000

400000

500000

600000

Thr

ough

put (

txn/

s)

TPC-C No ReplicationTPC-C Log ShippingTPC-C RecordTPC-C Spread No ReplicationTPC-C Spread Log ShippingTPC-C Spread Record

8 16 24 32Number of cores

0

100000

200000

300000

400000

Thr

ough

put (

txn/

s)

TPC-C+ No ReplicationTPC-C+ Log ShippingTPC-C+ RecordTPC-C+ Spread No ReplicationTPC-C+ Spread Log ShippingTPC-C+ Spread Record

Figure 3.4: Primary Performance, 1 Gb/s Network

throughput of our recording approach is close to the performance of the original database andscales with the number of cores. While the throughput slows down slightly per core, it hasno visible scalability impact. Although our approach requires global serial ordering, it hasminimal effect under transactional workloads.

Log shipping also performs similarly to the original database, but the 10 Gb/s becomes abottleneck for the TPC-C workload at 32 cores. Both our approach and log shipping send datain a background thread. Although log shipping sends large amounts of data, it has only about2% overhead when the network is not the bottleneck. Our prototype has slightly more overhead(3-4%), because it requires copying the transaction input parameters and the write-set keys inthe commit path.

Page 46: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 37

Performance Over Slow Network

Table 3.2 shows that replicating fast databases consumes significant network traffic. If thenetwork is slow, then replication will have a significant impact on primary performance. In thissection, we compare log shipping and our approach using a 1Gb/s network. Though expensive,a 1Gb/s link is practical in the wide area. Depending on the region and the ISP, the estimatedcost of such a wide area link is roughly between $100K-500K per year. Both ERMIA’s logshipping and our approach use a 512MB send buffer. If the network is slow, the send bufferwill fill up and stall the primary.

Figure 3.4 shows the primary throughput with increasing number of cores. Our approachcan sustain the primary throughput up to 16 cores for TPC-C and 24 cores for TPC-C+. Incontrast, log shipping performs much worse than our approach. For the Spread workloads, ourapproach scales until 32 cores, while log shipping still performs poorly.

3.4.4 Replay Performance

In this section, we measure the performance of our concurrent replay method on the backupserver. To avoid performance artifacts caused by the primary, we collect the packet tracesgenerated by the primary, and send these traces to the backup on the 10 Gb/s network.

We evaluate backup performance by measuring the replay time for a 30 second trace.2 Ifthe backup can finish replaying within 30 seconds, then it will not be a bottleneck on primaryperformance. We also compare with Calvin’s deterministic execution scheme to show thebenefits of using multiple versions. Figure 3.5 shows the replay time on the backup withincreasing numbers of cores. For each data point, the primary and the backup use the samenumber of cores. Our numbers are marked using solid lines and Calvin numbers are markedusing dashed lines.

Our approach is able to replay the trace within 30 seconds under all 4 workloads, exceptTPC-C Spread at 24 cores. For TPC-C and TPC-C+, our approach takes roughly 18-23s.The spread workloads represent a worst-case scenario with no data locality and much highercontention. Even in this extreme setup, our approach can replay the trace in 29-32s.

Calvin shows slightly worse performance (5-10% overhead) than our approach with theTPC-C and TPC-C spread workloads. As mentioned in Section 3.4.2, TPC-C transactionsupdate most of the records they read. This type of access pattern does not benefit frommultiversioning during replay, since most versions will be read only once before being updated.For TPC-C+ workloads however, the CreditCheck transaction performs many reads withoutupdating these values. As a result, a single-versioned system like Calvin suffers from write-

2Similar to previous experiments, this trace is captured after the primary has been warmed up for 30 seconds.

Page 47: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 38

8 16 24 32Number of Cores

0

10

20

30

40

Tim

e to

repl

ay (s

econ

ds)

PrimaryTPC-CTPC-C+

Calvin TPC-CCalvin TPC-C+

8 16 24 32Number of Cores

0

10

20

30

40

Tim

e to

repl

ay (s

econ

ds)

PrimaryTPC-C SpreadTPC-C+ Spread

Calvin TPC-C SpreadCalvin TPC-C+ Spread

Figure 3.5: Concurrent Replay Performance

after-read dependencies, leading to higher overheads for TPC-C+ (up to 50%) compared toour approach. For the challenging spread workloads, Calvin consistently fails to complete thereplay within 30s.

Epoch Length

Our approach replicates transactions at epoch granularity. Thus, a failure on the primaryrisks losing transactions in the current epoch. The epoch length represents a tradeoff betweenperformance and data loss, since shorter epochs lose less data but may have a performanceimpact on the primary and the backup. We use ERMIA’s epoch manager to implement ourepochs; our default epoch length is 1 second.

We measure throughput on the primary and the backup while varying the epoch length. Weuse the TPC-C and TPC-C Spread workloads with 32 cores because this setup is performancesensitive and should show the largest impact. In Figure 3.6, we find that ERMIA’s performance

Page 48: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 39

50 100 250 500 750 1000Length of Epoch (ms)

0

200000

400000

600000

800000

1000000

Thr

ough

put (

read

-wri

te tx

ns/s

)

TPC-C ReplayTPC-C Spread Replay

Figure 3.6: Throughput With Different Epoch Lengths

is relatively stable when the epoch length is larger than 100 ms. At 50ms epoch length, theprimary throughput decreases by roughly 15% due to the cost of epoch processing.

We also measured the replay time with different epoch lengths, and found that it is similarto the numbers shown in Figure 3.5. This suggests that the cost of epoch processing has a moresignificant impact on the primary than on the backup.

Version Array Vs. Linked List

Existing multi-version databases use a linked list for tracking versions because it allows scalable,lock-free access (e.g., removal during aborts). However, this imposes a non-trivial cost fortraversing pointers when the linked list is long [17].

As described in Section 3.2.5, we track row versions using a sorted array, which allowsusing binary search for finding a given version. In our case, the array is created during replayinitialization, and can be accessed lock free during replay. We also do not need to handle aborts.

Here, we evaluate the cost of using a linked list implementation by measuring the replaytime for the 30-second TPC-C and TPC-Spread traces with different epoch lengths. With a50 ms epoch size, the TPC-C replay time is 30.3 seconds (backup about to keep up), and theTPC-C Spread replay time is 29.4 seconds (backup can keep up). When the epoch size is100 ms or more, the replay cannot keep up with the primary. The slowdown (vs. using anarray) for TPC-C ranges from 24% (100 ms epoch) to 2.9x (750 ms epoch). The slowdownfor TPC-C Spread ranges from 71.5% (100 ms epoch) to 5.3x (750 ms epoch). However, sinceepoch lengths below 100 ms impact primary performance, and epoch lengths above 100 msimpact backup performance, we conclude that version arrays are essential for ensuring that thebackup can keep up with peak primary throughput.

The main reason for the slowdown is update hotspots [78] in the TPC-C family. Figure 3.7

Page 49: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 40

2 0 2 1 2 2 2 3 2 4 2 5 2 6 2 7 2 8 2 9 2 1 0 2 1 1 2 1 2 2 1 3

Number of Versions Accessed

0 .0

0 .5

1 .0

1 .5

2 .0

2 .5

Num

ber o

f Key

Acc

esse

d

× 1 0 8

Number of Versions Accessed on ReadNumber of Versions Accessed on Write

Figure 3.7: Link List Versions

shows a histogram of the number of key versions that are looked up in the linked list whena key is accessed (note that X axis has log scale). While most accesses are cheap (1 or 2versions), a significant number of accesses skip over 4000 versions, because some keys areupdated frequently in TPC-C resulting in long linked lists within an epoch. These accesses leadto cascading slowdown for dependent transactions.

Memory Overhead

In this section, we measure the memory overhead imposed by multi-versioning. When thebackup database finishes replay and starts generating the ERMIA checkpoint file, we collectthe distribution of the number of versions for all keys. The total number of keys in the databaseis roughly 110M and the total number of versions is 164M. Thus the memory overhead ofmultiversioning is roughly 48%. We find that most keys (99.7%) only have 1 or 2 versions,however the update hotspots in TPC-C lead to some keys having high numbers of versions (e.g.,352 keys had 2048-65536 versions).

3.5 Bug Detection

Our replay scheme is mainly designed for replication, but it can also be used to catch corruption-related bugs in the primary database. The concurrency scheme on the backup is different fromthe primary and thus uncorrelated corruption bugs can be detected with a simple checksumscheme. On the primary database, we add a checksum to each committed transaction. Thechecksum is calculated over the transaction’s write-set, containing keys and row data. Onthe backup, after replaying a transaction, we recalculate the checksum and compare it withthe primary’s checksum. A checksum mismatch indicates a bug. This scheme imposes

Page 50: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 41

Time 1 2 3 4 5 6 7 8T1 Write A Write B CommitT2 Read B Write C CommitT3 Read A Write C Commit

Table 3.3: Concurrent Interleaving in ERMIA that ignores Write-Write Conflicts

12% overhead on the primary performance, although a faster checksum will help improveperformance.

Using the checksum scheme, we have found and fixed two bugs in ERMIA’s concurrencycontrol implementation. One of them is related to timestamp tracking in ERMIA’s SSI and SSNimplementation. We also found a significant flaw in ERMIA’s phantom protection protocol.ERMIA reuses Silo’s phantom protection protocol, but Silo’s protocol only works for a single-versioned database. Both bugs are concurrency bugs that can lead to a non-serializable schedulewithout crashing the database. Without our system, it would have been hard to identify them.

3.5.1 Ignoring Write-Write Conflicts

ERMIA’s original implementation of SSI compared the skew timestamp (see Section 3.2.4)with the reader’s commit timestamp but not with the overwritten row’s commit timestamp.This can lead to non-serializable interleavings.

Consider the concurrent interleaving shown in Table 3.3. When T2 commits, it notices thatB in its read-set has been overwritten by T1. So, T2 gets a skew timestamp of 3, which is equalto T1’s commit timestamp. Next, T2 finds C in its write-set and looks for C’s last reader butfinds no reader for C. So, T2 falsely believes that it is OK to commit. At the same time, beforeT2 commits, another transaction T3 has already committed and also written to C.

This interleaving, however, is not serializable. On row B, T2 happens before T1 in serialorder because T2 has to read row B before T1 updates it. On row C, T2 overwrites row C andthus must happen after T3. However, T3 reads row A after it is written by T1, and so T3 musthappen before T1. Thus, there is no feasible serial order for T2.

Our fix for this bug is simple. When a transaction tries to commit back in time, it not onlyneeds to compare the readers’ commit timestamp with the skew timestamp, it also needs tocompare the commit timestamp of the row to detect write-write conflicts.

3.5.2 Logical Phantom Detection

A phantom read may occur in the database when inserts occur concurrently with range queries.Consider the concurrent interleaving shown in Table 3.4. In this case, T2 performs a scan on

Page 51: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 42

T1 Insert X2 Write Y CommitT2 Scan Min(X) = X3 Write Y Commit

Table 3.4: A Phantom Example

T1 Insert X2 Write Y CommitT2 Read A Scan Min(X) = X3 Write Y Commit

Table 3.5: Logical Phantom

Table X, looking for a minimum row. In this example, X2 is less than X3, but it is inserted afterthe scan operation in T2. Thus the scan in T2 only finds X3, which was the minimum at thebeginning. This interleaving is non-serializable. Transaction T2 commits after T1 and has awrite-write conflict on row Y, and so T2 happens after T1. However, T2 scans for X and finds X3

instead of X2. So T2 must also happen before T1 and thus the interleaving is non-serializable.However, if the database only tracks reads and writes, it will ignore the fact that T2 must

happen before T1. Specifically, T2’s read-set only contains X3, which ignores the fact that T2

skipped X2. Databases implementing serializable transactions must detect and prevent suchphantom reads.

ERMIA uses a mechanism similar to Silo [79] to detect and prevent phantom reads. Itassigns each B-Tree node a version number. When the node changes, the version numberincrements. When a transaction scans the index, it tracks a list of B-Tree nodes it accesses.Before commit, it checks whether version numbers of these B-Tree nodes increased. If so, thetransaction aborts. In the example above, T2 aborts because the B-Tree node containing X2

has been changed by T1 after T2’s scan. We call this mechanism physical phantom detection.Physical phantom detection works well for Silo because Silo is single-versioned.

This physical phantom detection is not sufficient for correctly detecting phantom reads inERMIA, which a multi-version database. We show this problem with the example shown inTable 3.5. ERMIA, like many multi-version databases, guarantees that reads are snapshotconsistent, and so a transaction can only read rows that are committed before it starts. In thisexample, T1 writes X2 and Y. It commits after T2 starts but before the scan in T2. When T2

scans for X, it has to skip X2 because X2 is inserted after T2 started. So T2 finds X3 instead ofX2 during the scan. At the time, T1 has already committed. This means that ERMIA’s physicalphantom detection mechanism, which keeps track of the B-Tree nodes has the latest version(X2). As a result, when T2 writes Y and commits, the physical phantom detection mechanismwill not detect any phantom, leading to a non-serializable interleaving.

We fixed this bug in ERMIA by adding a sentinel version for each key in the B-Tree. Whena transaction inserts a new row and commits, the new row consists of two versions instead of

Page 52: Replication and Workload Management for In-Memory OLTP

Chapter 3. Replay Based Primary-Backup Replication 43

one. The first version is a sentinel version with timestamp 0 and the second version is the actualnew data with the current timestamp. The succeeding scan in T2 will skip the data version ofX2 because T2’s start timestamp is smaller than the data version’s timestamp. However, T2 doesnot skip the sentinel version as the sentinel version has timestamp 0, and so T2 adds the sentinelversion into its read-set for future validation. Because the sentinel version contains no data, T2

continues the scan and finds X3. At commit time, T2 realizes that the sentinel version of X2 isupdated by T3 so it must be serialized before T3. However, T2 and T3 update to Y in the reverseorder, and so T2 must abort.

We call this mechanism logical phantom detection. Combined with physical phantomdetection, we can correctly detect and prevent phantoms in multi-version databases.

3.6 Summary

In this chapter, we present a primary-backup replication scheme for providing reliability forhigh-throughput, in-memory databases. Our approach uses deterministic replay for replicationto minimize the network traffic requirements due to logging. We have shown that recording thekeys in the write-set requires 15-20% of the network bandwidth needed by traditional logging forOLTP workloads. The write-set keys can be used to perform deterministic replay concurrently,using epoch-based processing and a multi-version database. We have shown that this approachallows the backup to scale as well as ERMIA, a fast in-memory database.

Page 53: Replication and Workload Management for In-Memory OLTP

Chapter 4

Workload Management UnderDeterministic Concurrency Control

Today, modern web applications rely critically on high-performance databases, but these ap-plications frequently encounter unpredictable demand spikes leading to skewed and contendeddata accesses. For example, many web applications are subject to unpredictable spikes in de-mand due to events such as holiday sales or social reviews that make specific data items highlypopular. The database needs to perform workload management to support these workloads.

One approach for managing contended accesses is to partition the workloads so that con-tended accesses are serialized on each core. Existing deterministic databases [8, 17, 18] employthis approach for scaling and for handling contended accesses. However, this approach cansuffer from load imbalance with skewed accesses. Skewed accesses can make it difficult topartition the data evenly and so transactions accessing the popular items can degrade throughputdramatically. Long-term load spikes and imbalance requires repartitioning data, which is bothexpensive, and may require rewriting the application code for ensuring locality under the newpartitioning.

To manage skewed accesses, databases can employ a shared-memory architecture and dis-tribute transactions across cores. This approach allows load balancing transaction executionamong all cores. However, the performance of a shared-memory database can degrade signifi-cantly with contended accesses. Thus, the database needs to perform workload management formanaging contention. Much recent work on in-memory databases has adopted this approachfor OCC-based systems [40, 31]. These schemes are designed for non-deterministic databasesand they sacrifice opacity for performance (See Section 2.2.3).

In this thesis, we explore using deterministic concurrency control for managing databaseperformance under skewed and contended workloads. Deterministic databases are attractivefor several reasons. The predetermined ordering of transactions ensures serializable execution

44

Page 54: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 45

while avoiding concurrency-control related deadlocks and aborts. The lack of aborts helpsensure opacity [68] because transactions always read consistent data. Determinism reduces theneed for two-phase commit on maintaining consistency when transactions access data spanningmore than one machine, thus allowing distributed transaction throughput to scale [77]. Theprevious chapter also shows that it simplifies replication and failure recovery since transactionscan be replayed deterministically. A workload management scheme for deterministic databaseswill provide benefits for all these applications.

This work presents Caracal, a novel shared-memory (i.e., non-partitioned), deterministicdatabase that performs well under both skewed and contended workloads. Transactions runningon different cores in Caracal access shared data and thus concurrency control is required toensure deterministic ordering. To achieve parallelism, Caracal batches transactions into epochs,and similar to previous deterministic concurrency control schemes [77, 17], it initializes theconcurrency control operations for the batch before transaction execution, allowing transactionsto be executed concurrently while still maintaining deterministic output. While this shared-memory approach makes it possible to maintain load balance across cores, it can lead to poorperformance with contended workloads.

We propose two novel optimizations called batch append and split-on-demand for managingcontention. Both these optimizations are enabled by deterministic execution. Batch appendexecutes the concurrency control initialization operations in arbitrary order, which allowsbatching them and running them scalably. Split-on-demand detects contended transactionswithout depending on accurate historical data. Then it splits contended transactions intocontended and uncontended pieces. These pieces are run independently and without requiringsynchronization for conflicts or aborts, which helps reduce the serial component of contendedexecution.

We evaluate Caracal using the YCSB and TPC-C benchmarks and compare it with threedeterministic concurrency control schemes that use various partitioning methods, Granola [8],Bohm [17] and PWV [18]. Caracal’s optimizations enable it to outperform existing schemes formost workloads. For uniform uncontended YCSB, Caracal’s throughput at 32 cores is 1.64×higher than the best-performing alternative. When the YCSB workload is skewed, Caracal’sadvantage increases to 2.7×∼ 9.7×, with and without contention, respectively. For a contendedsingle-warehouse TPC-C workload, Caracal outperforms the best alternative by up to 92% .We also compare the performance of Caracal against several, in-memory non-deterministicdatabases. In this case, Caracal provides 2× ∼ 12× higher throughput under high contention.

This work makes several contributions. Caracal is the first deterministic database thatprovides comprehensive workload management for handling skewed and contended workloads.We provide a detailed comparison of our approach against previous deterministic and non-

Page 55: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 46

deterministic schemes. We also found that by leveraging determinism, our optimizations canachieve more efficient workload management than both deterministic and non-deterministicapproaches.

4.1 The Caracal Design

In this section, we describe our shared-memory, deterministic concurrency control scheme inthe Caracal database and then present our optimizations for managing skewed and contendedworkloads.

4.1.1 Deterministic Concurrency Control

Caracal reduces the impact of skewed accesses by performing concurrent, shared initialization.Transactions are assigned to cores in round-robin order and each core creates the rows or rowversions for the transactions assigned to it. This approach balances load since OLTP transactionsare short with a relatively small write set.

Similar to our deterministic replay method for replication (described in Section 3.2.5),Caracal batches and executes transactions in epochs. An epoch has two phases, initializationand execution. Recall that transaction initialization requires the transaction’s write-set. Withreplication, deterministic replay is performed on a backup database, and so the write-set of atransaction can be obtained from the primary database while it is executing the transaction. Incontrast, Caracal is a primary database, and so it relies on human effort to infer the write-setbefore transaction execution. This requirement can be met for many applications. However,for some applications, such as TPC-C’s Delivery transaction, Caracal needs to support rangeupdates. Inferring the write-set keys accurately in this case is difficult, especially when trans-actions are initialized concurrently.

Consider the following example. There are three transactions T1, T2, and T3, and their serialorder is T1 < T2 < T3. T1 inserts row R1 and T3 inserts row R3 in parallel, and these two rowsbelong to the same range. Now, if T2 performs a range update on this range, it should updateR1, but ignore R3. Thus, we require that T2’s write-set contain R1 but not R3. One methodfor handling range updates is to run the initialization phase serially so that we can perform arange scan during initialization. This scan would determine that T2’s write-set contains R1

but not R3, since the insert of R3 would not have been initialized yet. However, this serialinitialization, such as in Calvin [77], will not scale well. Bohm [17] partitions the initializationphase and runs initialization serially in each partition. However, as we show in our evaluation,this approach only scales when the workload is partitionable and can be load balanced.

Page 56: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 47

Caracal runs the operations in the initialization phase in parallel for scalability. We handlerange update operations by splitting the initialization phase into two steps, insert and append,as shown in Figure 4.1. During insert, we create rows and their initial versions for all newlyinserted (but not updated) keys. This step ensures that all rows that are inserted in the epochwill be visible in the next step. During append, we insert row versions for updated keys. Inaddition, we acquire the write-set keys for any range updates by performing a range scan. If akey is present in the range, we check the first version of the key, and only append a new versionif the first version of the key is smaller than the serial id of the range update. In the exampleabove, this step ensures that T2’s write-set contains R1 but not R3.

Similar to our previous approach, as described in Section 3.2.5, we do not allow concurrentprocessing between phases. Each phase has to wait for the previous phase to finish before itcan start. Caracal currently does not support durability. As we mentioned in Section 2.1.3, thishas an impact on transaction latencies but not on the throughput of the database, which is ourmain metric in this work.

A side effect of splitting the initialization phase into separate insert and append steps is thatwhen the read set or the read range is known, index read and scan operations can be performedduring the append step. As shown in Section 4.3.6, this optimization can help reduce contentionby reducing index operations during execution.

WaitExecution Epoch

EndAppend

Wait

Initialization Phase

Execution Phase

NewEpoch

Insert

T1: InsertR1T3: InsertR3

T2: RangeUpdateR0toR100

R1 R3

Figure 4.1: The Caracal Architecture

During the execution phase, a transaction needs to find the latest version to read that satisfiesits serial order. The typical method for implementing a multi-versioned database [10, 53, 34,85, 68] is to use a linked list of versions for each row object. This design assumes that each rowhas a small number of versions and so searching the list will be inexpensive. This assumption isusually true for non-deterministic databases, but with batching in deterministic systems, somehot keys may have many versions and transactions in the epoch need to access all of theseversions, making it expensive to traverse a linked list.

We observe that epoch-based concurrency control allows Caracal to use arrays for efficientversioning. Each element in the version array is a version id and the version pointer. The

Page 57: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 48

version id is the serial id of the transaction that created this version. To accelerate lookupsduring execution, the version array is kept sorted by version ids. Instead of traversing allversions during execution, a transaction does a binary search to find the latest version to readwith respect to its serial order. To further optimize this procedure, Caracal first tries to searcharound the latest updated version since readers are more likely to read the latest version. If wecan find the required version around the latest update, we can avoid the cost of a full binarysearch.

In the append step, Caracal appends new row versions in the sorted version arrays usinginsertion sort. Since transactions are processed in roughly increasing transaction serial id orderacross cores, each append is generally close to the end of the version array, and insertion sortis efficient in this case.

Caracal supports get, scan, insert, delete and update operations for stored-procedures. Innon-deterministic databases, insert, delete and update operations can fail. The insert operationfails if the row already exists in the database, and both the delete and the update operations failif the row does not exist. Usually, non-deterministic databases handle this failure by abortingthe transaction. In Caracal, none of the operations cause aborts. When a row exists, the insertoperation updates the row with the newer value. Similarly, when a row does not exist, the deleteand the update operations do nothing and return false. These semantics are consistent withthe INSERT REPLACE, UPDATE WHERE and DELETE WHERE statements in SQL. In the Caracalprogramming model, if the application needs to abort the transaction, it should check for theexistence of the row via get operations at the beginning of the stored procedure before issuingany write operations, as described in Section 4.1.6.

Caracal supports read-your-writes and duplicate-writes [75] by using a per-epoch write-buffer. During initialization, if Caracal detects a duplicate-write in a transaction, then it createsa private write buffer associated with the transaction’s serial id. During execution, duplicatewrites are first stored in the write buffer. If the duplicate write is the last write for a row in thetransaction, then it is written to the corresponding row version. Transactions always attemptto read from their write buffer. When the write buffer does not exist or the value in the writebuffer isn’t ready, the transaction reads from the row version. The write buffers are cleared atthe end of each epoch.

Next, we describe two optimizations, batch append and split-on-demand, that Caracal usesfor contention management during initialization and execution phases, respectively.

Page 58: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 49

4.1.2 Initialization Phase: Batch Append

Based on our replication scheme, Caracal supports concurrent initialization by synchronizingaccesses to the shared row version arrays with per-row locks. For uncontended workloads,acquiring the row lock before inserting a new version scales well. However, under contention,this locking operation limits scalability, similar to other shared-memory databases.

Our batch append optimization helps manage contention during the initialization phase. Weobserve that in our deterministic scheme, newly added versions do not need to be immediatelyvisible during the initialization phase. Within an epoch, as long as all the row versions arecreated before the execution phase, transactions will execute deterministically and the finaloutcome will strictly follow the serial order. Thus row locks can be acquired in any order,which enables batching the append operations for the row versions and running them scalablywith fewer locking operations.

Instead of waiting to acquire a row lock for each version that is inserted, we can simplyappend the new row version to a per-row, per-core, fixed-size buffer, without acquiring anylocks. When this buffer is full, or at the end of the initialization phase, each version in the bufferis appended to the corresponding rows’ version array using insertion sort, which lowers overalllock contention. The buffer is then cleared and reused. Note that the versions in a row buffermay not be at the end of the version array, which could increase the cost of insertion sort. Weevaluate this cost in Section 4.3.6.

A naive implementation of this approach would require an excessive amount memory forthe per-core buffers. To use memory more efficiently, we only assign per-core row buffers tocontended rows. If a row doesn’t have a row buffer, then Caracal initially tries to acquire therow lock to directly insert the row version into the version array. However, if acquiring therow lock fails, indicating contention, a per-core row buffer is created. Thereafter, versions areappended to the slots in the row buffer. We describe our memory-efficient implementation ofthe batch append optimization in Section 4.2.2.

4.1.3 Execution Phase: Split-on-Demand

The execution phase synchronizes data accesses by ensuring that a transaction reading a rowversion waits until the version is written. Under heavy contention, this wait synchronizationoccurs frequently leading to significant cache coherence traffic among processors. We aim toreduce this contention during the execution phase.

We observe that a small number of rows are updated repeatedly during contention. To reducecontention due to cache coherence, we would like to cluster and schedule these updates on asmall set of cores. However, along with performing these contended updates, transactions also

Page 59: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 50

perform other uncontended operations. These uncontended operations should be scheduled onall cores for scaling. We resolve this conflict by introducing a fine-grained piece-wise interfacein Caracal that allows an application developer to specify rows that may contend. When sucha row is identified as contended, its update operation is dynamically split from the transactioninto a separate piece. This new piece is run independently from the rest of the transaction codeon a core that will minimize contention while maintaining load balance.

This approach works well in a deterministic database because all the pieces in the transactionwill eventually commit, and commit in an order that is consistent with the predefined serialorder. In contrast, in a non-deterministic database, the serial order is not predefined, so themain transaction would need to synchronize with its split pieces to determine a serial order.For example, under 2PL, the main transaction would need to wait for its split pieces to finishbefore it can release locks.

Specifying Contended Operations

Application developers specify that a row may be contended by using the UpdateForKey(Row,CallbackFunction, Weight) row update function. This interface takes three arguments:1. the row to update, 2. a callback function that specifies how the row values should beupdated, and 3. an estimation of the amount of work needed by the callback function forscheduling purposes. When the row is likely to be contended, Caracal splits the callback in aseparate piece and invokes the piece independently of the transaction. Otherwise, the callbackfunction is invoked serially within the transaction. Splitting uncontended operations providesno performance benefits. Instead, it introduces memory and cache miss overheads associatedwith running an extra transaction piece. Also, developers do not specify the core on which to runthe piece, since Caracal automatically schedules it. Caracal does not currently allow specifyingcontention for reads for two reasons: 1) reads are less contended due to multi-versioning, and 2)we do not require the read sets of transactions and so cannot estimate contention due to reads.

The callback function must handle data dependencies with the row update in the split piece.For example, say an update to a contended row B depends on row A’s value. The callbackfunction for row B can track this dependency by waiting for the access to row A to completein the main transaction, an approach similar to rendezvous points in PWV [18]. However, thisdependency tracking is expensive since it needs inter-processor communication for synchro-nizing the pieces. In our shared-memory approach, a more efficient solution is to access rowA and update row B within the callback function so that the data dependency is preserved.Currently, we assume that the callback function will update at most one contended row, and sothe programmer needs to specify UpdateForKey() for each potentially contended row. Notethat the callback function performs an update and cannot issue aborts (See Section 4.1.6).

Page 60: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 51

We normally assign a callback weight based on the number of row updates in the callbackfunction. However, if the callback function performs read-heavy operations, such as scanningmany rows, then this weight is increased, which may require some tuning.

Caracal’s split-on-demand introduces a parallel programming model similar to PWV andGranola for reducing contention. However, while PWV splits transactions and executes them inparallel to speed up execution and reduce waiting time for readers, Caracal splits transactionsand executes the split pieces on fewer cores to reduce contention. Unlike partitioning, Caracalcan assign pieces to arbitrary cores, which provides more flexibility for scheduling and loadbalancing. For example, Caracal can avoid splitting parts of a transaction that have datadependencies, thus avoiding any synchronization costs. Also, Caracal only splits contendedpieces, thus reducing the overhead of splitting uncontended pieces. Finally, the split-on-demand programming model is optional: developers are encouraged to use it when contentionis a concern.

Scheduling Contended Operations

Our approach requires identifying contended rows accurately and efficiently so that transactionscan be split dynamically when they access the contended rows. Fortunately, in our batcheddeterministic model, we know the number of row versions that are created for any given row inan epoch by the end of the initialization phase. When the number of versions exceeds a certainthreshold, we predict the row will contend during execution.

During execution, Caracal needs to schedule all the contended pieces on the multiple cores.To reduce contention, all the pieces updating a contended row should be clustered to run on asmall set of cores. Furthermore, to balance load among all cores, each core should be assignedpieces based on their weights, so that the piece weights are balanced. Unlike partitioning,Caracal does not require that all the pieces accessing a contended row be assigned to a singlecore. As shown below, this simplifies load balancing in Caracal, and it also allows Caracalto leverage potential parallelism when running these contended pieces since each core is alsorunning other uncontended pieces.OnInitializationPhaseFinish():sum = 0for row in contended_rows:assert(row.nr_empty_versions >= threshold)row.start = sumsum += row.sum_weights

UpdateForKey(row, callback):if row.nr_empty_versions < threshold:# do not split...

Page 61: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 52

5k versions 10k versions 4k versions 5k versions

Row1: Core 0: 100%

Row2:Core 0: 1k/10k = 10%Core 1: 6k/10k = 60%Core 2: 3k/10k = 30%

        Row3:Core 2: 3k/4k = 75%Core 3: 1k/4k = 25%

Row4:Core 3: 100%

Core 0 Core 1 Core 2 Core 3

6k versions 6k versions 6k versions 6k versions

Figure 4.2: Load Balancing Split Pieces

rand = RandomNumberFrom(row.start,row.start + row.sum_weights)

core = NR_CORES * rand / sumRunPieceOnCore(core, new Piece(row, callback))

Listing 4.1: Probabilistic Assignment of Pieces in Caracal

Caracal fulfills our clustering and load balancing requirements described above by prob-abilistically packing piecess to one or more cores, while ensuring that cores process similaramount of work. Listing 4.1 shows our piece assignment algorithm. row.sum_weights is thesum of weights of all the pieces associated with this row. After the initialization phase finishes,but before the execution phase, we sum up the weights from all contended rows and keep thepartial sum in row.start. While issuing transactions in the execution phase, UpdateForKeydetects if the row is contended in the current epoch, and if so, we assign the piece to a coreaccording to a random number between row.start and row.start + row.sum_weights.The probabilistic assignment algorithm is inspired from lottery scheduling [80] in operatingsystems.

Figure 4.2 shows an example assignment. Four rows are contended and have a total of24,000 versions, so each core is assigned 6,000 versions. All updates to Row 1 and Row 4 areassigned to Core 0 and Core 3, respectively. However, updates to Row 2 are probabilisticallysplit across Cores 0, 1 and 2, in the ratio 10%, 60% and 30%. Similarly, updates to Row 3 aresplit across Cores 2 and 3. Our evaluation shows that our probabilistic partitioning scheme hasperformance comparable to or better than an offline optimal partitioning scheme that assignseach contended row to a single core.

Transaction parallelization via split-on-demand works well in a deterministic databasebecause all the pieces in the transaction will eventually commit, and commit in an order thatis consistent with the predefined serial order. In contrast, in a non-deterministic database, theserial order is not predefined, so the main transaction would need to synchronize with its splitpieces to determine a serial order. For example, under 2PL, the main transaction would needto wait for its split pieces to finish before it can release locks.

Page 62: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 53

4.1.4 Garbage Collection

Caracal is multi-versioned and so it needs to perform garbage collection (GC) to reclaimunused versions in the version array. We use two types of garbage collectors, a fast foregroundcollector (which we refer to as a minor collector, since it resembles a generational collectorin many programming languages), and a background collector (which we refer to as a majorcollector). The difference between these two garbage collectors lies in when they are invoked,and the rows from which they collect versions.

The design motivation of Caracal’s garbage collection scheme is to improve temporallocality. Consider, for example, that there are 100 rows to garbage collect at the end of anepoch: row 1 . . . 100. After garbage collection, say rows 81 . . . 100 remain in the CPUcache, while rows 1 . . . 80 have been evicted. If these evicted rows need to be updated in thenext epoch then the initialization phase needs to bring them back into the cache. With ourminor/major garbage collection design, the garbage collection of rows 1 . . . 80 occurs alongwith the initialization of the next epoch, which reduces cache misses.

The minor collector operates during the initialization phase and frees unused versions onlyfrom the rows that are being initialized. Since the database has to access these rows andtheir versions during initialization anyway, collecting unused versions at this point has bettertemporal cache locality. The drawback of this policy is that it only collects versions from therows that will be updated in this epoch. For rows that stop being updated, their unused versionswill never be reclaimed by the minor GC.

For major GC, we need to track all the rows that are updated in an epoch. During initializa-tion, the core that first creates a new version for a row also adds that row to a per-core garbagequeue. When an epoch finishes, while preparing for the next epoch, all cores in the systemperform major garbage collection by freeing unused versions from rows in the garbage queue.This collector is expensive because it operates on older versions, which are likely to have poorcache locality. It also pollutes the CPU cache with cold rows and version arrays, which affectsthe performance of the initialization phase in the next epoch.

Caracal combines both garbage collection schemes to provide fast and accurate garbagecollection. Our hypothesis is that most active rows will be updated within a few epochs and sohaving the minor GC alone should be able to reclaim most unused versions. To ensure accurategarbage collection, major GC is used as a fallback for rows that stop being updated. To do so,when major GC is invoked at the end of each epoch, it only collects rows that have not beenupdated for more than K epochs, wagering that the rows updated within the last K epochs willbe updated again and can be collected by the minor GC. The K parameter is user-configurable.A larger value reduces major garbage collection but increases the memory footprint. By default,K is set to 8 epochs.

Page 63: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 54

Caracal does not perform GC during transaction execution. We expect aggressive GC forversions not needed by active transactions will improve Caracal’s multi-versioning performancefurther [40, 5]. Caracal’s GC currently does not shrink version arrays, so the space in thesearrays can be reused in the next epoch.

To perform concurrent major garbage collection, a simple design is to use a per-core garbagequeue and assign a collector thread to each queue. However, in our experience, this approach canlead to significant load imbalance across the per-core queues, especially with skewed accesses.

Caracal solves the load imbalance problem by merging the per-core queues into one globalqueue at the end of the epoch before starting major garbage collection. The global queue isorganized as a linked list of blocks and each block has at most 512 rows. The intialization phaseadds rows into a per-core block at first, and when the block is full, it publishes the block intothe global linked list. During garbage collection, each thread dequeues a block at a time fromthe global queue, which reduces contention on the queue. Since contended rows may containmany old versions, garbage collection may cause straggler effects. While collecting a block,each collector thread also periodically checks the global list. If the list is empty, suggestingthat all other threads are finished (or nearly finished) collecting, the thread postpones garbagecollecting the straggler block to the next epoch by adding it back into the garbage queue.

4.1.5 Logging and Recovery

Non-deterministic databases usually log the outputs of committed transactions to disk forrecovery. With determinism, transactions always generate the same outcome when executingwith the same serial order. Leveraging this insight, Calvin [77] proposed logging the transactioninputs before executing the transactions. When the database fails and restarts, it replays thelog by re-executing the transactions. Caracal uses a similar logging method. As transactionsexecute, Caracal logs the transaction inputs to disk in parallel. When the epoch finishes, Caracalwaits for the entire epoch to be logged before responding to application clients. On recovery,Caracal replays all transactions until the last complete epoch.

To reduce the time of replay during recovery, Caracal needs to perform periodic check-pointing. Since our logging format is logical, Caracal’s checkpointing needs to be atomic. Atany time, when the failure occurs, the checkpoint image that Caracal generates must containconsistent and complete data before a certain epoch. So on recovery, Caracal only needs toreplay transactions from later epochs. This can be implemented using copy-on-write featuresin modern file systems, such as reflink (IOCTL_FICLONERANGE). Caracal currently implementslogging, but checkpointing is left as future work.

Page 64: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 55

4.1.6 Limitations of Determinism

Similar to many deterministic databases, Caracal inherits some limitations of determinism.Next, we describe these limitations and how some of them have been addressed by previouswork.

Aborts

There are two types of aborts in databases, system-level aborts due to concurrency control, andapplication-level aborts in transaction logic. As an example of the latter abort, a transactionplacing an order should abort if there is not enough stock left. Caracal’s deterministic concur-rency control eliminates all system aborts. For application aborts, PWV [18] and Granola [8]require the application developer to issue any aborts before the transaction issues any writes.In this case, the rest of the transaction code stops executing. To support this type of abort inCaracal, the transaction writes an ignore marker in an empty version. When a transactionreads an ignore marker, it skips to a previous version that does not have an ignore marker.

Conditional Updates

In Caracal, we assume all rows in the write-set are updated. However, in practice, some rows inthe write-set may only be updated when a certain condition is met. This condition may dependon the data that is read from the database. These conditional updates can also be handled usingthe ignore marker. If the transaction decides not to update a certain row in its write-set, it canwrite an ignore marker.

Unknown Write-Sets

For some transactions, it may not be possible to infer the write-set from the transaction inputsbefore execution. Calvin [77] proposes using reconnaissance transactions to deal with thisissue. First, the transaction is run as a read-only transaction and it performs all the necessaryreads to infer the write-set. Then, in the next epoch, the transaction can declare its write-setand execute as a normal transaction. During this execution phase, a transaction first checks ifthe rows read in the read-only phase have changed, and if so, the transaction aborts using themechanism described above.

We believe certain reconnaissance transactions can be implemented entirely within webapplications, without requiring any database support. Web applications can issue the firstphase read-only transaction and render the values as form data. When the user submits theform, the transaction will validate these values by re-reading from the database. Advanced

Page 65: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 56

applications may also use the ETag feature [21] in the HTTP protocol to perform this validation.This approach works when the data that the read-only reconnaissance transaction accesseschanges infrequently and so the validation will rarely fail. However, for some workloads, thereconnaissance transaction may read stale data when the data changes frequently, and so thevalidation may fail constantly. Thus the performance impact of reconnaissance transactionsdepends heavily on the workload. We plan to investigate alternate mechanisms to overcomethis limitation.

4.2 Implementation

Caracal is a in-memory, multi-core database that can run well on a single node or scale outto multiple nodes. Our implementation uses C++17 and leverages language features suchas lambda functions and templates to provide a developer-friendly API for writing storedprocedures. Caracal’s stored procedure API supports distribute transactions. Transactionsare split into pieces representing by lambda functions, and these pieces can be dispatched tomultiple cores or multiple machines.

Below, we describe our implementation strategy and several optimizations that we haveapplied to our epoch-based, deterministic, multi-versioned design.

4.2.1 Serial ID Assignment

Caracal assigns a serial id and a core id to each transaction using a distributed timestamp. Theserial id is a globally unique 64 bit integer, consisting of the epoch number stored in the mostsignificant bits, a per-node sequence counter, and a node id stored in the least significant bits.When the sequence counter crosses a batch size threshold, the epoch number is incrementedand the sequence counter is reset. Then, Caracal assigns all transactions with the previousepoch number to cores based on their core id. On each node, by default, the core id is generatedin a round-robin manner ensuring that load is roughly balanced. Currently, Caracal performsthese operations with a single thread on each node. In a real deployment, these operationswould be carried out by the event scheduler that processes incoming network requests, but forour experiments in Section 4.3 we do them while generating transactions.

4.2.2 Initialization of the Version Array

Caracal uses sorted arrays for row versions, as mentioned in Section 4.1.1. During the initial-ization of an epoch, cores insert empty into the version array using insertion-sort. Transactions

Page 66: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 57

are initialized in serial order on each core, making insertion sort efficient in this case becausethe newly inserted empty version is usually at, or very close to, the last element in the array. Ifthe version array fills up, we re-allocate it with a larger capacity.

During garbage collection, we free the version data pointed to by the unused array entries,clear the version number, and set the pointer to NULL (this is to distinguish a free entry from aninitialized empty version). The GC does not shrink the version array itself, to reduce repeatedreallocations in subsequent epochs. We expect that the version array for a row will quicklygrow to the appropriate capacity for a particular workload, and then remain at that size. As afuture memory optimization, we could periodically shrink the version array for rows that areno longer actively updated.

Our batch append optimization requires a per-core buffer object that is associated with eachrow to accumulate empty versions when there is contention on the row-level lock. The simplestdesign is to add N buffer pointers to the row object, where N is the number of cores. Each bufferpointer points to a preallocated per-core buffer. However, this design impacts cache utilizationbecause the row object has many buffer pointers. For example, on a 32-core system, each rowobject will need an extra 256 bytes for these pointers. In addition, creating per-core buffers forevery row in the database would increase memory footprint dramatically without much benefitsince most rows are uncontended and these buffers would never be used at all. In Caracal, eachrow object only needs a single index value to represent the per-core buffers. The key idea is topre-allocate memory for the per-core buffers, but assign buffers only to contended rows duringinitialization.

Index=2Index=1

Core 0

Core 1

Core 2

Core 3

2 1 ...

1 3 ...

4 2 ...

3 4 ...

Index=0

Header Slot

RowBuffer

Figure 4.3: Per-core Buffers for Batched Initialization

On database startup, Caracal pre-allocates memory for each core from the local NUMAzone, as shown in Figure 4.3 (each core’s chunk is marked with a different color in the figure).With this design, an index value represents a row buffer, consisting of per-core buffer objects.

Page 67: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 58

Each buffer object has a header and a fixed number of slots for the row versions (e.g., 4 slots inFigure 4.3). The header tracks the number of filled slots (shown as colored boxes in Figure 4.3).

If a row doesn’t have a row buffer (indicated by a special empty index value), Caracalattempts to acquire the row-level lock and add the empty to the row object’s version array.Acquiring the row lock will fail if there is contention on the row, which triggers the assignmentof a pre-allocated per-core row buffer by incrementing the global index value. The index of theassigned row buffer is stored in the row object, and empty versions are subsequently appendedto the slots in the buffer objects.

At the end of the initialization phase, all the versions in the per-core buffer objects arebatch-appended to the version arrays of the corresponding rows. Since there may be manypending buffers, we parallelize this process by having each core perform this operation for itsown slice of each row buffer. A core’s batch-append operations can occur while other coresare still running the initialization phase and possibly allocating new row buffers. This is safebecause the newly allocated row buffers will not contain any row versions for the core that hasfinished its initialization phase. For example, in Figure 4.3, say Core 1 finishes initializationand batch-appends the versions in the red buffers until Index 1. If another core allocates a rowbuffer at Index 2, then that row buffer will not have any versions for Core 1, since Core 1 hasalready finished initialization. During this process, there is a small chance that all cores finishthe initialization at exactly the same time, and may contend on some rows. If this happens,Caracal will delay batch-appending the contending row and process other rows first.

4.2.3 Transaction Scheduler

Caracal executes transactions using per-core kernel threads. Each kernel thread implementsa user-level thread scheduler for dispatching and executing transactions on the core (to avoidconfusion, we refer to the kernel thread as a core). Each scheduler runs two types of threads: adispatch thread and one or more worker threads. The scheduler also maintains a piece queue,consisting of transaction pieces that worker threads execute on that core.

The dispatch thread splits incoming transaction code into multiple pieces (e.g., for our split-on-demand optimization), and then adds these pieces to the piece queue. Due to on-demandsplitting, pieces may be added to the piece queues of other cores. To reduce contention, eachcore keeps a small buffer for each per-core piece queue, and appends pieces in a batch.

Each time a worker thread chooses a piece to run, it will pick the one with the smallest serialid from its piece queue. This approach may deadlock since pieces may be dispatched fromother cores concurrently. For example, say the worker thread on Core 2 is running a piece oftransaction T2 since it has the smallest serial id. While this transaction is running, the dispatch

Page 68: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 59

thread on Core 1 could add a piece of Transaction T1 (serialized before T2) to Core 2’s piecequeue. If T2’s read depends on T1’s write, then T2 will wait forever even though T1 is in thepiece queue.

We address this issue by using a preemptive scheduler. While a worker thread waits fora read during the execution of a piece, it spins and periodically checks the piece queue for apiece with a smaller serial id. If such a piece exists, then Caracal suspends the current workerthread and creates a new worker thread to run the piece with the smallest serial id. This workerthread executes pieces with serial ids smaller than the suspended thread’s serial id. When itfinishes, it resumes the suspended thread. Alternatively, when a worker thread waits, we couldaggressively preempt it to run transaction pieces with a larger serial id as well. However, thesepieces may also depend on pieces with a smaller id, and so the repeated thread creation andpreemption may degrade performance. Thus, Caracal only preempts pieces to run a piece witha smaller serial id.

Caracal runs the dispatch and worker threads in parallel in each of the phases shown inFigure 4.1, and uses a barrier to synchronize phases. Each dispatch thread tracks the number ofpieces it has dispatched and increments a global piece counter with this value. When a workerthread finds that its piece queue is empty, it decrements the piece counter with the number ofpieces it has executed. While the piece counter is larger than zero, the worker thread waits untilpieces are added to the piece queue. Otherwise, the phase has finished and so all the workerthreads exit, and a single, light-weight, control thread restarts the dispatch threads on all coresfor the next phase. We use epoll to coordinate user-level threads, so a worker thread waitsusing epoll_wait() on an eventfd. Using epoll also enables our user-level thread library tomultiplex network IO for our multi-node Caracal deployment.

The ApplyRowUpdate(Row, CallbackFunction) call dynamically splits the callbackfunction into a separate piece if the row was marked as contended during the initializationphase. It returns a Future object representing the callback result. If the application dependson this result, then it uses the future’s wait function to wait for callback completion. When thecallback function is split, the wait function is implemented in the same way as a piece waitingon a read, as described previously. Otherwise, the wait function simply invokes the callbackdirectly. In this case, the overhead of using the ApplyRowUpdate interface is the data storedfor the closure of the callback function, which is less than a cache line size.

4.2.4 Index Search Optimization

Caracal’s two-phase execution model leads to the same keys being searched twice, once in eachphase. The initialization phase uses the keys in the write-set for searching the index to find the

Page 69: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 60

corresponding rows in which versions are appended. Then, during transaction execution, weneed to search for these keys in the index again. Our experiments showed that this additionalsearch can lower throughput significantly because it is expensive for some workloads.

We mitigate this issue by allocating a row cache for each transaction. During initialization,after searching the index for a row, we store a pointer to the row in the row cache. Duringexecution, transactions read and write data by first looking up the row cache. All accesses tothe write-set are found in the row cache.

4.2.5 Inlining

Similar to Cicada [40], Caracal inlines the version array and the version data for tables with asmall row size and that have infrequent updates. For these tables, Caracal reserves 192 bytes(3 cache lines) after the row object, and it will allocate the version array and the version datafrom these 192 bytes, if possible.

We found that the main advantage of this optimization is that it reduces unnecessary majorGC. Since these versions are embedded in the row object, they do not need to be collected bythe major garbage collector unless the row is deleted.

4.2.6 Memory Allocation

Row data is immutable in a multi-versioned database. Every row update creates a new version,requiring memory allocation, and putting pressure on the memory allocator. Our experiencewith Caracal is that generic memory allocators are a poor fit for our main-memory databasebecause they constantly generate page faults. The reason is that they aggressively returnmemory back to the operating system, e.g., by calling madvise(MADV_DONTNEED). This policyspecifically impacts Caracal’s epoch-based memory allocation that goes through cycles ofallocating and freeing memory.

As a result, Caracal implements its own memory allocator. On startup, the databaseis allocated a fixed, configurable amount of memory, and this memory is pre-faulted (viamlock()). Caracal manages memory in 2MB pages. For multi-core scaling, we assign per-core slabs for each object type, so allocation from slabs requires no synchronization. Each slabmanages at least 2MB of memory, and memory is allocated and freed from the global pageallocator in 2MB pages.

Page 70: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 61

4.2.7 NUMA Scaling

While Caracal does not partition the database or assume data locality, its internal architecturerequires locality for scaling and achieving high performance on NUMA systems. Several keydata structures in Caracal are per-core structures and require no locks, and they are allocatedfrom the local NUMA zone. These structures include the buffered version array (Section 4.2.2),transaction pieces and worker threads executing these pieces (Section 4.2.3).

4.3 Evaluation

We compare Caracal against three deterministic approaches: Granola [8], Bohm [17], andPWV [18]. These baselines all use partitioning to improve performance and to eliminatecontention. They are all designed for uniform, partitionable workloads and don’t tolerateskewed workloads well. Our evaluation shows that Caracal significantly outperforms all theseprevious deterministic databases when the workload is both contended and skewed.

We do not compare against Calvin [77] since its single-threaded initialization phase makesit non-scalable, and because Bohm already improves on Calvin.

We use the YCSB benchmark and a TPC-C like benchmark to evaluate all databases.For both benchmarks, we first evaluate how Caracal performs against partitioned systems foruncontended workloads. Then, we show results for contended workloads, and how Caracal’soptimizations benefit these workloads. All baselines run the same benchmarks.

We also compare Caracal with recent, non-deterministic, shared-memory databases thatprovide robust performance under skewed and contention workloads. They are ERMIA [34],FOEDUS [35] and STO [31] (OSTO/MSTO). This evaluation shows how determinism helpswith contention management.

4.3.1 Hardware and Software Platform

We evaluate Caracal’s single-node performance on an HP ProLiant DL560 Gen8 with fourXeon E5-4620 processors. Each processor has eight physical cores with 16MB last level cacheand one NUMA zone, for a total of four NUMA zones on the machine. Each NUMA zonehas eight 16GB DIMMs of DDR3 DRAM, for a total of 512GB DRAM. Each DIMM has tworanks and operates at 1333 MT/s.

For the distributed experiments in Section 4.3.10, we use 8 HP ProLiant DL160 Gen8machines connected using commodity 10Gb Ethernet. Each machine has two Xeon E5-2650processors with eight physical cores, 20MB last level cache and 32GB DRAM.

Page 71: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 62

All machines run CentOS 7.6 with 1062.el7 kernel (released on Aug 7, 2019). We compileCaracal, Granola, Bohm, and PWV using Clang 9 and ERMIA, FOEDUS, STO using GCC8.4, all with -O3 optimization.

4.3.2 Comparison Databases

We implement three deterministic databases, Granola, Bohm, and PWV, using our code-baseto make the comparison as fair as possible. We process batches of transactions in epochs forall implementations so that they all have bounded latencies for transactions, similar to Caracal.

Bohm partitions the initialization phase, so our implementation does not acquire locks onrows during initialization. We also include several optimizations compared to original Bohm,such as a version array and binary search to speed up searching versions.

Granola is single-versioned. We implement it by eliminating the initialization phase soeach epoch contains just the execution phase. Granola does not perform any concurrency controland so each partition executes transaction pieces in serial order. Our Granola implementationsupports dependent transactions via PWV-style rendezvous points [18]. However, dependenttransactions may stall the serial execution.

PWV is single-versioned. It uses a per-partition dependency graph to track conflictswithin the partition. If a piece depends on a piece that hasn’t finished execution on anotherpartition, the dependency graph allows PWV to schedule other non-conflicting pieces (unlikeour Granola implementation). Our implementation constructs the dependency graph using rowor table access information during the initialization phase. Then, we schedule pieces based onthe dependency graph in the execution phase and support early write visibility. There is nosynchronization required between the initialization and execution phases since they are bothpartitioned.

To compare with non-deterministic approaches, we use the OSTO and MSTO implemen-tations from Huang et al. [31], which are built on STOv2 (an improved version of the STOframework [29]).

OSTO is an OCC-based, single-versioning database. It does not provide opacity, viamechanisms such as locking or snapshots, and so transactions may read inconsistent data(although such transactions will eventually abort). OSTO uses a backoff retry scheme toregulate contention.

MSTO is an OCC-based, multi-versioned database based on the design of Cicada [40].Although both MSTO and Cicada provide snapshots for read-only transactions, they do notprovide opacity. In our evaluation, we show the performance of OSTO and MSTO with dashedlines to indicate that they do not provide opacity. While the lack of opacity helps improve

Page 72: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 63

performance under contention, it imposes security risks and can cause application-level bugs,as described in Section 2.2.3.

ERMIA is a multi-versioning database that provides serializability and opacity by using theSerial Safety Net (SSN) protocol instead of OCC. SSN allows certain transactions to commiteven when they conflict under OCC. We use the latest version of ERMIA that incorporatesCicada’s back-off retry scheme, a new hashtable index [41], and provides fast abort by avoidingthe use of C++ exceptions [31].

FOEDUS is a single-versioning database that includes backends for both OCC (withoutopacity) and 2PL (with opacity). It supports snapshots for read-only transactions but read-writetransactions operate using single-versioning. We use the 2PL backend of FOEDUS to comparewith a system that provides opacity since we are comparing with OSTO, a non-opaque OCCapproach (OSTO) that is faster than FOEDUS’s OCC.

We port our benchmarks to these databases. All experiments with multi-versioning areperformed with garbage collection. We discuss the throughput at maximum scale (32 cores)when comparing systems unless otherwise stated. Our code base requires specifying the numberof transactions (rather than runtime), and we run all deterministic baselines with ∼ 5,000,000transactions. Each epoch lasts 40-60 ms and so the maximum transaction latency is roughly100ms. For STO, ERMIA, FOEDUS, we run the experiment for 10 seconds.

4.3.3 YCSB

We use a transactional YCSB microbenchmark. The original YCSB is a key-value storebenchmark [7] that does not specify transactions. In our evaluation, we group 10 unique keyaccesses into a single transaction. Each key access is either a read or an update operation. Weuse a total of 10,000,000 keys, and each row is 1000 bytes. When the key access is an updateoperation, the transaction will update the first 100 bytes of the total 1000 bytes in the row,representing an update to a subset of the database columns in a row. Otherwise, the transactionwill read the entire 1000 bytes. There are no dataflow dependencies within a transaction. ForCaracal, we use our split-on-demand optimization for all update operations in YCSB. Granola,Bohm (only the initialization phase), and PWV evenly partition the keys among all of the givencores.

We compare Caracal’s performance with other databases under low contention by usinguniform distribution to choose keys in a transaction and use a read-write ratio of 8:2. Figure 4.4ashows that Caracal achieves 2.15 MTxn/s under low contention. Bohm, Granola, and PWVachieve 1.31 MTxn/s, 934 KTxn/s, and 722KTxn/s. Caracal outperforms partitioning becauseCaracal does not need to split transactions into pieces, while the other three databases need

Page 73: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 64

0 8 16 24 32Cores

0.0

500k

1.0M

1.5M

2.0M

2.5M

Throughput

Caracal Bohm

PWV Granola

(a) YCSB Uniform

0 8 16 24 32Cores

0.0

500k

1.0M

1.5M

2.0M

2.5M

Throughput

Caracal Bohm

PWV Granola

(b) YCSB Skew, s = 0.9

Figure 4.4: YCSB Performance (8:2 read/write ratio)

to split each transaction into pieces, based on the key accesses. The mean time to createpieces and schedule them is roughly 1.35 µs for Caracal, 11.1 µs for Bohm, 23 µs for Granola,and 28 µs for PWV. The rest of the transaction execution time lies between 13∼17 µs for alldatabases, showing that partitioning has the most significant impact on performance in thisworkload. Bohm outperforms Granola because Bohm does not need to split pieces duringexecution. Granola outperforms PWV because PWV constructs the dependency graph, butYCSB does not have any rendezvous points, and so the dependency graph provides no benefitsin this workload.

We evaluate performance under skew by using a Zipfian distribution with s = 0.9 tochoose the keys in a transaction. Figure 4.4b shows that Caracal’s shared-memory design is notaffected by skew and it achieves 2.06 MTxn/s throughput. The other deterministic approachesuse partitioning to eliminate contention but suffer under this skewed workload. Bohm usesshared-memory execution but its partitioned initialization becomes a bottleneck. Bohm takes544 ms and Caracal takes 19 ms per epoch during initialization, while both their executiontimes are roughly 34 ms.

To evaluate Caracal’s performance under higher contention levels, we create a highlycontended YCSB workload. First, we set the read-write ratio to 0:10, so that all 10 key accessesare update operations. Second, while 3 of the rows are chosen from the entire database, theremaining 7 rows are chosen from a small set of 77 rows that are spaced 217 apart in the10M keyspace. The 3 keys and the 7 keys are chosen using either a uniform or a skeweddistribution from their respective set. These contended workloads trigger Caracal’s contentionoptimizations.

Page 74: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 65

0 8 16 24 32Cores

0.0

200k

400k

600k

800k

1.0M

Throughput

Caracal Bohm

PWV Granola

(a) YCSB Uniform Contention

0 8 16 24 32Cores

0.0

100k

200k

300k

400k

500k

Throughput

Caracal Bohm

PWV Granola

(b) YCSB Contention+Skew s = 0.9

Figure 4.5: YCSB Contention Performance (7/10 contended keys)

Figure 4.5a shows performance under contention for the uniform distribution. Bohm doesnot scale because its execution phase is contended. It spends 34 ms for initialization and 115 msfor execution, while Caracal spends 13 ms for initialization and 41 ms for execution each epoch.This uniformly contended workload, with no data dependencies across keys, is the best case fora partitioned database like Granola (868 KTxn/s), which outperforms Caracal (822 KTxn/s) by5.5%. PWV’s dependency graph construction and scheduling impose significant overheads forthis workload.

Figure 4.5b shows the results when the contended keys are chosen using the Zipfian distribu-tion. With this challenging skewed and contended workload, the performance of the partitioneddatabases is lower than the uniform contended case, but Caracal’s performance still scales. With32 cores, Caracal (424 KTxn/s) achieves ∼2.5× the performance to Granola (173 KTxn/s).Bohm spends 162 ms for initialization and 75 ms for execution each epoch. In contrast, Caracalspends 20 ms for initialization and 34 ms for execution each epoch. For this workload, oursplit-on-demand optimization must handle one heavily contended key and it updates this keyusing multiple cores, which achieves higher throughput than using a single core due to thescheduling overhead of pieces.

4.3.4 TPC-C Like

Next, we use the TPC-C OLTP benchmark to evaluate Caracal. Unlike YCSB, TPC-C specifiestransaction behaviour in detail and simulates the activities of a wholesale supplier. The supplierstocks items in multiple warehouses. Customers place new orders, pay for the orders, and then

Page 75: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 66

items are eventually delivered to the customers. TPC-C models this process using 3 read-writetransactions (NewOrder – 45%, Payment – 43%, Delivery – 4%) and 2 read-only transactions(OrderStatus – 4%, StockLevel – 4%). Although Delivery is only 4% of the transaction mix,it has a significant impact on performance because it is heavy-weight compared to NewOrderand Payment, and it can create conflicts with these transaction types. By default, the numberof warehouses in TPC-C is the same as the number of cores in our experiments.

Our TPC-C like benchmark is based on TPC-C. We make a few modifications to the TPC-Cbenchmark because Caracal requires the write-sets of transactions. The NewOrder transactioninserts new order items with an order id, and in TPC-C, this id is generated by reading andincrementing a field in the District table. This prevents us from inferring the write-set of theNewOrder transaction before execution. We resolve this this issue by using auto-increment togenerate the order id. When the NewOrder transaction inserts new order items, it acquires andincrements an atomic counter inside the database. Silo refers this optimization as “FastIds” [79].We also remove the customer name lookup feature in the Payment transaction. This lookupprovides a customer ID by scanning a read-only index when the customer only provides a lastname for payment. We remove the read-only index and limit the Payment transaction onlyto customer ID. For the two read-only transactions in TPC-C, we perform index lookup inthe OrderLine table and its secondary index in the initialization phase, which helps reducecontention. Finally, we ignore the scaling requirement in TPC-C, similar to all the baselines inour evaluation, as that imposes a throughput limit at each warehouse.1

We can use reconnaissance transaction, as described in Section 4.1.6, to run full TPC-Cwithout our modifications. For the Payment transaction, the reconnaissance transaction wouldneed to read the CustomerName index. Since this index is read-only, the reconnaissance trans-action would always validate successfully, and the rest of transaction can then be performedusing Caracal’s deterministic protocol. However, for the NewOrder transaction, the reconnais-sance transaction needs to read the next_o_id field, which is updated frequently by NewOrdertransactions. In this case, validations will fail frequently and few NewOrder transactions willcommit, as explained in Section 4.1.6.

Granola, Bohm and PWV require a partitioning plan to scale. By default, TPC-C is easy topartition by warehouse. Caracal does not need partitioning to scale TPC-C. However, Caracalpins transactions to cores based on their home warehouses for better performance.

Default TPC-C has low contention. For higher contention, we use a single warehouse TPC-C workload [31, 18], which causes contention on the Warehouse, District, Customer and Stocktables. For this workload, the PWV paper [18] proposes partitioning all tables by district_id,except the StockLevel and the Warehouse tables. The latter two tables are assigned their own

1See Section 4.1.3 in the TPC-C specification.

Page 76: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 67

0 8 16 24 32Cores

0.0

300k

600k

900k

1.2M

1.5MThroughput

Caracal Bohm

PWV Granola

(a) TPC-C

0 8 16 24 32Cores

0.0

120k

240k

360k

480k

600k

Throughput

Caracal Bohm

PWV Granola

(b) TPC-C Single Warehouse

Figure 4.6: TPC-C and TPC-C Single Warehouse Performance

cores. Since there are 10 districts, the District, NewOrder, Customer, Orders and OrderLinetables each have 10 partitions. These 50 partitions are assigned to the rest of the cores inround-robin order. Since we use auto-increment for the order id, we do not have a rendezvouspoint in the NewOrder transaction, but we do have rendezvous points in the Delivery and theStockLevel transactions.

In Caracal, the NewOrder, Payment and Delivery transactions can cause contention in theStock, Warehouse, District and Customer tables in the single warehouse TPC-C workload. Wesimply update these tables using our UpdateForKey API. The Delivery transaction contains adata dependency. It scans the OrderLine table for items in an order to calculate the total andto mark these items as delivered. Then, it uses the calculated total to increment the customerbalance. To handle contended updates to the Customer table, we merge the OrderLine rangeupdate and the Customer update in the same callback function.

Our PWV implementation uses coarse-grained dependencies [18], in which accesses tothe Stock table are treated as a single edge in the graph, instead of an edge per row. Thiscoarse-grained tracking reduces the graph size and the scheduling overheads significantly. Also,it avoids the need to declare the Stock table read-set in PWV, which is not feasible for theStockLevel transaction.

Figure 4.6a shows Caracal’s throughput on default TPC-C compared to the other databases.Granola outperforms Caracal by 38% for two reasons. First, Granola is single-versioned and soit incurs fewer cache misses compared to multi-versioning schemes such as Caracal and Bohm.Second, default TPC-C is easily partitionable and its transactions can be run as independenttransactions, making it well suited for Granola [8]. Finally, most TPC-C transactions are

Page 77: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 68

0 8 16 24 32Cores

0.0

800k

1.6M

2.4M

3.2M

4.0M

Throughput

Caracal MSTO

OSTO Ermia

Foedus(2PL)

(a) YCSB Uniform

0 8 16 24 32Cores

0.0

500k

1.0M

1.5M

2.0M

2.5M

Throughput

Caracal MSTO

OSTO Ermia

Foedus(2PL)

(b) YCSB Skew, s = 0.9

Figure 4.7: YCSB Performance (8:2 read/write ratio)

processed in a single piece, so unlike with YCSB (Figure 4.4a), there is minimal overhead ofpartitioning. Thus, PWV and Bohm also benefit from partitioning. PWV performs better thanBohm since it partitions transaction execution but its dependency tracking and scheduling addoverheads compared to Granola.

Figure 4.6b shows the result for single warehouse TPC-C. Granola, PWV and Bohm do notscale for this workload, while Caracal shows much better scaling. Caracal outperforms Granolaby 89% at 32 cores. Due to skew, Bohm takes 92 ms while Caracal takes 12 ms for initialization,while both take roughly 18 ms for execution per epoch. With partitioned execution in Granolaand PWV, the Stock table partition becomes the bottleneck at 8 cores. The Warehouse tablepartition has 50% utilization, while the rest of the cores are 60-70% utilized. Compared toGranola, PWV’s dependency-based scheduling reduces wait times due to a rendezvous pointon the Customer table, but not the Stock table, which has coarse-grained dependencies, andso accesses to the table cannot be scheduled out-of-order.

4.3.5 Comparing with Non-Deterministic Databases

In this section, we evaluate Caracal with non-deterministic shared-memory databases. We usethe same YCSB and TPC-C workloads as the previous evaluation.

Figure 4.7a shows performance for the uniform, non-contended YCSB workload. Caracalperforms much better than ERMIA and 2PL but worse than OSTO. Caracal has roughly the samethroughput comparing to MSTO. OSTO, which is single versioned, achieves almost 4 MTxn/s,which is much faster than any multi-versioning scheme. Among databases that provide opacity,

Page 78: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 69

0 8 16 24 32Cores

0.0

200k

400k

600k

800k

1.0M

Throughput

Caracal MSTO

OSTO Ermia

Foedus(2PL)

(a) YCSB Uniform Contention

0 8 16 24 32Cores

0.0

100k

200k

300k

400k

500k

Throughput

Caracal MSTO

OSTO Ermia

Foedus(2PL)

(b) YCSB Contention+Skew s = 0.9

Figure 4.8: YCSB Contention Performance (7/10 contended keys)

Caracal has the best performance.Figure 4.7b shows performance for the skewed, non-contended YCSB workload. This

workload has no impact on Caracal (compared to Figure 4.7a). Although STO is shared-memory, we observe that both OSTO and MSTO suffer under and their throughput remains flatwith additional processors. ERMIA and FOEDUS are less impacted by skew but their overallthroughput is significantly lower than Caracal’s throughput.

Figure 4.8a shows performance for the uniform, contended workload. Caracal has thebest performance on 32 cores. ERMIA’s unique concurrency control mechanism allows sometransactions to commit even when there are conflicts, making it the only other database thatcan scale up in this workload. OSTO and MSTO have the best performance for uniformand uncontended workloads but they are unable to scale for this workload. 2PL is the worstperforming because this workload contends heavily and 2PL performance degrades when morecores are added.

Figure 4.8b shows the performance of the databases when the contended keys are chosenfrom the Zipfian distribution. With this challenging skewed and contended workload, Caracal’sperformance can still scale slowly, but compared to the uniform contended case, Caracal’sperformance is impacted. For this workload, our split-on-demand optimization must handleone heavily contended key and it updates this key using multiple cores for load balancing. Noother database can scale at all for this workload.

Under TPC-C, Figure 4.9a shows Caracal’s throughput compared to the other databases.Caracal is slightly worse than FOEDUS (12% lower) and MSTO (23% lower). Our investigation

Page 79: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 70

0 8 16 24 32Cores

0.0

500k

1.0M

1.5M

2.0M

2.5M

Throughput

Caracal MSTO

OSTO Ermia

Foedus(2PL)

(a) TPC-C

0 8 16 24 32Cores

0.0

130k

250k

380k

500k

630k

Throughput

Caracal MSTO

OSTO Ermia

Foedus(2PL)

(b) TPC-C Single Warehouse

Figure 4.9: TPC-C and TPC-C Single Warehouse Performance

suggests the lower performance is due to two reasons: 1. Caracal converts transaction code intoits pieces framework, which enables splitting distributed transactions across nodes; 2. Caracal’smulti-versioning scheme is less optimized. In contrast, MSTO more cache-friendly because itreduces its memory footprint with frequent garbage collection, and FOEDUS is single-versiondesign. OSTO is still the best performing database for TPC-C, due to its single-version design.OSTO and MSTO’s lack of opacity can also contribute to performance gain because it allowsmore transactions to commit under conflict. Overall, Caracal remains comparable to the otherdatabases that provide opacity for uncontended workloads like TPC-C.

Figure 4.9b shows performance for a single warehouse TPC-C. In this case, ERMIA andFOEDUS’s performance drops significantly, suggesting that this workload is highly contended.OSTO provides good overall performance but does not scale up. This is because there are8% scan-heavy read-only transactions in TPC-C, and when we use a single warehouse, theseread-only transactions will affect other read-write transactions in a single-version design. Thisis why multi-version databases, such as MSTO and Caracal, perform much better than OSTO.Caracal outperforms MSTO by 46% due to our optimizations.

Overall, Caracal’s ability for managing contention and skew is more effective than MSTOand OSTO. Also, MSTO and OSTO sacrifice opacity to achieve better performance undercontention, while Caracal provides opacity by design.

Page 80: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 71

4.3.6 Impact of Optimizations

We have demonstrated Caracal’s ability to manage contention. In this section, we show theimpact of Caracal’s contention optimizations by comparing Caracal’s performance with threevariations: 1) Caracal with both optimizations disabled, 2) Caracal with only batch append,and 3) Caracal with best-effort partitioning. The last variant enables Caracal’s batch appendand splits transactions based on the number of versions of the row. However, rather thanprobabilistically placing pieces onto cores, this variant performs best-effort partitioning usingan offline bin packing algorithm; the rows are partitioned and the pieces updating a row areplaced on one core exclusively. Note that we do not show the cost of offline bin packing, whichis roughly 1-2 seconds for a 50 ms epoch.

For the uniformly contended YCSB workload, Figure 4.10a shows that Caracal with no op-timizations experiences contention in both the initialization and execution phases; batch appendimproves throughput by 33% and when combined with split-on-demand, Caracal achieves 3×performance compared to no optimization. At 32 cores, with no optimization, initializationtakes 60 ms and execution takes 115 ms per epoch. Our batch append reduces initialization to13 ms and split-on-demand reduces the execution time to 41 ms. Our probabilistic placementof pieces has no visible overhead compared to using offline bin-packing partitioning (which isinfeasible to use in a high-performance database).

For the skewed contended YCSB workload, Figure 4.10b shows that batch append almostdoubles throughput (248 KTxn/s) compared to no optimizations (141 KTxn/s). Among all thecontended rows, the distribution of the number of versions is also highly skewed. To maintainload balance, Caracal assigns a few highly contended rows to multiple cores. In this case,bin-packing performs 9.6% worse than Caracal because it is hard to partition the skewed keyswhile maintaining load balance.

Figure 4.10c shows the impact of our contention optimizations for single warehouse TPC-C.In this case, batch append provides good scalability and reduces the initialization phase from32 ms to 13 ms per epoch at 32 cores. To understand why this optimization works, we measuredlock wait times. Without batch append, the average and the 99.9 percentile lock wait times are1 µs and 31.5 µs. With batch append, the corresponding numbers are 40 ns and 1.5 us. Whilethe median number of element move operations in insertion sort is 0 both without or with batchappend, surprisingly, batch append reduces the average number of element move operations ininsertion sort from 13 to 2. Without batch append, the long tail in lock wait times blocks threadprogress, causing more insertions to happen out of order.

In Figure 4.10c, the split-on-demand optimization adds some overhead due to splittingpieces while its contention reduction benefits are small, and so it increases the executionphase from 17 ms to 18 ms per epoch. This suggests that the workload stresses index lookup

Page 81: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 72

0 8 16 24 32Cores

0.0

200k

400k

600k

800k

1.0M

Throughput

Caracal No Optimization

Batch Append Bin Packing

(a) YCSB Uniform Contention

0 8 16 24 32Cores

0.0

100k

200k

300k

400k

500k

Throughput

Caracal No Optimization

Batch Append Bin Packing

(b) YCSB Contention+Skew

0 8 16 24 32Cores

0.0

130k

250k

380k

500k

630k

Throughput

Caracal No Optimization

Batch Append Bin Packing

(c) Single Warehouse TPC-C

0 8 16 24 32Cores

0.0

130k

250k

380k

500k

630kThroughput

Caracal No Optimization

Batch Append Bin Packing

(d) Single Warehouse TPC-C (index lookup during exe-cution)

Figure 4.10: Impact of optimizations on different workloads

and concurrency control during initialization, while transaction conflicts do not cause muchcontention during the execution phase. We illustrate this by performing index lookup for thetwo read-only transactions in the execution phase instead, which increases contention in theexecution phase. In this case, Figure 4.10d shows that the batch append optimization improvesperformance by 20% and reduces initialization time from 32 ms to 11 ms. The split-on-demandoptimization improves performance by another 20% and reduces the execution time from 49 msto 32 ms.

Page 82: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 73

1 8 64 512 4K 32K 256KThreshold

0.0

200k

400k

600k

800k

1.0MThroughput

YCSB+Contention

(a) YCSB Uniform Contention

1 8 64 512 4K 32K 256KThreshold

0.0

100k

200k

300k

400k

500k

Throughput

YCSB+Contention+Skew

(b) YCSB Contention+Skew

1 8 64 512Threshold

0.0

130k

250k

380k

500k

630k

Throughput

TPC-C Single Warehouse

(c) Single Warehouse TPC-C

1 8 64 512Threshold

0.0

130k

250k

380k

500k

630k

Throughput

TPC-C Single Warehouse

(d) Single Warehouse TPC-C (index lookup during ex-ecution)

Figure 4.11: Tuning Thresholds for split-on-demand

4.3.7 Tuning

In this section, we show the performance impact of tuning the split-on-demand threshold.Figure 4.11 shows the throughput for 4 workloads for the different thresholds values shown onthe X-axis.

For YCSB Contention (Figure 4.11a) and YCSB Contention+Skew (Figure 4.11b), there isa large range for the optimal threshold. Starting from 1 version to 32 versions, Caracal alwaysmaintains optimal performance. For thresholds larger than 8K versions, performance decreasesbecause split-on-demand is deactivated.

With TPC-C Single Warehouse (Figure 4.11c), the range for optimal thresholds is relativelynarrow: from 2 to 8 versions. For thresholds larger than 16 versions, Caracal splits fewer updateoperations and the performance drops by up to 7.5%.

Page 83: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 74

If we instead perform the index search for read-only transactions in the execution phase(Figure 4.11d), the range for optimal thresholds shrinks to 2 to 4 versions. For thresholds largerthan 8 versions, Caracal’s performance drops by 19% in the worst case.

We conclude that it is best to set a relatively low threshold when the database administratorexpects contention. In that case, the worst performance penalty is merely 10% ∼ 20%.

4.3.8 Garbage Collection

Caracal uses a multi-versioned design and employs a generational garbage collector to freeunused versions. Since Caracal prefers collecting using the fast minor garbage collector, themajor garbage collector only collects unused versions when the row has not been updated for 8epochs by default, which temporarily leaves some unfreed garbage in our system.

To evaluate the memory overhead of our garbage collection design, we plot Caracal’smemory usage over time in Figure 4.12. For these experiments, we increase the number ofepochs to 200, rather than the default 50 epochs used in the previous experiments. Each epochcontains 100,000 transactions, which is significantly larger than our previous setup. The x-axisrepresents time in terms of the epoch in which the database running. The red line showsCaracal’s memory usage, measured after a major garbage collection in each epoch. The blueline shows Caracal’s optimal memory usage after each epoch (the major garbage collectoris modified to collect all garbage), and the green line shows memory consumption when wedisable the major garbage collection completely (minor collection only).

Figure 4.12 shows that the memory overhead of Caracal’s multi-versioning is relativelysmall when major GC is enabled. At the beginning of a run, as rows are accessed, Caracalextends the version arrays, but after that both the optimal and Caracal’s memory consumptionincrease slowly. Even with the major garbage collection disabled, Caracal is still able to stabilizememory usage over time. These results validate our hypothesis that most rows will be updatedagain in the next few epochs, and so running with the major garbage collection disabled will notrapidly leak memory. One could run the major garbage collector only under memory pressureif the environment can tolerate higher memory overheads.

We also perform the same evaluation for TPC-C (not shown). In this case, both minor-onlyand minor+major GC achieve close to optimal memory consumption. This suggests that withTPC-C, all garbage can be collected by the minor garbage collector.

4.3.9 Latency

Caracal uses epoch-based concurrency control and so the commit latency (the time that atransaction stays inside the database) is the execution time of the entire epoch. In this section,

Page 84: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 75

0 40 80 120 160Epochs

8.00G

16.00G

24.00G

32.00GMemoryUsage

Minor+Major GC No Major GCOptimal

(a) YCSB

0 40 80 120 160Epochs

8.00G

16.00G

24.00G

32.00G

MemoryUsage

Minor+Major GC No Major GCOptimal

(b) YCSB Skew (s = 0.9 8:2 r/w ratio)

0 40 80 120 160Epochs

8.00G

16.00G

24.00G

32.00G

MemoryUsage

Minor+Major GC No Major GCOptimal

(c) YCSB Contention

0 40 80 120 160Epochs

8.00G

16.00G

24.00G

32.00GMemoryUsage

Minor+Major GC No Major GCOptimal

(d) YCSB Contention+Skew

Figure 4.12: Memory Usage

we evaluate the latency-throughput trade-off in Caracal by varying the epoch size.Figure 4.13 shows Caracal throughput versus latency graph. For this experiment, we vary

the epoch size from 5,000 transactions to 100,000 transactions to derive the correspondinglatency numbers shown on the X-axis.

For sufficiently large epochs, changing the epoch size has a minor effect on throughput.Specifically, when the latency increases above 50 ms, the throughput increases by at most 6%.When the epoch size is decreased so that the latency is below 50ms, Caracal’s throughputbegins to drop. Uncontended workloads like YCSB, YCSB Skew and TPC-C are affected themost. With the smallest epoch size, the latency drops to 8ms but the throughput drops by 30%to 40%.

Contended workloads are less sensitive to smaller epochs. However, we see that YCSB

Page 85: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 76

0 10 20 30 40 50 60 70 80 90Latency (ms)

0.0

500k

1.0M

1.5M

2.0M

2.5MThroughput

YCSB YCSB+Contention

YCSB+Skew YCSB+Contention+Skew

(a) YCSB

0 40 80 120 160 200Latency (ms)

0.0

200k

400k

600k

800k

1.0M

Throughput

TPC-C TPC-C Single Warehouse

(b) TPC-C

Figure 4.13: Latency and Throughput

Contention has a sharp throughput drop when the latency is reduced to less than 50ms. Ourinvestigation suggests the major garbage collector’s overhead increases for YCSB Contentionas we shrink the epoch size. This may indicate that we are running the major garbage collectiontoo frequently.

4.3.10 Scaling Out

Caracal leverages determinism to scale out to multiple nodes easily. We evaluate Caracal’sthroughput of a cluster up to 8 machines running TPC-C. In this experiment, we use our defaultTPC-C configuration with one warehouse per core. Under this hardware configuration, OSTOand MSTO can reach 1.40MTxn/s and 750KTxn/s in comparison.

1 2 3 4 5 6 7 8Number of Machines

0.00M

0.80M

1.60M

2.40M

3.20M

4.00M

Throughput

Caracal

Figure 4.14: Distributed TPC-C Throughput

Figure 4.14 shows that Caracal achieves 610KTxn/s using a single machine. As we add more

Page 86: Replication and Workload Management for In-Memory OLTP

Chapter 4. Workload Management Under Deterministic Concurrency Control 77

machines, Caracal’s throughput scales linearly, with no drop-off. With 2 nodes, Caracal achieves900KTxn/s, and with all 8 nodes, Caracal reaches 3.35MTxn/s. As a rough comparison with astate-of-art distributed database, FaRMv2 [68] reports 12MTxn/s for TPC-C using 90 machinesand an expensive, high-speed Infiniband network. Caracal achieves a quarter of FaRMv2’sperformance with just 8 (slightly slower) machines using a commodity 10Gb Ethernet network.

4.4 Summary

We have described the design of Caracal, a high-performance, in-memory database designedfor handling skewed and contended workloads. Caracal uses a shared-memory, deterministicconcurrency control scheme that enables reordering and parallelizing transaction executionwith low overhead. We present two optimizations that allow scaling under highly contendedworkloads. The batch append optimization groups concurrency control operations duringthe epoch initialization phase because the operations are commutative. The split-on-demandoptimization splits contending updates, which reduces the serial component of contendedexecution, and performs these updates on fewer cores, thus reducing contention. Compared toprevious deterministic schemes, Caracal scales well under contention, while providing strongserializability and opacity guarantees.

Page 87: Replication and Workload Management for In-Memory OLTP

Chapter 5

Related Work

In this chapter, we provide context for our work by comparing it with related work. In Sec-tion 5.1, we first describe various deterministic concurrency control schemes. Then, Section 5.2describes related work on database replication. Finally, Section 5.3 describes various workloadmanagement methods in databases.

5.1 Deterministic Concurrency Control

Table 5.1 compares Caracal with existing deterministic concurrency control schemes.

Requires Ver-sion Initialization Execution Scaling Challenge

Calvin Read-set,Write-set Single Single

threaded Shared Single threaded initialization.Contention during execution.

Bohm Read-set,Write-set Multi Partitioned Shared Skewed accesses.

Contention during execution.

Granola Independenttransactions Single N/A Partitioned Skewed accesses.

PWV Read-set,Write-set Single Partitioned Partitioned Skewed accesses.

PWV-Multi Read-set,Write-set Multi Partitioned Partitioned Skewed accesses.

Caracal Write-set Multi Shared Shared Contention during initialization andexecution. Addressed by Caracal.

Table 5.1: Comparison with Existing Deterministic SchemesCalvin is an early, single-version, shared-memory deterministic database [77]. It partitions

transactions into epochs and executes them an epoch at a time. For each epoch, it requiresthe read-set and write-set (the rows that will be read or written) of transactions. During theinitialization phase, Calvin uses the predefined serial order and the read-set and write-set toestablish a lock order for each row that will be accessed by the transactions in the epoch. For

78

Page 88: Replication and Workload Management for In-Memory OLTP

Chapter 5. Related Work 79

example, if T1 and T2 both update key k, then deterministic locking for key k will ensure thatT2 acquires the lock after T1 so rows are accessed according to the predefined serial order.Calvin uses a centralized lock manager during initialization, which can become a performancebottleneck on modern multi-core hardware, and its shared execution can lead to contention.

Bohm [17] improves upon Calvin by partitioning and parallelizing the initialization phaseand by using multi-versioning. Bohm needs the write-set for initializing the row versionsin the initialization phase. However, its multi-version implementation uses a linked list im-plementation and has to potentially traverse a long list of versions during execution, whichhas a significant impact on performance. Bohm uses the read-set to speed up searching therow versions. Bohm partitions the keys across cores during initialization thus eliminatingsynchronization costs when initializing row versions. However, this partitioning can lead topoor performance under skewed workloads. Unlike initialization, Bohm uses a shared-memorymodel during execution, which can lead to scaling bottlenecks under contention.

While Calvin and Bohm execute transactions using a shared-memory model, Granola [8]uses a shared-nothing architecture for its single-versioned, partitioned database. Granolasupports two modes of operation: timestamp and locking. The locking mode uses a non-deterministic two-phase commit protocol, while the timestamp mode supports a class of trans-actions called independent transactions using deterministic concurrency control [8, 39]. Anindependent transaction consists of pieces that do not have any cross-partition data-flow depen-dencies, i.e., the inputs of a piece do not depend on the execution of pieces in other partitions. Asa result, the pieces of a transaction can be run on the different partitions independently. Granolaexecutes these pieces serially on different cores in the predefined serial order. Since the piecesexecute in the predefined serial order, no row-level synchronization is required. Thus Granoladoes not require an initialization phase, and so it does not need to batch transactions in epochs.However, Granola’s partitioned execution can lead to significant performance degradation withskewed workloads.

The piece-wise visibility (PWV) work [18] is a single-version, partitioned database, similarto Granola. However, it adds support for intra-transaction data-flow dependencies via ren-dezvous points. These dependencies across pieces running on different cores limit parallelismsince a piece needs to wait until its dependencies are satisfied before it can start execution. Toreduce the impact of these cross-partition dependencies, PWV builds a separate dependencygraph on each partition during the initialization phase. This graph tracks all conflicts betweenpieces in the partition, allowing non-conflicting pieces to execute in any order. PWV requiresthe read-set and the write-set for creating the dependency graphs. Similar to Granola, PWV’spartitioned execution (and initialization) can lead to performance degradation with skewedworkloads.

Page 89: Replication and Workload Management for In-Memory OLTP

Chapter 5. Related Work 80

As mentioned in the PWV paper, PWV can also be implemented using multi-versioning,which helps ensure that reads never block writes [18]. We implemented PWV-Multi usingBohm’s multi-versioned design.

The PWV work observes that transactions abort under a restricted set of conditions withdeterministic concurrency control. In particular, a transaction can commit right after alltransaction logic-related abort code has finished executing, since the rest of the transaction isexecuted deterministically and will not abort. Thus transactions can make their writes visibleafter this commit point but before the transaction finishes executing. This early write visibilityimproves the performance of the PWV concurrency control protocol.

Caracal’s deterministic concurrency control builds on Bohm’s multi-version concurrencycontrol and provides PWV’s early-write visibility. Caracal’s multi-version design is similarto Bohm’s placeholder approach for tracking row versions [17]. However, Caracal’s multi-versioning implementation is more efficient than Bohm’s and thus Caracal only requires thewrite-set for initialization. This is especially beneficial for our replay-based replication schemesince we only need to send the write-set to the backup. Any scheme that requires the read-setwould need to send much more data since the read-set is generally much larger than the write-setfor many transactional workloads.

Caracal runs the initialization and execution phases using a shared-memory model. Thisdesign avoids the drawbacks of partitioning. Any partitioning scheme requires applicationdevelopers to partition their transactions based on the data partitioning scheme. Developersneed to partition their transactions into pieces carefully to reduce multi-partition transactionsand cross-partition dependencies across transaction pieces, both of which reduce performance.Furthermore, skewed workloads can lead to load imbalance across partitions. Long-termload imbalance requires repartitioning the database, which is expensive and potentially requiresdeveloper involvement to rewrite the transaction code. In comparison, load balancing in Caracalcan be achieved by simply scheduling transactions on cores so that all cores remain busy.

The main challenge in Caracal is that shared memory execution can lead to degradedperformance under contention. This is an issue for all schemes shown in Table 5.1 that useshared initialization or execution. Caracal’s batch append and split-on-demand optimizationsare designed to enable performance scaling even under highly contended workloads and shouldbe applicable to other shared memory deterministic schemes.

5.2 Replication

We have demonstrated that deterministic concurrency control can be used to provide scalabledatabase replication. In this section, we compare other logging and replication approaches for

Page 90: Replication and Workload Management for In-Memory OLTP

Chapter 5. Related Work 81

in-memory databases.

5.2.1 Log Shipping

Primary-backup replication based on traditional log shipping is easy to implement and com-monly available in many traditional databases such as SQL Server [45], DB2 [32], MySQL [56],and PostgreSQL [61]. However, recent, high-performance in-memory database designs raisechallenges for traditional log shipping. On the backup database, most log shipping imple-mentations replay the log serially. While this log replay can be executed relatively efficiently,Hong et al. suggest that serially replaying the recovery log on the backup database can becomea bottleneck with the increasing number of cores on the primary database [30]. To enableconcurrent log replay, their KuaFu system constructs a dependency graph on the backup, basedon tracking write-write dependencies in the log. Then it uses topological ordering to con-currently apply the logs of non-conflicting transactions. However, KuaFu [30]’s dependencytracking and transaction ordering still need to be performed serially, and so it is unclear whetherthe scheme will scale for fast in-memory databases. Even if it scales, another issue with logshipping is the amount of network traffic that is needed to ship the logs to the backup. Whilenetwork bandwidth is not a bottleneck within modern data centers, it becomes a bottleneck forgeographically separated data centers.

Write-behind logging [3] is an efficient logging and recovery scheme for in-memory multi-version databases that use NVM for durable storage. This logging scheme logs the timestampranges of uncommitted transactions, requiring minimal logging and it provides very fast recoverybecause it reuses the multi-versioned data in the database. However, since the log only containstimestamps and not the data, replication still requires traditional log shipping.

5.2.2 Logging and Recovery in Shared-Nothing Architectures

In this section, we describe several recent logging and recovery approaches for shared-nothingarchitectures and then discuss their applicability for replication.

Malviya et al. show that logging in high-throughput databases imposes significant process-ing overheads [42]. They propose recovering databases using command logging and show thattheir approach improves database performance by 50%. Command logging logs the transactioninputs and replays the transactions deterministically for recovery. In their non-deterministic,shared-nothing VoltDB database, command logs of different nodes are merged at the masternode during recovery. A command logging based approach would send the minimal amountof data to the backup for replication. However, aside from load imbalance under skewed work-loads, this approach also suffers when the workload contains many cross partition transactions.

Page 91: Replication and Workload Management for In-Memory OLTP

Chapter 5. Related Work 82

This is because transactions are processed in serial order based on their timestamps to guar-antee correctness during recovery and significant synchronization is needed for cross-partitiontransactions. As a result, a pure command-logging based approach on the backup will not scalewith primary performance, as shown by the adaptive logging work [88].

Adaptive Logging [88] improves recovery for command logging by generating a graphbased on data dependencies between transactions. Their focus is on single node failure ina distributed database. When a node fails, all the transactions on the failed node and anytransactions on which it depends on other nodes are replayed concurrently, in topological order,similar to Kuafu [30]. Adaptive logging reduces recovery time further by logging rows forsome transactions, thus avoiding serial replay for long chains of transactions. Adaptive loggingis designed for recovery and thus generates its dependency graph offline. Unfortunately, thepaper does not evaluate how long it takes to generate this graph, whether it can be performedscalably, and the size of the graph. For replication, this graph would need to be generatedonline, impacting primary performance, and sent to the backup, requiring significant networkbandwidth.

Pacman [86] proposes a novel method for parallelizing recovery for in-memory databasesthat use command logging. They use a combination of static and dynamic analysis to parallelizetransaction replay during recovery. Pacman does not depend on data partitioning becausethe static analysis can discover a partitioning scheme for parallelizing the replay. Pacmanperforms intra-procedure and inter-procedure analysis to create a global dependency graphbased on coarse-grained table-level dependencies. The dynamic analysis performs fine-graineddependency analysis for nodes in the global graph, allowing concurrent execution of pieces ofcode with no conflicts. This approach can be applied for a backup and we believe its dynamicanalysis can scale well for replication. However, its static analysis assumes that the transactionlogic is simple enough to determine the read-set and write-set easily, limiting the programmingmodel on the primary database.

While a shared-nothing database design works well for certain applications on a primary,we believe that this design is ill-suited for a backup database because it limits the types ofapplications that a primary database can support. In particular, a shared-nothing backup wouldneed to decide on a partitioning scheme, and improper partitioning will lead to poor scaling. Ourdatabase backup uses a shared memory design which allows supporting any high-performanceprimary that provides serializability.

Page 92: Replication and Workload Management for In-Memory OLTP

Chapter 5. Related Work 83

5.3 Workload Management

There is a large body of work in workload management because it is a long-standing and funda-mental issue in database concurrency control. Early work focused on contention managementfor locking based, shared-memory databases because these protocols were commonly used fordisk-oriented databases. Ryu et al. [65] analyzed how contention affects locking performance.Franaszek et al. [22] proposed using transaction wait information to improve throughput undercontention for locking protocols.

In the late 2000s, massively parallel server systems became available and the growth of mainmemory capacity enabled many datasets to fit entirely in memory [72]. These trends have led toa rich body of research on scaling in-memory databases using non-locking concurrency controlprotocols, such as Silo [79], ERMIA [34], TicToc [89] and Cicada [40], and a resurgence inwork on workload management for supporting such protocols.

Next, we describe workload management schemes for shared-memory and shared-nothingarchitectures. Shared-memory architectures can handle skew naturally and so the focus ofworkload management is contention management. In contrast, in shared-nothing architectures,the focus of workload management is load-balancing.

5.3.1 Shared-Memory Architectures

Silo uses optimistic concurrency control (OCC), which is generally believed to work well whentransaction conflicts are unlikely [37]. Fekete et al. [20, 6] proposed Serializable SnapshotIsolation and later ERMIA [34] proposed Serial Safety Net (SSN), both of which leveragemulti-versioning to commit certain transactions even when they conflict under OCC. TicToc [89]proposes a timestamp-based, single-versioning design that reorders transactions to achieve asimilar goal. Cicada [40] proposes several optimizations for multi-versioning and achievesgood performance for both uncontended and contended workloads. Both TicToc and Cicadaavoid using a global timestamp for better scaling but this design leads to sacrificing opacity.As we mentioned in Section 2.2.3, the lack of opacity can help improve performance byallowing more transactions to commit, but it can lead to hard-to-write, buggy or vulnerableapplications [68, 11].

OCC-based systems need to use backoff based abort-retry schemes under contention toavoid performance collapse [31]. When a transaction aborts, instead of retrying immediately,the database delays the transaction for a short period before retrying. TicToc [89] delaystransactions for a random period between 0 and 100µs. Cicada [40] monitors the change inthroughput and uses feedback control to choose an appropriate delay every 5ms. STO [31] usesa randomized exponential backoff [44] algorithm to choose the delay.

Page 93: Replication and Workload Management for In-Memory OLTP

Chapter 5. Related Work 84

Ding et al. [12] propose batching transactions to reduce aborts in OCC under contention.Transactions within a batch are analyzed based on read-write dependencies and reorderedduring validation to maximize the number of transaction commits in the batch.

STO [31] proposes commit-time update (CU) and timestamp splitting (TS) optimizationsfor scaling under contention. The CU optimization provides a programming interface thathas similarities with our split-on-demand technique. This optimization reduces conflicts byrepresenting certain read-modify-write operations in a transaction as blind writes that areevaluated at commit time. This optimization is orthogonal to split-on-demand, which schedulespieces on different cores to reduce contention, while maintaining load balance. With timestampsplitting (TS), STO separates columns in a table into many subsets. If a transaction only accessescolumns in one subset, it does not need to evaluate pieces from another subset. As a result,when combining CU and TS, STO’s performance under contention improves dramatically. TheTS optimization assumes columnar organizations in tables, which may not be applicable fordocument or key-value model databases.

Doppel [52] integrates commutative data types into the database. Application developerscan utilize these types to reduce contention when updates are commutative, such as whenupdating counters or determining the maximum value. Similar to Caracal, Doppel uses phasedexecution, with three phases: joined, split and reconciliation. Under joined phase, the databaseuses OCC to commit transactions. As soon as the database detects contention, it marks thecontended row and switches into the split phase. During the split phase, if a row is identifiedas contended, transactions update per-core slices of the row instead of the actual row, whicheliminates contention. To switch back to the joined phase, the database needs to go throughthe reconciliation phase, in which each core stops processing transactions and merges its per-core slices into the row. Doppel’s design shows significant performance improvement forcommutative operations. In Caracal, our batch append optimization uses the same designbecause appending empty version in Caracal is commutative. However, Doppel specificallytargets contention for commutative datatypes, while Caracal can handle contention regardlessof the datatype.

IC3 [83] proposes a pieces based design that’s similar to our split-on-demand optimization.In IC3, pieces in a transaction commit separately using OCC. If a piece aborts due to contention,IC3 only retries the piece instead of the entire transaction. This significantly reduces the cost ofaborts under contention. If a piece commits, IC3 will track dependencies among transactions,and succeeding pieces in the transaction can only execute when the dependency is fulfilled.Unlike split-on-demand, IC3 does not allow developers to specify pieces freely because certaintransaction splits may cause cyclic dependencies and potential deadlocks when schedulingpieces. To solve this issue, IC3 performs static analysis among all stored procedures and

Page 94: Replication and Workload Management for In-Memory OLTP

Chapter 5. Related Work 85

merges all pieces that can form cycles into one piece. In practice, IC3 produces pieces at acoarse granularity, where each piece accesses one or more tables. Although IC3 works wellunder contention, its optimizations are only applicable for workloads with many tables.

DRP (Deferred Runtime Pipeline) [50] proposes an interesting extension to 2PL. Insteadof eliminating locks, it eliminates deadlock and reduces the critical section by using inter-transaction dependency tracking. Similar to our approach, it assumes transaction’s read-setand write-set are known ahead of execution. During execution, transactions acquire all locksin a fixed order (rank) to eliminate deadlock. Whenever the transaction finishes accessing anobject, it immediately relaxes the lock so that another transaction can acquire the lock. Tomaintain serializability, whenever a transaction tries to acquire any lock, it needs to make sureall its dependent transactions have acquired locks with larger ranks. All locks are released whena transaction commits. However, with the relaxed lock, a transaction could finish before itsdependencies so it must wait for all its dependencies to commit first before it releases all itslocks. Similar to our split-on-demand, DRP provides a similar interface for transactions to splita deferred piece. However, deferred pieces are not executed eagerly using a contention-awareschedule. Instead, they are executed lazily during a future read. Unlike our batch append,DRP does not perform any optimizations for locking. For transactions that cannot declare theirwrite-set ahead of time, DRP uses a technique similar to reconnaissance transactions. However,since DRP does not need epochs, reconnaissance transactions can read recent data and are thusless prone to aborts.

Unlike all the schemes described above, Caracal’s deterministic scheme avoids aborts alto-gether, and its batching mechanism is designed to optimize database execution under contentionwhile executing transactions in the predefined serial order.

Hybrid concurrency control combines concurrency control protocols that are known to begood at low and high contention levels and aim to provide good performance across a range ofcontention levels. MOCC [83] combines OCC and 2PL, while CormCC [74] combines multipleconcurrency control schemes. When contention is detected, both MOCC and CormCC fallbackto 2PL to reduce transaction aborts. However, recent studies [31, 40] suggest 2PL can be moreexpensive than OCC under heavy contention, which is also confirmed in our evaluation in thisthesis.

Our work on Caracal is the only shared-memory architecture database that leverages deter-minism to perform contention management. We have demonstrated that our solution handlescontention more effectively than various OCC-based solutions.

Page 95: Replication and Workload Management for In-Memory OLTP

Chapter 5. Related Work 86

5.3.2 Shared-Nothing Architectures

The alternative to the shared-memory architecture is a shared-nothing architecture in whichapplication developers partition the database across cores and each partition is accessed bya single core. Pioneering examples of this architecture are H-Store [72], DORA [58] andGranola [8]. The shared-nothing architecture eliminates synchronization costs for data accessesand so it performs well under contention [2] but it can suffer from load imbalance with skewedworkloads.

As a result, there have been several proposals for load balancing in these architectures. Pavloet al. [59] propose a skew-aware partitioning algorithm. E-Store [73] shows that the partitioningproblem is equivalent to bin packing, which is a well-known NP-Complete problem [24]. Fordatabase schemas that are tree-shaped, E-store proposes an approximate placement algorithmthat uses historical metrics collected from the database. Clay [67] extends E-Store by makingfewer assumptions on the database schemas.

Many shared-memory systems also use partitioning to handle contention, making them ahybrid architecture. Orthrus [64] shows that performance degradation in the shared-memory ar-chitecture under contention is due to processor state pollution. The authors propose decouplingthe lock manager from transaction execution. In this way, developers only have to partition thelock manager, without requiring partitioned transaction execution. Bohm [17] adopts a similardesign into its initialization phase. However, as a result, their partitioned concurrency controloperations suffer under skewed workloads, a problem fundamental to all partitioned systems.

Page 96: Replication and Workload Management for In-Memory OLTP

Chapter 6

Conclusions and Future Work

Databases are critical components of computing infrastructure services and thus need to bealways available and they need to process transactions efficiently under various types of work-loads. Meeting these availability and workload management requirements is challenging forin-memory databases because they provide significantly higher performance than traditionaldisk-based databases.

This thesis has demonstrated that both availability and workload management can be ad-dressed by leveraging determinism. Deterministic concurrency control guarantees equivalenceto a predetermined serial order. Our replication scheme utilizes this guarantee to providereliable fail-over without being limited by the network bandwidth or the backup replay perfor-mance. We describe a generic replication scheme that only requires serializability from theprimary database. Our replay-based scheme can check whether the primary and the backupmake identical updates, which helps detect arbitrary and subtle race condition bugs that canlead to serious data corruption in the database. Thus our approach can help improve the qualityof the primary database.

We are the first to describe a workload management scheme for deterministic databases forefficiently handling both skewed and contended workloads. We propose two novel optimizationsthat leverage determinism to reorder and parallelize the internal operations of the database. Ourevaluation shows that our scheme not only outperforms partitioning under skewed or contendedworkloads but also suggests that, by leveraging determinism, our approach is more effectivethan other non-deterministic approaches.

There are many open problems in database replication, workload management, and deter-ministic concurrency control. The thesis proposes replay-based replication for all transactions.However, in practice, it may be better to use log shipping for transactions that are compute-intensive. For these transactions, we could identify them on the primary and replicate thecomputed result, so that the backup does not need to compute again. This hybrid approach

87

Page 97: Replication and Workload Management for In-Memory OLTP

Chapter 6. Conclusions and Future Work 88

would significantly speed up the replay on the backup database. Similarly, one could detecttransactions that have smaller outputs than their inputs and use log shipping for these trans-actions. This would reduce network traffic because the transaction inputs do not need to bereplicated.

Our replication work can detect race condition bugs inside the primary database, but thereare no debugging tools to further help database developers yet. It will be valuable to pin-pointthe data race and the execution trace that led to the race. Such a tool would not only helpdatabase developers but may also help users fix data corruption.

In our current implementation, we assume that application developers use C++ to writestored procedures, and in our workload management work, they need to explicitly call oursplit-on-demand API. It would be preferable for application developers to express transactionsin high-level sequential code and then we could use static analysis to automatically insert callsto the split-on-demand API in the transaction code.

Our workload management mainly focuses on a single node, shared-memory database. Inthe distributed environment, databases are sharded for performance, which creates a similarload-balancing problem to partitioning. In a distributed cluster, nodes can easily becomeoverloaded if the workload is skewed. Thus we need a mechanism for identifying and migratingdata and transactions from hotspot partitions. We believe that our epoch-based deterministicconcurrency control will simplify this load balancing problem in distributed databases becausedata migration can be performed at epoch granularity.

One future direction involves improving the applicability of deterministic concurrency con-trol. As mentioned in the limitations of Caracal, we have found that deterministic concurrencycontrol still lacks generality. Although Caracal tackles this problem for certain types of rangeupdates, most transactions still need to declare their write-set keys or ranges before execution,which imposes a burden on application developers. It will be useful to infer the write-setkeys or ranges automatically from generic stored procedures by using static analysis, and ifthat fails, transactions should run using non-deterministic concurrency control methods such as2PL or OCC. More broadly, hybrid schemes that combine deterministic and non-deterministicconcurrency control may also be beneficial.

Non-volatile memory is another future direction for in-memory databases in general. Be-cause non-volatile memory has higher storage density and lower cost/GB, it can significantlyreduce the cost of in-memory databases. Its non-volatile nature can also help Caracal supportefficient durability in the future, which Caracal does not support for now.

Page 98: Replication and Workload Management for In-Memory OLTP

Bibliography

[1] Akamai. Akamai Online Retail Performance Report | Akamai.https://www.akamai.com/uk/en/about/news/press/2017-press/akamai-

releases-spring-2017-state-of-online-retail-performance-report.jsp,2017.

[2] Raja Appuswamy, Angelos C. Anadiotis, Danica Porobic, Mustafa K. Iman, and AnastasiaAilamaki. Analyzing the impact of system architecture on the scalability of OLTP enginesfor high-contention workloads. Proceedings of the VLDB Endowment, October 2017,volume 11 issue 2 pages 121–134.

[3] Joy Arulraj, Matthew Perron, and Andrew Pavlo. Write-behind logging. Proceedings ofthe VLDB Endowment, November 2016, volume 10 issue 4 pages 337–348.

[4] Hal Berenson, Phil Bernstein, Jim Gray, Jim Melton, Elizabeth O’Neil, and Patrick O’Neil.A critique of ANSI SQL isolation levels. ACM SIGMOD Record, 1995, volume 24 issue2 pages 1–10.

[5] Jan Böttcher, Viktor Leis, Thomas Neumann, and Alfons Kemper. Scalable garbagecollection for in-memory MVCC systems. Proceedings of the VLDB Endowment, October2019, volume 13 issue 2 pages 128–141.

[6] Michael J. Cahill, Uwe Röhm, and Alan D. Fekete. Serializable isolation for snapshotdatabases. ACM Transactions on Database Systems, December 2009, volume 34 issue 4pages 1–42.

[7] Brian F. Cooper, Adam Silberstein, Erwin Tam, Raghu Ramakrishnan, and Russell Sears.Benchmarking cloud serving systems with YCSB. In Proceedings of the Symposium onCloud Computing, SoCC, 2010, pages 143–154, Indianapolis, Indiana, USA. ACM.

[8] James Cowling and Barbara Liskov. Granola: Low-Overhead Distributed TransactionCoordination. In USENIX Annual Technical Conference - ATC, 2012, pages 21–33.USENIX Association.

89

Page 99: Replication and Workload Management for In-Memory OLTP

Bibliography 90

[9] Giuseppe DeCandia, Deniz Hastorun, Madan Jampani, Gunavardhan Kakulapati, AvinashLakshman, Alex Pilchin, Swaminathan Sivasubramanian, Peter Vosshall, and WernerVogels. Dynamo: Amazon’s Highly Available Key-value Store. In Proceedings of theSymposium on Operating Systems Principles - SOSP, 2007, pages 205–220. ACM.

[10] Cristian Diaconu, Craig Freedman, Erik Ismert, Per-Ake Larson, Pravin Mittal, RyanStonecipher, Nitin Verma, and Mike Zwilling. Hekaton: SQL Server’s Memory-Optimized OLTP Engine. In Proceedings of the International Conference on Managementof Data - SIGMOD, 2013, pages 1243–1254. ACM.

[11] Dave Dice, Timothy L Harris, and Alex Kogan. Pitfalls of lazy subscription. In Pro-ceedings of the Workshop on the Theory of Transactional Memory, 2014, pages 1–4.ACM.

[12] Bailu Ding, Lucja Kot, and Johannes Gehrke. Improving optimistic concurrency con-trol through transaction batching and operation reordering. Proceedings of the VLDBEndowment, October 2018, volume 12 issue 2 pages 169–182.

[13] Aleksandar Dragojević, Dushyanth Narayanan, Miguel Castro, and Orion Hodson. FaRM:Fast Remote Memory. Proceedings of the 11th USENIX Symposium on Networked SystemsDesign and Implementation (NSDI 14), 2014, pages 401–414.

[14] Aleksandar Dragojević, Dushyanth Narayanan, Edmund B. Nightingale, Matthew Renzel-mann, Alex Shamis, Anirudh Badam, and Miguel Castro. No compromises: Distributedtransactions with consistency, availability, and performance. In Symposium on OperatingSystems Principles - SOSP, 2015, pages 54–70. ACM.

[15] George W Dunlap, Dominic G Lucchetti, Michael a Fetterman, and Peter M Chen.Execution replay of multiprocessor virtual machines. In Proceedings of the Fourth ACMSIGPLANSIGOPS International Conference on Virtual Execution Environments - VEE,2008, pages 121–130.

[16] Aaron J. Elmore, Vaibhav Arora, Rebecca Taft, Andrew Pavlo, Divyakant Agrawal, andAmr El Abbadi. Squall: Fine-Grained Live Reconfiguration for Partitioned Main MemoryDatabases. In Proceedings of the 2015 ACM SIGMOD International Conference onManagement of Data - SIGMOD, 2015, pages 299–313, Melbourne, Victoria, Australia.ACM.

Page 100: Replication and Workload Management for In-Memory OLTP

Bibliography 91

[17] Jose M. Faleiro and Daniel J. Abadi. Rethinking serializable multiversion concurrencycontrol. Proceedings of the VLDB Endowment, July 2015, volume 8 issue 11 pages1190–1201.

[18] Jose M. Faleiro, Daniel J. Abadi, and Joseph M. Hellerstein. High performance trans-actions via early write visibility. Proceedings of the VLDB Endowment, January 2017,volume 10 issue 5 pages 613–624.

[19] Franz Färber, Sang Kyun Cha, Jürgen Primsch, Christof Bornhövd, Stefan Sigg, and Wolf-gang Lehner. SAP HANA database: Data management for modern business applications.ACM SIGMOD Record, January 2012, volume 40 issue 4 pages 45–51.

[20] Alan Fekete, Dimitrios Liarokapis, Elizabeth O’Neil, Patrick O’Neil, and Dennis Shasha.Making snapshot isolation serializable. ACM Transactions on Database Systems, 2005,volume 30 issue 2 pages 492–528.

[21] R. Fielding, J. Gettys, J. C. Mogul, H. Frystyk, L. Masinter, P. Leach, and T. Berners-Lee.Hypertext Transfer Protocol – HTTP/1.1. RFC 2616, RFC Editor, June 1999.

[22] Peter Franaszek, John Robinson, and Alexander Thomasian. Concurrency Control forHigh Contention Environments. ACM Transactions on Database Systems, 1992, volume17 issue 2 pages 304–345.

[23] Matt Freels. Consistency without Clocks: The FaunaDB Distributed TransactionProtocol.https://fauna.com/blog/consistency-without-clocks-faunadb-

transaction-protocol, 2018.

[24] Garey, Michael R. and Johnson, David S. Proving NP-Completeness Results. In Com-puters and Intractability: A Guide to the Theory of NP-Completeness, pages 45–74. SanFrancisco: W. H. Freeman, 1979, ISBN 0-7167-1045-5.

[25] Gitlab. GitLab.com database incident.https://about.gitlab.com/blog/2017/02/01/gitlab-dot-com-database-

incident/, 2017.

[26] Zhenyu Guo, Xi Wang, Jian Tang, Xuezheng Liu, Zhilei Xu, Ming Wu, M. FransKaashoek, and Zheng Zhang. R2: An Application-Level Kernel for Record and Re-play. In Operating Systems Design and Implementation - OSDI, 2008, pages 193–208.USENIX Association.

Page 101: Replication and Workload Management for In-Memory OLTP

Bibliography 92

[27] Stavros Harizopoulos, Daniel J Abadi, Samuel Madden, and Michael Stonebraker. OLTPThrough the Looking Glass, and What We Found There. In Proceedings of the ACMInternational Conference on Management of Data - SIGMOD, 2008, pages 981–992.ACM.

[28] Pat Helland, Harald Sammer, Jim Lyon, Richard Carr, Phil Garrett, Andreas Reuter, PatHelland, Harald Sammer, Jim Lyon, Richard Carr, Phil Garrett, and Andreas Reuter.Group Commit Timers and High-Volume Transaction Systems. Technical Report 88.1,Tandem Computers Incorporated, 1988.

[29] Nathaniel Herman, Jeevana Priya Inala, Yihe Huang, Lillian Tsai, Eddie Kohler, BarbaraLiskov, and Liuba Shrira. Type-aware transactions for faster concurrent code. In Pro-ceedings of the European Conference on Computer Systems - EuroSys, 2016, pages 1–16,London, United Kingdom. ACM.

[30] Chuntao Hong, Dong Zhou, Mao Yang, Carbo Kuo, Lintao Zhang, and Lidong Zhou.KuaFu: Closing the parallelism gap in database replication. Proceedings - InternationalConference on Data Engineering, 2013, pages 1186–1195.

[31] Yihe Huang, William Qian, Eddie Kohler, Barbara Liskov, and Liuba Shrira. Opportunitiesfor optimism in contended main-memory multicore transactions. Proceedings of the VLDBEndowment, January 2020, volume 13 issue 5 pages 629–642.

[32] IBM. The Basics of DB2 Log Shipping.http://www.ibm.com/developerworks/data/library/techarticle/

0304mcinnis/0304mcinnis.html, April 2003.

[33] Anuj Kalia, David G Andersen, and Michael Kaminsky. FaSST: Fast, Scalable and SimpleDistributed Transactions with Two-Sided (RDMA) Datagram RPCs. In 12th USENIXSymposium on Operating Systems Design and Implementation (OSDI 16), 2016, pages185–201.

[34] Kangnyeon Kim, Tianzheng Wang, Ryan Johnson, and Ippokratis Pandis. ERMIA: FastMemory-Optimized Database System for Heterogeneous Workloads. In Proceedings ofthe ACM International Conference on Management of Data - SIGMOD, 2016, pages1675–1687.

[35] Hideaki Kimura. FOEDUS: OLTP Engine for a Thousand Cores and NVRAM. InProceedings of the ACM International Conference on Management of Data - SIGMOD,2015, pages 691–706, Melbourne, Victoria, Australia. ACM Press.

Page 102: Replication and Workload Management for In-Memory OLTP

Bibliography 93

[36] Evan Klitzke. Why Uber Engineering Switched from Postgres to MySQL.https://eng.uber.com/postgres-to-mysql-migration/, July 2016.

[37] H T Kung and John T Robinson. On Optimistic Methods for Concurrency Control. ACMTransactions on Database Systems, 1981, volume 6 issue 2 pages 213–226.

[38] Per-Åke Larson, Spyros Blanas, Cristian Diaconu, Craig Freedman, Jignesh M. Patel, andMike Zwilling. High-Performance Concurrency Control Mechanisms for Main-MemoryDatabases. Proceedings of the VLDB Endowment, 2011, volume 5 issue 4 pages 298–309.

[39] Jialin Li, Ellis Michael, and Dan R. K. Ports. Eris: Coordination-Free Consistent Trans-actions Using In-Network Concurrency Control. In Proceedings of the Symposium onOperating Systems Principles - SOSP, 2017, pages 104–120, Shanghai, China. ACM.

[40] Hyeontaek Lim, Michael Kaminsky, and David G. Andersen. Cicada: Dependably FastMulti-Core In-Memory Transactions. In Proceedings of the ACM International Confer-ence on Management of Data - SIGMOD, 2017, pages 21–35, Chicago, Illinois, USA.ACM.

[41] Baotong Lu, Xiangpeng Hao, Tianzheng Wang, and Eric Lo. Dash: Scalable hashing onpersistent memory. Proceedings of the VLDB Endowment, April 2020, volume 13 issue 8pages 1147–1161.

[42] Nirmesh Malviya, Ariel Weisberg, Samuel Madden, and Michael Stonebraker. Rethink-ing main memory OLTP recovery. Proceedings - International Conference on DataEngineering, 2014, pages 604–615.

[43] Yandong Mao, Eddie Kohler, and Robert Tappan Morris. Cache craftiness for fast multi-core key-value storage. In Proceedings of the ACM European Conference on ComputerSystems - EuroSys, 2012, pages 183–196, Bern, Switzerland. ACM.

[44] John M. Mellor-Crummey and Michael L. Scott. Algorithms for scalable synchronizationon shared-memory multiprocessors. ACM Transactions on Computer Systems (TOCS),February 1991, volume 9 issue 1 pages 21–65.

[45] Microsoft. About Log Shipping (SQL Server) - SQL Server.https://docs.microsoft.com/en-us/sql/database-engine/log-shipping/

about-log-shipping-sql-server, 2016.

[46] Microsoft. Introduction to Memory-Optimized Tables - SQL Server.https://docs.microsoft.com/en-us/sql/relational-databases/in-

memory-oltp/introduction-to-memory-optimized-tables, 2016.

Page 103: Replication and Workload Management for In-Memory OLTP

Bibliography 94

[47] Microsoft. Understanding isolation levels - SQL Server.https://docs.microsoft.com/en-us/sql/connect/jdbc/understanding-

isolation-levels, 2019.

[48] Microsoft. Transact-SQL Reference (Database Engine).https://github.com/MicrosoftDocs/sql-docs, 2020.

[49] C. Mohan, Don Haderle, Bruce Lindsay, Hamid Pirahesh, and Peter Schwarz. ARIES:A transaction recovery method supporting fine-granularity locking and partial rollbacksusing write-ahead logging. ACM Transactions on Database Systems, March 1992, volume17 issue 1 pages 94–162.

[50] Shuai Mu, Sebastian Angel, and Dennis Shasha. Deferred Runtime Pipelining for con-tentious multicore software transactions. In Proceedings of the Fourteenth EuroSys Con-ference 2019, March 2019, pages 1–16, Dresden Germany. ACM.

[51] MySQL. MySQL :: MySQL 8.0 Reference Manual :: 17.2.1 Replication Formats.https://dev.mysql.com/doc/refman/8.0/en/replication-formats.html,2020.

[52] Neha Narula, Cody Cutler, Eddie Kohler, and Robert Morris. Phase reconciliation forcontended in-memory transactions. In USENIX Symposium on Operating Systems Designand Implementation - OSDI, October 2014, pages 511–524, Broomfield, CO. USENIXAssociation.

[53] Thomas Neumann, Tobias Mühlbauer, and Alfons Kemper. Fast Serializable Multi-VersionConcurrency Control for Main-Memory Database Systems. In Proceedings of the ACMInternational Conference on Management of Data - SIGMOD, 2015, pages 677–689,Melbourne, Victoria, Australia. ACM.

[54] Oracle. Data Concurrency and Consistency - 11g Release 2 (11.2).https://docs.oracle.com/cd/E25054_01/server.1111/e25789/consist.htm,2009.

[55] Oracle. Database PL/SQL Language Reference.https://docs.oracle.com/en/database/oracle/oracle-database/18/

lnpls/index.html, 2019.

[56] Oracle. MySQL :: MySQL 5.7 Reference Manual :: 16 Replication.https://dev.mysql.com/doc/refman/5.7/en/replication.html, 2020.

Page 104: Replication and Workload Management for In-Memory OLTP

Bibliography 95

[57] Oracle. TimesTen In-Memory Database FAQ.https://www.oracle.com/database/technologies/timesten-faq.html, 2021.

[58] Ippokratis Pandis, Ryan Johnson, Nikos Hardavellas, and Anastasia Ailamaki. Data-oriented transaction execution. Proceedings of the VLDB Endowment, 2010, volume 3issue 1-2 pages 928–939.

[59] Andrew Pavlo, Carlo Curino, and Stanley Zdonik. Skew-aware automatic database par-titioning in shared-nothing, parallel OLTP systems. In Proceedings of the InternationalConference on Management of Data - SIGMOD, 2012, page 61, Scottsdale, Arizona,USA. ACM.

[60] Percona Lab. Percona-Lab/tpcc-mysql.Percona-Lab, January 2017.

[61] PostgreSQL. PostgreSQL: Documentation: 12: 26.2. Log-Shipping Standby Servers.https://www.postgresql.org/docs/current/warm-standby.html, 2020.

[62] PostgreSQL. PostgreSQL: Documentation: 12: Pg_upgrade.https://www.postgresql.org/docs/current/pgupgrade.html, 2020.

[63] RedisLabs. Transactions – Redis.https://redis.io/topics/transactions.

[64] Kun Ren, Jose M. Faleiro, and Daniel J. Abadi. Design Principles for Scaling Multi-coreOLTP Under High Contention. In Proceedings of the ACM International Conference onManagement of Data - SIGMOD, 2015, pages 1583–1598.

[65] In Kyung Ryu and Alexander Thomasian. Analysis of database performance with dynamiclocking. Journal of the ACM (JACM), July 1990, volume 37 issue 3 pages 491–523.

[66] SAP SE. SAP HANA Customer Stories.https://www.sap.com/products/hana/customer-finder.html, 2021.

[67] Marco Serafini, Rebecca Taft, Aaron J Elmore, Andrew Pavlo, Ashraf Aboulnaga, andMichael Stonebraker. Clay: Fine-Grained Adaptive Partitioning for General DatabaseSchemas. Proceedings of the VLDB Endowment, 2016, volume 10 issue 4 pages 445–456.

[68] Alex Shamis, Matthew Renzelmann, Stanko Novakovic, Georgios Chatzopoulos, Alek-sandar Dragojević, Dushyanth Narayanan, and Miguel Castro. Fast General DistributedTransactions with Opacity. In Proceedings of the ACM International Conference onManagement of Data - SIGMOD, 2019, pages 433–448, Amsterdam, Netherlands. ACM.

Page 105: Replication and Workload Management for In-Memory OLTP

Bibliography 96

[69] Vishal Sikka, Franz Färber, and Wolfgang Lehner. Efficient transaction processing inSAP HANA database: The end of a column store myth. In Proceedings of the 2012 ACMSIGMOD International Conference on Management of Data, 2012.

[70] M. Spiro, Ashok M. Joshi, and T. K. Rengarajan. Designing an Optimized TransactionCommit Protocol. Digital Technical Journal, 1991, volume 3 pages 70–78.

[71] StackOverflow. The Incredible Growth of Amazon RDS.https://d1.awsstatic.com/whitepapers/RDS/Stack-Overflow-The-

Incredible-Growth-of-Amazon-RDS.pdf, 2018.

[72] Michael Stonebraker, Samuel Madden, Daniel J Abadi, Stavros Harizopoulos, NabilHachem, and Pat Helland. The End of an Architectural Era (It’s Time for a CompleteRewrite). In Proceedings of the VLDB Endowment, 2007, pages 1150–1160.

[73] Rebecca Taft, Essam Mansour, Marco Serafini, Jennie Duggan, Aaron J ElmoreAshraf Aboulnaga, Andrew Pavlo, Michael Stonebraker, Aaron J Elmore, Ashraf Aboul-naga, Andrew Pavlo, and Michael Stonebraker. E-Store: Fine-Grained Elastic Partitioningfor Distributed Transaction Processing Systems. Proceedings of the VLDB Endowment,2014, volume 8 issue 3 pages 245–256.

[74] Dixin Tang and Aaron J Elmore. Toward Coordination-free and Reconfigurable MixedConcurrency Control. In USENIX Annual Technical Conference - ATC, 2018, pages809–822.

[75] D.B. Terry, A.J. Demers, K. Petersen, M.J. Spreitzer, M.M. Theimer, and B.B. Welch.Session guarantees for weakly consistent replicated data. In Proceedings of Interna-tional Conference on Parallel and Distributed Information Systems, 1994, pages 140–149,Austin, TX, USA. IEEE Comput. Soc. Press.

[76] The Apache Software Foundation. Apache Geode.https://geode.apache.org/, 2021.

[77] Alexander Thomson, Thaddeus Diamond, Shu-Chun Weng, Kun Ren, Philip Shao, andDaniel J Abadi. Calvin: Fast distributed transactions for partitioned database systems. InProceedings of the ACM International Conference on Management of Data - SIGMOD,2012, pages 1–12.

[78] Pınar Tözün, Ippokratis Pandis, Cansu Kaynak, Djordje Jevdjic, and Anastasia Ailamaki.From A to E: Analyzing TPC’s OLTP benchmarks: The obsolete, the ubiquitous, the

Page 106: Replication and Workload Management for In-Memory OLTP

Bibliography 97

unexplored. In Proceedings of the 16th International Conference on Extending DatabaseTechnology - EDBT ’13, 2013, pages 17–28, Genoa, Italy. ACM Press.

[79] Stephen Tu, Wenting Zheng, Eddie Kohler, Barbara Liskov, and Samuel Madden. Speedytransactions in multicore in-memory databases. In Proceedings of the ACM Symposiumon Operating Systems Principles - SOSP, 2013, pages 18–32.

[80] Carl A Waldspurger and William E Weihl. Lottery Scheduling: Flexible Proportional-Share Resource Management. In Proceedings of the USENIX Conference on OperatingSystems Design and Implementation, 1994, pages 1–11.

[81] Tianzheng Wang, Ryan Johnson, Alan Fekete, and Ippokratis Pandis. The Serial SafetyNet: Efficient Concurrency Control on Modern Hardware. In Proceedings of the 11thInternational Workshop on Data Management on New Hardware, 8, 2015, pages 1–8.

[82] Tianzheng Wang and Hideaki Kimura. Mostly-optimistic concurrency control for highlycontended dynamic workloads on a thousand cores. Proceedings of the VLDB Endowment,October 2016, volume 10 issue 2 pages 49–60.

[83] Zhaoguo Wang, Shuai Mu, Yang Cui, Han Yi, Haibo Chen, and Jinyang Li. Scaling Multi-core Databases via Constrained Parallel Execution. Proceedings of the ACM InternationalConference on Management of Data - SIGMOD, 2016, pages 1643–1658.

[84] Todd Warszawski and Peter Bailis. ACIDRain: Concurrency Related Attacks on DatabaseBacked Web Applications. In Proceedings of the ACM International Conference onManagement of Data - SIGMOD, 2017, pages 5–20, Chicago, Illinois, USA. ACM.

[85] Yingjun Wu, Joy Arulraj, Jiexi Lin, Ran Xian, and Andrew Pavlo. An empirical evaluationof in-memory multi-version concurrency control. Proceedings of the VLDB Endowment,March 2017, volume 10 issue 7 pages 781–792.

[86] Yingjun Wu, Wentian Guo, Chee-Yong Chan, and Kian-Lee Tan. Fast Failure Recoveryfor Main-Memory DBMSs on Multicores. In Proceedings of the ACM InternationalConference on Management of Data - SIGMOD, 2017, pages 267–281, Chicago, Illinois,USA. ACM.

[87] Yingjun Wu and Kian-Lee Tan. Scalable In-Memory Transaction Processing with HTM.In USENIX Annual Technical Conference - ATC, 2016, pages 365–377. USENIX Associ-ation.

Page 107: Replication and Workload Management for In-Memory OLTP

Bibliography 98

[88] Chang Yao, Divyakant Agrawal, Gang Chen, Beng Chin Ooi, and Sai Wu. AdaptiveLogging: Optimizing Logging and Recovery Costs in Distributed In-memory Databases.In Proceedings of the ACM International Conference on Management of Data - SIGMOD,2016, pages 1119–1134, San Francisco, California, USA. ACM.

[89] Xiangyao Yu, Andrew Pavlo, Daniel Sanchez, and Srinivas Devadas. TicToc: TimeTraveling Optimistic Concurrency Control. In Proceedings of the ACM InternationalConference on Management of Data - SIGMOD, 2016, pages 1629–1642, San Francisco,California, USA. ACM.

[90] Wenting Zheng, Stephen Tu, and Eddie Kohler. Fast Databases with Fast Durability andRecovery Through Multicore Parallelism. In USENIX Symposium on Operating SystemsDesign and Implementation - OSDI, 2014, pages 465–477. USENIX Association.