1
Database System ArchitecturesDatabase System Architectures
Client--Server Systems
Parallel Systems
Distributed Systems
Network Types
2
Client-Server SystemsClient-Server Systems
Server systems satisfy requests generated at m client systems, whose general structure is shown below:
3
Client-Server Systems Client-Server Systems
Advantages of replacing mainframes with networks of workstations or personal computers connected to back-end server machines: better functionality for the cost
flexibility in locating resources and expanding facilities
better user interfaces
easier maintenance
Server systems can be broadly categorized into two kinds: transaction servers which are widely used in relational database
systems, and
data servers, used in object-oriented database systems
4
Transaction ServersTransaction Servers
Also called query server systems or SQL server systems; clients send requests to the server system where the transactions are executed, and results are shipped back to the client.
Requests specified in SQL, and communicated to the server through a remote procedure call (RPC) mechanism.
Transactional RPC allows many RPC calls to collectively form a transaction.
Open Database Connectivity (ODBC) is a C language application program interface standard from Microsoft for connecting to a server, sending SQL requests, and receiving results.
JDBC standard similar to ODBC, for Java
5
Transaction Server Process StructureTransaction Server Process Structure
A typical transaction server consists of multiple processes accessing data in shared memory.
Server processes These receive user queries (transactions), execute them and send
results back
Processes may be multithreaded, allowing a single process to execute several user queries concurrently
Typically multiple multithreaded server processes
Lock manager process
Database writer process Output modified buffer blocks to disks continually
6
Transaction Server Processes (Cont.)Transaction Server Processes (Cont.)
Log writer process Server processes simply add log records to log record buffer
Log writer process outputs log records to stable storage.
Checkpoint process Performs periodic checkpoints
Process monitor process Monitors other processes, and takes recovery actions if any of the other
processes fail
E.g. aborting any transactions being executed by a server process and restarting it
7
Transaction System Processes (Cont.)Transaction System Processes (Cont.)
8
Transaction System Processes (Cont.)Transaction System Processes (Cont.)
Shared memory contains shared data Buffer pool
Lock table
Log buffer
Cached query plans (reused if same query submitted again)
All database processes can access shared memory
To ensure that no two processes are accessing the same data structure at the same time, databases systems implement mutual exclusion using either Operating system semaphores
Atomic instructions such as test-and-set
9
Lock Manager Process Lock Manager Process
To avoid overhead of interprocess communication for lock request/grant, each database process operates directly on the lock table data structure instead of sending requests to lock manager process Mutual exclusion ensured on the lock table using semaphores
If a lock can be obtained, the lock table is updated directly in shared memory
If a lock cannot be immediately obtained, a lock request is noted in the lock table and the process (or thread) then waits for lock to be granted
When a lock is released, releasing process updates lock table to record release of lock, as well as grant of lock to waiting requests (if any)
Process/thread waiting for lock may either:
Continually scan lock table to check for lock grant, or
Use operating system semaphore mechanism to wait on a semaphore.
– Semaphore identifier is recorded in the lock table
– When a lock is granted, the releasing process signals the semaphore to tell the waiting process/thread to proceed
Lock manager process still used for deadlock detection
10
Data ServersData Servers
Used in LANs, where there is a very high speed connection between the clients and the server, the client machines are comparable in processing power to the server machine, and the tasks to be executed are compute intensive.
Ship data to client machines where processing is performed, and then ship results back to the server machine.
This architecture requires full back-end functionality at the clients.
Used in many object-oriented database systems
Issues: Locking
Data Caching
Lock Caching
11
Data Servers (Cont.)Data Servers (Cont.)
Locking Overhead of requesting and getting locks from server is high due to
message delays
Can grant locks on requested and prefetched items; with page shipping, transaction is granted lock on whole page.
Locks on a prefetched item can be P{called back} by the server, and returned by client transaction if the prefetched item has not been used.
Locks on the page can be deescalated to locks on items in the page when there are lock conflicts. Locks on unused items can then be returned to server.
12
Data Servers (Cont.)Data Servers (Cont.)
Data Caching Data can be cached at client even in between transactions
But check that data is up-to-date before it is used (cache coherency)
Check can be done when requesting lock on data item
Lock Caching Locks can be retained by client system even in between transactions
Transactions can acquire cached locks locally, without contacting server
Server calls back locks from clients when it receives conflicting lock request. Client returns lock once no local transaction is using it.
Similar to deescalation, but across transactions.
13
Parallel SystemsParallel Systems
Parallel database systems consist of multiple processors and multiple disks connected by a fast interconnection network.
A coarse-grain parallel machine consists of a small number of powerful processors
A massively parallel or fine grain parallel machine utilizes thousands of smaller processors.
Two main performance measures: throughput --- the number of tasks that can be completed in a
given time interval
response time --- the amount of time it takes to complete a single task from the time it is submitted
14
Speed-Up and Scale-UpSpeed-Up and Scale-Up
Speedup: a fixed-sized problem executing on a small system is given to a system which is N-times larger. Measured by:
speedup = small system elapsed time
large system elapsed time
Speedup is linear if equation equals N.
Scaleup: increase the size of both the problem and the system N-times larger system used to perform N-times larger job
Measured by:
scaleup = small system small problem elapsed time
big system big problem elapsed time
Scale up is linear if equation equals 1.
15
SpeedupSpeedup
Speedup
16
ScaleupScaleup
Scaleup
17
Factors Limiting Speedup and ScaleupFactors Limiting Speedup and Scaleup
Speedup and scaleup are often sublinear due to:
Startup costs: Cost of starting up multiple processes may dominate computation time, if the degree of parallelism is high.
Interference: Processes accessing shared resources (e.g.,system bus, disks, or locks) compete with each other, thus spending time waiting on other processes, rather than performing useful work.
Skew: Increasing the degree of parallelism increases the variance in service times of parallely executing tasks. Overall execution time determined by slowest of parallely executing tasks.
18
Interconnection Network ArchitecturesInterconnection Network Architectures
Bus. System components send data on and receive data from a single communication bus; Does not scale well with increasing parallelism.
Mesh. Components are arranged as nodes in a grid, and each component is connected to all adjacent components Communication links grow with growing number of components, and
so scales better. But may require 2n hops to send message to a node (or n with
wraparound connections at edge of grid).
Hypercube. Components are numbered in binary; components are connected to one another if their binary representations differ in exactly one bit. n components are connected to log(n) other components and can
reach each other via at most log(n) links; reduces communication delays.
19
Interconnection ArchitecturesInterconnection Architectures
20
Parallel Database ArchitecturesParallel Database Architectures
Shared memory -- processors share a common memory
Shared disk -- processors share a common disk
Shared nothing -- processors share neither a common memory nor common disk
Hierarchical -- hybrid of the above architectures
21
Parallel Database ArchitecturesParallel Database Architectures
22
Shared MemoryShared Memory
Processors and disks have access to a common memory, typically via a bus or through an interconnection network.
Extremely efficient communication between processors — data in shared memory can be accessed by any processor without having to move it using software.
Downside – architecture is not scalable beyond 32 or 64 processors since the bus or the interconnection network becomes a bottleneck
Widely used for lower degrees of parallelism (4 to 8).
23
Shared DiskShared Disk
All processors can directly access all disks via an interconnection network, but the processors have private memories. The memory bus is not a bottleneck
Architecture provides a degree of fault-tolerance — if a processor fails, the other processors can take over its tasks since the database is resident on disks that are accessible from all processors.
Examples: IBM Sysplex and DEC clusters (now part of Compaq) running Rdb (now Oracle Rdb) were early commercial users
Downside: bottleneck now occurs at interconnection to the disk subsystem.
Shared-disk systems can scale to a somewhat larger number of processors, but communication between processors is slower.
24
Shared NothingShared Nothing
Node consists of a processor, memory, and one or more disks. Processors at one node communicate with another processor at another node using an interconnection network. A node functions as the server for the data on the disk or disks the node owns.
Examples: Teradata, Tandem, Oracle-n CUBE
Data accessed from local disks (and local memory accesses) do not pass through interconnection network, thereby minimizing the interference of resource sharing.
Shared-nothing multiprocessors can be scaled up to thousands of processors without interference.
Main drawback: cost of communication and non-local disk access; sending data involves software interaction at both ends.
25
HierarchicalHierarchical
Combines characteristics of shared-memory, shared-disk, and shared-nothing architectures.
Top level is a shared-nothing architecture – nodes connected by an interconnection network, and do not share disks or memory with each other.
Each node of the system could be a shared-memory system with a few processors.
Alternatively, each node could be a shared-disk system, and each of the systems sharing a set of disks could be a shared-memory system.
26
Distributed SystemsDistributed Systems
Data spread over multiple machines (also referred to as sites or nodes.
Network interconnects the machines
Data shared by users on multiple machines
27
Distributed DatabasesDistributed Databases
Homogeneous distributed databases Same software/schema on all sites, data may be partitioned among
sites
Goal: provide a view of a single database, hiding details of distribution
Heterogeneous distributed databases Different software/schema on different sites
Goal: integrate existing databases to provide useful functionality
Differentiate between local and global transactions A local transaction accesses data in the single site at which the
transaction was initiated.
A global transaction either accesses data in a site different from the one at which the transaction was initiated or accesses data in several different sites.
28
Trade-offs in Distributed SystemsTrade-offs in Distributed Systems
Sharing data – users at one site able to access the data residing at some other sites.
Autonomy – each site is able to retain a degree of control over data stored locally.
Higher system availability through redundancy — data can be replicated at remote sites, and system can function even if a site fails.
Disadvantage: added complexity required to ensure proper coordination among sites. Software development cost.
Greater potential for bugs.
Increased processing overhead.
29
Implementation Issues for Distributed Implementation Issues for Distributed DatabasesDatabases
Atomicity needed even for transactions that update data at multiple site Transaction cannot be committed at one site and aborted at another
The two-phase commit protocol (2PC) used to ensure atomicity Basic idea: each site executes transaction till just before commit, and
the leaves final decision to a coordinator
Each site must follow decision of coordinator: even if there is a failure while waiting for coordinators decision
2PC is not always appropriate: other transaction models based on persistent messaging, and workflows, are also used
Distributed concurrency control (and deadlock detection) required
Replication of data items required for improving data availability
30
Network TypesNetwork Types
Local-area networks (LANs) – composed of processors that are distributed over small geographical areas, such as a single building or a few adjacent buildings.
Wide-area networks (WANs) – composed of processors distributed over a large geographical area.
Discontinuous connection – WANs, such as those based on periodic dial-up (using, e.g., UUCP), that are connected only for part of the time.
Continuous connection – WANs, such as the Internet, where hosts are connected to the network at all times.
31
Homogeneous Distributed DatabasesHomogeneous Distributed Databases
In a homogeneous distributed database All sites have identical software Are aware of each other and agree to cooperate in processing user
requests. Each site surrenders part of its autonomy in terms of right to change
schemas or software Appears to user as a single system
In a heterogeneous distributed database Different sites may use different schemas and software
Difference in schema is a major problem for query processing Difference in softwrae is a major problem for transaction
processing Sites may not be aware of each other and may provide only
limited facilities for cooperation in transaction processing
32
Distributed Data StorageDistributed Data Storage
Assume relational data model
Replication System maintains multiple copies of data, stored in different sites,
for faster retrieval and fault tolerance.
Fragmentation Relation is partitioned into several fragments stored in distinct sites
Replication and fragmentation can be combined Relation is partitioned into several fragments: system maintains
several identical replicas of each such fragment.
33
Data ReplicationData Replication
A relation or fragment of a relation is replicated if it is stored redundantly in two or more sites.
Full replication of a relation is the case where the relation is stored at all sites.
Fully redundant databases are those in which every site contains a copy of the entire database.
34
Data Replication Data Replication
Advantages of Replication Availability: failure of site containing relation r does not result in
unavailability of r is replicas exist.
Parallelism: queries on r may be processed by several nodes in parallel.
Reduced data transfer: relation r is available locally at each site containing a replica of r.
Disadvantages of Replication Increased cost of updates: each replica of relation r must be updated.
Increased complexity of concurrency control: concurrent updates to distinct replicas may lead to inconsistent data unless special concurrency control mechanisms are implemented.
One solution: choose one copy as primary copy and apply concurrency control operations on primary copy
35
Data FragmentationData Fragmentation
Division of relation r into fragments r1, r2, …, rn which contain sufficient information to reconstruct relation r.
Horizontal fragmentation: each tuple of r is assigned to one or more fragments
Vertical fragmentation: the schema for relation r is split into several smaller schemas All schemas must contain a common candidate key (or superkey) to
ensure lossless join property.
A special attribute, the tuple-id attribute may be added to each schema to serve as a candidate key.
Example : relation account with following schema
Account-schema = (branch-name, account-number, balance)
36
Horizontal Fragmentation of Horizontal Fragmentation of accountaccount Relation Relation
branch-name account-number balance
HillsideHillsideHillside
A-305A-226A-155
50033662
account1=branch-name=“Hillside”(account)
branch-name account-number balance
ValleyviewValleyviewValleyviewValleyview
A-177A-402A-408A-639
205100001123750
account2=branch-name=“Valleyview”(account)
37
Vertical Fragmentation of Vertical Fragmentation of employee-info employee-info RelationRelation
branch-name customer-name tuple-id
HillsideHillsideValleyviewValleyviewHillsideValleyviewValleyview
LowmanCampCampKahnKahnKahnGreen
deposit1=branch-name, customer-name, tuple-id(employee-info)
1234567
account number balance tuple-id
50033620510000621123750
1234567
A-305A-226A-177A-402A-155A-408A-639
deposit2=account-number, balance, tuple-id(employee-info)
38
Advantages of FragmentationAdvantages of Fragmentation
Horizontal: allows parallel processing on fragments of a relation
allows a relation to be split so that tuples are located where they are most frequently accessed
Vertical:
allows tuples to be split so that each part of the tuple is stored where it is most frequently accessed
tuple-id attribute allows efficient joining of vertical fragments
allows parallel processing on a relation
Vertical and horizontal fragmentation can be mixed. Fragments may be successively fragmented to an arbitrary depth.
39
Data TransparencyData Transparency
Data transparency: Degree to which system user may remain unaware of the details of how and where the data items are stored in a distributed system
Consider transparency issues in relation to: Fragmentation transparency
Replication transparency
Location transparency
40
Naming of Data Items - CriteriaNaming of Data Items - Criteria
1. Every data item must have a system-wide unique name.
2. It should be possible to find the location of data items efficiently.
3. It should be possible to change the location of data items transparently.
4. Each site should be able to create new data items autonomously.
41
Centralized Scheme - Name ServerCentralized Scheme - Name Server
Structure: name server assigns all names
each site maintains a record of local data items
sites ask name server to locate non-local data items
Advantages: satisfies naming criteria 1-3
Disadvantages: does not satisfy naming criterion 4
name server is a potential performance bottleneck
name server is a single point of failure
42
Use of AliasesUse of Aliases
Alternative to centralized scheme: each site prefixes its own site identifier to any name that it generates i.e., site 17.account. Fulfills having a unique identifier, and avoids problems associated
with central control.
However, fails to achieve network transparency.
Solution: Create a set of aliases for data items; Store the mapping of aliases to the real names at each site.
The user can be unaware of the physical location of a data item, and is unaffected if the data item is moved from one site to another.
43
Distributed TransactionsDistributed Transactions
Transaction may access data at several sites.
Each site has a local transaction manager responsible for: Maintaining a log for recovery purposes
Participating in coordinating the concurrent execution of the transactions executing at that site.
Each site has a transaction coordinator, which is responsible for: Starting the execution of transactions that originate at the site.
Distributing subtransactions at appropriate sites for execution.
Coordinating the termination of each transaction that originates at the site, which may result in the transaction being committed at all sites or aborted at all sites.
44
Transaction System ArchitectureTransaction System Architecture
45
System Failure ModesSystem Failure Modes
Failures unique to distributed systems: Failure of a site. Loss of massages
Handled by network transmission control protocols such as TCP-IP
Failure of a communication link Handled by network protocols, by routing messages via
alternative links Network partition
A network is said to be partitioned when it has been split into two or more subsystems that lack any connection between them
– Note: a subsystem may consist of a single node
Network partitioning and site failures are generally indistinguishable.
46
Commit ProtocolsCommit Protocols
Commit protocols are used to ensure atomicity across sites a transaction which executes at multiple sites must either be
committed at all the sites, or aborted at all the sites.
not acceptable to have a transaction committed at one site and aborted at another
The two-phase commit (2 PC) protocol is widely used
47
Two Phase Commit Protocol (2PC)Two Phase Commit Protocol (2PC)
Assumes fail-stop model – failed sites simply stop working, and do not cause any other harm, such as sending incorrect messages to other sites.
Execution of the protocol is initiated by the coordinator after the last step of the transaction has been reached.
The protocol involves all the local sites at which the transaction executed
Let T be a transaction initiated at site Si, and let the transaction coordinator at Si be Ci
48
Phase 1: Obtaining a DecisionPhase 1: Obtaining a Decision
Coordinator asks all participants to prepare to commit transaction Ti.
Ci adds the records <prepare T> to the log and forces log to stable storage
sends prepare T messages to all sites at which T executed
Upon receiving message, transaction manager at site determines if it can commit the transaction if not, add a record <no T> to the log and send abort T message to
Ci
if the transaction can be committed, then:
add the record <ready T> to the log
force all records for T to stable storage
send ready T message to Ci
49
Phase 2: Recording the DecisionPhase 2: Recording the Decision
T can be committed of Ci received a ready T message from all the participating sites: otherwise T must be aborted.
Coordinator adds a decision record, <commit T> or <abort T>, to the log and forces record onto stable storage. Once the record stable storage it is irrevocable (even if failures occur)
Coordinator sends a message to each participant informing it of the decision (commit or abort)
Participants take appropriate action locally.
50
Handling of Failures - Site FailureHandling of Failures - Site Failure
When site Si recovers, it examines its log to determine the fate of
transactions active at the time of the failure.
Log contain <commit T> record: site executes redo (T)
Log contains <abort T> record: site executes undo (T)
Log contains <ready T> record: site must consult Ci to determine the fate of T. If T committed, redo (T)
If T aborted, undo (T)
The log contains no control records concerning T replies that Sk failed before responding to the prepare T message from Ci
since the failure of Sk precludes the sending of such a response C1 must abort T
Sk must execute undo (T)
51
Handling of Failures- Coordinator FailureHandling of Failures- Coordinator Failure
If coordinator fails while the commit protocol for T is executing then participating sites must decide on T’s fate:
1. If an active site contains a <commit T> record in its log, then T must be committed.
2. If an active site contains an <abort T> record in its log, then T must be aborted.
3. If some active participating site does not contain a <ready T> record in its log, then the failed coordinator Ci cannot have decided to commit T. Can therefore abort T.
4. If none of the above cases holds, then all active sites must have a <ready T> record in their logs, but no additional control records (such as <abort T> of <commit T>). In this case active sites must wait for Ci to recover, to find decision.
Blocking problem : active sites may have to wait for failed coordinator to recover.
52
Handling of Failures - Network PartitionHandling of Failures - Network Partition
If the coordinator and all its participants remain in one partition, the failure has no effect on the commit protocol.
If the coordinator and its participants belong to several partitions: Sites that are not in the partition containing the coordinator think the
coordinator has failed, and execute the protocol to deal with failure of the coordinator.
No harm results, but sites may still have to wait for decision from coordinator.
The coordinator and the sites are in the same partition as the coordinator think that the sites in the other partition have failed, and follow the usual commit protocol.
Again, no harm results
53
Recovery and Concurrency ControlRecovery and Concurrency Control
In-doubt transactions have a <ready T>, but neither a <commit T>, nor an <abort T> log record.
The recovering site must determine the commit-abort status of such transactions by contacting other sites; this can slow and potentially block recovery.
Recovery algorithms can note lock information in the log. Instead of <ready T>, write out <ready T, L> L = list of locks held by
T when the log is written (read locks can be omitted).
For every in-doubt transaction T, all the locks noted in the <ready T, L> log record are reacquired.
After lock reacquisition, transaction processing can resume; the commit or rollback of in-doubt transactions is performed concurrently with the execution of new transactions.
54
Alternative Models of Transaction Alternative Models of Transaction ProcessingProcessing
Notion of a single transaction spanning multiple sites is inappropriate for many applications E.g. transaction crossing an organizational boundary No organization would like to permit an externally initiated
transaction to block local transactions for an indeterminate period
Alternative models carry out transactions by sending messages Code to handle messages must be carefully designed to ensure
atomicity and durability properties for updates Isolation cannot be guaranteed, in that intermediate stages are
visible, but code must ensure no inconsistent states result due to concurrency
Persistent messaging systems are systems that provide transactional properties to messages Messages are guaranteed to be delivered exactly once Will discuss implementation techniques later
55
Distributed Concurrency ControlDistributed Concurrency Control
Modify concurrency control schemes for use in distributed environment.
We assume that each site participates in the execution of a commit protocol to ensure global transaction automicity.
We assume all replicas of any item are updated Will see how to relax this in case of site failures later
56
Single-Lock-Manager ApproachSingle-Lock-Manager Approach
System maintains a single lock manager that resides in a single chosen site, say Si
When a transaction needs to lock a data item, it sends a lock request to Si and lock manager determines whether the lock can be granted immediately If yes, lock manager sends a message to the site which initiated the
request
If no, request is delayed until it can be granted, at which time a message is sent to the initiating site
57
Single-Lock-Manager Approach (Cont.)Single-Lock-Manager Approach (Cont.)
The transaction can read the data item from any one of the sites at which a replica of the data item resides.
Writes must be performed on all replicas of a data item
Advantages of scheme: Simple implementation
Simple deadlock handling
Disadvantages of scheme are: Bottleneck: lock manager site becomes a bottleneck
Vulnerability: system is vulnerable to lock manager site failure.
58
Distributed Lock ManagerDistributed Lock Manager
In this approach, functionality of locking is implemented by lock managers at each site Lock managers control access to local data items
But special protocols may be used for replicas
Advantage: work is distributed and can be made robust to failures
Disadvantage: deadlock detection is more complicated Lock managers cooperate for deadlock detection
Several variants of this approach Primary copy Majority protocol Biased protocol Quorum consensus
59
Primary CopyPrimary Copy
Choose one replica of data item to be the primary copy. Site containing the replica is called the primary site for that data
item
Different data items can have different primary sites
When a transaction needs to lock a data item Q, it requests a lock at the primary site of Q. Implicitly gets lock on all replicas of the data item
Benefit Concurrency control for replicated data handled similarly to
unreplicated data - simple implementation.
Drawback If the primary site of Q fails, Q is inaccessible even though other
sites containing a replica may be accessible.
60
Majority ProtocolMajority Protocol
Local lock manager at each site administers lock and unlock requests for data items stored at that site.
When a transaction wishes to lock an unreplicated data item Q residing at site Si, a message is sent to Si ‘s lock manager.
If Q is locked in an incompatible mode, then the request is delayed until it can be granted.
When the lock request can be granted, the lock manager sends a message back to the initiator indicating that the lock request has been granted.
61
Majority Protocol (Cont.)Majority Protocol (Cont.)
In case of replicated data If Q is replicated at n sites, then a lock request message must be
sent to more than half of the n sites in which Q is stored.
The transaction does not operate on Q until it has obtained a lock on a majority of the replicas of Q.
When writing the data item, transaction performs writes on all replicas.
Benefit Can be used even when some sites are unavailable
Drawback Requires 2(n/2 + 1) messages for handling lock requests, and (n/2 +
1) messages for handling unlock requests.
Potential for deadlock even with single item - e.g., each of 3 transactions may have locks on 1/3rd of the replicas of a data.
62
Quorum Consensus ProtocolQuorum Consensus Protocol
Each site is assigned a weight. Let S be the total of all site weights
Choose two values read quorum Qr and write quorum Qw
Such that Qr + Qw > S and 2 * Qw > S
Quorums can be chosen (and S computed) separately for each item
Each read must lock enough replicas that the sum of the site weights is >= Qr
Each write must lock enough replicas that the sum of the site weights is >= Qw
For now we assume all replicas are written Extensions to allow some sites to be unavailable described later
63
Deadlock HandlingDeadlock Handling
Consider the following two transactions and history, with item X and transaction T1 at site 1, and item Y and transaction T2 at site 2:
T1: write (X)write (Y)
T2: write (Y)write (X)
X-lock on Xwrite (X) X-lock on Y
write (Y)wait for X-lock on X
Wait for X-lock on Y
Result: deadlock which cannot be detected locally at either site
64
Centralized ApproachCentralized Approach
A global wait-for graph is constructed and maintained in a single site; the deadlock-detection coordinator Real graph: Real, but unknown, state of the system.
Constructed graph:Approximation generated by the controller during the execution of its algorithm .
the global wait-for graph can be constructed when: a new edge is inserted in or removed from one of the local wait-for
graphs.
a number of changes have occurred in a local wait-for graph.
the coordinator needs to invoke cycle-detection.
If the coordinator finds a cycle, it selects a victim and notifies all sites. The sites roll back the victim transaction.
65
Example Wait-For Graph for False CyclesExample Wait-For Graph for False Cycles
Initial state:
66
False Cycles (Cont.)False Cycles (Cont.)
Suppose that starting from the state shown in figure,
1. T2 releases resources at S1 resulting in a message remove T1 T2 message from the
Transaction Manager at site S1 to the coordinator)
2. And then T2 requests a resource held by T3 at site S2 resulting in a message insert T2 T3 from S2 to the coordinator
Suppose further that the insert message reaches before the delete message this can happen due to network delays
The coordinator would then find a false cycle
T1 T2 T3 T1
The false cycle above never existed in reality. False cycles cannot occur if two-phase locking is used.
67
TimestampingTimestamping
Timestamp based concurrency-control protocols can be used in distributed systems
Each transaction must be given a unique timestamp
Main problem: how to generate a timestamp in a distributed fashion Each site generates a unique local timestamp using either a logical
counter or the local clock.
Global unique timestamp is obtained by concatenating the unique local timestamp with the unique identifier.
68
Timestamping (Cont.)Timestamping (Cont.)
A site with a slow clock will assign smaller timestamps Still logically correct: serializability not affected
But: “disadvantages” transactions
To fix this problem
Define within each site Si a logical clock (LCi), which generates the unique local timestamp
Require that Si advance its logical clock whenever a request is received from a transaction Ti with timestamp < x,y> and x is greater that the current value of LCi.
In this case, site Si advances its logical clock to the value x + 1.
69
Replication with Weak ConsistencyReplication with Weak Consistency
Many commercial databases support replication of data with weak degrees of consistency (I.e., without a guarantee of serializabiliy)
E.g.: master-slave replication: updates are performed at a single “master” site, and propagated to “slave” sites. Propagation is not part of the update transaction: its is decoupled
May be immediately after transaction commits May be periodic
Data may only be read at slave sites, not updated No need to obtain locks at any remote site
Particularly useful for distributing information E.g. from central office to branch-office
Also useful for running read-only queries offline from the main database
70
Replication with Weak Consistency (Cont.)Replication with Weak Consistency (Cont.)
Replicas should see a transaction-consistent snapshot of the database That is, a state of the database reflecting all effects of all
transactions up to some point in the serialization order, and no effects of any later transactions.
E.g. Oracle provides a create snapshot statement to create a snapshot of a relation or a set of relations at a remote site snapshot refresh either by recomputation or by incremental update
Automatic refresh (continuous or periodic) or manual refresh
71
Multimaster ReplicationMultimaster Replication
With multimaster replication (also called update-anywhere replication) updates are permitted at any replica, and are automatically propagated to all replicas Basic model in distributed databases, where transactions are
unaware of the details of replication, and database system propagates updates as part of the same transaction
Coupled with 2 phase commit
Many systems support lazy propagation where updates are transmitted after transaction commits
Allow updates to occur even if some sites are disconnected from the network, but at the cost of consistency
72
Lazy Propagation (Cont.)Lazy Propagation (Cont.)
Two approaches to lazy propagation Updates at any replica translated into update at primary site, and then propagated
back to all replicas
Updates to an item are ordered serially
But transactions may read an old value of an item and use it to perform an update, result in non-serializability
Updates are performed at any replica and propagated to all other replicas
Causes even more serialization problems:
– Same data item may be updated concurrently at multiple sites!
Conflict detection is a problem Some conflicts due to lack of distributed concurrency control can be detected
when updates are propagated to other sites (will see later, in Section 23.5.4)
Conflict resolution is very messy Resolution may require committed transactions to be rolled back
Durability violated
Automatic resolution may not be possible, and human intervention may be required
73
Read One Write All (Available)Read One Write All (Available)
Biased protocol is a special case of quorum consensus Allows reads to read any one replica but updates require all replicas
to be available at commit time (called read one write all)
Read one write all available (ignoring failed sites) is attractive, but incorrect If failed link may come back up, without a disconnected site ever
being aware that it was disconnected
The site then has old values, and a read from that site would return an incorrect value
If site was aware of failure reintegration could have been performed, but no way to guarantee this
With network partitioning, sites in each partition may update same item concurrently
believing sites in other partitions have all failed
74
Site ReintegrationSite Reintegration
When failed site recovers, it must catch up with all updates that it missed while it was down Problem: updates may be happening to items whose replica is
stored at the site while the site is recovering
Solution 1: halt all updates on system while reintegrating a site
Unacceptable disruption
Solution 2: lock all replicas of all data items at the site, update to latest version, then release locks
Other solutions with better concurrency also available
75
Coordinator SelectionCoordinator Selection
Backup coordinators site which maintains enough information locally to assume the role
of coordinator if the actual coordinator fails
executes the same algorithms and maintains the same internal state information as the actual coordinator fails executes state information as the actual coordinator
allows fast recovery from coordinator failure but involves overhead during normal processing.
Election algorithms used to elect a new coordinator in case of failures
Example: Bully Algorithm - applicable to systems where every site can send a message to every other site.
76
Distributed Query ProcessingDistributed Query Processing
For centralized systems, the primary criterion for measuring the cost of a particular strategy is the number of disk accesses.
In a distributed system, other issues must be taken into account: The cost of a data transmission over the network.
The potential gain in performance from having several sites process parts of the query in parallel.
77
Query TransformationQuery Transformation
Translating algebraic queries on fragments. It must be possible to construct relation r from its fragments
Replace relation r by the expression to construct relation r from its fragments
Consider the horizontal fragmentation of the account relation into
account1 = branch-name = “Hillside” (account)
account2 = branch-name = “Valleyview” (account)
The query branch-name = “Hillside” (account) becomes
branch-name = “Hillside” (account1 account2)
which is optimized into
branch-name = “Hillside” (account1) branch-name = “Hillside” (account2)
78
Example Query (Cont.)Example Query (Cont.)
Since account1 has only tuples pertaining to the Hillside branch, we can eliminate the selection operation.
Apply the definition of account2 to obtain
branch-name = “Hillside” ( branch-name = “Valleyview” (account)
This expression is the empty set regardless of the contents of the account relation.
Final strategy is for the Hillside site to return account1 as the result of the query.
79
Simple Join ProcessingSimple Join Processing
Consider the following relational algebra expression in which the three relations are neither replicated nor fragmented
account depositor branch
account is stored at site S1
depositor at S2
branch at S3
For a query issued at site SI, the system needs to produce the result at site SI
80
Possible Query Processing StrategiesPossible Query Processing Strategies
Ship copies of all three relations to site SI and choose a strategy for processing the entire locally at site SI.
Ship a copy of the account relation to site S2 and compute temp1 = account depositor at S2. Ship temp1 from S2 to S3, and compute temp2 = temp1 branch at S3. Ship the result temp2 to SI.
Devise similar strategies, exchanging the roles S1, S2, S3
Must consider following factors: amount of data being shipped
cost of transmitting a data block between sites
relative processing speed at each site
81
Semijoin StrategySemijoin Strategy
Let r1 be a relation with schema R1 stores at site S1
Let r2 be a relation with schema R2 stores at site S2
Evaluate the expression r1 r2 and obtain the result at S1.
1. Compute temp1 R1 R2 (r1) at S1.
2. Ship temp1 from S1 to S2.
3. Compute temp2 r2 temp1 at S2
4. Ship temp2 from S2 to S1.
5. Compute r1 temp2 at S1. This is the same as r1 r2.
82
Formal DefinitionFormal Definition
The semijoin of r1 with r2, is denoted by:
r1 r2
it is defined by:
R1 (r1 r2)
Thus, r1 r2 selects those tuples of r1 that contributed to r1 r2.
In step 3 above, temp2=r2 r1.
For joins of several relations, the above strategy can be extended to a series of semijoin steps.
83
Join Strategies that Exploit ParallelismJoin Strategies that Exploit Parallelism
Consider r1 r2 r3 r4 where relation ri is stored at site Si. The
result must be presented at site S1.
r1 is shipped to S2 and r1 r2 is computed at S2: simultaneously r3 is
shipped to S4 and r3 r4 is computed at S4
S2 ships tuples of (r1 r2) to S1 as they produced;
S4 ships tuples of (r3 r4) to S1
Once tuples of (r1 r2) and (r3 r4) arrive at S1 (r1 r2) (r3 r4) is
computed in parallel with the computation of (r1 r2) at S2 and the
computation of (r3 r4) at S4.
84
Heterogeneous Distributed DatabasesHeterogeneous Distributed Databases
Many database applications require data from a variety of preexisting databases located in a heterogeneous collection of hardware and software platforms
Data models may differ (hierarchical, relational , etc.)
Transaction commit protocols may be incompatible
Concurrency control may be based on different techniques (locking, timestamping, etc.)
System-level details almost certainly are totally incompatible.
A multidatabase system is a software layer on top of existing database systems, which is designed to manipulate information in heterogeneous databases Creates an illusion of logical database integration without any
physical database integration
85
AdvantagesAdvantages
Preservation of investment in existing hardware
system software
Applications
Local autonomy and administrative control
Allows use of special-purpose DBMSs
Step towards a unified homogeneous DBMS Full integration into a homogeneous DBMS faces
Technical difficulties and cost of conversion
Organizational/political difficulties
– Organizations do not want to give up control on their data
– Local databases wish to retain a great deal of autonomy
86
Unified View of DataUnified View of Data
Agreement on a common data model Typically the relational model
Agreement on a common conceptual schema Different names for same relation/attribute Same relation/attribute name means different things
Agreement on a single representation of shared data E.g. data types, precision, Character sets
ASCII vs EBCDIC Sort order variations
Agreement on units of measure Variations in names
E.g. Köln vs Cologne, Mumbai vs Bombay
87
Query ProcessingQuery Processing
Several issues in query processing in a heterogeneous database
Schema translation Write a wrapper for each data source to translate data to a global
schema
Wrappers must also translate updates on global schema to updates on local schema
Limited query capabilities Some data sources allow only restricted forms of selections
E.g. web forms, flat file data sources
Queries have to be broken up and processed partly at the source and partly at a different site
Removal of duplicate information when sites have overlapping information Decide which sites to execute query
Global query optimization
88
Mediator SystemsMediator Systems
Mediator systems are systems that integrate multiple heterogeneous data sources by providing an integrated global view, and providing query facilities on global view Unlike full fledged multidatabase systems, mediators generally do
not bother about transaction processing
But the terms mediator and multidatabase are sometimes used interchangeably
The term virtual database is also used to refer to mediator/multidatabase systems
89
Parallel DatabasesParallel Databases
Introduction
I/O Parallelism
Interquery Parallelism
Intraquery Parallelism
Intraoperation Parallelism
Interoperation Parallelism
Design of Parallel Systems
90
IntroductionIntroduction
Parallel machines are becoming quite common and affordable Prices of microprocessors, memory and disks have dropped sharply
Databases are growing increasingly large large volumes of transaction data are collected and stored for later
analysis.
multimedia objects like images are increasingly stored in databases
Large-scale parallel database systems increasingly used for: storing large volumes of data
processing time-consuming decision-support queries
providing high throughput for transaction processing
91
Parallelism in DatabasesParallelism in Databases
Data can be partitioned across multiple disks for parallel I/O.
Individual relational operations (e.g., sort, join, aggregation) can be executed in parallel data can be partitioned and each processor can work independently
on its own partition.
Queries are expressed in high level language (SQL, translated to relational algebra) makes parallelization easier.
Different queries can be run in parallel with each other.Concurrency control takes care of conflicts.
Thus, databases naturally lend themselves to parallelism.
92
I/O ParallelismI/O Parallelism
Reduce the time required to retrieve relations from disk by partitioning
the relations on multiple disks.
Horizontal partitioning – tuples of a relation are divided among many disks such that each tuple resides on one disk.
Partitioning techniques (number of disks = n):
Round-robin:
Send the ith tuple inserted in the relation to disk i mod n.
Hash partitioning:
Choose one or more attributes as the partitioning attributes.
Choose hash function h with range 0…n - 1
Let i denote result of hash function h applied to the partitioning attribute value of a tuple. Send tuple to disk i.
93
I/O Parallelism (Cont.)I/O Parallelism (Cont.)
Partitioning techniques (cont.):
Range partitioning: Choose an attribute as the partitioning attribute.
A partitioning vector [vo, v1, ..., vn-2] is chosen.
Let v be the partitioning attribute value of a tuple. Tuples such that vi vi+1 go to disk I + 1. Tuples with v < v0 go to disk 0 and tuples with v vn-2 go to disk n-1.
E.g., with a partitioning vector [5,11], a tuple with partitioning attribute value of 2 will go to disk 0, a tuple with value 8 will go to disk 1, while a tuple with value 20 will go to disk2.
94
Comparison of Partitioning TechniquesComparison of Partitioning Techniques
Evaluate how well partitioning techniques support the following types of data access:
1.Scanning the entire relation.
2.Locating a tuple associatively – point queries. E.g., r.A = 25.
3.Locating all tuples such that the value of a given attribute lies within a specified range – range queries. E.g., 10 r.A < 25.
95
Comparison of Partitioning Techniques (Cont.)Comparison of Partitioning Techniques (Cont.)
Round robin:
Advantages Best suited for sequential scan of entire relation on each query.
All disks have almost an equal number of tuples; retrieval work is thus well balanced between disks.
Range queries are difficult to process No clustering -- tuples are scattered across all disks
96
Comparison of Partitioning Techniques(Cont.)Comparison of Partitioning Techniques(Cont.)
Hash partitioning:
Good for sequential access Assuming hash function is good, and partitioning attributes form a
key, tuples will be equally distributed between disks
Retrieval work is then well balanced between disks.
Good for point queries on partitioning attribute Can lookup single disk, leaving others available for answering other
queries.
Index on partitioning attribute can be local to disk, making lookup and update more efficient
No clustering, so difficult to answer range queries
97
Comparison of Partitioning Techniques (Cont.)Comparison of Partitioning Techniques (Cont.)
Range partitioning: Provides data clustering by partitioning attribute value. Good for sequential access Good for point queries on partitioning attribute: only one disk
needs to be accessed. For range queries on partitioning attribute, one to a few disks
may need to be accessed Remaining disks are available for other queries. Good if result tuples are from one to a few blocks. If many blocks are to be fetched, they are still fetched from one
to a few disks, and potential parallelism in disk access is wasted Example of execution skew.
98
Partitioning a Relation across DisksPartitioning a Relation across Disks
If a relation contains only a few tuples which will fit into a single disk block, then assign the relation to a single disk.
Large relations are preferably partitioned across all the available disks.
If a relation consists of m disk blocks and there are n disks available in the system, then the relation should be allocated min(m,n) disks.
99
Handling of SkewHandling of Skew
The distribution of tuples to disks may be skewed — that is, some disks have many tuples, while others may have fewer tuples.
Types of skew: Attribute-value skew.
Some values appear in the partitioning attributes of many tuples; all the tuples with the same value for the partitioning attribute end up in the same partition.
Can occur with range-partitioning and hash-partitioning. Partition skew.
With range-partitioning, badly chosen partition vector may assign too many tuples to some partitions and too few to others.
Less likely with hash-partitioning if a good hash-function is chosen.
100
Handling Skew in Range-PartitioningHandling Skew in Range-Partitioning
To create a balanced partitioning vector (assuming partitioning attribute forms a key of the relation): Sort the relation on the partitioning attribute.
Construct the partition vector by scanning the relation in sorted order as follows.
After every 1/nth of the relation has been read, the value of the partitioning attribute of the next tuple is added to the partition vector.
n denotes the number of partitions to be constructed.
Duplicate entries or imbalances can result if duplicates are present in partitioning attributes.
Alternative technique based on histograms used in practice
101
Handling Skew using HistogramsHandling Skew using Histograms
Balanced partitioning vector can be constructed from histogram in a relatively straightforward fashion
Assume uniform distribution within each range of the histogram
Histogram can be constructed by scanning relation, or sampling (blocks containing) tuples of the relation
102
Handling Skew Using Virtual Processor Handling Skew Using Virtual Processor Partitioning Partitioning
Skew in range partitioning can be handled elegantly using virtual processor partitioning: create a large number of partitions (say 10 to 20 times the number
of processors)
Assign virtual processors to partitions either in round-robin fashion or based on estimated cost of processing each virtual partition
Basic idea: If any normal partition would have been skewed, it is very likely the
skew is spread over a number of virtual partitions
Skewed virtual partitions get spread across a number of processors, so work gets distributed evenly!
103
Interquery ParallelismInterquery Parallelism
Queries/transactions execute in parallel with one another.
Increases transaction throughput; used primarily to scale up a transaction processing system to support a larger number of transactions per second.
Easiest form of parallelism to support, particularly in a shared-memory parallel database, because even sequential database systems support concurrent processing.
More complicated to implement on shared-disk or shared-nothing architectures Locking and logging must be coordinated by passing messages
between processors.
Data in a local buffer may have been updated at another processor.
Cache-coherency has to be maintained — reads and writes of data in buffer must find latest version of data.
104
Intraquery ParallelismIntraquery Parallelism
Execution of a single query in parallel on multiple processors/disks; important for speeding up long-running queries.
Two complementary forms of intraquery parallelism : Intraoperation Parallelism – parallelize the execution of each
individual operation in the query.
Interoperation Parallelism – execute the different operations in a query expression in parallel.
the first form scales better with increasing parallelism becausethe number of tuples processed by each operation is typically more than the number of operations in a query
105
Parallel Processing of Relational OperationsParallel Processing of Relational Operations
Our discussion of parallel algorithms assumes: read-only queries
shared-nothing architecture
n processors, P0, ..., Pn-1, and n disks D0, ..., Dn-1, where disk Di is associated with processor Pi.
If a processor has multiple disks they can simply simulate a single disk Di.
Shared-nothing architectures can be efficiently simulated on shared-memory and shared-disk systems. Algorithms for shared-nothing systems can thus be run on shared-
memory and shared-disk systems.
However, some optimizations may be possible.
106
Parallel SortParallel Sort
Parallel External Sort-Merge
Assume the relation has already been partitioned among disks D0, ..., Dn-1 (in whatever manner).
Each processor Pi locally sorts the data on disk Di.
The sorted runs on each processor are then merged to get the final sorted output.
Parallelize the merging of sorted runs as follows:
The sorted partitions at each processor Pi are range-partitioned across the processors P0, ..., Pm-1.
Each processor Pi performs a merge on the streams as they are received, to get a single sorted run.
The sorted runs on processors P0,..., Pm-1 are concatenated to get the final result.
107
Parallel JoinParallel Join
The join operation requires pairs of tuples to be tested to see if they satisfy the join condition, and if they do, the pair is added to the join output.
Parallel join algorithms attempt to split the pairs to be tested over several processors. Each processor then computes part of the join locally.
In a final step, the results from each processor can be collected together to produce the final result.
108
Partitioned JoinPartitioned Join
For equi-joins and natural joins, it is possible to partition the two input relations across the processors, and compute the join locally at each processor.
Let r and s be the input relations, and we want to compute r r.A=s.B s.
r and s each are partitioned into n partitions, denoted r0, r1, ..., rn-1 and s0, s1, ..., sn-1.
Can use either range partitioning or hash partitioning.
r and s must be partitioned on their join attributes r.A and s.B), using the same range-partitioning vector or hash function.
Partitions ri and si are sent to processor Pi,
Each processor Pi locally computes ri ri.A=si.B si. Any of the standard
join methods can be used.
109
Partitioned Join (Cont.)Partitioned Join (Cont.)
110
Fragment-and-Replicate JoinFragment-and-Replicate Join
Partitioning not possible for some join conditions e.g., non-equijoin conditions, such as r.A > s.B.
For joins were partitioning is not applicable, parallelization can be accomplished by fragment and replicate technique Depicted on next slide
Special case – asymmetric fragment-and-replicate: One of the relations, say r, is partitioned; any partitioning technique
can be used. The other relation, s, is replicated across all the processors.
Processor Pi then locally computes the join of ri with all of s using any join technique.
111
Depiction of Fragment-and-Replicate JoinsDepiction of Fragment-and-Replicate Joins
a. Asymmetric Fragment and Replicate b. Fragment and Replicate
112
Fragment-and-Replicate Join (Cont.)Fragment-and-Replicate Join (Cont.)
General case: reduces the sizes of the relations at each processor.
r is partitioned into n partitions,r0, r1, ..., r n-1;s is partitioned into m partitions, s0, s1, ..., sm-1.
Any partitioning technique may be used.
There must be at least m * n processors.
Label the processors as
P0,0, P0,1, ..., P0,m-1, P1,0, ..., Pn-1m-1.
Pi,j computes the join of ri with sj. In order to do so, ri is replicated to Pi,0, Pi,1, ..., Pi,m-1, while si is replicated to P0,i, P1,i, ..., Pn-1,i
Any join technique can be used at each processor Pi,j.
113
Fragment-and-Replicate Join (Cont.)Fragment-and-Replicate Join (Cont.)
Both versions of fragment-and-replicate work with any join condition, since every tuple in r can be tested with every tuple in s.
Usually has a higher cost than partitioning, since one of the relations (for asymmetric fragment-and-replicate) or both relations (for general fragment-and-replicate) have to be replicated.
Sometimes asymmetric fragment-and-replicate is preferable even though partitioning could be used. E.g., say s is small and r is large, and already partitioned. It may be
cheaper to replicate s across all processors, rather than repartition r and s on the join attributes.
114
Partitioned Parallel Hash-JoinPartitioned Parallel Hash-Join
Parallelizing partitioned hash join:
Assume s is smaller than r and therefore s is chosen as the build relation.
A hash function h1 takes the join attribute value of each tuple in s and maps this tuple to one of the n processors.
Each processor Pi reads the tuples of s that are on its disk Di, and sends each tuple to the appropriate processor based on hash function h1. Let si denote the tuples of relation s that are sent to processor Pi.
As tuples of relation s are received at the destination processors, they are partitioned further using another hash function, h2, which is used to compute the hash-join locally. (Cont.)
115
Partitioned Parallel Hash-Join (Cont.)Partitioned Parallel Hash-Join (Cont.)
Once the tuples of s have been distributed, the larger relation r is redistributed across the m processors using the hash function h1
Let ri denote the tuples of relation r that are sent to processor Pi.
As the r tuples are received at the destination processors, they are repartitioned using the function h2
(just as the probe relation is partitioned in the sequential hash-join algorithm).
Each processor Pi executes the build and probe phases of the hash-join algorithm on the local partitions ri and s of r and s to produce a partition of the final result of the hash-join.
Note: Hash-join optimizations can be applied to the parallel case e.g., the hybrid hash-join algorithm can be used to cache some of
the incoming tuples in memory and avoid the cost of writing them and reading them back in.
116
Parallel Nested-Loop JoinParallel Nested-Loop Join
Assume that relation s is much smaller than relation r and that r is stored by
partitioning.
there is an index on a join attribute of relation r at each of the partitions of relation r.
Use asymmetric fragment-and-replicate, with relation s being replicated, and using the existing partitioning of relation r.
Each processor Pj where a partition of relation s is stored reads the tuples of relation s stored in Dj, and replicates the tuples to every other processor Pi.
At the end of this phase, relation s is replicated at all sites that store tuples of relation r.
Each processor Pi performs an indexed nested-loop join of relation s with the ith partition of relation r.
117
Cost of Parallel Evaluation of OperationsCost of Parallel Evaluation of Operations
If there is no skew in the partitioning, and there is no overhead due to the parallel evaluation, expected speed-up will be 1/n
If skew and overheads are also to be taken into account, the time taken by a parallel operation can be estimated as
Tpart + Tasm + max (T0, T1, …, Tn-1)
Tpart is the time for partitioning the relations
Tasm is the time for assembling the results
Ti is the time taken for the operation at processor Pi
this needs to be estimated taking into account the skew, and the time wasted in contentions.
118
Interoperator ParallelismInteroperator Parallelism
Pipelined parallelism Consider a join of four relations
r1 r2 r3 r4
Set up a pipeline that computes the three joins in parallel
Let P1 be assigned the computation of temp1 = r1 r2
And P2 be assigned the computation of temp2 = temp1 r3
And P3 be assigned the computation of temp2 r4
Each of these operations can execute in parallel, sending result tuples it computes to the next operation even as it is computing further results
Provided a pipelineable join evaluation algorithm (e.g. indexed nested loops join) is used
119
Factors Limiting Utility of Pipeline Factors Limiting Utility of Pipeline ParallelismParallelism
Pipeline parallelism is useful since it avoids writing intermediate results to disk
Useful with small number of processors, but does not scale up well with more processors. One reason is that pipeline chains do not attain sufficient length.
Cannot pipeline operators which do not produce output until all inputs have been accessed (e.g. aggregate and sort)
Little speedup is obtained for the frequent cases of skew in which one operator's execution cost is much higher than the others.
120
Independent ParallelismIndependent Parallelism
Independent parallelism Consider a join of four relations
r1 r2 r3 r4
Let P1 be assigned the computation of temp1 = r1 r2
And P2 be assigned the computation of temp2 = r3 r4
And P3 be assigned the computation of temp1 temp2 P1 and P2 can work independently in parallel P3 has to wait for input from P1 and P2
– Can pipeline output of P1 and P2 to P3, combining independent parallelism and pipelined parallelism
Does not provide a high degree of parallelism useful with a lower degree of parallelism. less useful in a highly parallel system,
121
Design of Parallel SystemsDesign of Parallel Systems
Some issues in the design of parallel systems:
Parallel loading of data from external sources is needed in order to handle large volumes of incoming data.
Resilience to failure of some processors or disks. Probability of some disk or processor failing is higher in a parallel
system.
Operation (perhaps with degraded performance) should be possible in spite of failure.
Redundancy achieved by storing extra copy of every data item at another processor.
122
Design of Parallel Systems Design of Parallel Systems
On-line reorganization of data and schema changes must be supported. For example, index construction on terabyte databases can take
hours or days even on a parallel system.
Need to allow other processing (insertions/deletions/updates) to be performed on relation even as index is being constructed.
Basic idea: index construction tracks changes and ``catches up'‘ on changes at the end.
Also need support for on-line repartitioning and schema changes (executed concurrently with other processing).