U.S. patent application number 14/159640 was filed with the patent office on 2014-07-10 for logical replication in clustered database system with adaptive cloning.
This patent application is currently assigned to MICROSOFT CORPORATION. The applicant listed for this patent is Microsoft Corporation. Invention is credited to Peter Byrne, Robin D. Dhamankar, Qun Guo, Michael E. Habben, Xiaowei Jiang, Vishal Kathuria, Mahesh K. Sreenivas, Rui Wang, Yixue Zhu.
Application Number | 20140195489 14/159640 |
Document ID | / |
Family ID | 44745513 |
Filed Date | 2014-07-10 |
United States Patent
Application |
20140195489 |
Kind Code |
A1 |
Wang; Rui ; et al. |
July 10, 2014 |
LOGICAL REPLICATION IN CLUSTERED DATABASE SYSTEM WITH ADAPTIVE
CLONING
Abstract
Architecture that addresses an end-to-end solution for logical
transactional replication from a shared-nothing clustered database
management system, which uses adaptive cloning for high
availability. This can be time based using a global logical
timestamp. The disclosed architecture, used for refreshing stale
clones, does not preserve user transaction boundaries, which is a
more complex situation than where the boundaries are preserved. In
such a scenario it is probable that for a given data segment no
clone of the segment may contain the complete user transaction
history, and hence, the history has to be pieced together from the
logs of multiple different clones. This is accomplished such that
log harvesting is coordinated with the clone state transitions to
ensure the correctness of logical replication.
Inventors: |
Wang; Rui; (Redmond, WA)
; Habben; Michael E.; (Sammamish, WA) ; Guo;
Qun; (Bellevue, WA) ; Byrne; Peter; (Redmond,
WA) ; Dhamankar; Robin D.; (Bellevue, WA) ;
Kathuria; Vishal; (Woodinville, WA) ; Sreenivas;
Mahesh K.; (Sammamish, WA) ; Zhu; Yixue;
(Sammamish, WA) ; Jiang; Xiaowei; (Bellevue,
WA) |
|
Applicant: |
Name |
City |
State |
Country |
Type |
Microsoft Corporation |
Redmond |
WA |
US |
|
|
Assignee: |
MICROSOFT CORPORATION
Redmond
WA
|
Family ID: |
44745513 |
Appl. No.: |
14/159640 |
Filed: |
January 21, 2014 |
Related U.S. Patent Documents
|
|
|
|
|
|
Application
Number |
Filing Date |
Patent Number |
|
|
12758045 |
Apr 12, 2010 |
8671074 |
|
|
14159640 |
|
|
|
|
Current U.S.
Class: |
707/634 |
Current CPC
Class: |
G06F 9/466 20130101;
G06F 16/219 20190101; G06F 11/2097 20130101; G06F 2201/82 20130101;
G06F 16/273 20190101; G06F 2201/80 20130101; G06F 11/2041 20130101;
G06F 11/2048 20130101 |
Class at
Publication: |
707/634 |
International
Class: |
G06F 17/30 20060101
G06F017/30 |
Claims
1. A computer-implemented database management method performed by a
computer system executing machine-readable instructions, the method
comprising acts of: storing database segments of a database as
primary clones across cluster nodes; maintaining redundant copies
of the database segments as secondary clones; and when refreshing a
stale secondary clone, only when it is determined that a degree of
staleness of the stale secondary clone is above a predetermined
threshold, rebuilding the stale secondary clone in its
entirety.
2. The method of claim 1, wherein when refreshing the stale
secondary clone, when it is determined that a degree of staleness
of the stale secondary clone is not above a predetermined
threshold, refreshing of the stale secondary clone is not
accomplished by replaying an associated transaction log.
3. The method of claim 1, wherein when refreshing the stale
secondary clone, when it is determined that a degree of staleness
of the stale secondary clone is not above a predetermined
threshold, refreshing of the stale secondary clone is performed
without preserving user transaction boundaries.
4. The method of claim 1, further comprising: receiving multiple
transaction log streams based on global logical timestamps of
commit processes employed in the cluster; and merging the received
multiple transaction log streams into a single change stream which
is then provided as a single stream of ordered changes to a
destination data system.
5. The method of claim 4, wherein at least one of the transaction
log streams lacks a complete record of changes.
6. The method of claim 5, wherein merging the received multiple
transaction log streams is coordinated with clone state transitions
to facilitate correctness of logical replication.
7. The method of claim 1, wherein one and only one clone of each
data segment is assigned as the primary clone.
8. A computer-implemented database management system having
computer readable media that store executable instructions executed
by a processor, comprising: a database storing database segments as
primary clones across cluster nodes and redundant copies of the
database segments maintained as secondary clones; and a replication
component that refreshes a stale secondary clone and only when it
is determined that a degree of staleness of the stale secondary
clone is above a predetermined threshold, rebuilds the stale
secondary clone in its entirety.
9. The system of claim 8, wherein when refreshing the stale
secondary clone, when it is determined that a degree of staleness
of the stale secondary clone is not above a predetermined
threshold, refreshing of the stale secondary clone is not
accomplished by replaying an associated transaction log.
10. The system of claim 8, wherein when refreshing the stale
secondary clone, when it is determined that a degree of staleness
of the stale secondary clone is not above a predetermined
threshold, refreshing of the stale secondary clone is performed
without preserving user transaction boundaries.
11. The system of claim 8, the replication component further
receives multiple transaction log streams based on global logical
timestamps of commit processes employed in the cluster; and merges
the received multiple transaction log streams into a single change
stream which is then provided as a single stream of ordered changes
to a destination data system.
12. The system of claim 11, wherein at least one of the transaction
log streams lacks a complete record of changes.
13. The system of claim 12, wherein the replication component
merges the received multiple transaction log streams by
coordinating with clone state transitions to facilitate correctness
of logical replication.
14. The system of claim 8, wherein one and only one clone of each
data segment is assigned as the primary clone.
15. A computer-readable storage memory having computer-executable
instructions, which when executed perform actions, comprising:
storing database segments of a database as primary clones across
cluster nodes; maintaining redundant copies of the database
segments as secondary clones; and when refreshing a stale secondary
clone, only when it is determined that a degree of staleness of the
stale secondary clone is above a predetermined threshold,
rebuilding the stale secondary clone in its entirety.
16. The computer-readable storage memory of claim 15, wherein when
refreshing the stale secondary clone, when it is determined that a
degree of staleness of the stale secondary clone is not above a
predetermined threshold, refreshing of the stale secondary clone is
not accomplished by replaying an associated transaction log.
17. The computer-readable storage memory of claim 15, wherein when
refreshing the stale secondary clone, when it is determined that a
degree of staleness of the stale secondary clone is not above a
predetermined threshold, refreshing of the stale secondary clone is
performed without preserving user transaction boundaries.
18. The computer-readable storage memory of claim 15, further
comprising computer-executable instructions, which when executed
perform actions, comprising: receiving multiple transaction log
streams based on global logical timestamps of commit processes
employed in the cluster; and merging the received multiple
transaction log streams into a single change stream which is then
provided as a single stream of ordered changes to a destination
data system.
19. The computer-readable storage memory of claim 18, wherein at
least one of the transaction log streams lacks a complete record of
changes.
20. The computer-readable storage memory of claim 19, wherein
merging the received multiple transaction log streams is
coordinated with clone state transitions to facilitate correctness
of logical replication.
Description
CROSS-REFERENCE TO RELATED APPLICATIONS
[0001] This application claims priority to and is a continuation of
co-pending U.S. patent application Ser. No. 12/758,045 entitled
"Logical Replication in Clustered Database System with Adaptive
Cloning" and filed Apr. 12, 2010, which is incorporated herein by
reference.
BACKGROUND
[0002] In an effort to make database management systems more
scalable, multiple server processes can be clustered in such a way
that together they function as a single logical system. Databases
may be hosted on multiple nodes (servers), each of which hosts zero
or more segments of the database. To distribute the data, an object
(e.g. table or an index on the table) can be logically sub-divided
into data segments. Physical manifestation of a data segment is
called a clone. Thus, by dividing an entity (e.g. table, index)
into one or more logical data segments and distributing the data
corresponding to the logical segments (clones) across various
nodes, scalability for the system can be achieved. While doing so,
in order to make the data highly available and the system as a
whole more fault-tolerant, multiple redundant copies of the data
(multiple clones for each data segment) may be maintained. When
nodes move between online and offline states due to failure or
intentional administrative purposes, these clones can become stale
due to updates that occur while the clones are offline
(inaccessible).
[0003] Disaster Recovery (DR) is a key requirement for
enterprise-class database systems. Facilitating disaster recovery
typically involves maintaining up-to-date copies of databases at
multiple locations that are separated geographically. One
rudimentary approach to replicate a database distributed on
multiple nodes to a remote site is to shutdown the database and
copy data associated with the database onto the remote site,
thereby making more efficient and effective techniques for disaster
recovery desirable. Although this technique is able to provide a
consistent image of the database to the remote site, it involves
database downtime. Depending on the size of the database, this
process is time consuming, inefficient, resource intensive, and
incurs data loss most of the time. Moreover, taking a database
offline, even for short periods of time, can be prohibitively
costly and problematic. Thus, more effective and efficient
replication strategies are desired for disaster recovery.
SUMMARY
[0004] The following presents a simplified summary in order to
provide a basic understanding of some novel embodiments described
herein. This summary is not an extensive overview, and it is not
intended to identify key/critical elements or to delineate the
scope thereof. Its sole purpose is to present some concepts in a
simplified form as a prelude to the more detailed description that
is presented later.
[0005] The disclosed architecture addresses an end-to-end solution
for logical transactional replication in a shared-nothing (SN)
clustered database management system, which uses adaptive cloning
for high availability. Logical replication can be established
between two databases associated with two different database
management systems. Since at the logical level, replication is
agnostic to how the data is stored on the source and the target.
Thus, the source can be the shared-nothing cluster and the target
can be a standalone (non-clustered) instance. The logical
replication can be time based using a global logical timestamp.
[0006] Cloning is the process of maintaining redundant copies of
data within the cluster. Cloning can employ a primary clone and one
or more secondary clones. Adaptive cloning means that when a
primary clone becomes unavailable (e.g., due to server
failure/shutdown) an up-to-date secondary clone is switched online
to become the primary clone. Since this switching is typically a
very fast operation, availability of data is not compromised. In
addition clones can be split and/or merged to dynamically load
balance the system.
[0007] The disclosed architecture, used for refreshing stale
clones, does not preserve user transaction boundaries, which is a
more complex situation than where the boundaries are preserved. In
such a scenario it is probable that for a given data segment no
clone of the data segment may contain the complete user transaction
history, and hence, the history has to be pieced together from the
logs of multiple different clones. This is accomplished such that
log harvesting is coordinated with the clone state transitions to
ensure the correctness of logical replication.
[0008] To the accomplishment of the foregoing and related ends,
certain illustrative aspects are described herein in connection
with the following description and the annexed drawings. These
aspects are indicative of the various ways in which the principles
disclosed herein can be practiced and all aspects and equivalents
thereof are intended to be within the scope of the claimed subject
matter. Other advantages and novel features will become apparent
from the following detailed description when considered in
conjunction with the drawings.
BRIEF DESCRIPTION OF THE DRAWINGS
[0009] FIG. 1 illustrates computer-implemented database management
system in accordance with the disclosed architecture.
[0010] FIG. 2 illustrates additional details of a database
management system.
[0011] FIG. 3 illustrates a time and activity table with respect to
harvesting multiple log streams in the presence of adaptive
cloning.
[0012] FIG. 4 illustrates a computer-implemented database
management method in accordance with the disclosed
architecture.
[0013] FIG. 5 illustrates further aspects of the method of FIG.
4.
[0014] FIG. 6 illustrates further aspects of the method of FIG.
4.
[0015] FIG. 7 illustrates a block diagram of a computing system
that executes log replication in a shared-nothing cluster with
adaptive cloning in accordance with the disclosed architecture.
DETAILED DESCRIPTION
[0016] The disclosed architecture addresses the problem of logical
transactional replication from a shared-nothing (SN) clustered
database management system that employs adaptive cloning for high
availability of the database. An architectural goal is to provide a
single system image. Each node (server or brick) in the SN cluster
operates autonomously. That is, each node has sole ownership of the
data on that node. There is no node that shares data with another
node in the cluster. When utilizing multiple servers in the
cluster, the data is divided across the multiple servers. For
example, if the data on a single server comprises users, orders,
and products, the SN cluster approach employs three separate
machines for partitioning the data--a first node hosting users
data, a second node hosting orders data, and a third node hosting
products data. Moreover, data can be partitioned horizontally
across the cluster nodes.
[0017] The database management system can be a cluster system that
comprises a collection of servers or instances. An instance of the
server linked together with the necessary cluster infrastructure is
referred to as a brick (also a node). Thus, the cluster is a
collection of one or more bricks (also referred to as nodes). Data
is distributed across multiple bricks in the system to achieve
scale-out and performance. A database object, such as a table or
index, is divided into segments. Segments are logical entities. The
physical manifestation of a data segment is referred to as a clone.
Each data segment has one or more clones associated with
it--typically more than one, to provide high availability of the
data. Clones are transactionally consistent redundant copies of
data maintained in an up-to-date state and managed by the database
management system that enables a higher degree of availability,
dynamic load balancing, fault tolerance, and other designated
purposes. Clones have roles--primary and secondary. One and only
one clone of each data segment is assigned the primary role. Other
redundant clones of a given data segment are assigned the secondary
role.
[0018] When a brick goes offline (e.g., due to a failure or
intentional shutdown triggered by the user/administrator) all the
clones physically hosted on that brick go to offline state. Offline
clones are inaccessible, and hence, updates do not get propagated
while the clones remain inaccessible. A clone that misses one or
more updates becomes stale. Stale clones need to be refreshed to an
up-to-date state when the brick on which the stale clone is
physically located comes back online (rejoins). Note that the brick
can be online while the stale clone is offline so that stale clone
needs to be refreshed to an up-to-date state when the brick hosting
the stale clone comes back online (stale clone becomes accessible
again). Updates can complete successfully as long as at least the
primary clone(s) of the data segment(s) is available. (Some
distributed systems require a quorum of clones.) Updates to each
segment get applied on the primary clone and then propagated from
the primary clone to the corresponding secondary clones within the
same transaction. A primary clone can never be in the stale state.
When the brick rejoins, the clones on that brick become accessible
again.
[0019] The process of bringing a stale clone back to transactional
consistency with a primary clone is called clone refresh. Clone
refresh is not accomplished by replaying the associated transaction
log (missed updates). Rather, the system adopts a technique that
identifies mismatches in the stale clones and updates those records
in no particular order and without preserving the user transaction
boundaries. Such algorithms do not typically suffer from the
problem of having long catch-up phases.
[0020] However, a system using such a scheme for clone refresh can
get into a state where for a given segment of data, no clone of the
segment contains the full user transaction history in the
associated log. This makes logical replication a more complex task,
especially where transaction boundaries are not preserved. As is
described in greater detail herein, the solution is to coordinate
clone state transition with log harvesting to ensure the
correctness of logical replication.
[0021] Reference is now made to the drawings, wherein like
reference numerals are used to refer to like elements throughout.
In the following description, for purposes of explanation, numerous
specific details are set forth in order to provide a thorough
understanding thereof. It may be evident, however, that the novel
embodiments can be practiced without these specific details. In
other instances, well known structures and devices are shown in
block diagram form in order to facilitate a description thereof.
The intention is to cover all modifications, equivalents, and
alternatives falling within the spirit and scope of the claimed
subject matter.
[0022] FIG. 1 illustrates computer-implemented database management
system 100 in accordance with the disclosed architecture. The
system 100 includes a shared-nothing cluster 102 that employs
adaptive cloning to store database segments of a database across
cluster nodes. Each database segment is associated with transaction
log records that track changes of a transaction. The transaction
log records can be formed into a transaction log stream. Thus, the
cluster 102 can include multiple log streams 104 of changes that
occur for the clones on the cluster nodes. A replication component
106 receives a set of the multiple transaction log streams 108 of a
database to create a change history of the changes. The replication
component 106 merges the set of multiple log streams into a single
change stream 110 and then logically replicates the changes to one
or more destination data system(s) 112 as a single stream of
changes.
[0023] The replication component 106 maintains an original data
operation order of the changes when receiving the multiple
transaction log records of different transactions. The replication
component 106 adheres to an original data change order of
transaction log records within a transaction generated in the
multiple transaction log streams. The replication component 106
receives the multiple transaction log records based on global
logical timestamps of commit processes employed in the cluster.
[0024] The replication component 106 creates the change history as
a single ordered stream of the changes, and the single ordered
stream created from the multiple transaction log streams, where
each log stream is potentially incomplete (lacks a complete record
of all the changes).
[0025] FIG. 2 illustrates additional details of a database
management system 200. The system 200 includes the shared-nothing
cluster 102 of nodes 202 that employs adaptive cloning to store
data segments 204 as clones (206, 208, and 210) on the nodes 202. A
transaction coordination component 212 coordinates distributed
transaction processing of multiple transaction log records in
association with changes to the clones (206, 208, and 210). Each of
the data segments 204 is cloned across some of the nodes 202. For
example, a first segment 214 is associated with a Clone.sub.1 that
is distributed to nodes in the cluster 102. Similarly, a second
segment 216 is associated with a Clone.sub.2 that is distributed to
nodes in the cluster 102, and so on for other segments and clones
as shown. When a node goes offline it may or may not affect all
clones of a data segment or other data segments.
[0026] The replication component 106 harvests the transaction log
records from cluster nodes 202 as log streams. A data
virtualization management component 218 is responsible for clone
state transitions, clone placement, dynamic load balancing,
etc.
[0027] The transaction coordination component 212 can function as a
distributed transaction coordination manager that, among other
functions and operations, receives a commit request, receives
transaction orders from the nodes, and broadcasts a global logical
timestamp to the nodes to commit the transactions.
[0028] The global logical timestamp value is a monotonically
increasing value. Each commit/abort broadcast from the transaction
coordination manager results in an increase in the global logical
timestamp value. Individual transactions are associated with a
commit-global logical timestamp value that corresponds to the
global logical timestamp message in which the commit decision was
hardened and broadcast. The commit-global logical timestamp value
represents the logical time associated with the transaction commit.
Given two transactions that committed in different global logical
timestamp values, the transaction associated with a lower
commit-global logical timestamp value is said to have committed
earlier. The global logical timestamp mechanism therefore provides
a global clock to partially order data modifications in the
cluster.
[0029] Refreshing of a stale clone (as well as clone state
transition) is performed by the data virtualization component 218,
and starts with a transition to in-refresh state. Updates are
propagated to in-refresh clones and selectively get applied
depending on the phase in which the refresh operation is at (some
updates on the in-refresh clone can be skipped/discarded if the
ongoing refresh activity will catch up to it anyway). The clone is
eligible to be designated the primary role when the refreshing
process is completed.
[0030] As previously described, a primary clone can never be in a
stale state. However, the primary and secondary role assignments
can be changed. Stale clones are refreshed from the primary clone
of the data segment. A decision can be made to rebuild the clone in
its entirety from another source rather than refresh the clone, if
the degree of staleness is above a predetermined threshold.
[0031] Specifically, each record has a key and a system-defined
clone update identifier (CUID) column that is present in both
primary and secondary clones. The CUID is used to detect if the
record is up-to-date. Each table has a CUID "high watermark" value
that is stored in metadata. Suitable data manipulation language
operation(s) detect this CUID value, treat the value as a runtime
constant, and update every row having the associated value.
[0032] More specifically, when a clone goes offline, the language
operation that is accessing the clone will abort. The CUID high
watermark value for the table is updated in metadata concurrently
with marking in metadata that this clone should no longer be
updated (in an offline state). Any new operation that starts will
not update the offline secondary clones, and puts the new value of
CUID in the rows the operation inserts or updates on the primary
clone.
[0033] When a secondary clone is brought back online, the metadata
is updated to indicate that this clone is now to be maintained by
the language. This occurs by marking the beginning of transitioning
process of the stale clone to in-refresh state. The operation that
was already running continues to run without any changes, and is
unaware of the new clone.
[0034] All new operations that start after this point pick up the
new metadata (at query startup time, not compile time) when the
schema locks on the table are obtained, and keeps the secondary
clone up-to-date (updates propagated to secondary clones that are
in in-refresh or online state). Inserts are propagated as long as
there is no duplicate record. Updates and deletes are only
propagated if the CUID column before update/delete compares
successfully between the primary and secondary clone. That is, only
update or delete operations are performed on the row in the
secondary clone(s) if the clone is up-to-date; otherwise, ignore,
and update/delete from primary. When all old operations have
completed, metadata detects that the old metadata version is no
longer in use, deletes the metadata, and sends notification that
the clone is now officially in the in-refresh state.
[0035] At this point, a refresh algorithm runs and performs an
outerjoin in a read committed mode, between the primary clone and
secondary clone, finding the keys of the rows that may be
out-of-date (in other words, the key is not present in both primary
and secondary clone, or if the key is present in both clones but
CUID values do not match). This set of keys is inserted into a
temporary table. Because all new operations propagate to the
secondary, the outerjoin is guaranteed to find all out-of-date
rows; however, the operations may also find some rows that are
actually consistent.
[0036] Next, a series of batch transactions are run in a repeatable
read mode. The operations lock a set of keys on the primary clone,
delete all those keys from the secondary clone, insert the data row
from primary clone into the secondary clone, and commit the
transaction. When the batch completes, the set of records is
consistent, and will forever stay consistent because the language
operations continually update the records. The process continues,
making a single pass through the table. The secondary clone is
up-to-date when the table pass completes.
[0037] An algorithm such as described herein attempts to avoid long
catch-up (refreshing of a clone) typically associated with schemes
similar to algorithms that playback transaction logs. However,
updates involved in refreshing the clone do not preserve the
original user transaction boundaries. Thus, over time it is
possible to have data segments for which no single clone completely
has the entire user transaction history in the associated log. A
solution is to coordinate log harvesting with clone state
transition.
[0038] The following description is a general summary of the
lifetime of a distributed transaction in the SN clustered database.
A user starts a distributed transaction. The brick on which the
transaction is started is designated as the root brick of the
transaction. The user then issues a query under this transaction.
Part of the query needs to run in other bricks. As the query sends
parts to other bricks, the transaction information is also sent.
Separate transaction branches are setup in new bricks that are
involved in the transaction. After the query is finished, the
transaction branch stays in that brick until the transaction has
been committed or rolled back.
[0039] After the remote brick finishes using the transaction for
the current request, a summary of the activity of the transaction
is sent back to a root transaction branch. The root branch tracks
all the bricks that participated in the transaction and the time
when the last transaction log record is written in each brick.
[0040] The user requests to commit the transaction through the root
branch. As the root branch tracks all participating bricks and the
work performed by the transaction in each of the participating
bricks, it can determine when the transaction is prepared as soon
as it has been ascertained that the relevant portions of the log
have been persisted on each participant brick.
[0041] When a transaction is prepared, the root branch sends a
commit request to the distributed transaction manager (coordination
component 212). Each brick batches individual transaction commit
requests locally and then sends the complete batch as a transaction
order to the transaction manager. The distributed transaction
manager collects the transaction orders from all bricks and groups
the orders. At some predetermined time interval, the transaction
manager sends out a global logical timestamp broadcast which
includes all transaction orders collected during this interval.
Each broadcast is associated with a monotonically increasing global
logical timestamp. Only after the root brick receives this
broadcast is the root transaction branch committed locally.
Similarly, when the broadcast reaches remote participating bricks
of the transactions, the remote footprints are cleaned up.
[0042] The locks held by the transaction branches are not released
until commit has been processed, which occurs only after the
transaction commit has been included in a broadcast. This ensures
that two transactions that have data conflicts (operate on the same
piece of data) are serialized and cannot commit with the same
global timestamp.
[0043] A time-based scheme can be employed for the coordination and
commit of distributed transactions. Each brick batches individual
transaction commit requests locally and sends the complete batch as
a transaction order to the transaction manager.
[0044] The distributed transaction coordination component 212
collects transaction orders from all bricks for a period of time
and then groups the orders. At the end of he time period, the
coordination component 212 broadcasts these commit/abort decisions
for this group of transactions to all bricks in the cluster (after
decisions have been persisted).
[0045] To assign a unique timestamp to this group, a global logical
timestamp is associated with this group of transactions. The global
logical timestamp can be a number that increases monotonically.
Each commit/abort broadcast results in an increase in the global
logical timestamp. Individual transactions are associated with a
commit global logical timestamp corresponding to a group message in
which the commit decision was hardened and broadcast. The commit
global logical timestamp represents the logical time associated
with the transaction commit. Given two transactions that committed
in different commit global logical timestamps, the global logical
timestamp associated with a lower commit global logical timestamp
is said to have committed earlier. The global logical timestamp
mechanism therefore provides a global logical timestamp to
partially order data modifications in the cluster.
[0046] The order of replication of log records of dependent
transactions across multiple log streams is maintained. The
harvesting of log records of different transactions from the
multiple log streams is maintained to keep the original operation
order. This order is utilized to maintain logical consistency of
data (e.g., foreign key relations). Consider the following schema
as the running example:
TABLE-US-00001 CREATE TABLE pk_tab( pKey1 INT PRIMARY KEY, sKey1
INT, -- Non-clustered index on sKey value INT) CREATE TABLE fk_tab(
pKey2 INT PRIMARY KEY, sKey2 INT REFERENCES pk_tab1 (pKey1) ) --
FOREIGN KEY
where transaction T1 inserts a row (p1, s1, v1) into table pk_tab,
and transaction T2 inserts a row (fp2, p1) into table fk_tab, and
where p1 references the row inside pk_tab1.
[0047] In order for replication to be consistent, transaction T1
must be applied to subscribers before transaction T2. If the order
is not maintained, there can be foreign key violations as changes
are applied at the subscriber.
[0048] Due to the batched commit processing, transactions having
data conflicts commit in different commit broadcasts. Replicating
transactions in the order of the associated commit broadcast
ensures that the replication preserves the order, and hence, the
constraints.
[0049] The order of the replication of log records within a
transaction generated in multiple log streams is followed. The log
records within a transaction may be distributed in multiple logs.
Log-based replication extracts data changes from multiple logs, and
merges the data changes into a single data change stream. The
original data change order within a transaction is followed when
the data changes are applied to the remote database server;
otherwise, data consistency can be violated.
[0050] Continuing with the example above, consider that the
database includes two database segments, where a database is a
collection of one or more database segments. Further, consider that
pk_tab is located in dbsegment1 and fk_tab is located in
dbsegment2. Consider the following transaction:
TABLE-US-00002 begin tranT insert a row (p1, s1, v1) into table
pk_tab insert a row (fp2, p1) into table fk_tab commit tran T In
dbsegment1 transaction log: begin-tran-Tx insert-row-(p1, s1, v1)
commit-tran-Tx In dbsegment2 transaction log: begin-tran-Tx
insert-row-(fp2, p1) commit-tran-Tx
[0051] Although these logs can be harvested independently, the
extracted changes, "insert row (p1, s1, v1) into table pk_tab" and
"insert a row (fp2, p1) into table fk_tab" are applied in order;
otherwise, the foreign key constraints in fk_tab will be
violated.
[0052] Each transaction branch has a seed for command sequence
identifiers. When a transaction's root branch starts, the root
branch initializes the seed to one. At a transaction branch, after
a new command sequence identifier value has been generated using
the seed, the seed is incremented. When a transaction branch sends
a data change request to another branch of the same transaction,
the current seed of the source transaction branch is transmitted
with the request. After a transaction branch receives a request
with seed x2, its new seed becomes max(x1, x2), where x1 is its
current seed. When a transaction branch sends a response, the
current seed is transmitted as well. After a transaction branch
receives a response, the recipient's seed is changed in the same
way as above, that is, after a transaction branch receives a
request with seed x2, its new seed will become max(x1, x2), where
x1 is its current seed.
[0053] The rationale is if the relative order of two changes within
a transaction is relevant for logical consistency, the change
command sequence identifiers must be in the right order; otherwise,
the two data changes within a transaction can occur in parallel in
different database segments and the values of the command sequence
identifiers is irrelevant. Command sequence identifier generation
does not guarantee that all data changes within a transaction will
be totally ordered; it merely guarantees partial ordering for
logical consistency.
[0054] FIG. 2 illustrates a time and activity table 200 with
respect to harvesting multiple log streams in the presence of
adaptive cloning. As described above, clone refresh does not retain
a complete transactional history of the updates when bringing stale
clones up-to-date. Using the same schema as above, consider the
following scenario between three log streams associated with three
independent failure units:
[0055] Data segment pk_tab has clones pk_tab_c1 and pk_tab_c2 on
brick B1 and B2, respectively; the clone on brick B1 is a primary
clone and the clone of brick B2 is a secondary clone. Fk_tab's
primary clone, fk_tab_c1, is on brick B3.
[0056] Log harvesting and applying (replicating) changes to the
subscriber is performed in the order of the commit global logical
timestamp of the associated transactions. Consider the following
harvesting of the log for pk_tab:
[0057] Harvest the log from brick B1 for all changes to the data
segment pk_tab up to global logical timestamp N-1 and harvest the
remaining set of changes from brick B2 after the clone pk_tab_c2
has been brought online.
[0058] This order violates consistency. The two updates marked (1)
at brick B1 and global logical timestamp N+1, and (2) at brick B3
and global logical timestamp N+2, are order dependent. Although,
before clone pk_tab_c2 is brought online, the update in (1) has
been applied by the clone refresh process; this update has been
marked in a transaction that committed after the dependent
transaction that inserted a foreign key reference. If the log of
brick B2 is harvested between global logical timestamps N and N+6,
then during the application of the changes at the subscriber,
foreign key constraint will be violated.
[0059] In general, log records generated during clone refresh are
unsafe to harvest. Instead, only log records generated to online
clones can be harvested. In order to ensure that consistent history
of all user transactions is accounted for during log harvesting and
application to the subscriber, clone refresh and clone switch to
online state abide by the following constraint:
[0060] If a clone is brought to in-refresh state at global logical
timestamp N and the refresh finishes at global logical timestamp
N+x, the clone can be brought back to the online state only after
the log reader has harvested log records for all global logical
timestamps up to and including global logical timestamp N+x.
[0061] Consider that a table segment has two clones--one primary
(online) and the other secondary in-refresh state, and right after
the clone finishes refresh at global logical timestamp N+x, the
brick hosting the primary clone goes offline. At this point,
although the brick containing the clone just finishing refresh is
ready to serve this table segment for users, its log records are
unsafe for harvesting. Instead, the offline brick needs to be
brought online and recovered first so that its log up to global
logical timestamp N+x is available for harvesting. With the above
constraint, it is guaranteed that as long as a table segment is
online, its historic data changes are available from the logs of
bricks that contain its online clones.
[0062] Put another way, a database management system is provided
that performs replication of data from a shared-nothing cluster
that employs adaptive cloning. The cluster stores database segments
of a database across cluster nodes, each data segment having
transaction log records that track changes of a transaction, the
transaction log records formed into a transaction log stream. The
replication component receives multiple transaction log streams
from online clones based on the global logical timestamps and
command sequence identifiers to create an ordered change history of
the changes, merges the multiple log streams into a single change
stream, and replicates the changes to a destination in the single
change stream according to the change history.
[0063] The replication component maintains an original data
operation order of the changes when harvesting the multiple log
streams, each of which lacks a complete record of all the changes.
The replication component coordinates log harvesting with a data
virtualization management component of the shared-nothing cluster,
so that an offline clone is not switched to online until log
harvesting has progressed to the point when online refresh is
finished.
[0064] Included herein is a set of flow charts representative of
exemplary methodologies for performing novel aspects of the
disclosed architecture. While, for purposes of simplicity of
explanation, the one or more methodologies shown herein, for
example, in the form of a flow chart or flow diagram, are shown and
described as a series of acts, it is to be understood and
appreciated that the methodologies are not limited by the order of
acts, as some acts may, in accordance therewith, occur in a
different order and/or concurrently with other acts from that shown
and described herein. For example, those skilled in the art will
understand and appreciate that a methodology could alternatively be
represented as a series of interrelated states or events, such as
in a state diagram. Moreover, not all acts illustrated in a
methodology may be required for a novel implementation.
[0065] FIG. 4 illustrates a computer-implemented database
management method in accordance with the disclosed architecture. At
400, log streams of changes to data clones in a shared-nothing
cluster that employs adaptive cloning are received. At 402, the log
streams are merged into a single stream of ordered changes. At 404,
the ordered changes are replicated to a destination data
system.
[0066] FIG. 5 illustrates further aspects of the method of FIG. 4.
At 500, an original data operation order of the changes is
maintained when merging the log streams. At 502, an original data
change order of the log streams within a transaction generated in
the log streams is maintained. At 504, replication of transaction
log records is restricted to online clones. At 506, l the changes
are replicated according to a global logical timestamp.
[0067] FIG. 6 illustrates further aspects of the method of FIG. 4.
At 600, the single log stream of ordered changes is created for
replication according to a merged log history of the changes. At
602, the single log stream is created from clone transaction logs
each of which lacks an entire change history. At 604, the changes
are replicated based on a commit global logical timestamp.
[0068] As used in this application, the terms "component" and
"system" are intended to refer to a computer-related entity, either
hardware, a combination of software and tangible hardware,
software, or software in execution. For example, a component can
be, but is not limited to, tangible components such as a processor,
chip memory, mass storage devices (e.g., optical drives, solid
state drives, and/or magnetic storage media drives), and computers,
and software components such as a process running on a processor,
an object, an executable, module, a thread of execution, and/or a
program. By way of illustration, both an application running on a
server and the server can be a component. One or more components
can reside within a process and/or thread of execution, and a
component can be localized on one computer and/or distributed
between two or more computers. The word "exemplary" may be used
herein to mean serving as an example, instance, or illustration.
Any aspect or design described herein as "exemplary" is not
necessarily to be construed as preferred or advantageous over other
aspects or designs.
[0069] Referring now to FIG. 7, there is illustrated a block
diagram of a computing system 700 that executes log replication in
a shared-nothing cluster with adaptive cloning in accordance with
the disclosed architecture. In order to provide additional context
for various aspects thereof, FIG. 7 and the following description
are intended to provide a brief, general description of the
suitable computing system 700 in which the various aspects can be
implemented. While the description above is in the general context
of computer-executable instructions that can run on one or more
computers, those skilled in the art will recognize that a novel
embodiment also can be implemented in combination with other
program modules and/or as a combination of hardware and
software.
[0070] The computing system 700 for implementing various aspects
includes the computer 702 having processing unit(s) 704, a
computer-readable storage such as a system memory 706, and a system
bus 708. The processing unit(s) 704 can be any of various
commercially available processors such as single-processor,
multi-processor, single-core units and multi-core units. Moreover,
those skilled in the art will appreciate that the novel methods can
be practiced with other computer system configurations, including
minicomputers, mainframe computers, as well as personal computers
(e.g., desktop, laptop, etc.), hand-held computing devices,
microprocessor-based or programmable consumer electronics, and the
like, each of which can be operatively coupled to one or more
associated devices.
[0071] The system memory 706 can include computer-readable storage
(physical storage media) such as a volatile (VOL) memory 710 (e.g.,
random access memory (RAM)) and non-volatile memory (NON-VOL) 712
(e.g., ROM, EPROM, EEPROM, etc.). A basic input/output system
(BIOS) can be stored in the non-volatile memory 712, and includes
the basic routines that facilitate the communication of data and
signals between components within the computer 702, such as during
startup. The volatile memory 710 can also include a high-speed RAM
such as static RAM for caching data.
[0072] The system bus 708 provides an interface for system
components including, but not limited to, the system memory 706 to
the processing unit(s) 704. The system bus 708 can be any of
several types of bus structure that can further interconnect to a
memory bus (with or without a memory controller), and a peripheral
bus (e.g., PCI, PCIe, AGP, LPC, etc.), using any of a variety of
commercially available bus architectures.
[0073] The computer 702 further includes machine readable storage
subsystem(s) 714 and storage interface(s) 716 for interfacing the
storage subsystem(s) 714 to the system bus 708 and other desired
computer components. The storage subsystem(s) 714 (physical storage
media) can include one or more of a hard disk drive (HDD), a
magnetic floppy disk drive (FDD), and/or optical disk storage drive
(e.g., a CD-ROM drive DVD drive), for example. The storage
interface(s) 716 can include interface technologies such as EIDE,
ATA, SATA, and IEEE 1394, for example.
[0074] One or more programs and data can be stored in the memory
subsystem 706, a machine readable and removable memory subsystem
718 (e.g., flash drive form factor technology), and/or the storage
subsystem(s) 714 (e.g., optical, magnetic, solid state), including
an operating system 720, one or more application programs 722,
other program modules 724, and program data 726.
[0075] The one or more application programs 722, other program
modules 724, and program data 726 can include the entities and
components of the system 100 of FIG. 1, the entities and components
of the system 200 of FIG. 2, the state and activities in the table
300 of FIG. 3, and the methods represented by the flowcharts of
FIGS. 4-6, for example.
[0076] Generally, programs include routines, methods, data
structures, other software components, etc., that perform
particular tasks or implement particular abstract data types. All
or portions of the operating system 720, applications 722, modules
724, and/or data 726 can also be cached in memory such as the
volatile memory 710, for example. It is to be appreciated that the
disclosed architecture can be implemented with various commercially
available operating systems or combinations of operating systems
(e.g., as virtual machines).
[0077] The storage subsystem(s) 714 and memory subsystems (706 and
718) serve as computer readable media for volatile and non-volatile
storage of data, data structures, computer-executable instructions,
and so forth. Such instructions, when executed by a computer or
other machine, can cause the computer or other machine to perform
one or more acts of a method. The instructions to perform the acts
can be stored on one medium, or could be stored across multiple
media, so that the instructions appear collectively on the one or
more computer-readable storage media, regardless of whether all of
the instructions are on the same media.
[0078] Computer readable media can be any available media that can
be accessed by the computer 702 and includes volatile and
non-volatile internal and/or external media that is removable or
non-removable. For the computer 702, the media accommodate the
storage of data in any suitable digital format. It should be
appreciated by those skilled in the art that other types of
computer readable media can be employed such as zip drives,
magnetic tape, flash memory cards, flash drives, cartridges, and
the like, for storing computer executable instructions for
performing the novel methods of the disclosed architecture.
[0079] A user can interact with the computer 702, programs, and
data using external user input devices 728 such as a keyboard and a
mouse. Other external user input devices 728 can include a
microphone, an IR (infrared) remote control, a joystick, a game
pad, camera recognition systems, a stylus pen, touch screen,
gesture systems (e.g., eye movement, head movement, etc.), and/or
the like. The user can interact with the computer 702, programs,
and data using onboard user input devices 730 such a touchpad,
microphone, keyboard, etc., where the computer 702 is a portable
computer, for example. These and other input devices are connected
to the processing unit(s) 704 through input/output (I/O) device
interface(s) 732 via the system bus 708, but can be connected by
other interfaces such as a parallel port, IEEE 1394 serial port, a
game port, a USB port, an IR interface, etc. The I/O device
interface(s) 732 also facilitate the use of output peripherals 734
such as printers, audio devices, camera devices, and so on, such as
a sound card and/or onboard audio processing capability.
[0080] One or more graphics interface(s) 736 (also commonly
referred to as a graphics processing unit (GPU)) provide graphics
and video signals between the computer 702 and external display(s)
738 (e.g., LCD, plasma) and/or onboard displays 740 (e.g., for
portable computer). The graphics interface(s) 736 can also be
manufactured as part of the computer system board.
[0081] The computer 702 can operate in a networked environment
(e.g., IP-based) using logical connections via a wired/wireless
communications subsystem 742 to one or more networks and/or other
computers. The other computers can include workstations, servers,
routers, personal computers, microprocessor-based entertainment
appliances, peer devices or other common network nodes, and
typically include many or all of the elements described relative to
the computer 702. The logical connections can include
wired/wireless connectivity to a local area network (LAN), a wide
area network (WAN), hotspot, and so on. LAN and WAN networking
environments are commonplace in offices and companies and
facilitate enterprise-wide computer networks, such as intranets,
all of which may connect to a global communications network such as
the Internet.
[0082] When used in a networking environment the computer 702
connects to the network via a wired/wireless communication
subsystem 742 (e.g., a network interface adapter, onboard
transceiver subsystem, etc.) to communicate with wired/wireless
networks, wired/wireless printers, wired/wireless input devices
744, and so on. The computer 702 can include a modem or other means
for establishing communications over the network. In a networked
environment, programs and data relative to the computer 702 can be
stored in the remote memory/storage device, as is associated with a
distributed system. It will be appreciated that the network
connections shown are exemplary and other means of establishing a
communications link between the computers can be used.
[0083] The computer 702 is operable to communicate with
wired/wireless devices or entities using the radio technologies
such as the IEEE 802.xx family of standards, such as wireless
devices operatively disposed in wireless communication (e.g., IEEE
802.11 over-the-air modulation techniques) with, for example, a
printer, scanner, desktop and/or portable computer, personal
digital assistant (PDA), communications satellite, any piece of
equipment or location associated with a wirelessly detectable tag
(e.g., a kiosk, news stand, restroom), and telephone. This includes
at least Wi-Fi (or Wireless Fidelity) for hotspots, WiMax, and
Bluetooth.TM. wireless technologies. Thus, the communications can
be a predefined structure as with a conventional network or simply
an ad hoc communication between at least two devices. Wi-Fi
networks use radio technologies called IEEE 802.11x (a, b, g, etc.)
to provide secure, reliable, fast wireless connectivity. A Wi-Fi
network can be used to connect computers to each other, to the
Internet, and to wire networks (which use IEEE 802.3-related media
and functions).
[0084] What has been described above includes examples of the
disclosed architecture. It is, of course, not possible to describe
every conceivable combination of components and/or methodologies,
but one of ordinary skill in the art may recognize that many
further combinations and permutations are possible. Accordingly,
the novel architecture is intended to embrace all such alterations,
modifications and variations that fall within the spirit and scope
of the appended claims. Furthermore, to the extent that the term
"includes" is used in either the detailed description or the
claims, such term is intended to be inclusive in a manner similar
to the term "comprising" as "comprising" is interpreted when
employed as a transitional word in a claim.
* * * * *