U.S. patent application number 16/508214 was filed with the patent office on 2019-10-31 for systems and methods for storing and transferring message data.
The applicant listed for this patent is Satori Worldwide, LLC. Invention is credited to Igor Milyakov.
Application Number | 20190334850 16/508214 |
Document ID | / |
Family ID | 55700150 |
Filed Date | 2019-10-31 |
United States Patent
Application |
20190334850 |
Kind Code |
A1 |
Milyakov; Igor |
October 31, 2019 |
SYSTEMS AND METHODS FOR STORING AND TRANSFERRING MESSAGE DATA
Abstract
A method includes storing a plurality of blocks in a queue,
wherein each block stores one or more respective messages and is
associated with a respective time that the block was stored in the
queue. The method also includes allowing, by one or more computer
processors, messages to be read from inactive blocks having
associated storage times between a first time and a second time
that is older than the first time, and deleting from the queue one
or more inactive blocks having associated storage times older than
the second time.
Inventors: |
Milyakov; Igor; (Sunnyvale,
CA) |
|
Applicant: |
Name |
City |
State |
Country |
Type |
Satori Worldwide, LLC |
Palo Alto |
CA |
US |
|
|
Family ID: |
55700150 |
Appl. No.: |
16/508214 |
Filed: |
July 10, 2019 |
Related U.S. Patent Documents
|
|
|
|
|
|
Application
Number |
Filing Date |
Patent Number |
|
|
16020455 |
Jun 27, 2018 |
10389674 |
|
|
16508214 |
|
|
|
|
15815595 |
Nov 16, 2017 |
10038661 |
|
|
16020455 |
|
|
|
|
15291633 |
Oct 12, 2016 |
9843551 |
|
|
15815595 |
|
|
|
|
PCT/US2016/023164 |
Mar 18, 2016 |
|
|
|
15291633 |
|
|
|
|
15063390 |
Mar 7, 2016 |
9407593 |
|
|
PCT/US2016/023164 |
|
|
|
|
14879689 |
Oct 9, 2015 |
9319365 |
|
|
15063390 |
|
|
|
|
Current U.S.
Class: |
1/1 |
Current CPC
Class: |
H04L 51/00 20130101;
H04L 51/16 20130101; H04L 69/28 20130101; H04L 51/34 20130101; H04L
69/10 20130101; H04L 29/08072 20130101; G06F 12/0866 20130101; H04L
65/403 20130101; H04L 51/28 20130101; H04L 67/26 20130101; H04L
29/06 20130101; H04L 51/26 20130101; H04L 51/10 20130101 |
International
Class: |
H04L 12/58 20060101
H04L012/58; H04L 29/08 20060101 H04L029/08; H04L 29/06 20060101
H04L029/06 |
Claims
1. A method, comprising: storing a plurality of blocks in a queue,
wherein each block stores one or more respective messages and is
associated with a respective time that the block was stored in the
queue; allowing, by one or more computer processors, messages to be
read from inactive blocks having associated storage times between a
first time and a second time that is older than the first time; and
deleting from the queue one or more inactive blocks having
associated storage times older than the second time.
2. The method of claim 1, wherein the storage times get older from
a block designating a tail of the queue to a block designating a
head of the queue.
3. The method of claim 2, wherein new blocks are stored at the tail
of the queue.
4. The method of claim 1, wherein an active portion of the queue is
associated with storage times newer than the first time, wherein an
inactive portion of the queue is associated with storage times
between the first time and second time, and wherein an inaccessible
portion of the queue is associated with storage times older than
the second time.
5. The method of claim 1, wherein each block is assigned a
time-to-reside, and wherein a block is designated as inactive when
the time-to-reside of the block expires.
6. The method of claim 1, wherein allowing messages to be read from
inactive blocks having associated storage times between the first
time and the second time that is older than the first time
comprises: providing additional time to read messages from a block
after a time-to-reside for the block expires.
7. The method of claim 1, wherein each block is assigned a
time-to-live, and wherein a block is deleted when the time-to-live
of the block expires.
8. The method of claim 1, wherein each block is assigned a
time-to-live, wherein each block is assigned a time-to-reside, and
wherein the time-to-live of a block is greater than the
time-to-reside of the block.
9. The method of claim 1, wherein the plurality of blocks are
stored in the queue according to an order.
10. The method of claim 1, comprising: sending read messages to a
respective subscriber.
11. A system, comprising: one or more computer processors
programmed to perform operations to: store a plurality of blocks in
a queue, wherein each block stores one or more respective messages
and is associated with a respective time that the block was stored
in the queue; allow messages to be read from inactive blocks having
associated storage times between a first time and a second time
that is older than the first time; and delete from the queue one or
more inactive blocks having associated storage times older than the
second time.
12. The system of claim 11, wherein the storage times get older
from a block designating a tail of the queue to a block designating
a head of the queue.
13. The system of claim 12, wherein new blocks are stored at the
tail of the queue.
14. The system of claim 11, wherein an active portion of the queue
is associated with storage times newer than the first time, wherein
an inactive portion of the queue is associated with storage times
between the first time and second time, and wherein an inaccessible
portion of the queue is associated with storage times older than
the second time.
15. The system of claim 11, wherein each block is assigned a
time-to-reside, and wherein a block is designated as inactive when
the time-to-reside of the block expires.
16. The system of claim 11, wherein to allow messages to be read
from inactive blocks having associated storage times between the
first time and the second time that is older than the first time
the one or more computer processors are further to: provide
additional time to read messages from a block after a
time-to-reside for the block expires.
17. The system of claim 11, wherein each block is assigned a
time-to-live, and wherein a block is deleted when the time-to-live
of the block expires.
18. The system of claim 11, wherein each block is assigned a
time-to-live, wherein each block is assigned a time-to-reside, and
wherein the time-to-live of a block is greater than the
time-to-reside of the block.
19. The system of claim 11, wherein the plurality of blocks are
stored in the queue according to an order.
20. A non-transitory computer-readable medium having instructions
stored thereon that, when executed by one or more computer
processors, cause the one or more computer processors to: store a
plurality of blocks in a queue, wherein each block stores one or
more respective messages and is associated with a respective time
that the block was stored in the queue; allow, by the one or more
computer processors, messages to be read from inactive blocks
having associated storage times between a first time and a second
time that is older than the first time; and delete from the queue
one or more inactive blocks having associated storage times older
than the second time.
Description
CROSS REFERENCE TO RELATED APPLICATIONS
[0001] This application is a continuation of U.S. application Ser.
No. 16/020,455, filed Jun. 27, 2018, which is a continuation of
U.S. application Ser. No. 15/815,595, filed Nov. 16, 2017 (now U.S.
Pat. No. 10,038,661, issued Jul. 31, 2018), which is a continuation
of U.S. application Ser. No. 15/291,633, filed Oct. 12, 2016 (now
U.S. Pat. No. 9,843,551, issued Dec. 12, 2017), which is a
continuation of International Application No. PCT/US2016/023164,
filed Mar. 18, 2016, which is a continuation of U.S. application
Ser. No. 15/063,390, filed Mar. 7, 2016 (now U.S. Pat. No.
9,407,593, issued Aug. 2, 2016), which is a continuation of U.S.
application Ser. No. 14/879,689, filed Oct. 9, 2015 (now U.S. Pat.
No. 9,319,365, issued Apr. 19, 2016), the entire contents of each
of which are hereby incorporated by reference.
BACKGROUND
[0002] This specification relates to a data communication system
and, in particular, a system that implements real-time, scalable
publish-subscribe messaging.
[0003] The publish-subscribe pattern (or "PubSub") is a data
communication messaging arrangement implemented by software systems
where so-called publishers publish messages to topics and so-called
subscribers receive the messages pertaining to particular topics to
which they are subscribed. There can be one or more publishers per
topic and publishers generally have no knowledge of what
subscribers, if any, will receive the published messages. Some
PubSub systems do not cache messages or have small caches meaning
that subscribers may not receive messages that were published
before the time of subscription to a particular topic. PubSub
systems can be susceptible to performance instability during surges
of message publications or as the number of subscribers to a
particular topic increases.
SUMMARY
[0004] One aspect of the subject matter described in this
specification relates to a method that includes storing a plurality
of blocks in a queue, wherein each block stores one or more
respective messages and is associated with a respective time that
the block was stored in the queue. The method also includes
allowing, by one or more computer processors, messages to be read
from inactive blocks having associated storage times between a
first time and a second time that is older than the first time, and
deleting from the queue one or more inactive blocks having
associated storage times older than the second time.
[0005] The details of one or more embodiments of the subject matter
described in this specification are set forth in the accompanying
drawings and the description below. Other features, aspects, and
advantages of the subject matter will become apparent from the
description, the drawings, and the claims.
BRIEF DESCRIPTION OF THE DRAWINGS
[0006] FIG. 1A illustrates an example system that supports the
PubSub communication pattern.
[0007] FIG. 1B illustrates functional layers of software on an
example client device.
[0008] FIG. 2 is a diagram of an example messaging system.
[0009] FIG. 3A is a data flow diagram of an example method for
writing data to a streamlet.
[0010] FIG. 3B is a data flow diagram of an example method for
reading data from a streamlet.
[0011] FIG. 4 is a schematic diagram of an example fast buffer
system for handling message data.
[0012] FIG. 5 is a schematic diagram of an example PubSub system
that incorporates the fast buffer system of FIG. 4.
[0013] FIGS. 6A, 6B, and 6C are schematic diagrams of an example
queue for storing blocks of messages.
[0014] FIG. 7 is a flowchart of an example method of storing and
accessing messages in a queue.
DETAILED DESCRIPTION
[0015] In general, there are two ways of dispatching messages in a
PubSub system. One way is to give the publisher information about
the subscribers and make the publisher responsible for delivery of
every message to every recipient. This method has serious problems
with scaling, since there is a linear (e.g., O(N)) dependency of
work to be done for a single message delivery, based on the number
of subscribers N. A second approach is to store everything in a
common storage (e.g., using ERLANG ETS) and let every consumer or
subscriber retrieve messages from the common storage independently.
In this case, there is no direct O(N) dependency for a publisher,
but a separate notification system is used to inform the consumers
about new messages. This second method also uses many mutually
exclusive locks to ensure modifications of the common storage are
safe across all of the users.
[0016] In certain examples, the systems and methods described
herein are a variation of the second approach and incorporate
multiple optimization. The systems and methods preferably have no
mutexes (e.g., compare and swap operations) in a data manipulation
section, so there are no locks that require a publisher or
subscriber to wait for another party. In general, each operation is
processed independently with respect to inter-core cache coherency.
The data is preferably read by blocks of about 64 kB, which
provides faster performance over the first approach, described
above. In some instances, there is an additional optimization for
subscriber cooperation, which prevents frequently published
channels from dominating over infrequently published channels. The
systems and methods may also use an optimized notification
mechanism with channel speed specific approaches that keep
latencies low, in channels of any speed.
[0017] Compared to previous approaches, the systems and methods
described herein have many benefits. For example, there is little
or no visible dependency between publisher performance and number
of subscribers. Also, data is advantageously aggregated and
delivered in chunks, which reduces handling overhead. Chunk size
may be dynamic to keep latencies low, and there are preferably no
locks between publisher and subscriber and between subscribers. The
notification mechanism also has channel speed specific algorithms
that provide a better balance between latencies and CPU
utilization. Further, the systems and methods have automatic memory
utilization logic that frees memory according to requirements, with
no locks involved. In one test, a prior approach, based on the
second method described above, delivered up to 5 million 100-byte
messages per second on a single physical server, while the improved
systems and methods described herein delivered 830 million 100-byte
messages per second, with the same single physical server.
[0018] FIG. 1A illustrates an example system 100 that supports the
PubSub communication pattern. Publishers (e.g., Publishers 1-N) can
publish messages to named channels (e.g., Channels I-N) by way of
the system 100 (also referred to as "messaging system" hereafter).
A message can include any type of information including one or more
of the following: text, image content, sound content, multimedia
content, video content, binary data, and so on. Other types of
message data are possible. Subscribers (e.g., Subscribers 1-N) can
subscribe to a named channel using the system 100 and start
receiving messages which occur after the subscription request or
from a given position (e.g., a message number or time offset). A
client can be both a publisher and a subscriber.
[0019] Depending on the configuration, a PubSub system can be
categorized as follows: [0020] One to One (1:1). In this
configuration there is one publisher and one subscriber per
channel. An example use case is private messaging. [0021] One to
Many (1:N). In this configuration there is one publisher and
multiple subscribers per channel. Example use cases are
broadcasting messages (e.g., stock prices). [0022] Many to Many
(M:N). In this configuration there are multiple publishers
publishing to a single channel. The messages are then delivered to
multiple subscribers. Example use cases are map applications.
[0023] There is no separate operation needed to create a named
channel. A channel is created implicitly when the channel is
subscribed to or when a message is published to the channel. In
some implementations, channel names can be qualified by a name
space. A name space includes one or more channel names. Different
name spaces can have the same channel names without causing
ambiguity. The name space name can be a prefix of a channel name
where the name space and channel name are separated by a dot or
other suitable separator. In some implementations, name spaces can
be used when specifying channel authorization settings. For
instance, the system 100 may have app1.foo and
app1.system.notifications channels where "app1" is the name of the
name space. The system can allow clients to subscribe and publish
to the app1.foo channel. However, clients can subscribe, but not
publish, to the app1.system.notifications channel.
[0024] FIG. 1B illustrates functional layers of software on an
example client device. A client device (e.g., client 102) is a data
processing apparatus such as, for example, a personal computer, a
laptop computer, a tablet computer, a smart phone, a smart watch,
or a server computer. Other types of client devices are possible.
The application layer 104 includes the end-user application(s) that
will integrate with the system 100. The messaging layer 106 is a
programmatic interface for the application layer 104 to utilize
services of the system 100 such as channel subscription, message
publication, message retrieval, user authentication, and user
authorization. In some implementations, the messages passed to and
from the messaging layer 106 are encoded as JavaScript Object
Notation (JSON) objects. Other message encoding schemes are
possible.
[0025] The operating system layer 108 includes the operating system
software on the client 102. In various implementations, messages
can be sent and received to/from the system 100 using persistent or
non-persistent connections. Persistent connections can be created
using, for example, network sockets. A transport protocol such as
TCP/IP layer 112 implements the Transport Control Protocol/Internet
Protocol communication with the system 100 that can be used by the
messaging layer 106 to send messages over connections to the system
100. Other communication protocols are possible including, for
example, User Datagram Protocol (UDP). In further implementations,
an optional Transport Layer Security (TLS) layer 110 can be
employed to ensure the confidentiality of the messages.
[0026] FIG. 2 is a diagram of an example messaging system 100. The
messaging system 100 provides functionality for implementing PubSub
communication patterns. The messaging system 100 includes software
components and storage that can be deployed at one or more data
centers 122 in one or more geographic locations, for example. The
messaging system 100 includes multiplexer (MX) nodes 202, 204 and
206, queue (Q) nodes 208, 210 and 212, one or more channel manager
nodes (e.g., channel managers 214, 215), and optionally one or more
cache (C) nodes 220 and 222. Each node can execute in a virtual
machine or on a physical machine (e.g., a data processing
apparatus). Each MX node serves as a termination point for one or
more publisher and/or subscriber connections through the external
network 216. The internal communication among MX nodes, Q nodes, C
nodes, and the channel managers, is conducted over an internal
network 218. For example, MX node 204 can be the terminus of a
subscriber connection from client 102. Each Q node buffers channel
data for consumption by the MX nodes. An ordered sequence of
messages published to a channel is a logical channel stream. For
example, if three clients publish messages to a given channel, the
combined messages published by the clients include a channel
stream. Messages can be ordered in a channel stream. For example,
the messages may be ordered by time of publication by the client,
by time of receipt by an MX node, or by time of receipt by a Q
node. Other ways for ordering messages in a channel stream are
possible. In the case where more than one message would be assigned
to the same position in the order, one of the messages can be
chosen (e.g., randomly) to have a later sequence in the order. Each
channel manager node is responsible for managing Q node load by
splitting channel streams into streamlets, as will be discussed in
further detail below. The optional C nodes provide caching and load
removal from the Q nodes.
[0027] In the example messaging system 100, one or more client
devices (publishers and/or subscribers) establish respective
persistent connections (e.g., TCP connections) to an MX node (e.g.,
MX nodes 202, 204 and/or 206). The MX node serves as a termination
point for these connections. For instance, external messages (e.g.,
between respective client devices and the MX node) carried by these
connections can be encoded based on an external protocol (e.g.,
JSON). The MX node terminates the external protocol and translates
the external messages to internal communication, and vice versa.
The MX nodes 202, 204 and 206 publish and subscribe to streamlets
on behalf of clients. In this way, an MX node can multiplex and
merge requests of client devices subscribing for or publishing to
the same channel, thus representing multiple client devices as one,
instead of one by one.
[0028] In the example messaging system 200, a Q node (e.g., Q nodes
208, 210 and/or 212) can store one or more streamlets of one or
more channel streams. A streamlet is a data buffer for a portion of
a channel stream. A streamlet will close to writing when its
storage is full. A streamlet will close to reading and writing and
be de-allocated when its time-to-live (TTL) has expired. For
example, a streamlet can have a maximum size of 1 MB and a TTL of
three minutes. Different channels can have streamlets limited by
different sizes and/or by different TTLs. For example, streamlets
in one channel can exist for up to three minutes, while streamlets
in another channel can exist for up to 10 minutes. In various
implementations, a streamlet corresponds to a computing process
running on a Q node. The computing process can be terminated after
the streamlet's TTL has expired, thus freeing up computing
resources (for the streamlet) back to the Q node, for example.
[0029] When receiving a publish request from client 102, an MX node
(e.g., MX node 204) makes a request to a channel manager (e.g.,
channel manager 214) to grant access to a streamlet to write the
message being published. However, if the MX node has already been
granted write access to a streamlet for the channel (and the
channel has not been closed to writing), the MX node can write the
message to that streamlet without having to request a grant to
access the streamlet. Once a message is written to a streamlet for
a channel, the message can be read by MX nodes and provided to
subscribers of that channel.
[0030] Similarly, when receiving a channel subscription request
from a client device, an MX node makes a request to a channel
manager to grant access to a streamlet for the channel from which
messages are read. If the MX node has already been granted read
access to a streamlet for the channel (and the channel's TTL has
not been closed to reading) the MX node can read messages from the
streamlet without having to request a grant to access the
streamlet. The read messages can then be forwarded to client
devices that have subscribed to the channel. In various
implementations, messages read from streamlets are cached by MX
nodes so that MX nodes can reduce the number of times messages are
read from the streamlets.
[0031] In implementations, an MX node can request a grant from the
channel manager that allows the MX node to store a block of data
into a streamlet on a particular Q node that stores streamlets of a
particular channel. Example streamlet grant request and grant data
structures are as follows:
TABLE-US-00001 StreamletGrantRequest = { "channel": string( )
"mode": "read" | "write" "position": 0 } StreamletGrantResponse = {
"streamlet-id": "abcdef82734987", "limit-size": 2000000, # 2
megabytes max "limit-msgs": 5000, # 5 thousand messages max
"limit-life": 4000, # the grant is valid for 4 seconds "q-node":
string( ) "position": 0 }
[0032] The StreamletGrantRequest data structure stores the name of
the stream channel and a mode indicating whether the MX node
intends on reading from or writing to the streamlet. The MX node
sends the StreamletGrantRequest to a channel manager node. The
channel manager node, in response, sends the MX node a
StreamletGrantResponse data structure. The StreamletGrantResponse
contains an identifier of the streamlet (streamlet-id), the maximum
size of the streamlet (limit-size), the maximum number of messages
that the streamlet can store (limit-msgs), the TTL (limit-life),
and an identifier of a Q node (q-node) on which the streamlet
resides. The StreamletGrantRequest and StreamletGrantResponse can
also have a position field that points to a position in a streamlet
(or a position in a channel) for reading from the streamlet.
[0033] A grant becomes invalid once the streamlet has closed. For
example, a streamlet is closed to reading and writing once the
streamlet's TTL has expired or when the streamlet's storage is
full. When a grant becomes invalid, the MX node can request a new
grant from the channel manager to read from or write to a
streamlet. The new grant will reference a different streamlet and
will refer to the same or a different Q node depending on where the
new streamlet resides.
[0034] FIG. 3A is a data flow diagram of an example method 300 for
writing data to a streamlet in various embodiments. In FIG. 3A,
when an MX node's (e.g., MX node 202) request to write to a
streamlet is granted by a channel manager (e.g., channel manager
214), the MX node 202 establishes a Transmission Control Protocol
(TCP) connection with the Q node (e.g., Q node 208) identified in
the grant response received from the channel manager (302). A
streamlet can be written concurrently by multiple write grants
(e.g., for messages published by multiple publishers). Other types
of connection protocols between the MX node 202 and the Q node 208
are possible.
[0035] The MX node 202 sends a prepare-publish message with an
identifier of a streamlet that the MX node 202 wants to write to
the Q node 208 (304). The streamlet identifier and Q node
identifier can be provided by the channel manager in the write
grant as described earlier. The Q node 202 provides the message to
a handler 301 (e.g., a computing process running on the Q node 208)
for the identified streamlet (306). The handler 301 can send an
acknowledgment to the MX node 202 (308). After receiving the
acknowledgement, the MX node 202 starts writing (publishing)
messages (e.g., 310, 312, 314, and 318) to the handler 301, which
stores the received data in the identified streamlet. The handler
301 can also send acknowledgements (316, 320) to the MX node 202
for the received data. In some implementations, acknowledgements
can be piggy-backed or cumulative. For example, the handler 301 can
send an acknowledgement to the MX node 202 for every predetermined
amount of data received (e.g., for every 100 messages received) or
for every predetermined time period (e.g., for every one
millisecond). Other acknowledgement scheduling algorithms, such as
Nagle's algorithm, can be used.
[0036] If the streamlet can no longer accept published data (e.g.,
the streamlet is full), the handler 301 sends a
Negative-Acknowledgement (NAK) message (330) indicating a problem,
following by an EOF (end-of-file) message (332). In this way, the
handler 301 closes the association with the MX node 202 for the
publish grant. The MX node 202 can request a write grant for
another streamlet from a channel manager if the MX node 202 has
additional messages to store.
[0037] FIG. 3B is a data flow diagram of an example method 350 for
reading data from a streamlet in various embodiments. In FIG. 3B,
an MX node (e.g., MX node 204) sends a request to a channel manager
(e.g., channel manager 214) for reading a particular channel
starting from a particular message or time offset in the channel.
The channel manager returns a read grant to the MX node 204
including an identifier of a streamlet containing the particular
message, a position in the streamlet corresponding to the
particular message, and an identifier ofa Q node (e.g., Q node 208)
containing the particular streamlet. The MX node 204 then
establishes a TCP connection with the Q node 208 (352). Other types
of connection protocols between the MX node 204 and the Q node 208
are possible.
[0038] The MX node 204 then sends a subscribe message (354) to the
Q node 208 with the identifier of the streamlet in the Q node and
the position in the streamlet from which the MX node 204 wants to
read (356). The Q node 208 provides the subscribe message to a
handler 351 of the streamlet (356). The handler 351 can send an
acknowledgement to the MX node 204 (358). The handler 351 sends
messages (360, 364, 366), starting at the position in the
streamlet, to the MX node 204. In some implementations, the handler
351 can send all of the messages in the streamlet to the MX node
204. After sending the last message in a particular streamlet, the
handler 351 can send a notification of the last message to the MX
node 204. The MX node 204 can send to the channel manager another
request for another streamlet containing a next message in the
particular channel.
[0039] If the particular streamlet is closed (e.g., after its TTL
has expired), the handler 351 can send an unsubscribe message
(390), followed by an EOF message (392), to close the association
with the MX node 204 for the read grant. The MX node 204 can close
the association with the handler 351 when the MX node 204 moves to
another streamlet for messages in the particular channel (e.g., as
instructed by the channel manager). The MX node 204 can also close
the association with the handler 351 if the MX node 204 receives an
unsubscribe message from a corresponding client device.
[0040] In various implementations, a streamlet can be written into
and read from at the same time instance. For example, there can be
a valid read grant and a valid write grant at the same time
instance. In various implementations, a streamlet can be read
concurrently by multiple read grants (e.g., for channels subscribed
to by multiple publisher clients). The handler of the streamlet can
order messages from concurrent write grants based on, for example,
time-of-arrival, and store the messages based on the order. In this
way, messages published to a channel from multiple publisher
clients can be serialized and stored in a streamlet of the
channel.
[0041] In the messaging system 100, one or more C nodes (e.g., C
node 220) can offload data transfers from one or more Q nodes. For
example, if there are multiple MX nodes requesting streamlets from
Q nodes for a particular channel, the streamlets can be offloaded
and cached in one or more C nodes. The MX nodes (e.g., as
instructed by read grants from a channel manager) can read the
streamlets from the C nodes instead.
[0042] As described above, messages for a channel in the messaging
system 100 are ordered in a channel stream. A channel manager
(e.g., channel manager 214) splits the channel stream into
fixed-sized streamlets that each reside on a respective Q node. In
this way, storing a channel stream can be shared among many Q
nodes; each Q node stores a portion (one or more streamlets) of the
channel stream. More particularly, a streamlet can be stored in,
for example, registers and/or dynamic memory elements associated
with a computing process on a Q node, thus avoiding the need to
access persistent, slower storage devices such as hard disks. This
results in faster message access. The channel manager can also
balance loads among Q nodes in the messaging system 100 by
monitoring respective workloads of the Q nodes and allocating
streamlets in a way that avoids overloading any one Q node.
[0043] In various implementations, a channel manager maintains a
list identifying each active streamlet, the respective Q node on
which the streamlet resides, an identification of the position of
the first message in the streamlet, and whether the streamlet is
closed for writing. In some implementations, Q nodes notify the
channel manager and any MX nodes that are publishing to a streamlet
that the streamlet is closed due to being full or when the
streamlet's TTL has expired. When a streamlet is closed, the
streamlet remains on the channel manager's list of active
streamlets until the streamlet's TTL has expired so that MX nodes
can continue to retrieve messages from the streamlet.
[0044] When an MX node requests a write grant for a given channel
and there is not a streamlet for the channel that can be written
to, the channel manager allocates a new streamlet on one of the Q
nodes and returns the identity of the streamlet and the Q node in
the StreamletGrantResponse to the MX node. Otherwise, the channel
manager returns the identity of the currently open for writing
streamlet and corresponding Q node in the StreamletGrantResponse to
the MX node. MX nodes can publish messages to the streamlet until
the streamlet is full or the streamlet's TTL has expired, after
which a new streamlet can be allocated by the channel manager.
[0045] When an MX node requests a read grant for a given channel
and there is not a streamlet for the channel that can be read from,
the channel manager allocates a new streamlet on one of the Q nodes
and returns the identity of the streamlet and the Q node in the
StreamletGrantResponse to the MX node. Otherwise, the channel
manager returns the identity of the streamlet and Q node that
contains the position from which the MX node wishes to read to the
MX node. The Q node can then begin sending messages to the MX node
from the streamlet beginning at the specified position until there
are no more messages in the streamlet to send. When a new message
is published to a streamlet, MX nodes that have subscribed to that
streamlet will receive the new message. If a streamlet's TTL has
expired, the handler process 351 sends an EOF message (392) to any
MX nodes that are subscribed to the streamlet.
[0046] As described earlier in reference to FIG. 2, the messaging
system 100 can include multiple channel managers (e.g., channel
managers 214, 215). Multiple channel managers provide resiliency
and prevent single point of failure. For instance, one channel
manager can replicate lists of streamlets and current grants it
maintains to another "slave" channel manager. As for another
example, multiple channel managers can coordinate operations
between them using distributed consensus protocols, such as, for
example, Paxos or Raft protocols.
[0047] Referring to FIG. 4, in certain implementations, the system
100 utilizes a fast buffer system 400 to receive, store, and
deliver message data. The fast buffer system 400 includes a
plurality of multiple queue publishers 402, a plurality of channel
queues 404, and a plurality of multiple queue consumers 406. In
general, the multiple queue publishers 402 receive messages (e.g.,
from one or more Q nodes or MX nodes) and provide the messages to
the channel queues 404. Each multiple queue publisher 402 may
provide messages to a single channel queue 404 or to more than one
channel queue 404, as shown in FIG. 4. Each channel queue 404 is
configured to receive messages from only one multiple queue
publisher 402. Each queue publisher 402 and queue consumer 406 is a
software component that comprises one or more threads of execution
or processes.
[0048] In general, each channel queue 404 stores messages for a
given channel in the system 100. The channel queues 404 keep track
of the order in which messages are received from the multiple queue
publishers 402. Each channel queue 404 includes or utilizes a
memory device to store messages. A channel queue 404 may have its
own memory device and/or may look up stored information in one or
more other memory devices used by other system components, such as
other channel queues 404 or Q nodes. A channel queue 404 uses
memory pointers to look up stored information in a memory
device.
[0049] The multiple queue consumers 406 retrieve messages from the
channel queues 404 and deliver or publish the messages to
downstream recipients (e.g., to one or more MX nodes or subscriber
devices). For example, when a new message is received by a multiple
queue publisher 402 and delivered to a channel queue 404, a
multiple queue consumer 406 may retrieve the new message from the
channel queue 404. Messages are always delivered from a channel
queue 404 to a multiple queue consumer 406 in the order in which
the messages were received by the multiple queue publisher 402.
[0050] In some instances, the multiple queue consumers 406 may be
configured to sleep when no new messages are available to be
retrieved. The multiple queue consumers 406 may be woken up (e.g.,
by a multiple queue publisher 402) when a new message is available.
For example, a notifier component of a multiple queue publisher 402
may wake up a corresponding multiple queue consumer 406 when new
message data arrives. Allowing the multiple queue consumers 406 to
sleep when no messages are available for retrieval allows system
resources to be better utilized. For example, processor time can be
allocated to other system components and/or processes when the
multiple queue consumers 406 are not needed and sleeping. As
depicted in FIG. 4, a multiple queue consumer 406 can retrieve
messages from a single channel queue 404 or from more than one
channel queue 404. In preferred implementations, a multiple queue
consumer 406 is configured to receive messages from multiple
channel queues 404 at the same time.
[0051] Components of the fast buffer system 400 are preferably
ephemeral and/or can be created and/or destroyed, as needed,
according to a flow of message data into and out of the fast buffer
system 400. In one example, when a new subscriber to a new channel
is connected to an MX node, the MX node automatically creates a new
multiple queue publisher 402 and a new channel queue 404 for
receiving channel data from a Q node. When no additional message
data is available for the new channel, the new multiple queue
publisher 402 and/or the new channel queue 404 may be put to sleep,
to allow system resources to be used elsewhere. In another example,
a multiple queue publisher 402 and/or a multiple channel queue 404
(e.g., in an MX node) may be deleted when no associated multiple
queue consumer 406 has existed or been active for more than a
threshold period of time. To delete the multiple queue publisher
402, a process associated with the multiple queue publisher 402 may
be terminated. Memory allocated to a deleted channel queue 404 may
be reallocated to other channel queues 404 or other system
components or freed, as needed.
[0052] In certain implementations, each multiple queue publisher
402 and/or multiple queue consumer 406 is associated with a unique
process identifier (PID). The PIDs allow the system 100 to monitor
the status of the multiple queue publishers 402, the channel queues
404, and the multiple queue consumers 406. The PIDs also allow the
system 100 to adjust the priority of (or to terminate) one or more
of the multiple queue publishers 402, the channel queues 404, and
the multiple queue consumers 406, as needed. Alternatively or
additionally, the PIDs may allow the system 100 to transfer or
adopt a channel queue 404 from one publisher to another
publisher.
[0053] In some examples, the system 100 uses a notification table
to find components that are waiting for new data. For example, a
multiple queue consumer can add itself to the notification table
when it reaches the end of available data and wants to be notified
when new data is added or becomes available for reading. When the
new data becomes available, a notifier component can use the
notification table to obtain the PIDs of multiple queue consumers
that are waiting for the new data. The notifier component may then
send notifications to inform the multiple queue consumers that the
new data is available. Likewise, when a multiple queue publisher
402 adds message data to a multiple channel queue 404, a notifier
component of the multiple queue publisher 402 may update the
notification table to indicate the message data has been added to
the multiple channel queue 404. In some examples, the channel queue
404 has or utilizes an additional per channel notification table
where a multiple queue consumer 406 can add its PID before going to
sleep, so that the multiple queue consumer 406 may be woken up when
a new message is available for retrieval. In one implementation, if
the notification table indicates a multiple queue consumer 406 is
sleeping and subscribed to a channel where a new message is
available for retrieval, a notifier (e.g., an auxiliary component
of the multiple queue publisher 402) can wake up that multiple
queue consumer 406. The notification table can then be updated to
indicate the multiple queue consumer 406 is awake. When no new
messages are available for the multiple queue consumer 406 to
retrieve, the multiple queue consumer 406 can again be put to
sleep, after its PID is added to the notification table.
[0054] FIG. 5 shows an example PubSub system 500 in which the fast
buffer system 400 is incorporated into Q nodes and/or MX nodes.
Messages from publishers 502 are received by input MX nodes 504
which transfer the messages to a Q node 506 for storage. The
messages are then retrieved from the Q node 506 by output MX nodes
508 which transfer the messages to subscribers 510. In the depicted
example, a separate fast buffer system 400 is incorporated into the
Q node 506 and the output MX nodes 508. The Q node 506 includes or
utilizes its own fast buffer system 400 to receive messages from
the input MX nodes 504, store the messages, and transmit the
messages to the output MX nodes 508. Likewise, each output MX node
508 includes or utilizes its own fast buffer system 400 to receive
messages from the Q node 506, store the messages, and transmit the
messages to the subscribers 510. By storing the messages on the
output MX nodes 508 for a limited time, the MX nodes 508 do not
need to obtain the messages from other system components, thereby
minimizing internal network traffic.
[0055] Advantageously, use of the fast buffer system 400 allows
channel data to be copied only once. For example, when message data
is received by the system 100 from a publisher, the message data
can be stored or copied to a memory device. Preferably, the message
data is copied only for data chunks that are less than a threshold
number of bytes (e.g., 64 bytes); otherwise, the message data may
not be copied and a reference to the message data may be created
instead. All components of the fast buffer system 400 that require
access to the message data may access the message data on the
memory device, without making further copies. For example, fast
buffer system components in a Q node or in an MX node can look up
the message data on the memory device. A single copy of the message
data is preferably accessible to all components of the fast buffer
system 400 (e.g., within a single server), such that no further
copying is required for the components to access the message data.
To store and retrieve the message data, the system components may
use pointers or other indicators that show where the message data
is stored on the memory device. By copying channel data only once
for all components of the fast buffer system 400, the fast buffer
system 400 spends less time writing and removing channel data and
devotes less memory space to storing the channel data. This enables
system resources to be allocated to other activities, such as
receiving messages from publishers and delivering the messages to
subscribers efficiently and accurately.
[0056] FIG. 6A is a schematic diagram of an example queue 600 used
to store message data. The queue 600 may be the same as or similar
to the channel queue 404, described herein. In the depicted
example, the queue 600 is storing blocks B1, B2, B3, and B4, which
each include one or more messages. The blocks were created by the
queue 600 in the following order: B1, B2, B3, and B4. Accordingly,
block B1 has been stored by the queue 600 the longest and defines a
head of the queue 600, as indicated by a head boundary 602. Block
B4 was added most recently to the queue 600 and defines a tail of
the queue 600, as indicated by a tail boundary 604. The head
boundary 602 and the tail boundary 604 define an active zone 606
for the queue 600. Within the active zone 606, blocks are
accessible to system components and processes (e.g., sender
processes associated with Q nodes and/or MX nodes). In general,
these system components and processes access or read the blocks in
an order in which the blocks were created by the queue 600 (i.e.,
from the head to the tail).
[0057] Each block in the queue 600 is associated with a time that
the block arrived in the queue 600. Block B1, which arrived in the
queue 600 first, is associated with an earlier time. Likewise,
block B4, which arrived in the queue 600 last, is associated with a
later time, which may be set by, for example, a Q node. A time for
an MX node may be synchronized with a time for a Q node using, for
example, a network time protocol (NTP) mechanism. In general, times
associated with blocks in the queue 600 increase from the head of
the queue 600 to the tail of the queue 600. Each of these times may
be or may be based on, for example, a time of day or a counter that
increments over time (e.g., with each second or portion
thereof).
[0058] FIG. 6B shows the queue 600 at a later time when new blocks
B5 and B6 have been added to the queue 600. Block B6, a most recent
addition to the queue 600, now resides at the tail of the queue
600, as indicated by the tail boundary 604, which has been
relocated to a new position after block B6. At this time, blocks B1
and B2 no longer reside in the active zone 606 and instead have
been moved to an inactive zone 608 of the queue 600. The head
boundary 602 is now located after block B2 and before block B3,
which resides at the head of the queue 600. In general, each block
is assigned a time-to-reside (TTR) in the active zone 606 of the
queue 600. Once the TTR for a block has expired, the head boundary
602 is moved to indicate that the block now resides in the inactive
zone 608. In the depicted example, the TTRs of blocks B1 and B2
have expired, and blocks B1 and B2 now reside in the inactive zone
608. Blocks in the inactive zone 608 are generally inaccessible to
system components and processes. In some implementations, however,
a process that was accessing a block in the active zone 606 may be
given additional time to access the block after the TTR expires and
the block has been moved to the inactive zone 608. This gives such
processes (e.g., asynchronous processes) additional time to finish
reading from the blocks, in case the processes were unable to
complete the reading before the TTR expired.
[0059] FIG. 6C shows the queue 600 at a later time when new blocks
B7 and B8 have been added to the queue 600. Block B8, the most
recent addition to the queue 600, now resides at the tail of the
queue 600, as indicated by the tail boundary 604, which has been
relocated to a new position after block B8. At this time, the TTRs
of blocks B3 and B4 have expired, and blocks B3 and B4 have been
moved from the active zone 606 to the inactive zone 608. The head
boundary 602 is now located after block B4 and before block B5,
which resides at the head of the queue 600. The head boundary 602
and an inactive zone boundary 610 define the inactive zone 608.
Preceding the inactive zone boundary 610 is a dead zone 612, where
blocks B1 and B2 are now located. In general, each block is
assigned a time-to-live (TTL) after which the block is moved from
the inactive zone 608 to the dead zone 612. After being moved to
the dead zone 612, a block is considered to be dead and is no
longer accessible to all system components or processes. This may
free up system memory to store additional blocks and/or message
data.
[0060] Given the progression of blocks from the active zone 606, to
the inactive zone 608, and to the dead zone 612, the TTL of a block
is greater than the TTR of a block. The TTR of a block is
preferably long enough that system components and processes have
sufficient time to access the block in the active zone 606, but not
so long that memory is tied up storing blocks that no longer need
to be accessed. The TTR may be, for example, about 1 second, about
10 seconds, about 30 seconds, or about 60 seconds. In preferred
implementations, the TTR is about 30 seconds. The TTL of a block is
generally longer than the TTR, to give the block time to reside in
the inactive zone 608. This gives system components and processes
(e.g., asynchronous processes) additional time to access the blocks
before the blocks are deleted or no longer accessible. In one
example, TTL=C*TTR, where C is a constant greater than or equal to
one. For example, C may be about 1.1, about 1.2, or about 1.3. In a
specific example, when TTR=30 seconds, and C=1.2, TTL=36
seconds.
[0061] In general, the queue 600 receives messages from a publisher
(e.g., a multiple queue publisher 402, an MX node, or a Q node) for
one or more channels. One or more sender processes (e.g.,
associated with subscribers, Q nodes, MX nodes, and/or multiple
queue consumers 404) receive messages from the queue 600 and send
the messages to other system components (e.g., to subscriber TCP/IP
connections).
[0062] In various implementations, the queue 600 receives blocks of
messages from only one publisher. The blocks are generated by the
publisher in an order, and the blocks are received by the queue 600
in the same order. This makes it easier for system components and
processes to know the order of blocks in the queue 600 and to
access messages from the blocks. The publisher may be, for example,
a client device of a user, an MX node, a Q node, and/or a multiple
queue publisher. In general, all messages are maintained in an
order and all client subscribers receive the messages in the same
order.
[0063] In some examples, the queue 600 receives and stores messages
only for a single channel. The messages may be received by the
queue 600 from a single publisher or from multiple publishers,
which may include, for example, a client device of a user, an MIX
node, a Q node, and/or a multiple queue publisher.
[0064] FIG. 7 is a flowchart of a method 700 of storing messages in
a queue. The method 700 includes providing (step 702) a queue
(e.g., queue 600 or channel queue 404) that includes an ordered
plurality of storage blocks. Each storage block stores one or more
respective messages and is associated with a respective time (e.g.,
a time at which the block was received by the queue). The times for
the storage blocks increase (e.g., from earlier to later) from a
block designating a head of the queue to a block designating a tail
of the queue. Each of a plurality of first sender processes reads
(step 704) messages from one or more blocks in the queue, beginning
at the head of the queue and sends (step 706) the read messages to
a respective recipient. One or more of the blocks having associated
times that are earlier than a first time (e.g., based on the TTR)
are designated (step 708) as old. A block associated with a time
later than or equal to the first time is designated (step 710) as a
new head of the queue. The designating step 708 and/or the
designating step 710 may occur before, during, and/or after the
reading step 704. One or more of the first sender processes that
began reading messages from a block before the block was designated
as old may be allowed to continue the reading operation (step 712)
on the block until a second time (e.g., based on the TTL) which is
later than the first time. This gives the first sender processes,
which may be asynchronous, additional time to complete the reading
operation. The reading operation is designed to complete much
earlier than TTL timeouts (e.g., few orders of magnitude earlier).
One or more of the old blocks are deleted (step 714) at a time
later than or equal to the second time.
[0065] Embodiments of the subject matter and the operations
described in this specification can be implemented in digital
electronic circuitry, or in computer software, firmware, or
hardware, including the structures disclosed in this specification
and their structural equivalents, or in combinations of one or more
of them. Embodiments of the subject matter described in this
specification can be implemented as one or more computer programs,
i.e., one or more modules of computer program instructions, encoded
on computer storage medium for execution by, or to control the
operation of, data processing apparatus. Alternatively or in
addition, the program instructions can be encoded on an
artificially-generated propagated signal, e.g., a machine-generated
electrical, optical, or electromagnetic signal that is generated to
encode information for transmission to suitable receiver apparatus
for execution by a data processing apparatus. A computer storage
medium can be, or be included in, a computer-readable storage
device, a computer-readable storage substrate, a random or serial
access memory array or device, or a combination of one or more of
them. Moreover, while a computer storage medium is not a propagated
signal, a computer storage medium can be a source or destination of
computer program instructions encoded in an artificially-generated
propagated signal. The computer storage medium can also be, or be
included in, one or more separate physical components or media
(e.g., multiple CDs, disks, or other storage devices).
[0066] The operations described in this specification can be
implemented as operations performed by a data processing apparatus
on data stored on one or more computer-readable storage devices or
received from other sources.
[0067] The term "data processing apparatus" encompasses all kinds
of apparatus, devices, and machines for processing data, including
by way of example a programmable processor, a computer, a system on
a chip, or multiple ones, or combinations, of the foregoing The
apparatus can include special purpose logic circuitry, e.g., an
FPGA (field programmable gate array) or an ASIC
(application-specific integrated circuit). The apparatus can also
include, in addition to hardware, code that creates an execution
environment for the computer program in question, e.g., code that
constitutes processor firmware, a protocol stack, a database
management system, an operating system, a cross-platform runtime
environment, a virtual machine, or a combination of one or more of
them. The apparatus and execution environment can realize various
different computing model infrastructures, such as web services,
distributed computing and grid computing infrastructures.
[0068] A computer program (also known as a program, software,
software application, script, or code) can be written in any form
of programming language, including compiled or interpreted
languages, declarative, procedural, or functional languages, and it
can be deployed in any form, including as a standalone program or
as a module, component, subroutine, object, or other unit suitable
for use in a computing environment. A computer program may, but
need not, correspond to a file in a file system. A program can be
stored in a portion of a file that holds other programs or data
(e.g., one or more scripts stored in a markup language resource),
in a single file dedicated to the program in question, or in
multiple coordinated files (e.g., files that store one or more
modules, sub-programs, or portions of code). A computer program can
be deployed to be executed on one computer or on multiple computers
that are located at one site or distributed across multiple sites
and interconnected by a communication network.
[0069] The processes and logic flows described in this
specification can be performed by one or more programmable
processors executing one or more computer programs to perform
actions by operating on input data and generating output. The
processes and logic flows can also be performed by, and apparatus
can also be implemented as, special purpose logic circuitry, e.g.,
an FPGA (field programmable gate array) or an ASIC
(application-specific integrated circuit).
[0070] Processors suitable for the execution of a computer program
include, by way of example, both general and special purpose
microprocessors, and any one or more processors of any kind of
digital computer. Generally, a processor will receive instructions
and data from a read-only memory or a random access memory or both.
The essential elements of a computer are a processor for performing
actions in accordance with instructions and one or more memory
devices for storing instructions and data. Generally, a computer
will also include, or be operatively coupled to receive data from
or transfer data to, or both, one or more mass storage devices for
storing data, e.g., magnetic, magneto-optical disks, or optical
disks. However, a computer need not have such devices. Moreover, a
computer can be embedded in another device, e.g., a smart phone, a
mobile audio or video player, a game console, a Global Positioning
System (GPS) receiver, or a portable storage device (e.g., a
universal serial bus (USB) flash drive), to name just a few.
Devices suitable for storing computer program instructions and data
include all forms of non-volatile memory, media and memory devices,
including by way of example semiconductor memory devices, e.g.,
EPROM, EEPROM, and flash memory devices; magnetic disks, e.g.,
internal hard disks or removable disks; magneto-optical disks; and
CD-ROM and DVD-ROM disks. The processor and the memory can be
supplemented by, or incorporated in, special purpose logic
circuitry.
[0071] To provide for interaction with a user, embodiments of the
subject matter described in this specification can be implemented
on a computer having a display device, e.g., a CRT (cathode ray
tube) or LCD (liquid crystal display) monitor, for displaying
information to the user and a keyboard and a pointing device, e.g.,
a mouse or a trackball, by which the user can provide input to the
computer. Other kinds of devices can be used to provide for
interaction with a user as well; for example, feedback provided to
the user can be any form of sensory feedback, e.g., visual
feedback, auditory feedback, or tactile feedback; and input from
the user can be received in any form, including acoustic, speech,
or tactile input. In addition, a computer can interact with a user
by sending resources to and receiving resources from a device that
is used by the user; for example, by sending web pages to a web
browser on a user's client device in response to requests received
from the web browser.
[0072] Embodiments of the subject matter described in this
specification can be implemented in a computing system that
includes a back-end component, e.g., as a data server, or that
includes a middleware component, e.g., an application server, or
that includes a front-end component, e.g., a client computer having
a graphical user interface or a Web browser through which a user
can interact with an implementation of the subject matter described
in this specification, or any combination of one or more such
back-end, middleware, or front-end components. The components of
the system can be interconnected by any form or medium of digital
data communication, e.g., a communication network. Examples of
communication networks include a local area network ("LAN") and a
wide area network ("WAN"), an inter-network (e.g., the Internet),
and peer-to-peer networks (e.g., ad hoc peer-to-peer networks).
[0073] The computing system can include clients and servers. A
client and server are generally remote from each other and
typically interact through a communication network. The
relationship of client and server arises by virtue of computer
programs running on the respective computers and having a
client-server relationship to each other. In some embodiments, a
server transmits data (e.g., an HTML page) to a client device
(e.g., for purposes of displaying data to and receiving user input
from a user interacting with the client device). Data generated at
the client device (e.g., a result of the user interaction) can be
received from the client device at the server.
[0074] A system of one or more computers can be configured to
perform particular operations or actions by virtue of having
software, firmware, hardware, or a combination of them installed on
the system that in operation causes or cause the system to perform
the actions. One or more computer programs can be configured to
perform particular operations or actions by virtue of including
instructions that, when executed by data processing apparatus,
cause the apparatus to perform the actions.
[0075] While this specification contains many specific
implementation details, these should not be construed as
limitations on the scope of any inventions or of what may be
claimed, but rather as descriptions of features specific to
particular embodiments of particular inventions. Certain features
that are described in this specification in the context of separate
embodiments can also be implemented in combination in a single
embodiment. Conversely, various features that are described in the
context of a single embodiment can also be implemented in multiple
embodiments separately or in any suitable subcombination. Moreover,
although features may be described above as acting in certain
combinations and even initially claimed as such, one or more
features from a claimed combination can in some cases be excised
from the combination, and the claimed combination may be directed
to a subcombination or variation of a subcombination.
[0076] Similarly, while operations are depicted in the drawings in
a particular order, this should not be understood as requiring that
such operations be performed in the particular order shown or in
sequential order, or that all illustrated operations be performed,
to achieve desirable results. In certain circumstances,
multitasking and parallel processing may be advantageous. Moreover,
the separation of various system components in the embodiments
described above should not be understood as requiring such
separation in all embodiments, and it should be understood that the
described program components and systems can generally be
integrated together in a single software product or packaged into
multiple software products.
[0077] Thus, particular embodiments of the subject matter have been
described. Other embodiments are within the scope of the following
claims. In some cases, the actions recited in the claims can be
performed in a different order and still achieve desirable results.
In addition, the processes depicted in the accompanying figures do
not necessarily require the particular order shown, or sequential
order, to achieve desirable results. In certain implementations,
multitasking and parallel processing may be advantageous.
* * * * *