U.S. patent application number 13/412083 was filed with the patent office on 2012-06-28 for apparatus for elastic database processing with heterogeneous data.
This patent application is currently assigned to ParElastic Corporation. Invention is credited to Mrithyunjaya Annapragada.
Application Number | 20120166424 13/412083 |
Document ID | / |
Family ID | 45556876 |
Filed Date | 2012-06-28 |
United States Patent
Application |
20120166424 |
Kind Code |
A1 |
Annapragada; Mrithyunjaya |
June 28, 2012 |
Apparatus for Elastic Database Processing with Heterogeneous
Data
Abstract
A database management system implemented in a cloud computing
environment. Operational nodes are assigned as groups of
controller-nodes, compute-nodes or storage-nodes. Queries specify
one or more tables for an associated database operation, with each
table being assigned to respective storage nodegroup(s). The number
of nodes executing a given query may change, by (a) changing the
compute-nodes associated with a connection, or (b) adding or
removing nodes associated with a connection; and/or distributing
data to a storage nodegroup based on a Distribution Method which
may be either data dependent or data independent. A controller node
further executes a Dynamic Query Planner (DQP) process that
develops a query plan.
Inventors: |
Annapragada; Mrithyunjaya;
(Bolton, MA) |
Assignee: |
ParElastic Corporation
Waltham
MA
|
Family ID: |
45556876 |
Appl. No.: |
13/412083 |
Filed: |
March 5, 2012 |
Related U.S. Patent Documents
|
|
|
|
|
|
Application
Number |
Filing Date |
Patent Number |
|
|
13252742 |
Oct 4, 2011 |
|
|
|
13412083 |
|
|
|
|
61406816 |
Oct 26, 2010 |
|
|
|
Current U.S.
Class: |
707/718 ;
707/E17.132 |
Current CPC
Class: |
G06F 16/2471 20190101;
G06F 16/27 20190101 |
Class at
Publication: |
707/718 ;
707/E17.132 |
International
Class: |
G06F 17/30 20060101
G06F017/30 |
Claims
1. A database management system comprising: a network interface,
for receiving database queries from two or more client application
processes as a network database service, the client application
processes originating from two different users, the system
providing a least one connection into the system for each such
client application process; a group of two or more operational
nodes for executing the queries as database operations, each
operational node implemented as a logical collection of software
components that execute on one or more physical machines; where the
number of physical machines is not necessarily the same as the
number of operational nodes; with the operational nodes assigned as
controller-nodes, compute-nodes or storage-nodes, and groups of
controller-nodes forming controller nodegroups, and groups of
compute-nodes forming compute nodegroups, and groups of storage
nodes forming storage nodegroups; the number of operational nodes,
and their available assignment as compute-nodes or storage-nodes
varying during execution of the queries; each client connection
being assigned to an associated compute nodegroup; the queries also
specifying one or more tables for an associated database operation,
with each such table being assigned to a respective storage
nodegroup; the operational nodes further: operating in parallel;
with the number of operational nodes executing a given query or
queries changing during a given time interval by at least one of:
(a) changing the compute-nodegroup associated with a connection, or
(b) adding or removing nodes from the compute nodegroup associated
with a connection; and distributing data from the tables among the
nodes in the storage nodegroup to which the table is assigned based
on a Distribution Method which may be either data dependent or data
independent; and at least one of the controller nodes further:
executing a Dynamic Query Planner (DQP) process that transforms
queries received from the client into a query plan that includes an
ordered series of steps that are executed in parallel on multiple
operational nodes where possible, the query plan further
stipulating, for each step, which compute node it must be performed
on, which storage nodes it must access, and other steps that that
this step depends on.
2. The apparatus of claim 1 wherein the operational nodes include
two or more Query Execution Engines, a Data Distribution Manager,
and a Persistent Data Store.
3. The apparatus of claim 1 wherein the software components
comprising the operational nodes execute on some combination of
physical machines and virtual machines.
4. The apparatus of claim 1 wherein the storage nodes further
provide a Persistent Data Store as at least one of relational
database, flat files, or non-relational databases.
5. The apparatus of claim 1 wherein the storage and compute nodes
contain multiple potentially heterogeneous data stores that can be
presented and queried as if they were one database; and wherein the
steps in the query plan generated by the DQP process are expressed
in a form that is understood by an underlying data store on the
node(s) executing the step(s); and wherein the form in which the
client communicates the query to the system may or may not be in
the same form used to communicate with the data stores on the
compute and storage nodes.
6. The apparatus of claim 1 wherein each nodegoup may store more
than one table.
7. The apparatus of claim 1 wherein transient tables created during
query execution are stored on a compute nodegroup associated with
the connection, and unless otherwise specified a nodegroup
designated as the default compute nodegroup is used for storing the
transient tables.
8. The apparatus of claim 1 wherein a query may specify a nodegroup
on which to store a table, but if a query does not, a storage
nodegroup designated as the default storage nodegroup is used for
storing the table.
9. The apparatus of claim 1 wherein compute and storage nodegroups
are further assigned reference counters and table counters, with
(a) a reference counter being incremented when a nodegroup is
assigned as the default storage or default compute nodegroup; (b) a
reference counter being incremented when a nodegroup is assigned as
the compute nodegroup for a connection; (c) the reference counter
being decremented when a nodegroup is no longer the default storage
or default compute nodegroup; (d) the reference counter being
decremented when a nodegroup is no longer the compute nodegroup for
a connection; and (e) the table counter being incremented each time
a table is associated with the nodegroup; and (f) the table counter
being decremented each time a table is disassociated from a
nodegroup.
10. The apparatus of claim 9 wherein the reference counter and
table counter are initialized to an initial-value, and wherein a
nodegoup can be deleted when both its associated reference counter
and table counter reach the initial value.
11. The apparatus of claim 1 wherein the data independent
distribution methods are broadcast, round-robin and random.
12. The apparatus of claim 1 wherein the data dependent
distribution methods are either inelastic (IDD) methods or elastic
(EDD) methods.
13. The apparatus of claim 1 wherein the DQP process further
identifies groups of tables in the query, and assigns database
operations that can be executed in parallel and at the same time to
different selected storage and/or compute nodes, such that a final
result will be as expected by the client application
process(es).
14. The apparatus of claim 1 wherein steps in the plan may operate
on sets of tables, and such that steps in the plan may (a) produce
intermediate result tables for later use (b) return results to the
client; (c) delete some or all of the intermediate result tables;
or (d) perform some other operation specified by the client
application process(es).
15. The apparatus of claim 1 wherein the system further maintains a
reference counter associated with each table referenced in a query,
and the table can be deleted when it is no longer being referenced
by any query.
16. The apparatus of claim 1 wherein a Query Execution Engine
receives a plan from the DQP and then identifies the number of
other Query Execution Engines required to execute the plan,
identifies other tables addressed in the query, and identifies
physical or virtual machines where Query Execution Engines may be
used for the plan, and contacts Machine Managers to provision Query
Execution Engines from the operational nodes, and where appropriate
additional physical or virtual machines are provisioned for the
same.
17. The apparatus of claim 1 wherein the operational nodes are
provisioned as one or more Query Execution Engines, and are
provided with client connection state information prior to
executing a step, and the connection state information is saved for
later use at the completion of execution of a step.
18. The apparatus of claim 17 wherein after query execution, some
or all of the Query Execution Engines may be released, and if any
physical or virtual machines were provisioned, they may be freed as
well.
19. The system of claim 1 wherein intermediate processing of
queries is decoupled from activities that are closely tied to where
the data is stored by: storing intermediate tables generated as
part of a query plan on the compute-nodegroup associated with the
connection; and storing persistent user data on a
storage-nodegroup; and wherein client connection state is decoupled
from the connection and provided to the query execution engine that
executes each step in the query plan.
20. A database management system comprising: a network interface,
for receiving database queries from two or more client application
processes as a network database service, the client application
processes originating from two different users, the system
providing a least one connection into the system for each such
client application processes; a group of two or more operational
nodes for executing the queries as database operations, each
operational node implemented as a logical collection of software
components that execute on one or more physical machines; where the
number of physical machines is not necessarily the same as the
number of operational nodes; with the operational nodes assigned as
controller-nodes, compute-nodes or storage-nodes, and groups of
controller-nodes forming controller nodegroups, and groups of
compute-nodes forming compute nodegroups, and groups of storage
nodes forming storage nodegroups; wherein the storage nodes further
provide a Persistent Data Store as at least one of relational
database, flat files, or non-relational databases; with one of the
compute nodegroups assigned as a default compute nodegroup and one
of the storage nodegroups defined as a default storage nodegroup;
the number of operational nodes, and their available assignment as
compute-nodes or storage-nodes varying during execution of the
queries; each client connection being assigned to an associated
compute nodegroup; the queries also specifying one or more tables
for an associated database operation, with each such table being
assigned to a respective storage nodegroup; the operational nodes
further: operating in parallel; with the number of operational
nodes executing a given query or queries changing during a given
time interval by at least one of: (a) changing the
compute-nodegroup associated with a connection, or (b) adding or
removing nodes from the compute nodegroup associated with a
connection; and distributing data from the tables among the nodes
in the storage nodegroup to which the table is assigned based on a
Distribution Method which may be either data dependent or data
independent; and at least one of the controller nodes further:
executing a Dynamic Query Planner (DQP) process that transforms
queries received from the client into a query plan that includes an
ordered series of steps that are executed in parallel on multiple
operational nodes where possible, the query plan further
stipulating, for each step, which compute node it must be performed
on, which storage nodes it must access, and other steps that this
step depends on; and wherein nodegroups are further assigned
reference counters and table counters, with (a) a reference counter
being incremented when a nodegroup is assigned as the default
storage or default compute nodegroup; (b) a reference counter being
incremented when a nodegroup is assigned as the compute nodegroup
for a connection; (c) the reference counter being decremented when
a nodegroup is no longer the default storage or default compute
nodegroup; and (d) the reference counter being decremented when a
nodegroup is no longer the compute nodegroup for a connection;
and(e) the table counter being incremented each time a table is
associated with the nodegroup; and (f) the table counter being
decremented each time a table is disassociated from a nodegroup,
wherein the reference counter and table counter are initialized to
an initial-value; and wherein a nodegroups can be deleted when both
its associated reference counter and table counter reach the
initial value; and intermediate processing of queries is decoupled
from activities that are closely tied to where the data is stored
by: storing intermediate tables generated as part of a query plan
on the compute-nodegroup associated with the connection; and
storing persistent user data on a storage-nodegroup; and wherein
client connection state is decoupled from the connection and
provided to the query execution engine that executes each step in
the query plan.
Description
RELATED APPLICATION
[0001] This application is a continuation of U.S. application Ser.
No. 13/252,742, filed Oct. 4, 2011, which claims the benefit of
U.S. Provisional Application No. 61/406,816, filed on Oct. 26,
2010. The entire teachings of the above applications are
incorporated herein by reference.
TECHNICAL FIELD
[0002] This patent application relates generally to data management
systems. More specifically, it relates to a parallel shared-nothing
relational database management system.
BACKGROUND
[0003] With the proliferation of technology, we face an exponential
growth of information and data that must be stored, analyzed and
acted upon by computers. It is generally believed that data volumes
grow at a compounded annual growth rate (CAGR) of approximately
sixty-percent. This growth results in a doubling of data volumes
approximately every two years.
[0004] Computers and computer related infrastructures have evolved
to keep pace with this exponential data growth. Over three decades
now, it has been shown that large collections of inexpensive
computers can be assembled, and their collective power can be
brought to bear on large and complex problems.
[0005] These kinds of assemblages of computers are often based on
the "Shared Nothing" (SN) architecture. In this architecture, a
collection of individual computers (called a node), each containing
CPUs, Disk Storage, Dynamic Memory, Network Interface
Controller(s), and some software programs is first assembled. The
CPU on each node, and any software programs that are run on that
node have complete and direct access to all information that is on
that node but have no direct access to any information that is
resident on another node.
[0006] It has been demonstrated that SN architectures can be
efficiently scaled up to hundreds, thousands and tens of thousands
of nodes. For some kinds of data processing, these architectures
can demonstrate linear or very close to linear scalability. In
other words, if a system consisted of M identical nodes, and
another system consisted of N identical nodes, and M>N, the
system with M nodes could perform (M/N) times more work in a given
interval of time compared to the system with N nodes. In some cases
this means that the system with M nodes could complete a piece of
work M/N times faster than the system with N nodes.
[0007] SN database systems, called "Parallel Database Management
Systems" (PDBMS) achieve their scalability and performance by
having a large number of nodes each perform a part of the
processing, on a subset of the problem, in parallel, and at the
same time.
[0008] In such systems, tuples of each relation in the database are
partitioned (declustered) across disk storage units attached
directly to each node. Partitioning allows multiple processors to
scan large relations in parallel without needing any exotic I/O
devices. Such architectures were pioneered by Teradata in the late
seventies, by Netezza in the 2000's, and by several research
projects.
[0009] SN architectures minimize interference by minimizing
resource sharing and contention. They also exploit commodity
processors and memory without needing an incredibly powerful
interconnection network.
SUMMARY
[0010] In pertinent aspects, a database management apparatus
executes queries in a way that provides parallelism and elasticity
through the concept of storage, compute, and control "nodes". Nodes
are an abstraction consisting of a collection of software programs
that execute on one or more virtual or physical machines. The nodes
may provide functions such as Query Execution Engines, a Data
Distribution Manager, and a Persistent Data Store. The Data
Distribution Manager determines how and where to store data needed
by the queries. The Persistent Data Stores are software programs
that store and retrieve data, and provide some interface through
which the data can be accessed.
[0011] Elasticity is provided by allowing the number and
composition of the nodes, and machines in the system to be changed
while queries are being executed. Parallelism is provided by
allowing a query to be executed such that more than one activity
related to the execution may occur at the same time, and by having
more than one node execute parts of the query at the same time.
[0012] Nodes are arranged in functional groups called nodegroups. A
nodegroup is a non-empty, ordered list of nodes. Nodegroups consist
of nodes of the same type. Storage nodes make up storage
nodegroups, compute nodes make up compute nodegroups, and control
nodes make up control nodegroups.
[0013] Tables in the system are associated with storage nodegroups
or compute nodegroups. Each table is associated with one nodegroup.
Zero or more tables may be associated with a storage or compute
nodegroup. Each system preferably has a default-compute-nodegroup
and a default-storage-nodegroup. The default nodegroup isn't
necessarily a fixed nodegroup, and the system may assign the
default nodegroups and/or assist the user to do so when none is
specified.
[0014] Each client process accessing the system may do so through
multiple connections.
[0015] Data in the tables is stored on one or more of the nodes
associated with the table's nodegroup. Data are distributed based
on Distribution Methods which may be either data dependent or data
independent.
[0016] Data independent distribution methods may include broadcast,
round-robin or random. Data dependent distribution methods are
either inelastic (IDD) methods or elastic (EDD) methods.
[0017] The available compute capacity in the system is changed
through Elastic Compute Capacity (ECC) by either (a) changing the
compute-nodegroup associated with a connection, or (b) adding or
removing nodes from the compute-nodegroup associated with a
connection.
[0018] The available storage capacity on a storage nodegroup can be
dynamically changed by adding node to or removing nodes from the
nodegroup.
[0019] The system also provides a mechanism by which multiple
potentially heterogeneous databases can be presented and queried as
if they were one database. One example of a persistent data store
is a relational database. Other examples include flat files,
non-relational databases, etc.
[0020] User tables (that is, those tables created by a client
process for storing user data) are associated with storage
nodegroups. Temporary tables or transient tables created during
query execution are associated with compute nodegroups.
[0021] Queries may specify the nodegroup on which to store a table,
however, if a nodegroup is not specified, a default is used.
[0022] Storage and compute nodegroups can be provided with
reference counters and table counters. The reference counters are
incremented when a nodegroup is assigned as the default storage or
default compute nodegroup. The reference counter is incremented
when a nodegroup is assigned as the compute nodegroup for a
connection. The reference counters are decremented when a nodegroup
ceases to be a default storage or default compute nodegroup, or
when the nodegroup ceases to be the nodegroup for a connection. The
table counter is incremented each time a table is associated with
the nodegroup and decremented each time a table is disassociated
from the nodegroup. Counters are initialized to some initial-value
when a nodegroup is created. The nodegroups can only be deleted
when the reference counter and table counter return to the initial
value.
[0023] A Dynamic Query Planner (DQP) transforms queries received
from a client into an ordered series of operations. This ordered
series of operations is called a query plan. The query plan
stipulates, for each operation, which node it must be performed on,
and what other operations (plan steps) this operation depends on.
When these operations are executed in the specified order, and on
the specified nodes, the result that would ensue would be the
result as expected by the client in response to the query it
submitted, in the predefined grammar. The plan steps may be
expressed by the system in a grammar and form that is understood by
the persistent data stores running on the nodes where the steps are
being executed. This grammar may or may not be the same as the
predefined grammar in which the client expresses the query to the
system.
[0024] Steps in a plan may have dependencies on other steps called
plan dependencies; a step that depends on another step cannot be
executed until the prerequisite step has been successfully
completed. The query is executed in parallel where possible, based
on plan dependencies. DQP does this by identifying groups of tables
in the query and constructing these operations that can be executed
in parallel and at the same time on different groups of nodes, such
that the final result would be the one expected by the client
application. Each operation may operate on some set of tables, and
may produce some intermediate results for later use in the query
plan. A step in the plan causes results to be returned to the
client. Further steps delete some or all of the intermediate result
tables created as part of the query plan.
[0025] The DQP can increment a reference counter associated with
each table if it is referenced in a query. The reference counter is
initialized to an initial-value when the table is created and the
table can only be deleted by a query when the table is not being
referenced by any query.
[0026] The Query Execution Engine receives a plan from the DQP and
then identifies the number of query execution engines that would be
required to execute the plan, and identifies the nodes addressed in
the query, and identifies machines where query execution engines
may be used for the plan, and contacts machine managers to
provision query execution engines, and where appropriate additional
engines are provisioned.
[0027] The Query Execution Engines are provided with client
connection state prior to executing a step, and state is saved
after the step for later use. After query execution, some or all of
the query execution engines may be released, and if any machines
were provisioned, they may be freed up as well.
BRIEF DESCRIPTION OF THE DRAWINGS
[0028] The foregoing will be apparent from the following more
particular description of example embodiments of the invention, as
illustrated in the accompanying drawings in which like reference
characters refer to the same parts throughout the different views.
The drawings are not necessarily to scale, emphasis instead being
placed upon illustrating embodiments of the present invention.
[0029] FIG. 1 is a high level block diagram of the Elastic Parallel
Database Management System (EPDBMS) and database client application
software according to one embodiment.
[0030] FIG. 2 is a high level block diagram of a Parallel Elastic
database management system according to one embodiment.
[0031] FIG. 3 is a block diagram depicting the high level program
flow during query execution according to one embodiment.
[0032] FIG. 4 is a high level block diagram depicting various
components and elements that are part of query execution according
to one embodiment.
[0033] FIG. 5 is a high level block diagram depicting various
logical groupings of nodes in a system according to one
embodiment.
[0034] FIG. 6 is a detailed block diagram of the dynamic query
planning process according to one embodiment.
[0035] FIG. 7 is a schema to illustrate the query planning process
according to one embodiment.
[0036] FIG. 8 is an illustration of the steps involved in adding
compute capacity to the system according to one embodiment.
[0037] FIG. 9 is an illustration of the steps involved in reducing
compute capacity to the system according to one embodiment.
[0038] FIG. 10 is an illustration of the steps involved in dynamic
capacity expansion according to one embodiment.
[0039] FIG. 11 illustrates the steps performed during capacity
expansion according one embodiment.
[0040] FIG. 12 is a high level block diagram illustrating where
tuples are stored on EDD tables to ensure co-location.
[0041] FIG. 13 is another high level block diagram illustrating
where tuples are stored on EDD tables to ensure co-location.
[0042] FIG. 14 is a detailed block diagram illustrating where
tuples are stored on EDD tables to ensure co-location according to
one embodiment.
[0043] FIG. 15 illustrates the steps performed during capacity
expansion according to one embodiment.
[0044] FIG. 16 illustrates the steps performed in determining where
to store a new row in an EDD table according to one embodiment.
[0045] FIG. 17 illustrates the steps performed in determining where
to store a new row in an EDD table according to one embodiment.
[0046] FIG. 18 illustrates the steps performed in determining where
to store a new row in an EDD table according to one embodiment.
[0047] FIG. 19 illustrates the process of distributing the first
six rows of data on a newly created nodegroup, `storage` with four
nodes into an EDD table according to one embodiment.
[0048] FIG. 20 continues the illustration from FIG. 19 and shows
the addition of a node to the nodegroup and the placement of the
next six rows of data according to one embodiment.
[0049] FIG. 21 continues the illustration from FIG. 19 and FIG. 20
and shows the placement of the next six rows of data after the
administrator changes the allocation strategy according to one
embodiment.
[0050] FIG. 22 is a high level block diagram of a parallel elastic
database management system illustrating heterogeneous persistent
data stores according to one embodiment.
[0051] FIG. 23 is a high level block diagram of a process model in
the elastic parallel database management system showing the
relationship between machines, nodes, and software processes
according to one embodiment.
[0052] FIG. 24 is a high level block diagram of a process model in
the elastic parallel database management system showing the
relationship between machines, nodes, and software processes
according to one embodiment.
[0053] FIG. 25 is a high level block diagram of a process model in
the elastic parallel database management system showing the
relationship between machines, nodes, and software processes
according to one embodiment.
[0054] FIG. 26 is an illustration of some aspects of a machine
manager on a single machine according to one embodiment.
[0055] FIG. 27 is an illustration of the steps involved in
launching a new execution engine on a machine according to one
embodiment.
[0056] FIG. 28 is a high level block diagram of the processing
steps involved in query execution as they relate to the process
architecture according to one embodiment.
DETAILED DESCRIPTION OF THE INVENTION
[0057] A description of example embodiments of the invention
follows.
[0058] Methods and apparatus to build an Elastic Parallel Database
Management System (EPDBMS) are described.
[0059] Preferred embodiments of an Elastic Parallel Database
Management System (EPDBMS) are described here.
[0060] FIG. 1 provides a high level block diagram of an EPDBMS. A
database client application (101) submits queries (102) to the
EPDBMS (103) in some language. The industry standard Structured
Query Language, or a variant thereof, may be used for this purpose.
The EPDBMS processes the query and returns the results (104) of the
query to the client application program.
[0061] Data in the system are distributed across a plurality of
nodes within the system. The EPDBMS consists of an assemblage of
nodes. FIG. 2 provides a high level block diagram of an EPDBMS. The
Client Application Software (230) communicates with a Controller
Node (201) that is part of the EPDBMS. The Controller Node includes
various components of the EPDBMS such as a Dynamic Query Planner
(202), a System Topology Manager (203), a Client Connection Manager
(204), a Transaction Coordinator (205), a Catalog Metadata manager
(206), a Data Distribution Manager (207) and a persistent Metadata
repository (208). The Controller Node communicates with a number of
Database Nodes (250, 260, 270, 280 and 290), each of which consist
of one or more Query Execution Engines (251, 261, 271, 281 and
291), and a Data Distribution Manager (252, 262, 272, 282 and 292).
Each node is also associated with a persistent data store where the
database data resides.
[0062] In various embodiments, the persistent data store may be:
[0063] a relational database management system (RDBMS) [0064] not a
relational database management system but instead some other
non-relational system [0065] the same database management system on
all nodes [0066] different database management systems on different
nodes
[0067] When a connection is initiated by the Client Application
Software (230) to the EPDBMS, the Client Connection Manager (204)
verifies authentication credentials and determines whether to
accept the connection or not. Subsequently if there is a
communication failure between the EPDBMS and the Client Application
Software, it is the responsibility of the Client Connection Manager
to notify all other relevant components within the EPDBMS and
handle the connection failure. A specific case of such processing
is the handling of a disconnection while a database transaction is
in progress. The Client Connection Manager (204) communicates the
disconnection information to the Transaction Coordinator (205)
which handles the interruption to the transaction and ensures data
consistency. The operation of the Dynamic Query Planner (202) is
described in detail in the section on Dynamic Query Planning
later.
[0068] FIG. 3 is a high level block diagram depicting the query
execution process in EPDBMS. When a query is presented to the
system (301), it is first analyzed to determine syntactical
validity (302) and a failure results in a suitable message being
delivered to the user (310). If the syntax and grammar are valid, a
parallel query plan is constructed and executed (303). The
execution of the parallel query plan will result in either results
or a suitable message being returned to the user, depending on the
nature of the query being executed (304), which ends the process
(305).
Nodes and Nodegroups
[0069] The EPDBMS consists of a plurality of nodes. Nodes in the
system consist of a logical assemblage of some computing resources
(CPU, Network, Memory, and Storage), and some associated software
components.
[0070] Nodes in the system are of four types: [0071] Controller
Node [0072] Storage Node [0073] Compute Node [0074] Un-enrolled
Nodes
[0075] Storage and compute nodes in the system are grouped into
"nodegroups". A nodegroup is a non-empty collection of nodes. Each
storage or compute node in the system may be a member of none, one,
or more nodegroups. A node that is a member of no nodegroups is an
un-enrolled node. A node that is a member of a nodegroup is an
enrolled node. The EPDBMS maintains a group counter associated with
each node, and the group counter is incremented whenever a node is
added to a nodegroup. Therefore, an un-enrolled node has a group
counter value of zero, and enrolled nodes have a non-zero group
counter value. The group counter on a node is decremented when the
node is removed from a nodegroup.
[0076] Each EPDBMS shall contain at least one nodegroup which will
be used for all intermediate computational purposes during query
execution. This nodegroup is referred to as the
default-compute-nodegroup. When a query begins execution, the
system determines where intermediate tables generated during the
execution of the query are to be placed. By default, this is the
default-compute-nodegroup but the administrator may specify an
alternate location. The location to be used to store these
intermediate tables is called the compute-nodegroup for that
connection.
[0077] In one embodiment, an inventory of Nodes and Nodegroups in
the system is stored in a System Topology Manager (203).
[0078] FIG. 4 further illustrates the roles of the various nodes in
the system during the execution of an query. A query (402) is
received from a Client Application (401). It is received on the
Controller Node (470) and the Dynamic Query Planning (DQP) process
(403) results in a query plan (404). The query plan when executed
utilizing the Storage and Compute Nodes results in Query Results
(405) that are returned to the client application (401). As
depicted in this figure, this system consists of seven nodes, five
of which are Storage Nodes (410) and two of which are Compute Nodes
(450). As illustrated, the tables (411, 412, 413, 414) are stored
on the Storage Nodes (421, 422, 423, 424 and 425), and the
intermediate results TMP1 (451) generated during the query plan
execution is stored on the Compute Nodes (461, 462).
Creation and destruction of tables in the EPDBMS
[0079] Each table in the system has one associated nodegroup. Many
tables may be associated with a single nodegroup, but each table is
only associated with a one nodegroup.
[0080] An association between a table and its nodegroup is
established at the time when the table is created. Data for a table
is stored on the nodes that are members of the associated nodegroup
in accordance with the Allocation Strategy for the nodegroup, and
the Distribution Method associated with the table.
[0081] FIG. 5 further illustrates this by depicting the storage
nodegroups (571, 572), that each consists of three storage nodes.
"Storage Nodegroup 1" consists of the storage nodes 521, 522 and
523, and "Storage Nodegroup 2" consists of the storage nodes 523,
524 and 525. The "Compute Nodegroup 1" (573) consists of two nodes
(561, 562). The tables PRODUCT and RATES (511, 513) are associated
with the "Storage Nodegroup 1" (571) and the tables CUSTOMER and
ACCOUNT (512, 514) are associated with "Storage Nodegroup 2"
(572).
[0082] Each nodegroup has an associated counter called the
tablecount. The value of tablecount is the number of tables that
are associated with the nodegroup. When a user creates a table and
associates that table with a nodegroup, the tablecount associated
with the nodegroup is incremented. The tablecount on a nodegroup is
decremented when a table on the nodegroup is disassociated from the
nodegroup.
[0083] In one embodiment, the SQL/DDL (Data Definition Language)
command issued to create a table shall include an explicit
specification of the nodegroup with which to associate the
table.
[0084] In one embodiment, each EPDBMS shall contain a nodegroup
referred to as the default-storage nodegroup, and if the DDL to
create the table does not explicitly specify a nodegroup with which
to associate the table, the default-storage nodegroup is used.
Data Distribution in EPDBMS
[0085] EPDBMS systems partition data across multiple storage nodes
in the system. Data Distribution Methods determine where each tuple
in a table must be stored on the system.
[0086] Distribution methods can be broadly divided into two types,
(a) those that determine where to store a row of data based on the
contents of that row of data (called data dependent distribution),
and (b) those that determine where to store a row of data
independent of the contents of the row of data (called data
independent distribution).
[0087] Applying data dependent distribution methods to a table
typically requires the specification of a set of columns (called
distribution columns), that will be used to determine where to
store any given row of data for that table. The ordered set of
distribution columns is called the Distribution Vector (DV). Assume
a table T with columns A, B, C, D and E, represented as T (A, B, C,
D, E) was to be distributed based on the values of columns C and D.
Then we say that "C and D are the distribution columns for the
table T", and "the ordered set (C, D) is the DV for the table T".
The DV is represented as <C, D>. A row of data in table T
with values a, b, c, d, and e is represented as (a, b, c, d, e) and
the DV for this row of data is represented as <c, d>.
Representation and Comparison of DVs
[0088] The DVs for a table is always in a form where each column in
the DV is represented in the most general canonical form applicable
for the datatype of that column.
[0089] The DV of one row is identical to the DV of another row if
and only if following four conditions are met: [0090] Condition 1:
The number of distribution columns in both distribution vectors is
the same, and [0091] Condition 2: The corresponding distribution
columns in both distribution vectors share a canonical
representation, and [0092] Condition 3: The size of the binary
representation of both the DVs is the same, and [0093] Condition 4:
The binary representations of the DVs are identical.
[0094] The first three of these conditions above may be determined
without examining a single row of data, as they represent
attributes of the table(s) to which the rows belong. The fourth
condition need only be evaluated if the first three conditions were
evaluated successfully.
[0095] Two DV's are said to be "comparable DV's" if the first three
conditions above are met.
[0096] Two DV's that are comparable and meet the fourth condition
above are said to be "equal DV's".
Co-Location
[0097] Co-Location is a property of Data Dependent Distributions
that ensures that: [0098] Two rows from a table that have the equal
DV's will be stored on the same node. [0099] If two tables are
associated with the same nodegroup, and have equal DV's, then the
two rows will be stored on the same node.
Data Distribution Methods
[0100] Data Distribution Methods determine where each tuple in a
table must be stored on the system. The EPDBMS allows users to
distribute data in a table according to the following distribution
methods:
Data Independent Distribution Methods:
[0101] Broadcast Distribution. [0102] Round-Robin Distribution.
[0103] Random Distribution.
Data Dependent Distribution Methods:
[0103] [0104] InElastic Data Distribution Methods (IEDD) [0105]
Elastic Data Distribution (EDD).
[0106] In broadcast distribution, all rows in a given table are to
be stored on all nodes that are members of the nodegroup associated
with the table.
[0107] In random distribution, a row of data for the table can be
stored on any node in the nodegroup associated with the table. A
variation of the random distribution method is the round-robin
distribution method. Here, new data is sent to nodes in the
nodegroup in a "round-robin" fashion, first to the first node, then
to the second, and then finally after some data has been sent to
each node, back to the first node in the nodegroup associated with
the table.
[0108] Elastic Data Distribution (EDD) are a class of data
dependent distribution methods that ensure co-location, and allow
nodes to be added to the nodegroup when required, without requiring
wholesale data redistribution. EDD methods determine where to store
a row based on algorithms that enable nodes to be added to the
nodegroup without the necessity to redistribute data. EDD methods
are used for storage of permanent (user) tables.
[0109] InElastic Data Distribution (IEDD) Methods are a class of
data dependent distribution methods that ensure co-location but do
not allow nodes to be added to the nodegroup without wholesale data
redistribution. IEDD methods are used for the storage of
intermediate results during query processing.
[0110] In one embodiment, the distribution method for a table shall
be established at the time when the table is created. The
distribution method for a table may be changed so long as there is
no data stored in the table at the time of the change.
[0111] In one embodiment, the DDL command used to create the table
must explicitly specify the distribution method for the table.
[0112] In one embodiment, if the user fails to specify a
distribution method when creating a table, the EPDBMS will choose a
distribution method based on the other information provided in the
SQL/DDL, used to create the table.
[0113] The Catalog Metadata Manager (FIG. 2, 206) is responsible
for storing all information about a table such as its name, the
columns and the distribution method. It stores this information in
the Metadata Repository (208). Information relating to the
distribution of data amongst the nodes in the system is managed by
the Controller Node component of the Data Distribution Manager
(207) and the Node components of the Data Distribution Manager
(252, 262, 272, 282, 292).
Rule for Co-Location and feasibility of local joins
[0114] In order that two tables are co-located for the purposes of
a join, the following condition must be met.
TABLE-US-00001 {Both tables are associated with the same nodegroup}
AND { {One of them is broadcast distributed} OR { { {Both use same
EDD with comparable DVs} OR {Both use same IEDD with comparable
DVs} } AND {An equijoin exists between the two tables} AND {The
equijoin is between corresponding columns in the two DV's} } }
[0115] These rules can be extended to an arbitrary number of
tables. If two tables are co-located for the purposes of a join,
then the result of materializing the join between those two tables
would naturally be distributed according to the DV of the
controlling table in the join. If a third table was co-located for
the purpose of joining with the materialization of the join of the
first two, then the three tables are collectively co-located for
the purposes of a join, and the DV for the result of that
materialization could be determined.
The Dynamic Query Planning Process (DQP)
[0116] FIG. 6 shows a detailed block diagram of one embodiment of
the DQP process. According to one aspect of the embodiments,
queries submitted to the EPDBMS are decomposed into a set of steps.
This process is called query planning, and is performed by the
Dynamic Query Planner (DQP). These steps (the output of DQP) are
then executed in parallel, and at the same time by a plurality of
nodes in the system.
[0117] A validated query is received by the Dynamic Query Planner
(DQP) in the step labeled 601. The DQP deconstructs the query and
identifies all tables that are referenced in the query (602). The
DQP then identifies all restrictions on a single table and
associates those with the table (603). The system retrieves
information from a metadata catalog that is depicted in FIG. 2 as
the Catalog Metadata Manager (206) and the Metadata repository
(208).
[0118] The DQP then identifies groupings of tables referenced in
the query called "partitions", and constructs the grouping that
contains the fewest partitions (604). A partition contains tables
referenced in the query that are joined, and are co-located for the
purposes of the join that is to be performed. Each table being
referenced in a query is a member of exactly one partition. A
partition may consist of one or more tables. If a table is
referenced multiple times in a query, each reference is treated as
an independent table.
[0119] If the DQP determines that multiple different groupings of
tables can be created with the same number of partitions, it
chooses the one where the tables are most evenly distributed among
the partitions. This can occur if one or more tables referenced in
the query are each joined with two or more tables each. If multiple
such groupings exist, one is chosen at random.
[0120] The clauses in the query reflecting the join between tables
in a single partition are now extracted from the query and
associated with the partition (605).
[0121] If the DQP now determines that only one partition remains
(606), query planning can be completed by identifying and recording
intra-step dependencies (607) and consolidating all the steps
performed to reach this point (620). If on the other hand, DQP
determines that there is more than one partition, planning
continues (608). The subject of intra-query dependencies is covered
in more detail below in paragraph [257].
[0122] Each partition is a group of tables as described in
paragraph [99]. If a partition were materialized, the partition
would itself be distributed according to the distribution of the
controlling table, or tables in the partition.
[0123] To illustrate the preceding statement, assume that tables T1
(A, B, C, D, E), T2 (P, Q, R, S, T) and T3 (X, Y, Z) were members
of a partition, and that T1<A>, T2<P> and T3 was
broadcast distributed. Assume that the join conditions T1.A=T2.P
and T2.Q=T3.X were determined to exist in step 605. Assume that
step 603 identified the following restrictions, T1.B=17 and
T2.T=`abc`. Assume that the complete execution of the query
required columns T1.A, T2.R, T2.S, and T3.Z. Assume that a table
were created on each node in the nodegroup where T1, T2 and T3
reside, that contained the results of executing the query:
TABLE-US-00002 SELECT T1.A, T2.R, T2.S, T3.Z FROM T1, T2, T3 WHERE
T1.A = T2.P AND T2.Q = T3.X AND T1.B = 17 AND T2.T = `abc`
[0124] That table would be considered to be the materialization of
the partition containing T1, T2, and T3.
[0125] Observe that while T1.B is used in materializing the
partition, the DQP determines that the value is not needed at any
point in the further execution of this query and therefore T1.B is
not part of the materialization of the partition.
[0126] As T1<A> and T2<P> are on the same nodegroup,
and since a join condition T1.A=T2.P was found, T1 and T2 were
placed in the same partition. A partition containing T1 and T2
alone would be considered to have a DV of <A> or <P>.
Since T3 is broadcast distributed, it can be joined to any table on
the same nodegroup. As T1, T2 and T3 are on the same nodegroup, the
partition containing T1, T2 and T3 has a distribution determined by
the controlling tables, which in this case are T1 and T2.
[0127] It can be proved that if step 606 determines that there is
more than one partition, and there are join conditions between the
direct materializations of the partitions, that these partitions
are not distributed in a manner consistent with direct joining The
most straightforward proof is the proof by contradiction.
[0128] Assume that step 606 determines that there are N partitions,
and N>1. This means that there is no grouping of tables possible
with less than N partitions. Assume that two of these partitions
are in fact distributed in a manner consistent with direct joining,
and a join condition existed between these two partitions. In that
event, a partition could be constructed that would consist of all
the tables that are members of those two partitions, and a
partitioning scheme with N-1 partitions could have been created in
step 604. As step 604 constructed the grouping with the minimum
number of possible partitions, this is a contradiction. Therefore
we conclude that no two partitions generated in step 604, if
materialized, would be distributed in a manner consistent with
direct joining, if a join condition existed between these two
partitions.
[0129] The DQP estimates the number of rows (608) that would exist
in each partition using statistics and historical information that
may be maintained by the DQP for this purpose (614). The DQP then
determines whether there exist joins between the remaining
partitions. If no joins are found (609), the partition predicted to
be the largest is materialized using round-robin distribution (610)
on the compute-nodegroup for the query. All the remaining
partitions are all materialized using broadcast distribution (611)
on the compute-nodegroup for the query. These two preceding steps
are recorded as planning steps (612), and the query is then
reformulated (613) using the newly created intermediate tables and
the process begins at step 602.
[0130] It can be shown that upon executing steps 610 and 611, the
query reformulated in step 613 can be reduced to a single partition
when steps 603, 604 and 605 are performed. As the table
materialized in step 610 is distributed using round-robin
distribution on the compute-nodegroup for the query, and since the
remaining tables are distributed using broadcast distribution on
the compute-nodegroup for the query, these tables can be joined to
form a Cartesian product on the compute-nodegroup for the
query.
[0131] On the other hand, if step 609 determines that joins do
exist between at least two of the partitions, the DQP chooses two
of the partitions that are being joined based on statistics and
historical information (614) and materializes them into
intermediate tables that are distributed in a manner consistent
with the join between them (615 and 616), where both intermediate
tables are located on the compute-nodegroup for the query. The
actions taken in step 616 are recorded for use in constructing the
query plan (617). The DQP then reformulates the query using the
remaining tables. It can be shown that the actions taken in steps
615 and 616 would allow steps 603, 604 and 605 to identify at least
one partition that contained more than a single table and the
planning process continues till we arrive at a query that includes
only a single partition (in step 606).
[0132] In one embodiment, each node in the system executes a fully
functional DBMS that is able to completely plan, optimize and
execute queries for the data resident on that node.
[0133] All intermediate tables that are materialized during the
process of query execution are materialized on the
compute-nodegroup for the query, if no alternate behavior has been
specified by the administrator. The administrator can specify where
these results are to be materialized.
[0134] The process of query planning and execution is best
understood with practical examples. For the purposes of these
examples, the sample schema illustrated in FIG. 7 is used. The
schema is based on the TPC-H Benchmark (www.tpc.org). The figure
also specifies how the tables in the system are to be distributed
in an EPDBMS. Tables REGION and NATION are broadcast on a nodegroup
called NG1. The table Supplier is defined to be distributed using
some EDD method with a single distribution column, and on the
nodegroup NG2. All other tables are distributed using the same EDD
on nodegroup NG1.
[0135] To illustrate the Parallel Query Planning process, consider
the following query: SELECT PARTKEY FROM LINEORDER WHERE
ORDERKEY=12345;
[0136] This query is syntactically valid, but no table called
LINEORDER exists in the system so a suitable error message is
generated by the system.
[0137] Consider the following query that is submitted to the
EPDBMS: SELECT PARTKEY FROM LINEITEM WHERE ORDERKEY=12345;
[0138] The query references a single table (LINEITEM) and
therefore, the query consists of a single partition and can be
executed directly on all nodes where the table LINEITEM
resides.
[0139] In one embodiment, DQP is complete and the query can be
executed on all nodes where the table LINEITEM resides and the
results can be returned to the user.
[0140] In one embodiment, DQP determines that the query includes a
restriction "ORDERKEY=12345" and also recognizes that the table
LINEITEM is distributed using the EDD method on a single column,
ORDERKEY. The system therefore knows that all rows of the table
LINEITEM with an ORDERKEY of 12345 must exist on a single node in
the system, or not exist on any node in the system. The system can
therefore perform further optimization and execute the query only
on the one node where it is determined that rows with an ORDERKEY
of 12345 exist on the system. Step (608) in the query planning
process would therefore determine which node (if any) the query
needs to be executed on, and may even entirely eliminate any
execution if it can determine that no row with an ORDERKEY of 12345
exists on the system. The EDD method can provide the DQP with this
information, and the DQP can perform this optimization.
[0141] Consider the following query that is submitted to the
EPDBMS:
[0142] SELECT LINEITEM.PARTKEY, PART.NAME
[0143] FROM LINEITEM, PART
[0144] WHERE LINEITEM.ORDERKEY=12345
[0145] AND LINEITEM.PARTKEY=PART.PARTKEY;
[0146] Being syntactically valid, DQP is attempted and determines
that the query references two tables, LINEITEM and PART (step 602).
The restriction "LINEITEM.ORDERKEY=12345" is extracted and
associated with the table LINEITEM (step 603). Step 604 determines
that the query consists of two partitions because the tables
LINEITEM and PART are being joined on the column PARTKEY but the
table LINEITEM is distributed on the column ORDERKEY, and the table
PART is distributed on PARTKEY. As each partition is a single
table, step 605 finds no join conditions to extract.
[0147] At this stage, we have the following two partitions:
[0148] PARTITION1: LINEITEM, WHERE ORDERKEY=12345
[0149] PARTITION2: PART
[0150] We also have the remnant of the query that includes the join
condition,
[0151] SELECT LINEITEM.PARTKEY, PART.NAME
[0152] FROM LINEITEM, PART
[0153] WHERE LINEITEM.PARTKEY=PART.PARTKEY;
[0154] Note that this is not intended to illustrate a complete and
valid SQL statement that is identical to the SQL submitted to the
system, but rather, the contents of some data structures for the
purposes of illustrating the DQP process.
[0155] Step 606 determines that there are two partitions as shown
above and step 608 uses some historical data to guess the number of
rows that will be found in each of the partitions.
[0156] Step 609 determines that the two partitions are in fact
joined and determines that the join clause would be
"PARTITION1.PARTKEY=PARTITION2.PARTKEY". Step 615 will therefore
cause the system to generate two intermediate tables (call
them_PARTITION.sub.--1 and PARTITION 2) and distribute them on the
compute-nodegroup for the query using an IEDD on the PARTKEY.
[0157] _PARTITION.sub.--1 is the result of executing the query
"SELECT LINEITEM.PARTKEY FROM LINEITEM WHERE ORDERKEY=12345" and
distributing that result on the value of the PARTKEY.
[0158] .sub.--PARTITION.sub.--2 is the result of executing the
query "SELECT PART.PARTKEY, PART.NAME FROM PART" and distributing
that result on the value of the PARTKEY.
[0159] Both _PARTITION.sub.--1 and _PARTITION.sub.--2 are created
on the nodegroup compute-nodegroup for the query.
[0160] These steps are recorded (617) for constructing the query
plan and the query is reformulated using the remaining tables,
_PARTITION.sub.--1 and _PARTITION.sub.--2 as follows.
[0161] SELECT_PARTITION.sub.--1.PARTKEY,
_PARTITION.sub.--2.NAME
[0162] FROM_PARTITION.sub.--1, PARTITION.sub.--2
[0163]
WHERE_PARTITION.sub.--1.PARTKEY=_PARTITION.sub.--1.PARTKEY;
[0164] PQP resumes at step 602 that identifies two tables,
_PARTITION.sub.--1 and _PARTITION.sub.--2. There are no single
table restrictions on either table. Step 604 determines that the
two tables are joined and the joins can be executed locally. Step
605 extracts the join condition and attaches it with the newly
created partition, _PARTITION.sub.--3.
[0165] Step 606 determines that there is only a single partition
and therefore query planning now completes by constructing the list
of steps recorded along the way. Those steps were:
TABLE-US-00003 ON NODEGROUP COMPUTE-NODEGROUP CREATE TABLE
_PARTITION_1 (PARTKEY) TO HOLD THE RESULTS OF EXECUTING THE QUERY
"SELECT PARTKEY FROM LINEITEM WHERE ORDERKEY = 12345" ON ALL NODES
IN NODEGROUP NG1 WHERE A ROW WITH ORDERKEY 12345 MAY RESIDE AND
DISTRIBUTE _PARTITION_1 ON <PARTKEY> USING SIMPLE HASH
DISTRIBUTION. ON NODEGROUP COMPUTE-NODEGROUP CREATE TABLE
_PARTITION_2 (PARTKEY, NAME) TO HOLD THE RESULTS OF EXECUTING THE
QUERY SELECT PARTKEY, NAME FROM PART ON ALL NODES IN NODEGROUP NG1
AND DISTRIBUTE _PARTITION_2 ON <PARTKEY> USING SIMPLE HASH
DISTRIBUTION. ON NODEGROUP COMPUTE-NODEGROUP EXECUTE THE QUERY
SELECT _PARTITION_1.PARTKEY, _PARTITION_2.NAME FROM _PARTITION_1,
_PARTITION_2 WHERE _PARTITION_1.PARTKEY = _PARTITION_2.PARTKEY AND
RETURN THOSE RESULTS TO THE USER WHO SUBMITTED THE QUERY. ON
NODEGROUP COMPUTE-NODEGROUP DROP TABLE _PARTITION_1 DROP TABLE
_PARTITION_2
[0166] In the case of queries that include more complex SQL
constructs, such as aggregations, more complex query plans are
constructed by the DQP.
[0167] Consider the following query that is received by the
EPDBMS:
[0168] SELECT CUSTKEY, COUNT(*)
[0169] FROM LINEITEM
[0170] GROUP BY CUSTKEY;
[0171] The query consists of a single table and therefore query
execution can be performed in parallel on all nodes where the table
LINEITEM resides.
[0172] However, each node would only be able to determine the
results of executing this query on the data on that node.
[0173] In order to completely execute this query, the DQP needs to
generate a plan that includes additional steps. The query plan for
this query would include these steps:
TABLE-US-00004 ON NODEGROUP COMPUTE-NODEGROUP CREATE TABLE
_TEMP_TABLE_1 (CUSTKEY, COUNTER) TO HOLD THE RESULTS OF EXECUTING
THE QUERY "SELECT CUSTKEY, COUNT(*) FROM LINEITEM GROUP BY CUSTKEY"
ON ALL NODES IN NODEGROUP NG1 AND DISTRIBUTE THE TABLE
_TEMP_TABLE_1 ON <CUSTKEY> USING SIMPLE HASH DISTRIBUTION ON
NODEGROUP COMPUTE-NODEGROUP EXECUTE THE QUERY "SELECT CUSTKEY, SUM
(COUNTER) AS "COUNT(*)" FROM _TEMP_TABLE_1 GROUP BY CUSTKEY" AND
RETURN THOSE RESULTS TO THE USER WHO SUBMITTED THE QUERY.
[0174] All nodes in the system where the table LINEITEM exists,
execute the first query and the EPDBMS materializes those results
in a temporary table on the compute-nodegroup. The second step of
the plan then issues a second aggregation query and returns the
results to the user.
[0175] Since_TEMP_TABLE.sub.--1 was created with a data dependent
distribution on <CUSTKEY>, all rows in _TEMP_TABLE.sub.--1
with a given CUSTKEY will exist on the same node of the
compute-nodegroup for the query.
Intermediate Tables and Intermediate Processing
[0176] The EPDBMS software can determine where intermediate tables
created during the execution of a query are to be stored, and
intermediate computations performed. This may either be the system
default (default-compute-nodegroup) or some other nodegroup
specified by the administrator. The administrator may specify what
nodegroup each query, group of queries, session, group of sessions,
user or group of users will consider to be the
compute-nodegroup.
[0177] The DQP will capture information about the compute nodegroup
to be used at the beginning of query planning The DQP captures the
name of the nodegroup, and the members (nodes) of that nodegroup.
It indicates that it may be placing intermediate tables on these
nodes by incrementing a reference counter on those nodes. It
indicates that it is referencing the nodegroup by incrementing a
reference counter on the nodegroup. These reference counters are
decremented upon completion of execution of the query. In this way,
the EPDBMS can record its "affiliation" to specific nodes that are
members of the compute nodegroup, and the nodegroup that is being
used as the compute nodegroup.
[0178] Upon commencing execution of a query, a reference counter
associated with each table being referenced in the query is
incremented. If a table is referenced multiple times in a query,
the reference counter is incremented only once. This reference
counter is decremented upon completion of execution of the query,
for each table that was referenced in the query. The EPDBMS can
record that a query is being executed that utilizes a specific
table in the system.
Elastic Compute Capacity (ECC)
[0179] According to the foregoing explanation of query planning and
the placement of intermediate tables and the location where
intermediate computations are performed, one can immediately
appreciate that the administrator may seamlessly change the
available compute capacity in the system.
[0180] The available compute capacity in the system could be
changed in one of two ways: (a) by adding additional
compute-nodegroups, and associating different users with different
compute-nodegroups, and (b) by adding nodes to existing compute
nodegroup(s).
[0181] This is called Elastic Compute Capacity (ECC) and enables
the EPDBMS to scale up and down in response to high or low workload
situations. Such "resizing" is simple, quick and can be performed
while the system is continuing to process database operations.
[0182] FIG. 8 illustrates compute capacity expansion in one
embodiment. Assume an EPDBMS with fifty nodes named node0, node1,
node2, . . . node49. The following
[0183] nodegroups are defined on this system:
[0184] nodegroup-1: node40, node41, node42, . . . node49
[0185] default-storage: node0, node1, node2, . . . node39
[0186] For the purposes of this illustration assume that the
default-compute-nodegroup on the system is the nodegroup called
nodegroup-1. All queries on the system utilize the same nodegroup,
called nodegroup-1 as the compute nodegroup.
[0187] First, the administrator prepares additional nodes to bring
into the EPDBMS (Step 801). In the illustration in FIG. 8, the
administrator provisions five additional nodes to bring into the
EPDBMS. The activities involved in performing step 801 are shown on
the right of FIG. 8, alongside the step 801. First the nodes are
provisioned. Then, the EPDBMS is instructed that the new nodes are
to be called node50, node51, . . . node54. These nodes are then
brought online.
[0188] At the end of step 801, nodes50 through node54 are
un-enrolled nodes in the system. Step 802 enrolls these nodes and
creates a new nodegroup. The command issued on the right by the
administrator creates this nodegroup (called `newcompute`), and
includes on it the fifteen nodes whose names are enumerated as part
of the command. The system informs all nodes that a new nodegroup
has been created, and informs them of the membership of that
nodegroup.
[0189] At the end of step 802, the newly created nodegroup
`newcompute` is ready to become the new default-compute-nodegroup
for the system. The administrator performs step (803) to inform the
EPDBMS system that the default compute nodegroup to use is the new
nodegroup, `newcompute`.
[0190] All queries launched after this command completes will use
the nodegroup newcompute as the compute-nodegroup. Queries that are
already in flight will complete their operations using whatever
compute nodegroup was in effect when those queries started
execution.
[0191] The final state of the system is illustrated (804) and shows
that the default-compute-nodegroup is `newcompute`.
[0192] FIG. 9 illustrates how an administrator could reduce
available compute capacity on the system in one embodiment. After
the increased workload situation that led to the capacity increase
illustrated in FIG. 8, assume that the workload on the system
decreases, and the administrator decides to reduce the available
compute capacity in the system.
[0193] At the beginning of the illustration in FIG. 9, the
default-compute-nodegroup is newcompute and consists of fifteen
nodes. The administrator could immediately reduce the available
compute capacity to ten nodes by setting the
default-compute-nodegroup to nodegroup-1. However, the
administrator determines that only five nodes need to be part of
the compute nodegroup, and the administrator decides to use nodes
node50 . . . node54 for this purpose. Having created a new
nodegroup `compute` (step 902), the administrator activates that
nodegroup (step 903) and all queries launched after this command
completes will use the nodegroup `compute` for storage of
intermediate tables. Queries that are in flight at the time when
this change is made will complete, and should a new query be
launched in those same sessions, those new queries would use the
nodegroup `compute`.
[0194] By associating the location of intermediate tables and
intermediate processing with a compute-nodegroup, and providing a
mechanism by which the compute-nodegroup can be changed without
requiring system outages, or complex operations, the EPDBMS enables
the system to be truly elastic in adapting to variable workloads
that may be placed on the system.
[0195] By associating reference counts on the nodes that a query is
using, the EPDBMS knows whether a given node is in-use or not, at
any given time.
[0196] Once all queries that were running during the changes
illustrated in FIG. 9 have completed, the reference count on the
nodegroup `newcompute` will drop to zero.
Deletion of Nodegroups and Nodes
[0197] When the reference count and the table count associated with
a nodegroup are zero, the administrator can delete a nodegroup.
[0198] When a nodegroup is deleted, the group counter value for all
members of the nodegroup is decremented.
[0199] When a node has a zero reference count and a zero group
count, the administrator can delete the node.
[0200] When a node is deleted, the EPDBMS informs the software
executing on that node that its services are no longer needed by
the system, and the software on that node can perform an orderly
shutdown. Once a node has been logically deleted from the system,
the system administrator can de-provision the node.
Reference Counts for Compute-nodegroups and Storage-nodegroups
[0201] When a nodegroup is assigned a role, either as a
compute-nodegroup or a storage-nodegroup, it is important that this
be recorded as a reference count.
The Fact That a Nodegroup is Assigned the Role of
"Compute-nodegroup" or "Storage-nodegroup" Itself Causes Reference
Counters on the Nodegroup and All Participating Nodes to be
Incremented. This Ensures that Those Nodegroups and Nodes are Not
Accidentally Deleted or De-provisioned while there are No Running
Queries on the System.Dynamic Capacity Expansion (DCE) and Elastic
Data Distribution (EDD)
[0202] One aspect of the EPDBMS is a new class of data dependent
distribution method called "Elastic Data Distribution" (EDD).
Tables distributed with EDD can be configured to utilize additional
storage capacity that is managed through Dynamic Capacity Expansion
(DCE), while ensuring co-location and not require wholesale
redistribution of data.
Dynamic Capacity Expansion (DCE)
[0203] An administrator may add nodes to an existing storage
nodegroup. This process is called Dynamic Capacity Expansion (DCE).
FIG. 10 illustrates the process of DCE in a high level block
diagram according to one embodiment. The administrator first
determines that five nodes are to be added to the nodegroup,
`default-storage`. The nodes to be added to the system may be
existing nodes already in the system or new nodes that are to be
added to the system. The illustration assumes that new nodes are to
be used and so, new nodes are first provisioned and prepared for
induction into the system (1001).
[0204] At the end of this step (1001), the five nodes are
un-enrolled and ready for use. Step 1002 adds these nodes to the
nodegroup, `default-storage`. When the command completes, the
nodegroup default-storage now includes the additional five nodes,
and tables using the nodegroup default-storage can now store data
on those nodes as well as the previous fifty nodes.
[0205] When nodes are added to an existing nodegroup, a series of
steps are performed by the system. FIG. 11 illustrates these steps
in an embodiment, and extends on the example provided in FIG.
10.
[0206] Five nodes (node50, . . . node54) are to be added to the
nodegroup `storage-nodegroup` (1101). First, all random and
round-robin distributed tables associated with this nodegroup are
identified and empty tables are created on the new nodes. These
empty tables will be used in the future to store data on the new
nodes (1102). Similarly, all broadcast distributed tables
associated with this nodegroup are identified and empty tables are
created on the new nodes. These empty tables are then populated
with data to match the other nodes in the nodegroup (1103).
Similarly, all EDD tables associated with this nodegroup are
identified and empty tables are created on the new nodes (1104).
All data structures maintained by the system that relate to the EDD
tables are updated to reflect the new membership of the nodegroup
(1105). All nodes in the system are informed of the new membership
of the nodegroup (1106), and this ends the process of DCE (1107)
with the new nodes being part of the nodegroup.
Elastic Data Distribution (EDD)
[0207] When a table is created, it is associated with a nodegroup.
This association is established at the time when the table is
created. Data in the table is stored on the nodes that are members
of the nodegroup.
[0208] FIG. 12 is a high level block diagram illustrating where
tuples are stored on EDD tables to ensure co-location, in one
embodiment. For the purposes of this illustration, a table Ti (A,
B, C, D, E) is distributed according to EDD <A, B> and
associated with the nodegroup storage-nodegroup.
[0209] In step (1201), we construct the DV for the row <a,
b>. We then determine whether a row with the DV <a, b> was
ever stored in a table on the nodegroup `storage-nodegroup` (1202).
We choose `storage-nodegroup` as it is the nodegroup associated
with the table T1, which is the table to which the present row is
destined.
[0210] If we determine that a row with this DV was previously
encountered in connection with the nodegroup, `storage-nodegroup`,
the system determines where that row was stored (1203) and stores
this present row (a, b, c, d, e) on that same node.
[0211] If, on the other hand, step 1202 determines that no row with
this DV was ever stored in a table on nodegroup
`storage-nodegroup`, the EPDBMS decides where to store the row
(1204).
Distribution Maps
[0212] The distribution map is a data structure that tracks
information about DV's seen in connection with all tables
associated with a nodegroup.
[0213] The distribution map may be implemented as a simple linked
list or array of DVs seen in association with the associated
nodegroup.
[0214] The distribution map may be a tree data structure, where
each leaf is a DV that was seen in association with the associated
nodegroup.
Allocation Strategies
[0215] An allocation strategy is a set of rules that specify where
a given row in a table distributed with EDD would be stored, if it
has been determined that the DV of that row was never encountered
before in connection with the nodegroup.
[0216] Further, an allocation strategy is a deterministic set of
rules that can be evaluated based on the candidate row of data and
the state of the nodegroup.
[0217] FIG. 13 is another high level block diagram illustrating
where tuples are stored on EDD tables to ensure co-location.
[0218] In step (1301), we construct the DV for the row <a,
b>. We then determine whether a row with the DV <a, b> was
ever stored in a table on the nodegroup `storage-nodegroup` (1302).
We choose `storage-nodegroup` as it is the nodegroup associated
with the table T1, which is the table to which the present row is
destined. This determination is made by inspecting the Distribution
Map associated with the nodegroup as that data structure tells us
about all DV's that were ever seen in connection with the
nodegroup.
[0219] If we determine that a row with this DV was previously
encountered in connection with the nodegroup, `storage-nodegroup`,
the system determines where that row was stored (1305) and stores
this present row (a, b, c, d, e) on that same node, and this
concludes the process (1306).
[0220] If, on the other hand, step 1302 determines that no row with
this DV was ever stored in a table on nodegroup
`storage-nodegroup`, the EPDBMS decides where to store the row
(1303). This determination is made based on the Allocation Strategy
for the nodegroup. The location where this DV is being stored is
recorded for later use (1304), and this concludes the process
(1306).
Generations
[0221] Associated with each nodegroup, the EPDBMS maintains a
series of "generation" data structures. When a nodegroup is first
created, it is in its first generation. Associated with a
generation of a nodegroup are: (a) a list of participating nodes,
(b) a distribution map, and (c) an allocation strategy.
[0222] As a nodegroup is a non-empty collection of nodes, every
generation of a nodegroup includes a non-empty list of
participating nodes.
[0223] When nodes are added to a nodegroup through the process of
DCE, a new generation is created. The new generation records the
new list of participating nodes, has a new (empty) distribution map
and an allocation strategy. If the administrator does not specify
an allocation strategy, the default strategy is used.
[0224] An administrator may choose to modify the allocation
strategy on a nodegroup at any time without adding nodes. When the
allocation strategy on a nodegroup is changed, a new generation is
created. The new generation records the list of participating
nodes, has a new (empty) distribution map and a new allocation
strategy.
[0225] Information about nodegroups and the current and past
generations of the nodegroup are stored in a System Topology
Manager (203). The management of the distribution maps and the
allocation strategy are handled by the Data Distribution Manager
(207, 252, 262, 272, 282 and 292) that resides on the Controller
Node and the other Nodes in the system.
[0226] FIG. 14 extends on the preceding description, and is a
detailed block diagram illustrating where tuples are stored on EDD
tables to ensure co-location.
[0227] When presented with a row of data (a, b, c, d, e) destined
for the EDD table T1 (A, B, C, D, E) where the DV is <A, B>,
the first step (1401) is to compute the DV <a, b>. A counter
of generations is initialized to 1 (1402) and the distribution map
(DM) associated with the first generation of the nodegroup is
retrieved (1403). The DM retrieved in step 1403 is inspected for
the presence of an entry for the DV <a, b>. If one exists,
then step 1405 will determine where that row would have been stored
based on the allocation strategy associated with that generation,
and the new row is sent to that same place, which ends the process
(1406). If on the other hand, no entry for the DV <a, b> is
found in the generation associated with the counter `gencounter` in
step 1404, step 1407 checks to see whether another generation
exists. If there is one, step 1408 increments the counter and the
process resumes at step 1403. If on the other hand, step 1407
determines that no further generations exist, we can conclude that
this row (a, b, c, d, e) is the first occurrence of the DV <a,
b> and step 1409 retrieves the allocation strategy for the
current generation and stores the row in the location determined by
that allocation strategy. Step 1410 updates the DM associated with
the current generation reflecting the first occurrence of the DV
<a, b> and this completes the process (1406).
[0228] FIG. 15 extends on the preceding description and FIG. 11,
and provides more details of the steps performed during capacity
expansion. For the purpose of this illustration, five nodes are
being added to the nodegroup called `default-storage`, as shown in
FIG. 10.
[0229] The process begins (1501) after the user has provisioned the
additional five nodes and executed the command to add five nodes to
the nodegroup `storage-nodegroup`.
[0230] The system identifies (1502) all random and round-robin
distributed tables on the `storage-nodegroup` and creates empty
tables on the newly added nodes. These tables have no data in them
at this stage.
[0231] The system then identifies all broadcast (1503) distributed
tables on the storage-nodegroup and creates and populates replicas
on the newly added nodes. At the end of this step, all nodes in the
storage-nodegroup have identical (in structure and content)
broadcast distributed tables.
[0232] The system then identifies (1504) all EDD tables on
storage-nodegroup and creates empty tables on the newly added
nodes.
[0233] The tables created in steps 1502, 1503 and 1504 have the
same table structure as they have on the existing nodes in the
storage-nodegroup.
[0234] The system then creates (1505) a new generation for the
storage-nodegroup and populates an empty distribution map for that
generation.
[0235] The new nodes are then added (1506) to the node member list
for the nodegroup. If the administrator specified an allocation
strategy, step 1507 uses that strategy, and if not, a system
default strategy is used.
[0236] In preparation for switching to a new distribution map, the
system (1508) updates and flushes the distribution map for the
current generation. Finally, all nodes are informed of the new
generation of the storage-nodegroup, and when all nodes acknowledge
the change, the change is recorded as complete by the EPDBMS
(1510). This completes the process for dynamic capacity expansion
(1511).
Determining Where to Store a Row in an EDD Table
[0237] The algorithm depicted in FIGS. 12, 13, and 14 are best
understood as a dialog between two parts of the EPDBMS. The
following dialog between the EPDBMS and EDD logic is illustrative
of the placement process.
OPTION A:
[0238] EPDBMS: EDD, a row of data for T1(A, B, C, D, E) has been
received. The table T1 is EDD distributed on storage-nodegroup and
the DV is <A, B>. Please tell me where this row of data (a,
b, c, d, e) should be placed?
[0239] EDD: EPDBMS, I have never encountered a row with the DV
<a, b> associated with any table associated with
storage-nodegroup. This I can say for certain. You should place
this row on the node indicated by the Allocation Strategy for the
current generation of the nodegroup.
OPTION B:
[0240] EPDBMS: EDD, a row of data for T1(A, B, C, D, E) has been
received. The table T1 is EDD distributed on storage-nodegroup and
the DV is <A, B>. Please tell me where this row of data (a,
b, c, d, e) should be placed?
[0241] EDD: EPDBMS, I have determined that a row with an identical
DV <a, b> has been seen before and the row of data was placed
on node "nodeX".
[0242] EPDBMS: EDD, Thank you. I will place the row of data (a, b,
c, d, e) on nodeX, so it is co-located with the previous row with
the DV <a, b>.
[0243] The above illustrative dialog between the EPDBMS and the EDD
would ensure that co-location was guaranteed for all data on table
T1, and for all data in any other table associated with the same
nodegroup as T1 (storage-nodegroup) with a comparable DV. As the
numbers of rows in the database increases, and as the number of
distinct DVs increases, it is extremely expensive to maintain a
complete record of each DV that was ever encountered by the
system.
[0244] According to one embodiment, co-location can be guaranteed
without knowing all the DV's that were actually seen by the system.
We demonstrate below that it is sufficient to know positively that
a DV was never seen, or when (in which generation) it may have been
first seen, in order to guarantee co-location.
[0245] In other words, we demonstrate that in order to guarantee
co-location of data with identical DVs, it is sufficient if the EDD
logic could respond to the question (in the hypothetical dialog
described above) in the following fashion. We present below,
another hypothetical dialog between the EPDBMS code and the EDD
code.
OPTION A:
[0246] EPDBMS: EDD, a row of data for T1(A, B, C, D, E) has been
received. The table T1 is EDD distributed on storage-nodegroup and
the DV is <A, B>. Please tell me where this row of data (a,
b, c, d, e) should be placed?
[0247] EDD: EPDBMS, I have never encountered a row with the DV
<a, b> associated with any table associated with
storage-nodegroup. This I can say for certain. You should place
this row on the node indicated by the Allocation Strategy for the
current generation of the nodegroup.
OPTION B:
[0248] EPDBMS: EDD, a row of data for T1(A, B, C, D, E) has been
received. The table T1 is EDD distributed on storage-nodegroup and
the DV is <A, B>. Please tell me where this row of data (a,
b, c, d, e) should be placed?
[0249] EDD: EPDBMS, I may have encountered a row with DV <a,
b> in connection with storage-nodegroup. If I did, the first
time that I encountered it was in generation X, and in that
generation, the row would have been stored on node "nodeY". I can
definitely tell you, that before generation X, I never encountered
the DV <a, b> in connection with storage-nodegroup. Further,
I can guarantee that if, at any time in the past, you asked me
about the DV<a, b> in connection with this nodegroup, I would
have either told you that I never encountered it before, or I would
have told you that a previous row with that DV may have been stored
on nodeY, and in generation X. I can also guarantee that if I ever
told you that I had encountered this DV<a, b>, I would never
have told you that this row was encountered in any generation other
than generation X. I hope this helps.
[0250] If Option A in the above hypothetical dialog were to occur,
then some algorithm or algorithms have determined that the current
row (a, b, c, d, e) is the very first instance of the DV <a,
b>. EPDBMS is therefore free to store (a, b, c, d, e) on any
node in the system in accordance with the allocation strategy in
effect in the current generation.
[0251] On the other hand, if Option B in the above hypothetical
dialog were to occur, the EPDBMS would necessarily have to store
the row in "NodeY".
[0252] When determining where to store a row in an EDD distributed
table, there is therefore a tolerance for false positives in the
determination whether a prior row with an identical DV was ever
encountered. But, there is absolutely no tolerance for a false
negative in that determination.
[0253] A new row being inserted into an EDD table is stored on the
node specified by the allocation strategy of the earliest
generation in which it cannot be determined with absolute certainty
that the DV has never been seen, or according to the allocation
strategy of the current generation if the DV has never been seen in
any generation. For the purposes of this statement, it is
implicitly assumed that all references to allocation strategy and
generation are in relation to the nodegroup associated with the
table to which the row is being inserted.
[0254] We now describe the algorithm described above using the
illustration in FIG. 16.
[0255] In this illustration, a new row of data (a, b, c, d, e) is
received by the system for table T1(A, B, C, D, E) associated with
storage-nodegroup, which is EDD and the DV is <A, B>. First,
the system computes the DV of the row <a, b> (1601), and then
determines whether this DV was ever seen before (1602). If the DV
was never seen before, we can store the row on any node and still
guarantee co-location (1604). If, on the other hand, step 1602
determines that the DV may have been seen before, then we would
store new row on the same node where the previous row would have
been stored (1603).
[0256] FIG. 17 is a detailed block diagram that illustrates the
steps involved in determining where to place a row of data in an
EDD table. It extends on the illustration in FIG. 16.
[0257] When presented with the row (a, b, c, d, e) the first step
(1701) is to compute the DV <a, b>. A counter is initialized
to 1 and used for an iterative search through the generations of
the nodegroup. Step 1703 determines whether the DV <a, b> has
been encountered in connection with the generation indicated by the
counter `gencounter`. If the row has NOT been encountered in that
generation, then the process continues to step 1704, where we
determine whether there are more generations to search. If YES,
processing continues to 1705 where the gencounter is incremented
and processing resumes at step 1703. If on the other hand, step
1704 determines that no further generations exist, then we have
determined that this DV was never seen before and step 1706
determines where to send the row in accordance with the AS for the
current generation of the nodegroup. Step 1707 records the first
occurrence of the DV <a, b> in the current generation, and
the process ends (1709). On the other hand, if step 1703 determines
that the DV may have been seen, then the row will be dispatched
according to the Allocation Strategy for the generation indicated
by `gencounter` (1708) and this completes the process.
[0258] FIG. 18 extends the illustration of FIG. 17 and provides an
embodiment of the algorithm(s) used in determining where to store a
row in connection with an EDD table. A row of data with a DV <a,
b> is received (1801). In step 1802, five hashes are computed,
and these hashes are (h1, h2, h3, h4 and h5). For the purposes of
illustration, assume that each of these hashes is a sixty-four bit
value and is represented as an unsigned sixty-four bit integer.
Assume further that the distribution map is a bitmap containing one
million bytes (1,000,000 bytes) or eight million bits (total-bits).
Step 1803 translates each of the five hash values into a specific
bit in this bitmap. In this illustration, the hash value h.sub.i is
translated into a specific bit (b.sub.i) in the bitmap as:
b.sub.i=h.sub.i MOD total-bits
[0259] where MOD is the remainder on integer division (MODULUS)
operator
[0260] Having computed the values of the five bits (step 1803), the
process prepares to perform an iterative search of the distribution
maps associated with each generation of the nodegroup starting with
generation 1. Step 1804 initializes the generation counter to 1 and
step 1807 checks to see whether all the bits (b1, b2, b3, b4, and
b5) are set in the distribution map associated with the generation
specified by gencounter. If all the bits are set, it is possible
that this present DV <a, b> may have been seen in the
generation specified by gencounter. Step 1808 proceeds to dispatch
the row to the same location where the previous row would have been
dispatched, if it had been seen when that generation of the
distribution map was current, and the process ends (1811).
[0261] In step 1807, if even one of the five bits (b1, b2, b3, b4
and b5) is not set, the DV <a, b> was certainly not seen in
that generation of the distribution map, so step 1806 checks to see
whether there are more generations. If yes, the process increments
gencounter (1805) and starts again at step 1807.
[0262] If on the other hand, step 1806 determines that there are no
more generations, then this DV <a, b> has never been seen
before and step 1809 sets the five bits in the current generation
of the bitmap. Step 1810 determines where to dispatch this row
according to the allocation strategy for the current generation of
the bitmap, and this concludes the process (1811).
[0263] Without loss of generality, the five hash functions chosen
above in this embodiment may be replaced by any deterministic
function that operates on a DV and produces some value, and where
each DV is associated with a single value but many DV's may produce
the same value.
Allocation Strategies in Greater Detail
[0264] An Allocation Strategy (as defined in [178]) is a set of
rules that specify where a given row in a table distributed with
EDD would be stored, if it has been determined that the DV of that
row was never encountered before in connection with the
nodegroup.
[0265] We refine that definition here by stating that an Allocation
Strategy is a deterministic algorithm that may be evaluated solely
based upon the value of the DV, and any information related to the
current generation of the nodegroup (such as the number of nodes in
the nodegroup, etc.,)
[0266] The administrator may specify that all nodes in the
nodegroup are available to receive rows related to DV's that have
never been seen before. This is the default allocation
strategy.
[0267] The administrator may specify which nodes in the nodegroup
are available to receive rows related to DV's that have never been
seen before.
[0268] The administrator may specify which nodes in the nodegroup
are not available to receive rows related to DV's that have never
been seen before.
DCE and EDD Working Together
[0269] Having now described all the components of DCE and EDD,
FIGS. 19, 20 and 21 provide a single detailed example illustrating
the insertion of the first eighteen rows into two tables on a
nodegroup. Both of the tables are EDD distributed using the same
EDD method.
[0270] In the illustration of an embodiment that is provided here,
T1(A, B, C, D, E) is EDD <A, B> and T2 (P, Q, R) is EDD
<P, Q>. The DV's of T1 and T2 are comparable and T1 and T2
are associated with the same nodegroup.
[0271] The illustration begins on FIG. 19 in step 1901. A new
nodegroup, `storage` has been created and is in its first
generation. The tables T1 and T2 have been created.
[0272] The allocation strategy in the implementation is to send
data to the node in the nodegroup identified by computing the
reminder after division of the value of the first distribution
column by the number of nodes. As there are four nodes in storage
nodegroup at this time, the row (a, b, c, d, e) would be sent to
the node
[0273] a % 4
[0274] where "%" is the integer modulo operator (reminder after
integer division)
[0275] If (a % 4) is 0, the row is sent to node0,
[0276] If (a % 4) is 1, the row is sent to node1 and so on.
[0277] The first row of data (1, 1, 2, 3, 6) is received for T1. As
the DV <1, 1> has never been seen before, the row is sent to
node (1% 4) or node1 (steps 1902 and 1903).
[0278] The next row of data (2, 1, 2, 3, 6) is received for T1. As
the DV <2, 1> has never been seen before, the row is sent to
node (2% 4) or node2 (steps 1904 and 1905).
[0279] The next row of data (4, 3, 2, 3, 6) is received for T1. As
the DV <4, 3> has never been seen before, the row is sent to
node (4% 4) or node0 (steps 1906 and 1907).
[0280] The next row of data (4, 3, 7, 5, 2) is received for T1. The
DV <4, 3> was seen before in generation 1. The allocation
strategy for generation 1 would have sent the row to node (4% 4) or
node0. This row is sent to the same place. (steps 1908 and
1909).
[0281] The next row of data (3, 3, 6, 3, 2) is received for T1. The
DV <3, 3> has never been seen before, the row is sent to node
(3% 4) or node3 (steps 1910, 1911).
[0282] Finally, the sixth row of data (3, 3, 3, 3, 2) is received
for T1. The DV <3, 3> was seen before in generation 1. The
allocation strategy for generation 1 would have sent the row to
node (3% 4) or node3. This row is sent to the same place (steps
1912 and 1913).
[0283] This concludes the illustration on FIG. 19.
[0284] FIG. 20 continues the illustration from FIG. 19 on an
embodiment. The administrator adds a node (node4) to the nodegroup
according to the methods provided by DCE. The administrator uses
the default allocation strategy for the second generation of the
nodegroup `storage`. In the default allocation strategy, data is
sent to the node identified by the value of the first distribution
column % 5, as there are now five nodes in the nodegroup (step
2001).
[0285] A row of data (2, 1, 6) is received for T2. The DV <2,
1> was seen in generation 1. The allocation strategy for
generation 1 would have sent the row to node (2% 4) or node2. This
row is sent to the same place (steps 2002, 2003).
[0286] A row of data (2, 1, 2, 3, 6) is received for T1. The DV
<2, 1> was seen in generation 1. The allocation strategy for
generation 1 would have sent the row to node (2% 4) or node2. This
row is sent to the same place (steps 2004, 2005).
[0287] A row of data (7, 3, 2, 3, 6) is received for T1. The DV
<7, 3> has never been seen before, the row is sent to node
(7% 5) or node2 (steps 2006, 2007).
[0288] A row of data (7, 6, 7, 5, 2) is received for T1. The DV
<7, 6> has never been seen before, the row is sent to node
(7% 5) or node2 (steps 2008, 2009).
[0289] A row of data (9, 3, 6, 3, 2) is received for T1. The DV
<9, 3> has never been seen before, the row is sent to node
(9% 5) or node4 (steps 2010, 2011).
[0290] A row of data (3, 3, 2) is received for T2. The DV <3,
3> was seen in generation 1. The allocation strategy for
generation 1 would have sent the row to node (3% 4) or node3. This
row is sent to the same place (steps 2012, 2013).
[0291] This concludes the illustration on FIG. 20.
[0292] FIG. 21 continues the illustration from FIG. 19 and FIG. 20
on an embodiment. The administrator determines that node 2 needs no
more new data as it appears to have a higher fraction of the data
already. The administrator modifies the allocation strategy (step
2101) for the nodegroup by specifying that data should be sent to
nodes other than node 2. This leaves four nodes in the nodegroup
that can receive data and therefore the allocation strategy
computes the value of <first distribution column> % 4 and
depending on the value (0, 1, 2 or 3) sends the row to node (0, 2,
3 or 4). This leads to the creation of generation 3.
[0293] A row of data (2, 1, 6) is received for T2. The DV <2,
1> was seen in generation 1. The allocation strategy for
generation 1 would have sent the row to node (2% 4) or node2. This
row is sent to the same place (steps 2102, 2103).
[0294] Steps 2104 through 2113 are self-explanatory, and follow
directly from the actions in FIGS. 19 and 20.
[0295] Step 2102 and 2103 illustrate an important aspect of the
EDD. While the administrator has specified that new data is not to
be sent to the node Node 2, that only applies to new (and never
seen before) DV's. The DV <2, 1> has been seen before, and
must therefore be stored only on Node 2.
Reinitializing a Nodegroup
[0296] When there are no tables stored on a nodegroup, all
information that is retained about prior generations of the
nodegroup, and previous data that was stored on the nodegroup is
obsolete.
[0297] When the tablecount on a nodegroup returns to zero (is
decremented from 1 to zero) when the last table on it is deleted,
the nodegroup is reinitialized.
[0298] When a nodegroup is reinitialized, all generations other
than generation 1 are discarded, and all participating nodes in the
nodegroup may be included in the first generation of the newly
reinitialized nodegroup. The default allocation strategy is applied
to the newly reinitialized nodegroup.
Storing Distribution Maps
[0299] From the preceding sections, the detailed description and
the illustration in FIGS. 17, 18, 19, 20, and 21, it will be
apparent that the only distribution map that is ever updated is the
distribution map associated with the current generation.
[0300] Different generations of the distribution map may be stored
in different formats. In one embodiment, extremely efficient
compressed bitmaps may be used for all generations of the
distribution map, except the current generation. The current
generation, may be an uncompressed bitmap or a collection of
uncompressed bitmaps.
ACID implications for EDD
[0301] An EPDBMS that provides complete ACID (Atomicity,
Consistency, Isolation and Durability) semantics.
[0302] All changes that are made to a distribution map as part of a
database transaction are committed to the database as part of that
database transaction. The Transaction Coordinator (1605) is
responsible for ensuring that transactions are either completely
committed or not committed at all. The Transaction Coordinator
works with the Query Execution Engines on each node and the Client
Connection Manager on the Controller Node to ensure that data
integrity is preserved.
Heterogeneous Persistent Data Stores
[0303] FIG. 22 provides an illustration of a parallel elastic
database management system illustrating heterogeneous persistent
data stores, according to one embodiment. In this figure, an EPDBMS
is shown where one storage node, 2201 is part of a nodegroup called
email-nodegroup that is associated with a single table called
EMAIL. This table is a relational representation of data that is
stored in some external email server that speaks the well-known
IMAP protocol.
[0304] In practice, IMAP mail servers such as the mail services
provided by companies like Google have extremely efficient search
algorithms that are tailored towards email messages.
[0305] A client 2260 submits a query that contains a "join" between
two "tables", one is CUSTOMER, and the other is EMAIL. CUSTOMER is
associated with the storage-nodegroup, and stored on storage nodes
2202, 2203 and 2204.
[0306] The submission of this database query (2261) causes the DQP
(2251) to construct a query plan (2252). Observe that Step 1 of the
query plan is expressed in a language understood by the node
software on node 2201 that resembles (closely) the native language
of the Email Server (2220) that actually stores the data for the
table EMAIL. Steps 2 and 3 are expressed in a language understood
by the node software on nodes 2202, 2203, 2204, 2231 and 2232.
[0307] The IMAP server performs the search in the most efficient
manner as suited for email messages, and the persistent data stores
on nodes 2202, 2203 and 2204 perform the query in Step 2 using
algorithms best suited for them.
Intra-query Parallelism
[0308] The query plan depicted in FIG. 4 (404) constructed by the
DQP process (403) has determined that Step 2 depends on Step 1.
[0309] The query plan depicted in FIG. 22 (2252) constructed by the
DQP process (2251) has determined that Steps 1 and Step 2 may be
executed with no dependencies on each other, but Step 3 has a
dependency on Steps 1 and 2.
[0310] This aspect of the DQP process allows multiple sets of
operations to be executed in parallel, and at the same time,
depending on available resources on the system.
[0311] Execution location of software components
[0312] In the preceding description we have defined a Node to be,
"a logical collection of software components that are executing on
some computing resources, such as CPU, Memory, Network Interface,
and Storage". See paragraph [10].
[0313] We now proceed to specify this architecture in more
detail.
[0314] We have several kinds of software programs that each play a
part in the operation of the EPDBMS. We have software components on
the Controller Node that provide services such as Dynamic Query
Planning, System Topology, Catalog Metadata, and Data Distribution
(among others). On the Storage/Compute Nodes, we have software that
provides Query Execution and Data Distribution services in addition
to the persistent data repositories.
[0315] A "Machine" in the EPDBMS is a unit of computing
infrastructure that provides computing resources such as CPU,
Memory, Network Interface and Storage. This Machine may be either a
physical machine (real hardware) or a Virtual Machine. Without loss
of generality, we state that the Machine provides computing
resources including storage, while recognizing that in practice the
storage may be only virtual storage where data are physically
resident on physical storage media in some other location. Each
Machine in the EPDBMS has a component of management software called
the Machine Manager which is responsible for all management
functions related to activities on that Machine.
[0316] The Nodes in the EPDBMS are therefore (as described in
paragraph [10]) a logical collection of software components that
are executing on the resources provided by the Machines.
[0317] FIG. 23 shows an EPDBMS system with 5 Nodes, and 5 Machines.
As can be seen here, the Nodes and the Machines are paired up with
each Node having a dedicated Machine. Controller Node 2310 has
access to machine 2301, and the Storage/Controller Nodes 2320,
2330, 2340 and 2350 have access to the Machines 2302, 2303, 2304
and 2305 respectively. As described above, each Machine has a
dedicated Machine Manager (2371, 2372, 2373, 2374, and 2375).
[0318] FIG. 24 shows another EPDBMS with 5 Nodes and 4 Machines. In
this system, the Nodes 2430 and 2440 share the same Machine, 2403.
As described above, each Machine has a dedicated Machine Manager
(2471, 2472, 2473, and 2474).
[0319] FIG. 25 shows another EPDBMS with 5 Nodes and 3 Machines. In
this EPDBMS system, the Controller Node has dedicated access to
Machine 1 while the other four Nodes (Storage/Compute Nodes) have
Query Execution Engines on Machine 2 and have their Data
Distribution Managers and Persistent Data on Machine 3. Also
depicted in this figure is the fact that (as described in paragraph
[61]), a node may include multiple Query Execution Engines. In FIG.
25, we show that the Node 2520 has two Query Execution Engines and
the Node 2530 has three. As described above, each Machine has a
dedicated Machine Manager (2571, 2572, and 2573).
[0320] As illustrated in FIGS. 23, 24 and 25, Machines provide the
computing infrastructure, and the software components of the Nodes
are associated with specific Machines.
Machine Managers
[0321] As described above, each Machine has a dedicated Machine
Manager. The Machine Manager is responsible for all management
functions related to activities on that Machine. A Machine Manager
software program is launched on a Machine at the time when the
Machine is provisioned to be part of the EPDBMS, and this program
continues to operate till the Machine is released from the
EPDBMS.
[0322] FIG. 26 is an illustration of some aspects of a Machine
Manager on a single Machine. A Machine Manager program maintains a
reference counter to track each software resource that the EPDBMS
has chosen to associate with that Machine (2601).
[0323] When a new Machine is launched, these reference counters are
all initialized to zero and are incremented each time a software
resource is associated with the node, and decremented each time a
software resource is dis-associated from the node.
[0324] When a new Machine is launched, the Machine Manager launches
a certain number of "Job Threads". A Job Thread is a software
program that can be instructed to take on the role of a software
resource that is part of the EPDBMS.
[0325] The Machine Manager maintains a list of available "Job
Threads" (2603). In addition, it maintains lists of software
resources that are associated with the Machine (2602, 2604,
2605).
[0326] When the EPDBMS chooses to associate a software resource
with a Machine, the Machine Manager on the Machine receives the
request. The process of provisioning an Execution Engine is
depicted in FIG. 27 and starts at step 2720. The system is depicted
(on the left) with a Machine Manager (2701) with 4 Execution
Engines (2702), 4 Distribution Managers (2704), 2 Persistent Stores
(2705) and 4 Job Threads (2703).
[0327] When the request for a new Execution Engine is received
(2720), the Machine Manager identifies whether a Job Thread is
available (2721), and finding an available Job Thread, it assigns
the Job Thread (which removes it from the list of Job Threads)
(2722), and creates a new Execution Engine, (Execution Engine 5)
(2723) and updates counters reflecting the numbers of Job Threads
and Execution Engines (2724) which completes the process
(2725).
Process Architecture View of the Query Execution Process
[0328] When a Client Application connects to the EPDBMS, a
connection is established with the EPDBMS. Each such connection is
associated with a certain "state". Some common examples of such
connection state are collation sequence, language, database
localization settings, and transaction state.
[0329] FIG. 28 illustrates the query execution process and
highlights the steps involved, as they relate to the process
architecture.
[0330] The process begins with the receipt of a client connection
request (2801). The Client Connection Manager (on the Controller
Node) receives this connection (2802) and proceeds to validate the
request (2803). If the request is invalid, an error is returned
(2804). If the request is valid, we wait for a valid query/command.
Parts of this processing (2805) are depicted in detail as part of
FIG. 3. When a valid query/command is received, first determine
whether the connection state is to be altered (2806). Some commands
alter connection state as part of their execution, others alter
connection state (and do nothing else), and others do not alter the
connection state. If the connection state is to be altered, update
the state (2807). A query execution plan is constructed (2808) and
forwarded to the Query Execution Engine on the Controller Node
(2809). The Query Execution Engine determines how many additional
engines are required in the plan based on a variety of factors
(2810). For example, if the plan is such that multiple operations
in the plan can be performed at the same time, additional engines
may be required. An example of these kinds of dependencies was
provided as part of FIG. 22 (see description in paragraph [257] in
the section on "Intra-Query Parallelism"). The Query Execution
Engine determines what engines are available and currently not
provisioned for some other query plan (2811) and determines whether
there are sufficient free engines available (2812). If there are
not sufficient engines then a determination is made whether
additional engines can be launched on the machines currently
available to the system (2813). If additional machines are needed,
they are provisioned (2814), and additional engines are launched on
those machines (2815). Finally, in step 2816, the additional
engines required for the query plan are assigned to the plan, and
state information about the connection is forwarded to those
engines (2817). The engines execute the query plan and involve
other resources such as the redistribution managers where
appropriate (2818). Results from the query or an appropriate
message are returned to the client (2819) and the final connection
state is saved. If at the completion of query execution, engines
can be released, they are released (2821 and 2822). If the system
determines that an entire machine may be released (2823), then the
machine is released (2824), and this concludes the query execution
process (2825) and execution resumes at 2805 when a valid
command/query is received.
[0331] The decisions in steps 2813, 2814, and 2824 are policy
decisions that may be defined by the administrator based on a
variety of factors including the cost and capabilities of machines
available to the system.
[0332] In one embodiment, the administrator may define policies
that define the maximum number of machines that may be provided at
any time.
[0333] In one embodiment, the administrator may define the
mechanism(s) to be used to determine whether existing machines can
support additional engines.
[0334] In one embodiment, the system may record when a machine was
provided and make a determination based on the cost(s) involved in
retaining the machine for incremental time.
[0335] In one embodiment, mechanisms are implemented that will
periodically scan for machines that can be released, and if
appropriate release those machines.
[0336] As depicted in FIG. 28, execution engines involved in the
execution of a query plan at the request of a client are made aware
of all state related information, making it possible for engines to
be released at the end of one query execution (2822) and
provisioned again when required (2812 through 2816).
[0337] It should be understood that the example embodiments
described above may be implemented in many different ways. In some
instances, the various "nodes" described herein may each be
implemented by a physical or virtual general purpose computer
having a central processor, memory, disk or other mass storage,
communication interface(s), input/output (I/O) device(s), and other
peripherals. The general purpose computer is transformed into the
nodes described above, for example, by loading software
instructions into the processor, and then causing execution of the
instructions to carry out the functions described for each
node.
[0338] As is known in the art, such a computer may contain a system
bus, where a bus is a set of hardware lines used for data transfer
among the components of a computer or processing system. The bus or
busses are essentially shared conduit(s) that connect different
elements of the computer system (e.g., processor, disk storage,
memory, input/output ports, network ports, etc.) that enables the
transfer of information between the elements. One or more central
processor units are attached to the system bus and provide for the
execution of computer instructions. Also attached to system bus are
typically I/O device interfaces for connecting various input and
output devices (e.g., keyboard, mouse, displays, printers,
speakers, etc.) to the computer. Network interface(s) allow the
computer to connect to various other devices attached to a network.
Memory provides volatile storage for computer software instructions
and data used to implement an embodiment. Disk or other mass
storage provides non-volatile storage for computer software
instructions and data used to implement, for example, the various
procedures described herein.
[0339] Embodiments may therefore typically be implemented in
hardware, firmware, software, or any combination thereof
[0340] The computers that execute the node functions described
above may be deployed in a cloud computing arrangement that makes
available one or more physical and/or virtual data processing
machines via a convenient, on-demand network access model to a
shared pool of configurable computing resources (e.g., networks,
servers, storage, applications, and services) that can be rapidly
provisioned and released with minimal management effort or service
provider interaction. Such cloud computing deployments are relevant
and typically preferred as they allow multiple users to access
computing resources as part of a shared marketplace. By aggregating
demand from multiple users in central locations, cloud computing
environments can be built in data centers that use the best and
newest technology, located in the sustainable and/or centralized
locations and designed to achieve the greatest per-unit efficiency
possible.
[0341] In certain embodiments, the procedures, devices, and
processes described herein that constitute a node are a computer
program product, including a computer readable medium (e.g., a
removable storage medium such as one or more DVD-ROM's, CD-ROM's,
diskettes, tapes, etc.) that provides at least a portion of the
software instructions for the system. Such a computer program
product can be installed by any suitable software installation
procedure, as is well known in the art. In another embodiment, at
least a portion of the software instructions may also be downloaded
over a cable, communication and/or wireless connection.
[0342] Embodiments may also be implemented as instructions stored
on a non-transient machine-readable medium, which may be read and
executed by one or more procedures. A non-transient
machine-readable medium may include any mechanism for storing or
transmitting information in a form readable by a machine (e.g., a
computing device). For example, a non-transient machine-readable
medium may include read only memory (ROM); random access memory
(RAM); magnetic disk storage media; optical storage media; flash
memory devices; and others.
[0343] Further, firmware, software, routines, or instructions may
be described herein as performing certain actions and/or functions
of the nodes. However, it should be appreciated that such
descriptions contained herein are merely for convenience and that
such actions in fact result from computing devices, processors,
controllers, or other devices executing the firmware, software,
routines, instructions, etc.
[0344] It also should be understood that the block and network
diagrams may include more or fewer elements, be arranged
differently, or be represented differently. But it further should
be understood that certain implementations may dictate the block
and network diagrams and the number of block and network diagrams
illustrating the execution of the embodiments be implemented in a
particular way.
[0345] Accordingly, further embodiments may also be implemented in
a variety of computer architectures, physical, virtual, cloud
computers, and/or some combination thereof, and thus the computer
systems described herein are intended for purposes of illustration
only and not as a limitation of the embodiments.
[0346] While this invention has been particularly shown and
described with references to example embodiments thereof, it will
be understood by those skilled in the art that various changes in
form and details may be made therein without departing from the
scope of the invention encompassed by the appended claims.
* * * * *