U.S. patent application number 12/246509 was filed with the patent office on 2010-04-08 for streaming queries.
This patent application is currently assigned to Microsoft Corporation. Invention is credited to Jonathan D. Goldstein, David E. Maier.
Application Number | 20100088325 12/246509 |
Document ID | / |
Family ID | 42076613 |
Filed Date | 2010-04-08 |
United States Patent
Application |
20100088325 |
Kind Code |
A1 |
Goldstein; Jonathan D. ; et
al. |
April 8, 2010 |
Streaming Queries
Abstract
The described implementations relate to recursive streaming
queries. One technique processes a recursive streaming query
through a query graph. The technique also detects when output
produced by executing the query graph advances to a specific
point.
Inventors: |
Goldstein; Jonathan D.;
(Kirkland, WA) ; Maier; David E.; (Portland,
OR) |
Correspondence
Address: |
MICROSOFT CORPORATION
ONE MICROSOFT WAY
REDMOND
WA
98052
US
|
Assignee: |
Microsoft Corporation
Redmond
WA
|
Family ID: |
42076613 |
Appl. No.: |
12/246509 |
Filed: |
October 7, 2008 |
Current U.S.
Class: |
707/756 ;
707/E17.017 |
Current CPC
Class: |
G06F 16/24568
20190101 |
Class at
Publication: |
707/756 ;
707/E17.017 |
International
Class: |
G06F 17/30 20060101
G06F017/30 |
Claims
1. A system, comprising: first and second networked stream
operators for operating on a recursive streaming query, wherein the
first operator is configured to receive at least two input streams
and to generate an output stream based upon a first set of
conditions and wherein the second operator is positioned downstream
from the first operator and is configured to generate a multicast
output, and wherein a portion of the multicast output is
recursively directed back as an input to the first operator thereby
forming a recursive loop.
2. The system of claim 1, wherein the first operator comprises a
union operator and the second operator comprises a flying
fixed-point (FFP) operator and wherein at least one additional
operator is interposed between the union operator and the FFP
operator.
3. The system of claim 1, wherein the at least one additional
operator comprises a join operator that is configured to receive
output from the union operator and at least one other stream as
inputs.
4. The system of claim 1, wherein the second operator comprises a
flying fixed-point (FFP) operator that is configured to probe the
recursive loop with speculative times to determine a specific point
in time to which the recursive loop has completed processing.
5. The system of claim 4, wherein the speculative times are based
on timestamps of input events.
6. The system of claim 1, wherein the first operator comprises a
union operator and the second operator comprises a flying
fixed-point (FFP) operator and wherein at least one additional
operator is interposed between the union operator and the FFP
operator and wherein the FFP operator and the one or more
additional operators are configured to correctly handle stream
events that arrive out of time order.
7. The system of claim 6, wherein the FFP operator and the at least
one additional operator are configured to correctly handle stream
events that retract or reduce a validity of previously seen stream
events.
8. The system of claim 1, wherein the second operator is configured
to probe the recursive loop with speculative times to determine a
point in time to which the recursive loop has completed
processing.
9. A method, comprising: processing at least one input stream
associated with a recursive streaming query; and, advancing time
for the recursive streaming query to a specific point when the at
least one input stream has advanced to the specific point and
recursive computations on the input stream are complete to the
specific point.
10. The method of claim 9, wherein the processing comprising
processing two input streams.
11. The method of claim 9, wherein the processing comprising
introducing speculative time events to determine the specific
point.
12. The method of claim 9, further comprising generating an output
from the processing and wherein the output includes information
about the advancing.
13. The method of claim 12, wherein the generating comprises
generating the output as an event that contains time information
associated with the advancing.
14. The method of claim 9, wherein the advancing occurs while
recursive computations continue for events subsequent to the
point.
15. The method of claim 9, wherein the advancing is repeated for
additional times that are subsequent to the specific point.
16. A computer-readable storage media having instructions stored
thereon that when executed by a computing device cause the
computing device to perform acts, comprising: processing a
recursive streaming query through a query graph; and, detecting
when output produced by executing the query graph advances to a
specific point.
17. The computer-readable storage media of claim 16, wherein the
processing comprises a recursive step on the query graph.
18. The computer-readable storage media of claim 17, wherein the
detecting comprises detecting a passage of time on the recursive
step.
19. The computer-readable storage media of claim 16, wherein the
detecting comprises probing the query graph with the specific point
in time.
20. The computer-readable storage media of claim 19, wherein the
probing is accomplished with a speculative event.
Description
BACKGROUND
[0001] Computers are very effective at storing large amounts of
data, such as in a database. Over the last half century or so,
techniques have been refined for establishing computational
options, such as accessing or querying the stored data, viewing the
data, modifying the data, etc. In these scenarios, the data can be
thought of as relatively static and so the techniques, such as
database querying techniques tend not to be very applicable to time
sensitive scenarios, such as those involving real-time or near
real-time. For instance, a database query technique designed to
retrieve a definition of a word from a dictionary database need not
be time sensitive since the data is statically stored in the
database.
[0002] In contrast, other scenarios tend to be involve streaming
data in real-time or near real-time. For instance, a temperature
sensor may be configured to periodically output a time-stamped
signal corresponding to a sensed temperature. When viewed
collectively this output can be thought of as a stream of data or a
data stream. The above mentioned database querying techniques are
not generally applicable in the data stream scenarios. Instead,
stream processing techniques have been developed for use with data
streams.
[0003] Stream processing techniques offer much more limited
computational options that those available in traditional database
scenarios. Stated another way, a very small set of computations can
presently be performed with stream processing. The present concepts
introduce new stream processing techniques that greatly increase
the set of computations that can be accomplished with stream
processing.
SUMMARY
[0004] The described implementations relate to recursive streaming
queries. One method or technique processes a recursive streaming
query through a query graph. The technique also detects when output
produced by executing the query graph advances to a specific
point.
[0005] Another implementation is manifested as a method that
processes at least one input stream associated with a recursive
streaming query. The technique also advances time for the recursive
streaming query to a specific point when at least one input stream
has advanced to the specific point and recursive computations on
the input stream are complete to the point.
[0006] The above listed examples are intended to provide a quick
reference to aid the reader and are not intended to define the
scope of the concepts described herein.
BRIEF DESCRIPTION OF THE DRAWINGS
[0007] The accompanying drawings illustrate implementations of the
concepts conveyed in the present application. Features of the
illustrated implementations can be more readily understood by
reference to the following description taken in conjunction with
the accompanying drawings. Like reference numbers in the various
drawings are used wherever feasible to indicate like elements.
Further, the left-most numeral of each reference number conveys the
Figure and associated discussion where the reference number is
first introduced.
[0008] FIGS. 1-5 show exemplary graphs for processing recursive
streaming queries in accordance with some implementations of the
present concepts.
[0009] FIG. 6 is a flowchart of exemplary recursive streaming query
processing techniques in accordance with some implementations of
the present concepts.
DETAILED DESCRIPTION
Overview
[0010] This patent application pertains to stream processing and
more specifically to recursive streaming queries. A data stream or
streaming data can be thought of as events or notifications that
are generated in real-time or near real-time. For introductory
discussion purposes, an event can be thought of as including event
data or payload and a timestamp.
[0011] Processing recursive streaming queries can entail the use of
one or more recursions. A recursion can be thought of as a function
that is defined in terms of itself so that it can involve
potentially infinite or unbounded computation. In a streaming data
scenario, computation resources are reserved for specific events
until the resources are no longer needed. The present
implementations offer solutions for detecting when recursive
processing is completed up to a specific point in time. Thus, the
recursion may remain infinite, but the present techniques can
identify specific time periods for which the recursive processing
of streaming queries is complete. Computation resources can then be
freed up to the specific point in time.
[0012] For instance, consider introductory FIG. 1 that illustrates
an exemplary recursive streaming query processing method generally
at 100 Accompanying streaming data upon which the method can be
implemented is evidenced at 102. Generally, the method processes at
least one input stream (i.e., streaming data 102) associated with a
recursive streaming query at 104. At 106, the method also advances
time for the recursive streaming query to a specific point when two
conditions are met. First, the one input stream has advanced to the
specific point and second, recursive computations on the input
stream are complete to the point.
[0013] Assume for purposes of explanation, that streaming data 102
is emitted from a temperature sensor 108 and processed on a query
graph 110. The temperature sensor is offered as a simple example of
a source of streaming data and the skilled artisan should recognize
many other sources, some examples of which are described below in
relation to FIGS. 2-4. Further, in this example only a single data
stream 102 is input into query graph 110. Other examples where
multiple data streams are input into a query graph are described
below in relation to FIGS. 2-5.
[0014] A recursive streaming query relating to streaming data 102
can be performed on query graph 110 such as by performing a
recursive step 112 via a recursive loop 114.
[0015] A recursive streaming query based on streaming data 102 can,
in some instances, be characterized as infinite or running forever.
However, portions of the recursive streaming query can be executed
on recursive loop 114 to generate an output 116. The present
implementations can detect when output 116 has advanced to a
specific point in time.
[0016] In summary, even though the recursive streaming query may
run indefinitely, the present implementations can detect when the
query graph 110 has advanced to a specific point in time as
portions of the recursive query are completed. This can also be
thought of detecting forward time progress. Stated another way, the
technique can detect when a region of the query graph upstream of a
certain point, such as point 118 has completed processing including
recursive processing relative to a specific point in time. The
technique can cause the query graph to issue a notice from point
118 that computations upstream from that point have advanced to the
specific point in time.
[0017] FIG. 1 introduces the concept that query graph 110 can
process a recursive streaming query. FIG. 2 introduces examples of
components that can accomplish the computations associated with
processing a recursive streaming query.
[0018] FIG. 2 shows a query graph 210 that includes a plurality of
operators 212 for processing a recursive search query from two
input streams 214(1), 214(2).
[0019] In this case, query graph 210 includes six operators 212(1),
212(2), 212(3), 212(4), 212(5), and 212(6). The term "operator" 212
is used in that the operators "operate", or perform computations,
upon the streaming data responsive to the recursive search query to
generate an output from the graph at 216. Briefly, an operator can
receive one or more inputs and process the inputs according to a
set of conditions. If the conditions are satisfied, then the
operator can generate an output that can be delivered to one or
more other operators.
[0020] In the present case, operator 212(1) can be termed a
"project" operator; operator 212(2) can be termed a "union"
operator; operator 212(3) can be termed a "join" operator; operator
212(4) can be termed a "select" operator; operator 212(5) is
another project operator; and operator 212(6) can be termed a
(flying fixed-point (FFP)) operator. The function of these
operators is described in more detail below.
[0021] Considered from one perspective, query graph 210 can be
viewed as being defined by its operators since a number, type,
and/or arrangement of operators can be adapted to specific
recursive search queries. So, a query is achieved by operating on
one or more input streams with the selected operators to generate
an output.
[0022] Input data streams 214(1) and 214(2) describe a changing
graph, composed of nodes and edges. 214(2) describes the (possibly
changing) nodes in the changing graph, while 214(1) describes the
changing set of edges between nodes. In other words, 214(1) and
214(2) can be thought of as defining a dynamic input graph that is
operated on by query graph 210. The graph is dynamic in that the
input streams can change over time.
[0023] For instance, consider an input graph where each edge is
labeled with a number and the user wants to know what is the
shortest path from one node to another node. Until the moment that
the actual graph is generated, the number of steps that might be in
that shortest path cannot be bound. So, that also has an unbounded
nature in that the graph is unknown at the time of query
creation.
[0024] The present concepts can be applied to many interesting
streaming graph-search problems, such as finding a minimum path to
a destination on a road network from a changing location and given
changing traffic conditions. Another potential application can be
regular expression matching over streams. Another application can
be any form of looping where the process cannot bound the number of
iterations at the time the recursive streaming query is
created.
FIRST EXAMPLE
Reachability
[0025] Query graph 210 offers an example of how streaming query
results are computed recursively through an example query. When
viewed formally the present example can rely upon the following
graph reachability query: [0026] Given a directed graph G=(N, E)
with nodes N={ni|i=1 . . . j}, a and edges E={(n1i, n2i)|i=1 . . .
k}, compute all pairs (n1, n2), n1.epsilon.N, n2 .epsilon.N, such
that n2 is reachable from n1 through one or more edges in E.
[0027] Note that the present techniques solve the formal problem
stated above under the assumption that the graph is not known at
compile time. Furthermore, the graph may change over time. The
description of the graph is, therefore, in and of itself streaming.
While this example might seem contrived, it is, in fact, a good
starting point for discussing streaming queries over networks and
roads, where both edge properties (e.g., traffic conditions) and
graph structure (e.g., links failing and recovering in a network)
are volatile.
[0028] This discussion introduces techniques for calculating
results and lays the foundation for examining recursive streaming
queries. For ease of explanation, assume that this recursive
streaming query has a single window of infinite size, there are no
retractions (for example, to revise erroneous or speculative items)
in the input stream, and that there are no punctuations to deal
with. All of these assumptions will be removed in later
sections.
[0029] As mentioned above query graph 210 provides two input data
streams 214(1) and 214(2). Data stream 214(1) relates to edges and
data stream 214(2) relates to source nodes. Also note that the plan
is a directed graph of streaming versions of relational operators,
where each arrow in the diagram is a data stream, and is labeled
with the schema of the events traveling along the data stream.
Assume for discussion purposes that all stream events are tagged
with the application time Vs at which the event becomes valid.
[0030] The data streams are can be interpreted as describing a
changing relation. Since the present discussion assumes a single
window of infinite duration, the contents of the relation at any
time t can be all of the events with Vs.ltoreq.t. Operators
212(1)-212(6) then output event streams that describe the changing
view computed over the changing input according to the relational
semantics of individual operators.
[0031] As introduced above, the present configuration utilizes an
FFP operator 212(6). The FFP operator offers a means to achieve
recursion. The FFP generates a multicast output 220 that is
forwarded to a conventional, non-recursive output indicated at
220(1), as well as to one of its descendants in the operator graph.
In this case, output 220(2) recursively loops back to union
operator 212(2) thereby forming a recursive loop 222. The result
can be thought of as a form of recursion which terminates when a
fixed point is reached.
[0032] Another interesting feature of the illustrated configuration
is the schema elements labeled "bv". These are, in fact, bit
vectors, each of which is k bits long. The present techniques can
use this bit vector to track visited nodes in query graph 210 and
avoid infinite looping through cycles.
[0033] FIG. 3 shows a graph 310 that can be used as input for input
stream 214(2) of FIG. 2. FIG. 3 illustrates nodes 302(1), 302(2),
302(3), and 302(4). Individual nodes 302(1)-302(4) are labeled with
both the node name as well as the valid time for the node insertion
event. Similarly, the graph also illustrates edges 304(1), 304(2),
304(3), and 304(4) with accompanying valid times of their edge
insertion events. Viewed in light of FIG. 2, nodes 302(1)-302(4)
are what would flow in on the nodes input 214(2). Similarly, nodes
304(1)-304(4) are what flow in on the edges input 214(1).
[0034] For the sake of concreteness and clarity, the present
discussion will follow the execution of the query plan to
completion for each distinct moment in time. The discussion will
also rely upon the assumption that each operator processes input
events in batches such that all input events with the same valid
time are processed at once. The discussion is directed to the
behavior of this plan at the four distinct points in time from time
1 to time 4. Since the present example includes 4 distinct nodes
302(1)-302(4), bv is 4 bits long.
[0035] Time 1: the technique receives four input events on the
nodes data stream 214(2), which correspond to nodes n1, n2, n3, and
n4 (i.e., 302(1)-302(4). Recall that an event can be thought of as
a payload and a timestamp. So for instance, node 302(1) with a
timestamp of 1 is an event. Note that the projection above the
nodes stream produces the following 4 events:
[0036] (1, n.sub.1, n.sub.1, 1000), (1, n.sub.2, n.sub.2,
0100),
[0037] (1, n.sub.3, n.sub.3, 0010), (1, n.sub.4, n.sub.4, 0001)
[0038] In FIG. 2, these events then travel through the union
operator 212(2) and lodge in the join operator's right join
synopsis as indicated at 224. Since there is no input on the left
side of the join operator 212(3), the process has reached a fixed
point. (If it was desired to limit the set of nodes considered as
source nodes for reachability, the technique could limit the nodes
stream 214(2) to only those nodes.)
[0039] Time 2: the technique can receive one event in the edges
data stream 214(1). This edge travels up to the join operator
212(3), which then lodges it in its left synopsis at 226. The event
is:
[0040] (2, n.sub.3, n.sub.1)
[0041] This event means that starting at time two, the input
relation on the left side of the join operator 212(3) contains an
edge going from n3 to n1. Given the join condition, this edge joins
to one row on the right side: (1, n3, n3, 0010). The join operator
212(3) then outputs:
[0042] (2, n.sub.3, n.sub.1, n.sub.3, n.sub.3, 0010)
[0043] The select operator 212(4) then checks if there is a cycle
by seeing if the path described above already includes the
destination in the new, derived path. This determination is made by
checking the 1st bit, since the technique is following the path to
n1. Since this bit is not set, the event reaches the project
operator 212(5), which removes unneeded columns and sets the
appropriate bit in bv. The result is:
[0044] (2, n.sub.3, n.sub.1, 1010)
[0045] This result concludes that there exists a path from n3 to
n1, and that this path first appeared at valid time 2. The
technique now reaches the FFP operator 212(6), which both outputs
the result from the query graph at 220(1), and inserts it into the
union operator 212(2) below the join operator 212(3) via output
220(2). The join operator 212(3) then lodges the event in the right
synopsis at 224, but is unable to join it to anything in its left
synopsis at 226. The technique has now reached a fixed point.
[0046] Time 3: The technique receives two events in the edges data
stream 214(1). These events travel up to the join operator 212(3)
and lodge in its left synopsis 226. The events are:
[0047] (3, n.sub.1, n.sub.2), (3, n.sub.2, n.sub.3)
[0048] Note that at this point, the left join synopsis 226 contains
the following entries:
[0049] (3, n.sub.1, n.sub.2), (3, n.sub.2, n.sub.3), (2, n.sub.3,
n.sub.1)
[0050] By joining the two new events to entries in the right
synopsis 224, the join operator 212(3) produces:
[0051] (3, n.sub.1, n.sub.2, n.sub.1, n.sub.1, 1000), (3, n.sub.1,
n.sub.2, n.sub.3. n.sub.1, 1010),
[0052] (3, n.sub.2, n.sub.3, n.sub.2, n.sub.2, 0100)
[0053] All three events get past the select operator 212(4) since
all the checked bits are 0, and therefore the process has not
encountered a cycle yet. After projection by projection operator
212(5), these three events become:
[0054] (3, n.sub.1, n.sub.2, 1100), (3, n.sub.3, n.sub.2,
1110),
[0055] (3, n.sub.2, n.sub.3, 0110)
[0056] These entries are now output by the FFP operator 212(6) and
loop around again to lodge in the join operator's right synopsis at
224. This time, however, the technique has not yet reached a fixed
point. By joining the three new events to the join operator's left
synopsis 226, the technique produces the following events:
[0057] (3, n.sub.2, n.sub.3, n.sub.1, n.sub.2, 1100), (3, n.sub.2,
n.sub.3, n.sub.3, n.sub.2, 1110),
[0058] (3, n.sub.3, n.sub.1, n.sub.2, n.sub.3, 0110)
[0059] Continuing the query, the technique checks for cycles using
select operator 212(4). Unlike previous times, this time, the
technique finds a cycle. The second event has already visited n3.
The technique therefore does not pass this event through to the
next round of recursion and only continues with the first and third
events. After projection, these become:
[0060] (3, n.sub.1, n.sub.3, 1110), (3, n.sub.2, n.sub.1, 1110)
[0061] These are now output and passed back to the union operator
212(2) for another round of recursion. These entries lodge in the
join operator's right synopsis 224, and produce two new events. It
is not hard to see that these new events cannot get past the select
operator 212(4) since the first three bits are set for both events.
The technique has again reached a fixed point. Note that the
following output has been produced so far:
[0062] (2, n.sub.3, n.sub.1, 1010), (3, n.sub.1, n.sub.2, 1100),
(3, n.sub.3, n.sub.2, 1110),
[0063] (3, n.sub.2, n.sub.3, 0110), (3, n.sub.1, n.sub.3, 1110),
(3, n.sub.2, n.sub.1, 1110)
[0064] This output succinctly says that each of the first three
nodes is reachable from all the other first three nodes.
[0065] Time 4: The technique receives an event in the edges data
stream 214(1). This edge lodges in the join operator's left
synopsis 226, and is:
[0066] (4, n.sub.3, n.sub.4)
[0067] The join operator 212(3) then produces:
[0068] (4, n.sub.3, n.sub.4, n.sub.3, n.sub.3, 0010), (4, n.sub.3,
n.sub.4, n.sub.2, n.sub.3, 0110),
[0069] (4, n.sub.3, n.sub.4, n.sub.1, n.sub.3, 1110)
[0070] All of these events get through the select operator 212(4)
since none have their 4th bits set, and become:
[0071] (4, n.sub.3, n.sub.4, 0011), (4, n.sub.2, n.sub.4,
0111),
[0072] (4, n.sub.1, n.sub.4, 1111)
[0073] The events are then output by the FFP operator 212(6), loop
around, and lodge in join operator's right synopsis 224 without
joining to anything. The technique has again reached a fixed point.
Note that the output at time 4 says that n4 may be reached from any
other node.
[0074] There are a few interesting observations that can be derived
from this example.
[0075] First, for clarity, the above discussion presented the
example in a way that quiesced the query between time increments.
The same result, although possibly with a different output order,
would have been achieved if new input were allowed into the
recursive loop 222 before a fixed point had been reached. This
outcome is possible because of the order insensitivity of the
operators used in this recursive query plan. Operators, such as
aggregation and difference, do not have this property, and can
require either quiescence of the recursive loop between increasing
valid time increments or implementations capable of speculative
execution, when used in recursive queries. There will be further
discussion of this point in later sections.
[0076] Second, the query avoided infinite loops by maintaining a
careful notion of progress in the form of the visited bit vector.
This notion of progress can be a key to proving that a particular
recursive query terminates with the correct answer, and is
discussed formally in the formalism section below.
[0077] Traditional notions of punctuations would likely fail if
used in the context of this query, since operators in the recursive
loop wait on themselves for a punctuation. The punctuations would
therefore become blocked at the union and join operators 212(2),
212(3), respectively, which would receive punctuations from their
non-recursive inputs, but never the recursive one. This issue is
addressed fully in the formalism section below.
Formalism
[0078] The following discussion formally defines concepts related
to streams, punctuations, and queries. The discussion also
describes what is required for an operator implementation to be
speculation friendly, and prove that the FFP operator 212(6)
functions correctly with appropriate inputs, streams, and
operators.
[0079] The present concepts can utilize a formal model of streams
that tends to encompass most previous stream models. Formally, a
stream R is a potentially unbounded sequence e1, e2, . . . of
events. An event e consists of one or more control parameters c1,
c2, . . . , cn, plus an optional payload p, which is written as
e=<c1, c2, . . . , cn; p>. A payload will typically be a
relational tuple (i.e., an ordered sequence of data values), but
might be something else, such as a punctuation pattern. The
technique utilizes a notion of conformance of a payload p to a
schema RR. In other words, a stream R conforms to schema RR if the
payload of every event in R conforms to RR.
[0080] The exact nature of control parameters varies from system to
system. Some of the alternative implementations can include a
single control parameter that contains a sequence number assigned
at the inputs to a query. Another example can include a control
parameter that indicates what the event represents (regular tuple,
punctuation, end of stream), and a second control parameter giving
a timestamp supplied by the stream source. Another example can
include a control parameter indicating whether the event represents
a positive tuple (insertion) or negative tuple (deletion). Still a
further example can include a pair of control parameters defining a
time interval over which the payload is valid.
[0081] The present implementations do not constrain the details of
the control parameters. Instead, some implementations require that
for stream R(RR), any prefix P of R can be reconstituted into a
linear sequence r1, r2, . . . , rm of snapshots over RR. Each
snapshot is just a finite relation over RR. It is useful to think
of how each additional event modifies the reconstitution. For
example, with the first alternative described above, the technique
can treat an event <sn, p> as adding a new snapshot to the
list that adds p to the previous snapshot. That is, it extends r1,
r2, . . . , rsn-1 to r1, r2, . . . , rsn-1, rsn, where rsn=rsn-1
.orgate.{p}. For the final alternative offered above, the technique
can view snapshots being indexed by timestamps, and an event <s,
e; p> as inserting p into any snapshot rtk in rt1, rt2, . . . ,
rtm where s.ltoreq.tk<e, plus possibly adding a snapshot re to
the end of the list if e>tm.
[0082] Some implementations can treat a stream R as representing a
potentially infinite list r1, r2, . . . that is the limit for the
reconstitution as the technique takes longer and longer prefixes of
R. This sequence can be thought of as the canonical history of R,
and consider the intent of applying a function f to R to be a
stream S whose canonical history is f(r1), f(r2), . . . . However,
there is no guarantee that R converges to a well defined canonical
history in the limit. New events might continue to update a
particular snapshot indefinitely. Thus, some implementations can
require that a stream make progress, meaning that for each snapshot
ri, there comes a point in the stream where ri no longer
changes.
[0083] For an event e in stream R, let P be the prefix of R up to
e, and P:e be P with the addition of e. Let the reconstitution of P
be r1, r2, . . . rm, and the reconstitution of P:e be s1, s2, . . .
, sn. Then define the stabilization point of e relative to R,
stable(e), as the maximum i such that:
r.sub.1=s.sub.1,r.sub.2=s.sub.2, . . . , r.sub.i=s.sub.i.
[0084] That is, e does not modify any of r1, r2, . . . , ri. It can
be considered that stream R progresses if for any index j, there is
a point after which for any event e, stable(e).gtoreq.j. At that
point, snapshot rj is stabilized--it will no longer change. If R
progresses, then every snapshot eventually stabilizes, and the
canonical history is well defined. In this case, the technique can
use R@i to denote snapshot ri in the canonical history of R.
[0085] Note that snapshots in a reconstitution or canonical history
need not be indexed by sequential integers. Any strictly increasing
sequence can work and some implementations can use timestamps in
the sequel.
[0086] The above discussion considers only progressing streams, so
that the canonical history is always defined. However, at least
some of the implementations can detect progress and then make use
of it. For some streams, this task is easy--for example, in the
first alternative offered above, if events are assumed to be in
order of increasing sequence number, then one approach entails
handling disordered streams (at least in the recursive part of the
query). This approach can utilize a form of punctuation to
explicitly mark progress. An event e in stream R constitutes a
punctuation at i if every event d after e in R has stable(d)>i.
Then it can be stated that stream R explicitly progresses if for
any index j, there is some event e in R that is punctuation at i,
where i>j. In some cases, such as ordered streams, "normal"
events can serve as punctuations. However, to handle disordered
streams, these implementations can utilize specific punctuation
events (flagged as such with a control parameter). It can be
assumed that stream operators produce explicitly progressing output
given explicitly progressing inputs. Thus, the stream operator can
propagate punctuation appropriately.
[0087] The above definition of FFP operator 212(6) can also have
speculative punctuation, which is similar to regular punctuation,
but does not actually guarantee stream progress. The following
discussion will refer to non-speculative punctuation as definite
punctuation for purposes of distinguishing the two. The discussion
below uses dp(i) to denote a definite punctuation event at index i,
and sp(i) to denote a speculative punctuation event at index i.
CEDR Implementation
[0088] The following discussion relates to an implementation that
leverages complex event detection and response (CEDR) technologies.
A brief introduction to CEDR technologies follows.
[0089] Conventional stream systems separate the notion of
application time and system time, where application time is the
clock that event providers use to timestamp tuples created by the
providers, and system time is the clock of the receiving stream
processor. The disclosed architecture, referred to throughout this
description as the CEDR system, further refines application time
into occurrence time and valid time, thereby providing a
tri-temporal model of occurrence time, valid time, and system
time.
[0090] A temporal stream model is used to characterize streams,
engine operator semantics, and consistency levels for handling
out-of-order or invalidated data. In one implementation, the
tritemporal model is employed. The temporal model employed herein,
however, is simplified in the sense of modeling valid time and
system time (occurrence time is omitted). For the purposes of this
description, this is sufficient, since only these two notions of
time are necessary to understand the disclosed speculative output
and consistency levels.
[0091] A CEDR data stream is modeled as a time varying relation.
For most operators, an interpretation is used that a data stream
models a series of updates on the history of a table, in contrast
to conventional work which models the physical table updates
themselves. In CEDR, a stream is modeled as an append-only
relation. Each tuple in the relation is an event, and has a logical
ID and a payload. Each tuple also has a validity interval, which
indicates the range of time when the payload is in the underlying
table. Similar to the convention in temporal databases, the
interval is closed at the beginning, and open at the end. Valid
start and end times are denoted as Vs and Ve, respectively. When an
event arrives at a CEDR stream processing system, its CEDR (or
system) time, denoted as C, is assigned by the system clock. Since,
in general, CEDR systems use different clocks from event providers,
valid time and CEDR time are not assumed to be comparable.
[0092] CEDR has the ability to introduce the history of new
payloads with insert events. Since these insert events model the
history of the associated payload, both valid start and valid end
times are provided. In addition, CEDR streams can also shrink the
lifetime of payloads using retraction events. These retractions can
reduce the associated valid end times, but are not permitted to
change the associated valid start times. Retraction events provide
new valid end times, and are uniquely associated with the payloads
whose lifetimes are being reduced. A full retraction is a
retraction where the new valid end time is equal to the valid start
time. Further details about CEDR technologies can be obtained from
U.S. patent application Ser. No. 11/937,118, filed on Nov. 8, 2007,
the contents of which are hereby incorporated by reference in their
entirety.
[0093] In some CEDR implementations, there are four control
parameters, EventType, VStart, VEnd, and VNewEnd. Snapshots in CEDR
are indexed by timestamps. The EventType can be Insert, Retract,
CTI or EOS. For Insert, VStart and VEnd indicate the range of
snapshot indices over which the payload is valid. That is, the
payload belongs to all snapshots in that range. Note that the
interval is closed at the beginning and open at the end, so the
payload is not in the snapshot associated with VEnd. For Retract,
all of VStart, VEnd and the payload should match a previously seen
event e, and VNewEnd, where VStart.ltoreq.VNewEnd<VEnd,
effectively specifies a new VEnd for e. A Retract removes its
payload from snapshots with indices equal or later than VNewEnd. In
terms of progress, if e is an Insert event, stable(e)=VStart. If e
is a Retract, then stable(e)=VNewEnd. A CTI (current time
increment) event is a (definite) punctuation at index VStart. EOS
stands for "End of Stream", and is only issued if a stream is
ceasing output.
Queries and Fixed Points
[0094] To accommodate the algebraic representation of queries with
FFP operators, the present techniques can view a relational query Q
over which a fixed point can be computed as having two relational
parameters, r and s, designated as Q(r, s). Parameter r can name an
external input (and can be generalized to a set of relations).
Parameter s can name the recursion parameter, which represents data
headed around the recursive loop. Some implementations can require
that schema(Q)=schema(s), and that Q is monotone on its second
argument. This can be represented as Q(r, s).OR right.Q(r, s
.orgate.s1) for any s1.
[0095] The technique can now define the fixed point of Q on r.
Let
Q.sup.0(r)=Q(r,O)
Q.sup.i(r)=Q(r,Q.sup.i-1(r))fori>0
[0096] The technique can specify that tuple t has level i if it
appears in Qi(r). The fixed point of Q on r is
Q*(r)=U.sub.0.ltoreq.iQ.sup.i(r).
[0097] One potential goal for recursive queries over a stream R is
to compute the fixed point of each snapshot in the canonical
history of R. That is, given progressing stream R and Query Q, it
can be desirable to produce a progressing stream S such that, for
every index i,
S@i=Q*(R@i).
[0098] An S that satisfies these conditions is called a fixed-point
stream for R under Q, and write S.epsilon.Q*(R). (This membership
is utilized because there could be many streams with this
property.)
[0099] As noted in the introduction, it can be desirable to avoid
certain kinds of divergent behavior in computing fixed points. The
need for finite answers and finite derivations are captured in the
following two definitions.
[0100] Definition: Query Q(r, s) is convergent if for each value of
r, there exists a k such that Qk(r)=Qk+1(r).
[0101] If Q(r, s) converges at k, then
Q*(r)=U.sub.0.ltoreq.i.ltoreq.kQ.sup.i(r).
[0102] This shows that the result of Q on any value of r is
finite.
[0103] Definition: Query Q(r, s) is strongly convergent if for each
value of r, there exists a k such that Qk(r)=O.
[0104] Note that strongly convergent implies convergent, and that
for a strongly convergent query Q, there is a maximum level (k)
that any tuple t in Q*(r) has, hence the number of derivations is
finite.
Operations
[0105] Expressing Q with appropriate algebraic operators can allow
FFP operators to be used with a target query Q(r, s). The algebraic
operators are appropriate if they express Q with algebraic
operators such that they behave appropriately with regard to
speculative punctuation. A streaming operator G can be considered
to be speculation-friendly if the following three conditions
hold.
[0106] First, G speculates correctly.
[0107] Second, G does not block on definite punctuation.
[0108] Third, G is forward moving.
[0109] These conditions are discussed below.
[0110] In relation to the first condition, G speculates correctly
if given a speculative punctuation sp(i) in one input stream, and
that every other input stream is explicitly progressing, G will
eventually emit speculative punctuation sp(j) where j.ltoreq.i.
Moreover, if it turns out that sp(i) actually holds (that is, G
receives no later event e with stable(e).ltoreq.i), then sp(j)
actually holds (G will emit no event d with stable(d).ltoreq.j).
Also, if G has previously emitted a definite punctuation dp(k),
then j.gtoreq.min(i, k). This last condition says that G doesn't
"back up" from previously emitted definite punctuation. In
practice, it will always turn out that i>k, so j>k.
[0111] In one instance, to speculate correctly, G will typically
track definite punctuation on its other inputs. In this
implementation i=j. However, an alternative implementation (termed
the probing approach) is described below where j is sometimes less
than i.
[0112] In relation to the second condition, where G does not block
on definite punctuation, G is presumed to produce explicitly
progressing output on explicitly progressing input. This method
further instructs operators to emit output in the absence of any
particular definite punctuation. Such a G must output the same
collection of non-punctuation events on any two input streams with
the same non-punctuation events. Any monotonic operator has a
non-blocking implementation. (Handling non-monotonic operators by
being able to revise previous outputs is discussed below under the
heading "FFP in CEDR").
[0113] The third condition where G is forward moving utilizes an
input event e. If input event e for G contributes to output event
d, then the technique specifies that stable(e).ltoreq.stable(d). In
practice, it is unlikely that an operator G could arbitrarily shift
events backward in time without violating the first condition.
The FFP Operator
[0114] Using the FFP operator to compute fixed points relative to a
query Q(r, s), utilizes an algebraic query tree T(O, Ir, Is) for Q.
O, Ir and Is are essentially "ports" of this query tree, where O
connects to an output stream, Ir connects to an external input
stream R, and Is will be for recursive input. The FFP operator can
also be viewed as having ports: FFP(I, OE, OR). Here I connects to
an input stream, OE connects to the external output stream, and OR
connects to the recursive output stream. The technique can apply
FFP to T and R to make the following connections:
R.fwdarw.I.sub.r
O.fwdarw.I
O.sub.R.fwdarw.I.sub.s
[0115] OE will connect either directly to a client, or to the input
of a downstream operator. This discussion denotes this arrangement
of operators by FFP(R, T). When FFP, T and R are connected in this
manner, a recursive loop is created that passes from OR to Is to O
to I.
[0116] FIG. 4 shows the recursive loop in this reachability query
as a dashed line 402. FIG. 4 retains the operators 212(1)-212(6)
and data streams 214(1)-214(2) introduced in relation to FIG. 2 and
these components will not be reintroduced here for sake of brevity.
Note that for this example, Q, and hence T, has two external input
streams, one for nodes and one for edges. Q can be thought of as a
conceptual entity. For computation purposes, Q will be represented
as a tree of operators, T. In FIG. 4, T can consist of the
operators 212(1)-212(5), but not 212(6), which is the FFP
operator.
[0117] In defining the FFP operator 212(6), this technique views
the FFP operator as operating in phases, iterating over segments of
its input separated by speculative punctuations. (These phases in
general will be different from the levels of recursion defined
earlier.) The discussion assumes that at startup, the FFP operator
212(6) emits a speculative punctuation sp(tmin) on OR at 404, where
tmin is known to be before the stable points of all events on all
external input streams.
[0118] A segment of input for FFP operator 212(6) is a maximal
sequence of events e1, e2, . . . , em, sp(t) received on I, where
none of the ei's is a speculative punctuation. In this
implementation, by maximality, e1 must either be the first event on
I, or be preceded immediately by a speculative punctuation. The
present techniques can allow that a segment can have e1, e2, . . .
, em be the empty list.
[0119] For each segment e1, e2, . . . , em, sp(t) that the FFP
operator 212(6) receives on I, it performs the following steps.
[0120] F1. Emit e1, e2, . . . , em, on output OE.
[0121] F2. Emit those events in e1, e2, . . . , em that are not
definite punctuations on output OR.
[0122] F3.a. If stable(e.sub.i)>t for 1.ltoreq.i.ltoreq.k, then
emit dp(t) on output O.sub.R, followed by sp(u) for some
u.gtoreq.t+c (for a fixed constant c).
[0123] F3.b. Otherwise, emit sp(t) on output O.sub.R.
[0124] The constant c can be chosen as the minimal possible time
interval, sometimes called a chronon. Note that FFP operator 212(6)
may only ever have one speculative punctuation circulating on the
recursive loop at a time. Its strategy is to keep circulating a
speculative punctuation ps(t) until it determines that the
punctuation is valid, then it converts it to a definite punctuation
and starts speculating at a later point. The next section will
present conditions under which such speculation must always
eventually succeed.
[0125] While this definition of FFP or FFP operator might seem to
indicate that it operates in a batch-oriented fashion, in fact, as
seen in the reachability example and the implementation in the "FFP
in CEDR" section, steps F1-F3 can be pipelined and run in a
continuous fashion. Hence the "Flying" in "Flying fixed-point"
operator.
Correctness of FFP
[0126] This section describes the results of the foundation
introduced above.
[0127] Theorem: Let T(O, Ir, Is) be a query tree for a strongly
convergent query Q(r, s). If T uses speculation-friendly operators
and R is an explicitly progressing stream, then FFP(R, T) outputs
an explicitly progressing stream S.epsilon.Q*(R).
[0128] A proof is provided below in two main parts. The first part
establishes that S is a fixed-point stream for R under Q. The
second part shows that S is explicitly progressing. This proof is
provided for discussion purposes in relation to specific
implementations. Other implementations can achieve recursive data
stream processing without relying on the absolute assertion
expressed in this proof.
[0129] That S is a fixed-point stream for R under Q does not rely
on the handling of speculative punctuations at all. Rather, it
follows from the fact that FFP sends all input back around the
recursive loop, that operators on that loop do not block on
definite punctuations, and that R is progressing. The proof of this
part is an induction on the level of recursion. Consider a specific
snapshot r=R@t in the canonical history of R. The general statement
is that FFP eventually receives (hence outputs to OE) all events
needed for Qm(r) for every m.
Basis Case
[0130] The basis case is that FFP receives Q0(r)=Q(r, O) on I. This
case holds since R will eventually progress past t and stabilize r.
Since T will have received all of O at this point, it will output
all of Q(r, O) to I. (There is no problem if T receives more data,
because Q is assumed monotone on its second input.)
Inductive Step
[0131] This case follows from the observation that if the FFP
operator has received all of Qk-1 (r) on its input I, it will emit
it on recursive output OR. Thus, T will eventually produce all
tuples in
Q(r,Qk-1(r))=Qk(r).
[0132] Since Q is strongly convergent, there is some j such that
Qj(r)=O. Thus once FFP has received all input up through Qj(r),
there will be no more output events for Q*(r), and the output of
FFP will progress past time t.
[0133] Demonstrating the explicit progress of S requires two
things. (1) Any dp(t) that FFP emits on OE must be correctly
placed. That is, no later event e will be emitted with
stable(e)<t. (2) For any index u, FFP will eventually emit a
definite punctuation tp(t) for some t.gtoreq.u.
[0134] For (1), it is noted that FFP will always see the end of a
segment (that is, the next speculative punctuation). After FFP
emits any events on OR in step F2, it will necessarily emit a
speculative punctuation on OR in step F3.a or F3.b. Because every
operator on the recursive loop is speculation-friendly, each must
eventually pass on the speculative punctuation until it gets back
to I. Now consider segment e1, e2, . . . , em, sp(t) that satisfies
the if-statement in step F3.a. When e1, e2, . . . , em are sent out
again on OR, any event d they will produce in the next segment will
have stable(d)>t, since all operators on the recursive loop are
forward moving. This situation will be true for all subsequent
segments, by similar reasoning. Thus the speculative punctuation
sp(t) was actually valid, and FFP can convert it safely to dp(t).
Since R is explicitly progressing, T will eventually produce a
definite punctuation dp(u) where u.gtoreq.t. That punctuation will
be correctly placed in the output of T by the properties of its
operators, and hence will be correctly placed in the output of
FFP.
[0135] For (2), it is noted that a speculative punctuation sp(t)
can only be recirculated a finite number of times by step F3.b
before step F3.a applies. Since the input of FFP progresses, as
shown in the first part of the proof, there must eventually be a
segment where e1, e2, . . . , em all have stable points after t.
Further, each time the technique uses step F3.a, it increases the
index for the speculative punctuation by at least c. Thus, the
technique must eventually speculate at some index v.gtoreq.u.
End of Proof.
[0136] The hypotheses in the above theorem are actually stronger
than they need be. Any operators in T that are not on the recursive
loop do not need to be speculation-friendly. They only need to
satisfy the condition that they emit explicitly progressing output
on explicitly progressing input.
FFP in CEDR
[0137] Until this section, the discussion of FFP operators is
framed in a way which may be applied to most streaming systems.
This section discusses how recursion can work in the CEDR
stream-processing system. Among the discussed topics are how
speculative CTIs fit into the CEDR event model, and how specific
operators respond to these new events. Also discussed is the
handling of speculative output, which is a native capability of the
CEDR event processing system, in recursive queries. Further, the
interaction between the CEDR style of windowing and recursion is
discussed. Finally, consequences in terms of the sharing of
computation between windows with shared events are discussed.
Speculative CTI Handling
[0138] In the CEDR event processing system, physical streams may
already contain definite punctuations called CTIs (current time
increment). These punctuations come with a timestamp t. When one of
these events is received by the listener, there is a guarantee that
all events which affect snapshots earlier than t have been
received. Operators use this guarantee to garbage collect (i.e.,
reclaim) state that will not affect future output. CEDR operators,
except "align", do not need these events to unblock output, since
they do not have to block in the first place. Rather, they can
produce speculative results incorporating all the received events,
and correct these results later if necessary using retractions.
[0139] Like a definite CTI, a speculative CTIs (specCTI) comes with
a timestamp t. Note that in order to handle specCTIs correctly,
each operator should guarantee that it is handling these events in
a speculative-friendly way. Recall our definition from the
"formalism" section.
[0140] G is speculation-friendly if the following three conditions
hold:
[0141] S1. G speculates correctly.
[0142] S2. G does not block on definite punctuation.
[0143] S3. G is forward moving.
[0144] Some implementations guarantee these requirements in CEDR by
treating the SpecCTI similarly to a definite CTI, except for two
things:
[0145] First, do not garbage collect based on speculative CTIs, as
the recursion might not be finished.
[0146] Second, the described strategy can require that SpecCTIs
loop through the recursion unchanged. This restriction may force
the SpecCTI to become lodged until another operator input catches
up or until a SpecCTI may be safely emitted for the requested
time.
[0147] S1 is easily upheld for unitary or unary operators. After
unblocking any necessary output, thus possibly producing
speculative output, the technique simply allows the SpecCTI
through. Binary operators are a bit trickier. Assuming that one
branch is in the recursive loop, the technique lodges the SpecCTI
in the binary operator until it receives, from the non-recursive
child, a definite CTI with timestamp greater than or equal to the
SpecCTI timestamp. This delay ensures that all input from the
non-recursive side that could influence the output states prior to
the SpecCTI has been absorbed and emitted by the operator before
emitting the SpecCTI. Since the technique assumes that all
non-recursive inputs are explicitly progressing, a time must exist
where the specCTI becomes dislodged and passes through. At this
time, since specCTIs are treated, for the purpose of producing
output, like definite CTIs, all speculative output up to that point
in time must have been emitted.
[0148] S2 is trivially upheld by the CEDR operators, none of which
block on definite punctuation except align. Rather, CEDR operators
speculate as an alternative to blocking.
[0149] S3 is also trivially upheld by all operators in the CEDR
algebra except AlterLifetime, which is the only operator in the
algebra which can emit an event that includes valid times outside
the range of the input event which generated it. Since
AlterLifetime is used for windowing in the employed algebra, the
technique could require that all windowing be done on inputs
outside the recursive loop.
[0150] One implementation of the FFP operator augments the CEDR
multicast operator to handle specCTIs. This implementation tracks
the "high water mark" of the stabilization point for all Insert and
Retract events, and uses this value to speculate with. Its handling
of specCTIs and other events follows the algorithm given in Section
3.4, except it performs steps F1-F3 on the fly. To test the
if-condition in step F3.a, it remembers the timestamp in the
currently circulating specCTI, and sets a flag if it sees an
earlier event before the specCTI returns. Thus, this implementation
uses a fixed amount of state.
[0151] An observant reader will note that rather than allowing
SpecCTIs to become lodged in an operator, some implementations can
immediately emit them with the latest timestamp that their other
inputs allow, which, in these cases, would be guaranteed to be less
than the original SpecCTI. Rather than using a scheme with a high
water mark, these techniques could instead initially emit a SpecCTI
with a timestamp of infinity, and then retry the timestamp that
comes back until the technique can emit a definite CTI. After
emitting the definite CTI, the technique could then emit another
SpecCTI at infinity, etc. This alternative approach is termed
"specCTI probing".
CEDR Speculative Output
[0152] The CEDR stream processing system uses operators that
inherently speculate very aggressively by issuing full or partial
retractions for previous events in the input stream. Using this
mechanism, operators are free to speculate as aggressively as--at
any given time--producing all output under the assumption that the
input received so far is all the input. Speculation may then be
throttled back using the Align operator, and permanence of output
may be forced by the finalize operator for the purpose of managing
state in the absence of frequent-enough CTIs. Some of the described
implementations of FFP can handle the Retract events that are
sometimes issued by speculative operators, by virtue of starting
with operator implementations that handle Retracts.
[0153] Note that this form of speculation also allows these
techniques to significantly increase the expressiveness of
recursive queries, which, using this form of speculation, allows
the recursive use of operators such as aggregation and
difference.
Windows and Incremental Evaluation
[0154] In CEDR, rather than associating windows with operators,
some techniques associate windows with data. More specifically,
some of these techniques can associate with every event, an
interval (as opposed to other systems, which use a single
timestamp). This interval is actually the time during which a
particular payload, associated with the event, is in the snapshots
being modeled by the stream. This treatment has the effect of
assigning payloads to windows, such that the valid time interval of
the event determines the output times during which a windowed
version of any operator includes the payload. The AlterLifetime
operator can be used to explicitly set these windows.
EXAMPLE II
Pattern Matching with NFA
[0155] This section explains how the user of FFP operators can
implement arbitrary NFAs, a common paradigm for implementing
pattern matching. As with the above examples, these techniques get
the ability to speculate, incrementally window, and handle
out-of-order inputs as a consequence of using existing
operators.
[0156] FIG. 5 shows the resulting plan in the form of query graph
510. In the present case, query graph 510 includes seven operators
512(1)-512(7). Operators 512(1) and 512(2) are join operators;
operator 512(3) is a project operator; operator 512(4) is a union
operator; operator 512(5) is a multicast operator; operator 512(6)
is another project operator, and operator 512(7) is an FFP
operator.
[0157] Two data streams serve as sample input to query graph 510;
the first input stream is in the form of state machine 514(1),
while the second input stream is in the form of symbols 514(2). An
example of input data of the state machine 514(1) is evidenced
generally at 516. An example of input data of the symbols 514(2) is
evidenced generally at 518. Output from the query graph 510 is
evidenced generally at 520.
[0158] Note that, rather than being compiled into the plan, the
state machine is given as a streaming input, and may, in theory,
change over time. Thus, the plan is actually a streaming program
for executing arbitrary, evolving automata.
[0159] For clarity, as with the above examples, the discussion
again assumes that the window is infinite, and explains the role of
the various operators 512(1)-512(7) with the given input. The
particular automata that are executed here searches for the pattern
AB*A. The query can output all discovered event sequences that
constitute partial and complete patterns, and their associated
states in the automata. The starting state is called S, and the
final state is called F. (Note that there may be multiple final
states, and that one could filter the output for final states if
desired.)
[0160] The state machine input 514(1) is described using a set of
transitions such that each transition absorbs an accompanying
input. The symbols input 514(2) is a description of the sequence in
which an attempt is made to find patterns. Each event has a
sequence number, and a symbol, which may match a symbol in the
automata transition table.
[0161] A detailed query description was provided above in relation
to the reachability example. For sake of brevity only a sketch of
query behavior is provided in the present discussion. The state
machine input 514(1) is loaded into a right join synopsis 522 of
the lower join 512(1). When input comes along the symbols input
514(2) to left synopsis 524, join 512(1) finds all transitions
which can be made using this symbol, and passes these transitions
to the join 512(2) above at left synopsis 526, which looks for
partial patterns which have ended in the starting state of one of
the activated transitions, and which sequentially precede the new
symbol. For all such matches, the technique has found a new
(partial or complete) pattern, which is output and recursively
inserted back into a right synopsis 528 of the upper join
512(2).
[0162] Along the left branch of the multicast above the symbols
input 514(2), the technique creates a seed start state on each
input symbol and recursively inserts it into the right join
synopsis 528 of the upper join 512(2).
[0163] The notion of progress used to bound the computation in this
example is that transitions can be followed along increasing
sequence numbers. The technique is therefore bounded in the number
of recursive steps at any given moment by the number of received
symbols, which at any given moment, is finite.
[0164] Note that in the example given above, the input sequence is:
`ABBA`. Given that the query returns partial and complete
discovered patterns, the technique should output the following
patterns and their associated end sequence IDs:
[0165] `A`:1, `AB`:2, `ABB`:3, `ABBA`:4, `A`:4
[0166] Note that there are actually 4 extra outputs in FIG. 5.
These outputs correspond to the 4 seed patterns introduced by the
left side of the multicast, and are regarded as patterns of length
0.
[0167] In summary, the above description offers systems and
techniques for processing recursive streaming queries. The
description further defines how query graphs utilized in the
processing can be updated to specific points in time even while the
recursive streaming query may remain ongoing. The above described
techniques/methods and systems can be implemented on any type of
networkable computing device(s) as should be recognized by the
skilled artisan.
EXEMPLARY METHODS
[0168] FIG. 6 illustrates a flowchart of a method or technique 600
that is consistent with at least some implementations of the
present concepts. The order in which the technique 600 is described
is not intended to be construed as a limitation, and any number of
the described blocks can be combined in any order to implement the
technique, or an alternate technique. Furthermore, the technique
can be implemented in any suitable hardware, software, firmware, or
combination thereof, such that a computing device can implement the
technique. In one case, the technique is stored on a
computer-readable storage media as a set of instructions such that
execution by a computing device causes the computing device to
perform the technique.
[0169] The technique processes a recursive streaming query through
a query graph at block 602. Query graphs consist of operators
connected to one another via streams. Non-limiting examples of
operators and potential arrangements of operators in a query graph
are detailed above in relation to FIGS. 2-5.
[0170] The technique detects when output produced by executing the
query graph advances to a specific point at block 604. One
implementation involves circulating speculative CTIs through a
recursive loop of the query graph to detect when the output has
advanced to the specific point. Examples of this and other
exemplary techniques are described above.
CONCLUSION
[0171] The above described concepts detail the surprising
conclusion that recursive streaming query plans, through the
introduction of a cycle in the query graph, is simple, highly
expressive, and practical. At least some of these concepts can
immediately benefit from all the capabilities of existing operators
such as incremental window evaluation, disorder tolerance, and
speculation.
[0172] The present concepts can be employed in implementations that
are sufficiently expressive to attack both graph-walking queries
and regular-expression pattern matching. In the case of pattern
matching, the associated query plan is actually linear in the
number of transitions of the finite automata which detects the
pattern, resulting in a highly efficient algorithm. Even further
expressiveness is achieved in CEDR by speculating when necessary to
ensure disorder tolerance. This allows operators such as
aggregation and difference to be used in recursive loops, which is
useful for expressing branch and bound execution strategies.
[0173] Detecting forward time progress is relatively
straightforward with the addition of speculative CTIs, which
function similarly to regular CTIs. The above discussion includes
two implementations; a blocking speculative-CT strategy based on
high water marks and a non-blocking version based on probing.
[0174] Although techniques, methods, devices, systems, etc.,
pertaining to recursive streaming query scenarios are described in
language specific to structural features and/or methodological
acts, it is to be understood that the subject matter defined in the
appended claims is not necessarily limited to the specific features
or acts described. Rather, the specific features and acts are
disclosed as exemplary forms of implementing the claimed methods,
devices, systems, etc.
* * * * *