U.S. patent application number 16/221143 was filed with the patent office on 2019-05-09 for forward-based resource delivery network management techniques.
The applicant listed for this patent is Amazon Technologies, Inc.. Invention is credited to Joseph L. Ellsworth, Christopher Richard Newcombe, Brandon William Porter, Christopher Allen Suver.
Application Number | 20190140922 16/221143 |
Document ID | / |
Family ID | 48146163 |
Filed Date | 2019-05-09 |
United States Patent
Application |
20190140922 |
Kind Code |
A1 |
Ellsworth; Joseph L. ; et
al. |
May 9, 2019 |
FORWARD-BASED RESOURCE DELIVERY NETWORK MANAGEMENT TECHNIQUES
Abstract
A resource delivery network and method for distributing content
in the network is disclosed herein. The network comprises a
plurality of servers arranged in tiers and partitioned. Each server
includes a resource store with a set of resources for distribution
to a successive tier. Updates to each successive tier are provided
by a pull-forward client on servers in the tier. This forward
propagation mechanism maximizes resource availability at edge
servers in the network. Resources transmitted to the edge tier
servers may be transformed, combined, and rendered without taxing
lower tier servers. Transformation and pre-rendering of data can be
performed by low priority CPU tasks at each layer of the
system.
Inventors: |
Ellsworth; Joseph L.;
(Bainbridge Island, WA) ; Porter; Brandon William;
(Auburn, WA) ; Suver; Christopher Allen; (Seattle,
WA) ; Newcombe; Christopher Richard; (Kirkland,
WA) |
|
Applicant: |
Name |
City |
State |
Country |
Type |
Amazon Technologies, Inc. |
Seattle |
WA |
US |
|
|
Family ID: |
48146163 |
Appl. No.: |
16/221143 |
Filed: |
December 14, 2018 |
Related U.S. Patent Documents
|
|
|
|
|
|
Application
Number |
Filing Date |
Patent Number |
|
|
15844371 |
Dec 15, 2017 |
10218584 |
|
|
16221143 |
|
|
|
|
14994974 |
Jan 13, 2016 |
9893957 |
|
|
15844371 |
|
|
|
|
14644031 |
Mar 10, 2015 |
9246776 |
|
|
14994974 |
|
|
|
|
13842970 |
Mar 15, 2013 |
9240954 |
|
|
14644031 |
|
|
|
|
12652541 |
Jan 5, 2010 |
8433771 |
|
|
13842970 |
|
|
|
|
61248291 |
Oct 2, 2009 |
|
|
|
Current U.S.
Class: |
1/1 |
Current CPC
Class: |
G06F 16/13 20190101;
H04L 47/70 20130101; H04L 41/5025 20130101; H04N 21/2225 20130101;
H04L 67/125 20130101; H04N 21/23103 20130101; H04L 41/0823
20130101; H04L 47/782 20130101 |
International
Class: |
H04L 12/24 20060101
H04L012/24; G06F 16/13 20060101 G06F016/13; H04L 12/911 20060101
H04L012/911; H04L 29/08 20060101 H04L029/08; H04N 21/231 20060101
H04N021/231; H04N 21/2225 20060101 H04N021/2225 |
Claims
1. A computer-implemented method for managing a resource delivery
network including a plurality of partitions, the
computer-implemented method comprising: determining a connection
failure between a first partition and a second partition of the
resource delivery network; electing one server of a plurality of
servers of the first partition as a master for the first partition;
reconfiguring one or more other servers of the plurality of servers
to route write requests to the master, wherein the master uses a
different machine identifier to identify a log of the write
requests after election of the master; tracking the write requests
to a data store associated with the master; and propagating the
write requests to the one or more other servers.
2. The computer-implemented method of claim 1 further comprising:
determining that one or more resources written to the data store
since the connection failure is older than corresponding resources
in the second partition; and retrieving the corresponding resources
for the first partition.
3. The computer-implemented method of claim 1 further comprising:
reconfiguring the plurality of servers to an original configuration
of the first partition.
4. The computer-implemented method of claim 1 further comprising:
determining that the master has failed; and electing a new master
for the first partition from the plurality of servers in the first
partition.
5. The computer-implemented method of claim 1, wherein the write
requests are routed to the master using a proxy mechanism.
6. The computer-implemented method of claim 1, wherein the master
propagates the one or more resources to the second partition if the
total number of changes in the log with the different machine
identifier is greater than a threshold.
7. The computer-implemented method of claim 1 further comprising:
determining that a connection between the first partition and the
second partition has been restored; and processing the log to
identify updates.
8. The computer-implemented method of claim 7, wherein processing
the log comprises identifying a number of changes in the log with
the new machine ID.
9. The computer-implemented method of claim 8, wherein processing
the log further comprises initiating updates based on the number of
changes identified in the log.
10. One or more processor readable storage devices having processor
readable code stored thereon, the processor readable code programs
one or more processors to perform a method comprising: determining
a connection failure between a first partition and a second
partition of the resource delivery network; electing one server of
a plurality of servers of the first partition as a master for the
first partition; reconfiguring one or more other servers of the
plurality of servers to route write requests to the master, wherein
the master uses a different machine identifier to identify a log of
the write requests after election of the master; tracking the write
requests to a data store associated with the master; and
propagating the write requests to the one or more other
servers.
11. The one or more processor readable storage devices of claim 10,
wherein the processor readable code stored thereon further programs
one or more processors to: determine that one or more resources
written to the data store since the connection failure is older
than corresponding resources in the second partition; and retrieve
the corresponding resources for the first partition.
12. The one or more processor readable storage devices of claim 10,
wherein the processor readable code stored thereon further programs
one or more processors to perform a method comprising:
reconfiguring the plurality of servers to an original configuration
of the first partition.
13. The one or more processor readable storage devices of claim 10,
wherein the processor readable code stored thereon further programs
one or more processors to perform a method comprising: determining
that the master has failed; and electing a new master for the first
partition from the plurality of servers in the first partition.
14. The one or more processor readable storage devices of claim 10,
wherein the write requests are routed to the master using a proxy
mechanism.
15. The one or more processor readable storage devices of claim 10,
wherein the master propagates the one or more resources to the
second partition if the total number of changes in the log with the
different machine identifier is greater than a threshold.
16. The one or more processor readable storage devices of claim 10,
wherein the processor readable code stored thereon further programs
one or more processors to perform a method comprising: determining
that a connection between the first partition and the second
partition has been restored; and processing the log to identify
updates.
17. A resource delivery system comprising: at least one processor;
a memory including instructions that, upon being executed by the at
least one processor, cause the at least one processor to: determine
a connection failure between a first partition and a second
partition of the resource delivery system; select one server of a
plurality of servers of the first partition as a master for the
first partition; reconfigure one or more other servers of the
plurality of servers to route write requests to the master; monitor
the write requests sent to the master; propagate the write requests
to the one or more other servers; determine that one or more
resources written to the data store since the connection failure is
older than corresponding resources in the second partition; and
retrieve the corresponding resources for the first partition.
18. The resource delivery system of claim 17, wherein the memory
further includes instructions that, upon being executed by the at
least one processor, cause the at least one processor to:
reconfigure the plurality of servers to an original configuration
of the first partition.
19. The resource delivery system of claim 17, wherein the memory
further includes instructions that, upon being executed by the at
least one processor, cause the at least one processor to: determine
that the master has failed; and elect a new master for the first
partition from the plurality of servers in the first partition.
20. The resource delivery system of claim 17, wherein the write
requests are routed to the master using a proxy mechanism.
Description
CROSS-REFERENCE TO RELATED APPLICATIONS
[0001] This application is a continuation of U.S. application Ser.
No. 15/844,371, entitled "FORWARD-BASED RESOURCE DELIVERY NETWORK
MANAGEMENT TECHNIQUES" and filed on Dec. 15, 2017, which in turn is
a continuation of U.S. application Ser. No. 14/994,974, now U.S.
Pat. No. 9,893,957, entitled "FORWARD-BASED RESOURCE DELIVERY
NETWORK MANAGEMENT TECHNIQUES" and filed on Jan. 13, 2016, which is
a continuation of U.S. patent application Ser. No. 14/644,031, now
U.S. Pat. No. 9,246,776, entitled "FORWARD-BASED RESOURCE DELIVERY
NETWORK MANAGEMENT TECHNIQUES" and filed on Mar. 10, 2015, which is
a continuation of U.S. application Ser. No. 13/842,970, now U.S.
Pat. No. 9,240,954, entitled "FORWARD-BASED RESOURCE DELIVERY
NETWORK" and filed on Mar. 15, 2013, which is a continuation of
U.S. application Ser. No. 12/652,541, now U.S. Pat. No. 8,433,771,
entitled "DISTRIBUTION NETWORK WITH FORWARD RESOURCE PROPAGATION"
and filed on Jan. 5, 2010, which in turn claims the benefit of U.S.
Provisional Application No. 61/248,291, entitled "DISTRIBUTION
NETWORK WITH FORWARD RESOURCE PROPAGATION" and filed on Oct. 2,
2009, the disclosures of which are incorporated herein by
reference.
INCORPORATION BY REFERENCE TO ANY PRIORITY APPLICATIONS
[0002] Any and all applications for which a foreign or domestic
priority claim is identified in the Application Data Sheet as filed
with the present application are hereby incorporated by reference
under 37 CFR 1.57.
BACKGROUND OF THE INVENTION
Field of the Invention
[0003] Large scale service providers providing data, content and
applications via the Internet look to maximize availability and
responsiveness of clustered server systems. They also seek to
maintain minimal total costs of ownership for their systems. As
more users access such information, demand for faster delivery and
responsiveness increases.
Description of the Related Art
[0004] Delivery systems have been developed whereby geographically
dispersed networks of edge locations can each store copies of
content. Each edge location can include one or multiple servers.
Clients requesting the content are routed to the nearest edge
location so the content is delivered with the best possible
performance. To achieve the best possible performance, the edge
locations are typically high performance data centers that are able
to respond to requested loads during peak times.
[0005] The primary issue with this strategy is that the edge
locations or "caches" need to manage freshness or validity of their
content. Edge locations expire the content and refresh it on a
relatively frequent basis. The requirement for freshness creates
cache misses which may end up invoking back end services at a
higher cost. In some cases the content has expired based on a
time-to-live (TTL) value, but the content may not actually have
changed. In many systems, there is no mechanism to refresh the edge
cache without executing the full heavyweight retrieval from the
back end service. This results in a large amount of network traffic
and back end service calls which yield no benefit to the service
provider or end user. In current multi-tier cache systems, it may
be difficult to ensure content freshness without forcing every
caching layer have shorter than desired TTL values. The cost of a
refresh includes the cost of a proxy cache miss at every cache
layer and back end server processing.
BRIEF DESCRIPTION OF THE DRAWINGS
[0006] FIG. 1 is a depiction of an exemplary processing environment
for implementing the technology.
[0007] FIG. 2 illustrates a series of environments illustrated in
FIG. 1 arranged as vertical partitions in accordance with the
technology.
[0008] FIG. 3 illustrates components of each server in each of the
environments illustrated in FIGS. 1 and 2.
[0009] FIG. 4 illustrates a method of updating content at various
tiers or tiers of servers within the system of FIGS. 1-3.
[0010] FIG. 5 illustrates a process running on a client tier in
accordance with the present technology.
[0011] FIG. 6 illustrates a process running on a server tier in
accordance with the present technology.
[0012] FIG. 7 illustrates a process for adding a new server to the
system of FIGS. 1-3.
[0013] FIG. 8 illustrates an exemplary portion of a change log used
in the present technology.
[0014] FIG. 9 illustrates a cleanup process used in accordance with
the present technology.
[0015] FIG. 10 illustrates a log summary process used in accordance
with the present technology.
[0016] FIG. 11 illustrates a proxy process used in accordance with
the present technology.
[0017] FIG. 12 illustrates a process for allowing network write
operations in the event of a partition connection failure between
partitions.
[0018] FIG. 13 illustrates an exemplary hardware computing device
for the present technology.
DETAILED DESCRIPTION OF THE PREFERRED EMBODIMENT
[0019] Technology is presented which uses a forward propagation
mechanism in a resource delivery network to maximize resource
availability at edge locations in the network. The technology
retains cache semantics for look asides on read misses at the edge
locations. Multiple layers of cache are kept fresh using pull-based
forward propagation. The technology provides transparent support
for multiple tiers of systems, allowing the technology to scale to
support very large read loads while presenting fresh data at a
lower total cost. Additional background CPU tasks at each cache
layer may be used for partial pre-rendering of data, which
increases effective machine utilization. The technology also
eliminates cache miss storms which can brownout back end
services.
[0020] The technology will be described herein using a
document-centric discussion. In this context, a resource may be
considered as a file. However, a resource may represent an element
not stored in a file such as a database or other service. A
resource may also be interpreted as an arbitrary blob of data which
can be stored in an arbitrarily extensible hierarchal storage
mechanism and the term subdirectory is interpreted as a group which
may contain resources or other groups. The technology does not
require an underlying file system. Resources are uniquely
identified with a uniform resource identifier (URI) semantic and
change history is recorded for each resource.
[0021] In addition, while resources are often described herein as
written or copied to different systems. It should be recognized
that resources may be stored in volatile memory of systems or in
alternative forms or non-volatile memory such as hard-disks, disc
arrays, solid state systems or other forms of non-volatile memory.
The technology allows misses against memory to reference disk while
retaining overall high system performance.
[0022] FIG. 1 illustrates an exemplary processing environment for
implementing the technology. FIG. 1 illustrates a clustered
delivery network system 50 comprising set of vertically partitioned
servers or server clusters 100, 110, 120, 130, 140. At the highest
tier of the vertical partition is an authoritative store 100, also
referred to herein as an authoritative server. The authoritative
store 100 may comprise a set of 1 or more servers in a cluster. The
primary characteristic of an authoritative store 100 is that it
acts as the authoritative entity for a group of data. In this
capacity, all data stored on the authoritative store is replicated
to each of the servers 110, 120, 130, 140 in each of the tiers
below the store 100. The amount and scope of data will vary and it
may be partitioned in various ways as described below. In one
embodiment, each tier 110, 120, 130, 140 away from the
authoritative store 100 comprises an increasing number of
servers.
[0023] Each tier 110, 120, 130, 140 includes one or more caching
servers 102, 104, 106, 108 which are used to scale the system.
Servers in, for example, tier 110 pull resources written to
authoritative store 100. Servers in tiers 110, 120, 130, 140 pull
resources from the next higher tier of servers. Cache servers in
tiers 110, 120, 130 and additional tiers all use the same resource
synchronization mechanism.
[0024] Within the architecture, routers (illustrated in FIG. 3) may
be used to direct traffic between the servers and server layers.
Each set of VIP addresses 105, 115, 125, 135 in the architecture
and each partition may be assigned different domain entries and
sub-domains to allow traffic routing using standard DNS and TCP/IP
protocol techniques.
[0025] A set of all resources is stored on the authoritative store
100. Writes are made for resources to the authoritative server by
means of a PUT or POST command using standard HTTP protocol. As
noted above, while HTTP protocol may be used in one embodiment,
alternative protocols other than HTTP may be utilized. Any a
protocol which allows long lived connections with a bi-directional
conversation could be utilized. In still another alternative, a mix
of protocols may be used in the same system. In accordance with the
technology, each resource written to the authoritative store is
copied down each tier until it reaches the edge tier servers (tier
140 in FIG. 1). Read requests from, for example, users operating
web browsers 160 are addressed by a web-site rendering server 150,
and resource requests from the web-site rendering server 150 are
served by the edge tier of servers 140. In the example shown in
FIG. 1, all web servers address the domain www.example.domain 145.
Requests to the domain are then routed to the edge tier servers 140
using conventional techniques. As noted above, an authoritative
store may not be a simple HTTP resource server but may be a system
such as a relational database or another service providing system.
Any lower level misses that make it through all the layers to this
system will incur the full load of hitting the underlying service
or device.
[0026] Each tier 110, 120, 130, 140 replicates files on the
authoritative store 100 using log entries from a next tier higher
server sourcing the data. This allows a stacked layer strategy
where each layer is slightly delayed from the prior layer but can
represent a duplicate of the resources in the prior layer.
[0027] Because each tier uses the same synchronization mechanism to
copy resources to their local store this can allow edge resources,
including special purpose application servers, to continue
operation even when the back end services which produced the
resources are down. This can provide increased availability during
failures and provides a window of operation during service failure
without negatively impacting user experience.
[0028] As discussed below, each synchronization client at each tier
may use local filtering to allow vertical partitioning of resources
to improve system efficiency. One example of such partitioning is
dedicating a subset machines to specific range of users. Another
example is configuring servers at a higher tier to filter for
specific data. This filtering can be used, for example, to filter
for configuration data. In this instance, a service interested in
only a subset of data, such as configuration information, can
implement a portion of the replication client and use the pull
techniques discussed herein to update a local database, foregoing
the ability to serve up the data it acquires. In the configuration
implementation, a centralized configuration store is kept
completely independent of downstream services while allowing the
downstream services to rapidly detect interesting changes which
they may use to modify their internal configurations. The
replication client which updates the service local data may be
implemented in complete isolation from either the configion store
and the service provided it can call the service configuration
update API.
[0029] In one embodiment, each successive layer away from the
authoritative store 100 increases the number of servers or server
clusters by a factor of 4 to 1, e.g. there are 4 times as many
servers in tier 120 as tier 110. This allows 1 write server to
support 4 read servers in tier 110, 16 read servers in tier 120, 64
read servers in tier 130 and 256 read servers in tier 140. The
number of layers and servers discussed herein is exemplary and the
illustrations indicated in FIGS. 1-3 are exemplary only. The
average propagation of a resource to the edge tier 140 is given by
the update time multiplied by the number of tiers. For example, if
each tier is averaging 3 seconds for content propagation then a
total propagation time of 12 seconds is required for updating the
256 edge tier servers.
[0030] Each tier 110, 120, 130, 140 beyond the authoritative store
100 is composed of a group of servers, with the number of servers
in the tier (for example tier N+1) set so that when all servers are
actively reading files at a defined throttle rate and concurrently
reading files from previous tier of servers (e.g. tier N), they
will not degrade the maximum write performance of tier N servers by
more than some percentage X %, where X may be about 30%.
[0031] Access to each layer is provided by routers (illustrated in
FIG. 3) and virtual IP (VIP) addresses 105, 115, 125, 135 to allow
internal addressing. The separate VIP path (router name entry) for
each layer of servers is provided so that any read request from the
servers in the next tier (N+1) can be serviced by any server in the
prior tier (N).
[0032] An external data store service 250, is accessible to the
system 50. The external data store service may be a service
optimized for handling large sized resources. An example of one
external data store service is the Amazon S3 service which
unlimited storage through a simple web services interface.
[0033] FIG. 2 illustrates a plurality of vertical partitions with
multiple authoritative stores 100a, 100b, 100c. Each vertical
partition 210, 220,230 may be virtually, or physically separated
from adjacent partitions. As discussed below, it is possible that
read requests on edge tier servers 140a, 140b, 140c may be
re-directed to servers in other vertical partitions who have the
data. Mechanisms in the technology allow for routing such read
requests to the correct partition to serve the data.
[0034] Each of the tiers may be separated by one or more network
links. The tiers may be physically proximate to each other or
geographically separated. WAN links are used to connect physically
separated servers. Tiers can be organized so WAN links are
optimally used. For example when replicating to remote servers, a
small number of remote servers are configured replicate from a
given cluster across the WAN links. A larger number of servers at
the same remote location are configured to replicate from the
servers which have already copied the content locally. This
minimized the amount of redundant data transfer to across WAN links
which are generally more expensive and slower than LAN links.
Reducing the WAN traffic in this fashion can dramatically speed up
availability of data on all servers of a remote cluster.
[0035] To maximize replication rates it is possible to use a lower
grade security and possibly not use encryption via SSL for
replication between lower tier servers. Any tier which may be
exposed to entrusted users or consumers outside a trusted network
partition can utilize full encryption and security measures to
protect the content. It will be noted that in such embodiments, the
N-1 or write tier should be protected by firewalling techniques or
another security mechanism, such as an approved list of servers
allowed to contact the source server, should be provided.
[0036] FIG. 3 illustrates various components operating in each of
the servers in each tier of the system. FIG. 3 illustrates
components in volatile memory 1302 and non-volatile or mass storage
1304, described below with respect to FIG. 13. Other components of
the server processing device, described below with respect to FIG.
13, are not illustrated in FIG. 3 but it will be understood that
such components are present therein.
[0037] Each server 100, 102, 104, 106, 108 in FIG. 3 may include in
memory 1302 a web server component 302, a log write component 320,
a log clean-up process 304, a data transformation process 306 and a
request handler 308. The functions of each element will be
described below. Nonvolatile storage 1304 includes a write log 314,
a summary log 312 and resources 316. Each resource or group or
resources may have associated with it a TTL value managed by
handler 318, indicating a time out for the freshness of the
component.
[0038] The web server 302 in each tier responds to HTTP GET/HEAD
and HTTP PUT/POST requests to read and write information,
respectively, to and from the storage 1304 of the server. For each
write, the servers record an entry in an update log 314 using the
log write component 320. The log 314 allows each server in a given
layer N to deliver change history in small granular chucks to the
next tier (N+1) of servers.
[0039] A clean up process 304 runs to create a summary log 312 by
deleting repetitive entries for the same resource. For efficiency,
a number of recent entries 360 in the update log 314 may be
maintained in memory for rapid retrieval. The data transformation
handler 306 is a background process allowing certain efficiencies
to be created in certain portions of the process. The request
handler 308 answers updated requests from next tier servers, as
described below.
[0040] Shown in server 102, and present in every tier server except
the authoritative store, is a sync client 370. Sync client 370 acts
as a pull agent on the next lower tier of servers to pull data from
each successively lower tier. The sync client queries logs 314 in
the lower layer tier and uses log information to retrieve resources
listed therein using, for example, a standard HTTP GET. The files
are recorded in a local resource store 316 of the querying server
after fetch. Each resource store comprises the local storage of
each server and may comprise any number of different storage
elements, including but not limited to redundant storage arrays and
storage area networks of any known type. In some instances the
authoritative store may be a database may not support miss pass
through. In this instance the authoritative store is responsible
only to make writes against the next higher tier for each resource
changed.
[0041] A router 350 directs traffic between each of the servers in
each of the tiers using VIP entries for the servers and tiers. This
design may include a load balancer that can generally randomly
distribute total traffic from the next tier across servers in a
given tier. A router or load balancer that can route requests from
the same client to the same server with session or IP affinity may
also be used. A load dispatching proxy may be used in lieu of a
traditional load balancer.
[0042] FIG. 4 illustrates the replication process occurring within
the system to allow propagation of resources from any authoritative
store 100 throughout successive layers of the system 50. At 402 a
resource is written to a server in tier N, where N can comprise the
first tier authoritative server, or any server in any tier of
servers. Once the resource is written, an update to the local log
314 is recorded at 404. An exemplary format for the update log is
discussed below in FIG. 8. At 406, the update log is retrieved by
servers at the next highest tier N+1. The N+1 tier server
determines which entries have been updated at 408 by examining the
log entries. New resources are copied from the tier N server to the
tier N+1 server at 410.
[0043] FIG. 5 describes the replication process occurring on a tier
N+1 or "client" server. In this context, a client server is one
which is present in a tier N+1 (e.g. tier 120) and reads from a
previous tier N (e.g. 110); any tier may comprise the Nth tier. At
502, each server in tier N+1 reads its own local log (e.g. log 314
on tier N+-1) to find the last known write for each resource.
Depending on the configuration of the log, this may comprise
examining a timestamp indicating a last time of a write for a
server source in tier N, or a last log count for each source server
in tier N that the tier N+1 server is replicating from. At 504, a
polling request is repeatedly issued to the tier N server by sync
client 370. The polling request may comprise a GET request for the
N tier server's write log 314 or in-memory recent entries 360. The
polling request may be in in-memory request in one embodiment. At
504, the client repeatedly issues the request for changes to the
tier N server unless the client is busy responding to read requests
from downstream tiers. As discussed below, the tier N server blocks
such requests until new resources are available.
[0044] FIG. 6 illustrates a process occurring on the tier N server
when such in-memory requests are received from the tier N+1 tier.
As illustrated in FIG. 6, the tier N server does not respond to the
request unless an update has occurred on the tier N server and has
been written to the tier N server local log. At 602, update
requests are blocked by default until, at 604, a log write occurs.
At 606, the tier N server responds to the request and at 608, upon
completion of the response, returns to the blocking state until
another update occurs.
[0045] Returning to FIG. 5, if an update has occurred on the tier N
server at 504, the log is returned and the tier N+1 server uses the
last read information to review its own log and find the last
successful replication based on a count of the last write. This
returns the log changes from the tier N server which have occurred
after that write. The tier N server can alternatively determine
changes which have occurred since a particular time. At 508, the
client parses each line from the log file or files to find each
resource updated on the tier N server. At 510, log filtering may be
utilized. In one embodiment, the tier N+1 servers may be partition
and may only replicate certain content from the tier N server. The
URI or URI prefix can be utilized to filter out data the tier N-+1
server is not interested in receiving. Filtering may be performed
by sync client 370.
[0046] Once new or updated resources have been determined, at 510
the client executes a HTTP GET for each resource it processes from
the log and at 512 writes a copy of each resource in its local file
store at the same relative location in the local store that the
resource exists on the tier N server. Standard HTTP semantics send
1 GET request for each resource returned. This imposes at least 1
network round trip and the associated latency per resource fetched.
HTTP persistent connections can be used by all clients 370 when
requesting multiple resources. Allowing request of multiple items
per request reduces the replication delay across WAN partitions
where latency can be much higher.
[0047] At 514, the client updates the tier N+1 server local log for
each item copied from the tier N server.
[0048] Many servers may have the capability to store memory caches
of resources. When the replication process writes a new local
version of a given resource it can invalidate that item in memory.
This is supported using cache invalidation so the next reference to
the resource will trigger reload. At 516, the client invalidates
the in memory cache for each file processed and at 518 updates the
TTL age for all the resource or group of resources on the tier N+1
server. In another embodiment, solid state or other forms of memory
caches within each server may be used for performance
optimization.
[0049] In some instances files can contain mutual dependencies so
that a change in a file would require information from one or more
other files. In this instance, it may be desirable to defer
processing of the file until the complete set of changes are
written to all interdependent files. In this instance the
requesting tier server may modified so that it does not start the
processing based on any single file or resource change. The
processing is triggered by an activation or sentinel file which may
be written into the same directory tree as the resource file. The
replication client detects sentinel file which is used to trigger
the processing chain. In this way, each tier server generating file
changes is free to write new files at will, knowing that they will
not trigger the next step in the processing chain until they write
the sentinel file. If file versioning is enabled the sentinel file
may include the version number for each file which can be processed
for this group.
[0050] All files are processed in sequence of change because the
log entries are processed in order they occur in the logs. If
resources which are referenced by another resource are written
first then the ordered delivery can provide referential
integrity.
[0051] The client application generating file changes is
responsible to recognize success or failure of each write request
against the authoritative store. It should only generate the
sentinel file when all the dependant file changes have been
acknowledged. Nothing in this section should be interpreted as
support of a 2 phase commit. If the client application fails to
record all necessary files it is responsible to clean up resources
and then re-write those as needed.
[0052] This may be supported by allowing each resource change to
generate a new resource URI (version), which frees the file change
producers to generate new versions of the dependant files without
concern for overwriting important changes before the downstream
processors have finished their work. In this embodiment, each URI
for each resource for each resource may be unique and understood by
the server from which the resource is requested to refer to the
given resource.
[0053] When a "read storm" occurs--a condition where a large number
of reads occurs in a short period of time--any server subject to
the storm may lose execution cycles. This may delay replication
which can result in increasing staleness of the data. This is
addressed using the TTL-the TTL for the server may increase which
will trigger TTL based cache miss which will trigger refresh of
expired files. Each read server may use a sufficient number of
threads that it can pull more than one resource at a time.
[0054] FIG. 7 illustrates the process by which new servers added to
a next (N+1) tier can be added to the system 50. As noted above,
each server may create a summary log 312. As discussed below, the
summary log is a record of the last write of a particular resource
at the time of the creation of the summary log. (Subsequent changes
to the resource continue to be recorded in the write log 314, as
discussed below).
[0055] New servers obtain their initial state by initially reading
the summary logs at 702. At 704, the new server identifies all
unique resources which pass a local filter, if content filtering
for the new server is implemented. At 706, the new server issues an
HTTP GET for all resources identified in the summary log of the
tier N server which pass its local filter and at 708, the resources
are written to its local store. It should be noted that where a
write occurs to the local store at 708, the write may be an
in-memory write (to volatile memory), a write to disk (or other
nonvolatile memory, or both. Because additional writes to the tier
N serve may occur during the replication process while writes are
occurring at 708, the client in the N+1 layer checks the tier N
write log 314 for subsequent writes to resources in the summary
log. This occurs in a manner similar to a standard update where, at
710, the client issues a poll request (equivalent to 504) and at
712 reads the log entries for all changes that occurred after the
start of the summary log (equivalent to 506 above). At 714, a local
filter of the new resources is performed and at 716, the log file
314 is parsed to obtain each updated resource on the tier N server.
At 718 an HTTP GET is issued for the new resources and at 720, the
resources are written to the tire N+1 local store.
[0056] For new servers, a primary build application package
including the components necessary for implementing instructions to
run the components illustrated in FIG. 3 and described herein can
reference a known URI which will retrieve a configuration file.
This configuration file maps by server name to a cluster it is
participating and provides a URI where the server can go to
retrieve the basic configuration data for the cluster.
[0057] At startup each server references a known URI for this
configuration information to discover which the clusters in which
it will participate. It then accesses the configuration information
for those clusters to determine its local behavior, such as which
layer VIP it is participating in, which data partition it is
supporting, the partition mapping keys, and other information. Due
to the number of servers which may be participating, a single
resource is created at the default URI for each server by name
which contains the URI for the clusters it is participating. For
example a server named WN9018.internal.example. domain would have a
resource created at servermanage.example.domain/WN9018 which would
contain needed information.
[0058] There is some risk of a server in a higher layer executing a
read request which is routed to a different server on the previous
layer than the original server, and that such server will be behind
the original server in replication. If this occurs, the downstream
server will detect the lacking log entries when it cannot find the
higher numbered log entries in the log. The replication server can
re-try or wait for the other server to catch up with its most
recent entries. This can be avoided by using session-based affinity
for fetches to route synchronization requests to the same server
whenever it has sufficient bandwidth. This is only an issue if a
resource of the same name changes. If a write once unique URI
strategy is utilized, then this would only trigger cache miss which
would be handled automatically.
[0059] Detection of excess replication lag is one indication of
server failure. Upstream servers, which detect replication lag in
one server which is in a higher level than others, may be able to
use this information to request that the router remove or
de-prioritize the lagging server when routing future requests.
[0060] FIG. 8 illustrates an exemplary structure for the write logs
314. As each file is written to disk a log entry is written to the
write log 314 in nonvolatile storage. A sample of 6 log entries is
listed in FIG. 8. The general structure of the log entry is as
follows: server id, write count, timestamp, action, and local URI
(path) written. A linefeed (LF) separates each line. Update logs
may be stored in volatile memory to provide more rapid responses by
each server. In the event of a server reboot with logs in volatile
memory, if the log has not also been stored to disk, resource
changes may need to be retrieved by reference to logs at lower tier
servers.
[0061] The server ID 802 uniquely identifies a server or server
cluster and remains the same for all writes on a single server or
server cluster. The write count 804 is unique and is incremented
for each write to the server or server cluster. No write should
duplicate this number when using the same server ID. The time stamp
806 is measured from system count at the time of write. Some
implementations may eliminate the timestamp. The action flag 808
indicates the type of action for this resource. Options include "W"
for write and "D" for delete; other actions, including "M" for
modified may be added. The local URI (path) written 810 is the
local path of the file on the server. It is the server relative URI
location for the file. Each entry may optionally include a file
version number (not shown) in an integer form which increases by 1
for each new version of the resource written to the authoritative
store 100. Using a file version allows additional features to be
implemented in a more reliable fashion than when the file URI path
is simply overwritten. An authoritative write store 100 enforces
version numbering if needed for a given resource type.
[0062] The log entries are generally recorded as a result of PUT or
POST against a HTTP server (or other protocol resource write
instructions). Log entries can also be captured in a RPC or
handlers which update underlying data.
[0063] Where the resources are stored in a database and writes to
the database are to be logged, the same logging strategy is used.
Each write to the database resource is sequenced and recorded to
the log in the same way. All items in the database can be uniquely
accessed via a URI style semantic. When used with database
operations the recording of the log can be part of the code making
the database update or captured in a database trigger.
[0064] The technology also supports removing of resources or files
from the server's cached set of resources. This is desirable when
content which was previously valid is no longer valid or when it
can be removed for other reasons. To process the deletes the log
entry includes the action D for "delete". The change of the action
from W for write to D for delete allows the replication client to
detect the desire for delete and remove the resource from its local
store. It also invalidates any memory cache for that resource. The
authoritative sore can preserve D type action during log
consolidation for a period of time set by policy to ensure that all
servers have had an adequate time to process the removal. The
D-type actions can be removed during log consolidation since the
write function would functionally act as a replacement of the
original resource.
[0065] Returning to FIG. 3, each local write log 314 is updated at
the time of the resource is updated. In one embodiment, a sub-set
360 of most recent updates may kept in memory to allow for rapid
retrieval. For maximum durability the write log 314 may be flushed
to disk after every write. In one embodiment, frequent flushing of
the write log 314 makes it desirable to store the update logs in a
separate spindle physical storage environment than the resources.
This may comprise a separate physical disk in a server or separate
redundant array. In certain implementations, clients will fetch the
in memory resources for most recent updates, so rapid recognition
of changes can be used with minimal network overhead and no disk
impact.
[0066] In one embodiment, the write logs 314 are recorded and
ordered to make incremental access fast and relatively inexpensive.
This can be accomplished by a directory structure having the
following structure: for each day the local server creates a sub
directory labeled by day such as ccyy-mm-dd (Century
Year-Month-day). Starting at midnight it creates a new sub
directory and starts filling it with new log files. A new file
hh-mm (hour-minute). One file per minute with the name hh-mm
(hour-minute) zero padded is created which contains all the log
entries for all files and entries changed during that minute. If no
entries changed during the last minute no file will be created. If
no files where changed during a day then no sub directory for that
day is created. The one minute granularity may be adjusted upwards
or downwards depending on the write load of the server.
[0067] The above structure is merely exemplary--alternative
structures, times and filenames may be used.
[0068] As an alternative to writing log files to disk individual
log files can be stored in a database. The advantage of this
approach is that no directory tier structure can be established per
day. It also allows queries for changes across a unit of time which
may reduce the number of discrete fetches needed for a given set of
changes.
[0069] All the log files may be made available via HTTP GET from
the tier N server tier at a server relative path such as
../updatelogs/day/minutelogfiles.txt Each server will also return a
list of all logs in the directory which occur after a given time.
The log can be consolidated to remove repetitive updates so a
single larger log may replace a large number of the smaller
log.
[0070] Logs may themselves be partitioned in each server to allow
read clients to effectively search only those resources which they
are interested in. Each server can analyze path prefixes so the
logs can be partitioned into separate sub directories. This allows
read clients that are only interested in certain read prefixes to
avoid the overhead of filtering out log entries.
[0071] For example if two files are written . . . site
s/joemerchant/joesite/web/joe.detail.layout.xml.1 and
../system/items/00019919/baseitem.ion, the write handler has the
ability to detect the ../sites and different ../system/items, and
record the log entries in a separate sub directory such as
../writelog/sites versus ../writelog/items. This is configured with
simple path prefix matching. This mechanism may allow
"path-prefix+file extension" to be used to determine the
replication prefix.
[0072] As a result of the partitioning of the logs, the read
cluster for resources in the subdirectory ../system/items may have
a read load which is 1,000 times higher than required for the
../sites configuration data. Servers can process only the write log
entries they are interested in without looking at the others. This
optimization reduces the costs of analyzing log entries and
applying the copy filter at the replication client. One
implementation will include configuration based support to
partitioning write logs based on simple path prefix matching.
[0073] As also illustrated in FIG. 3, each server also provides a
virtual resource handler 308 which responds to the layer N+1 polls
for updates. The handler 308 will return immediately if it finds
any updates newer than the specified timestamp. If necessary the
server will merge in memory log updates and those represented in
multiple log files to obtain the data. This function supports
paging truncating the list at 5,000 items.
[0074] The virtual handler 308 allows the tier N+1 server to call
it repeatedly and block until a new update arrives. The server will
recognize a new entry immediately and return that single line or
multiple lines which allows a change recognition for an individual
resource. After processing the updates the tier N+1 will call the
same resource but will use a new timestamp that is equal to the
timestamp of the change of the last resource processed in the last
call.
[0075] This handler 308 can also accept a request for changes after
a write-count number, where the WriteCount is a serial number
representing the last write the client processed is available. In
this instance the server simply returns the first set of writes
occurring after a WriteCount.
[0076] Handler 308 implement a simple limit of 5,000 records. A
limit may be used because the replication client will take time to
replicate that number of resources referenced. The client calls the
same handler 308 again after processing the first set of changes
and receives the next set based on the timestamp or write count of
the last item in the prior batch. It repeats this loop until it is
blocked waiting for changes (as discussed with respect to FIG.
5).
[0077] In memory entries 360 may be flushed to storage after every
resource write. However, spooling of entries in memory and writing
to the disk in later batches also may be implemented. If used in
spooling mode, when an unclean shutdown occurs, the server can walk
the entire local storage resource directory structure to find
individual resource update timestamps and regenerate any that may
have been lost as a result of the spool not being flushed. The
immediate flush removes the need for such an extensive walk but
could degrade maximum write rates for the server.
[0078] As indicated above, resource content filtering on each of
the servers may be implemented (at for example steps 508 or 704
above). Every resource written has a predictable URI. This allows
N+1 tier servers to read URI from the next lower layer of server N
(or the authoritative store) and filter it. For example, where the
system 50 is used to implement a Web-based electronic commerce
system having, for example, items for sale from a catalog, a server
processing only catalog items could look only at the path prefix
such as ../system/catalog/items and ignore items such as
../system/siteconfig. This capability allows resources which have
read storm characteristics to be replicated more heavily and
through more layers to guarantee high speed access. It can be
particularly effective when the resources have low change rates
relative to the maximum read rates. This type of filtering is
particularly effective for application servers which only need a
subset of the data for their local caches. It minimizes the cost of
processing for unwanted fragments to a simple single line
evaluation of the URI without the associated GET or an extra
network round trip to fetch the unwanted resources.
[0079] Access control within each resource store can be treated at
the sub-directory or any child-directory by allowing a htaccess
file to be synchronized as one of the fragments. This requires the
local web server 302 to include a security handler enhanced to
reference these files and apply them to all files in a directory
and all children of a directory. An alternative is to have larger
.htaccess resources for the entire server in a location where it
can be easily replicated. This latter approach would require
triggering the reload of the htaccess file on receipt of new
changes or using a relatively short TTL.
[0080] FIG. 9 illustrates the log cleanup process which creates a
summary log 312. When using the above mentioned one minute write
log creation interval, log proliferation creates 3600 write log
files per day. This can slow a build when bringing new servers on
line because they may be processing changes to the same file
hundreds of times. To reduce this cost, each server runs a low
priority process which analyzes all updates to a given resource and
collapses updates to the same resources so only the last update for
a given resource is retained. This may be done on a periodic basis.
In one implementation, it is first on an hourly basis, then a daily
basis and may roll up across time so that, at the highest tier,
only the most recent version of a given file is retained in the
periodic summary log 312.
[0081] In FIG. 9, at 902, the cleanup process determines the last
file of a given resource on the server. Each resource may be stored
with a unique path which would prevent same named version
consolidation, however there is a semantic relationships across
files which allows the server to find only the set of files which
are referenced from a valid release and eliminate all version
numbered files which are not referenced. At 904, the process
identifies all references to all files which are not referenced and
which are older than a given time frame that can be deleted. The
most recent entry is recorded in the summary log. The process loops
thorough each resource at 906. It should be noted that the cleanup
process takes a lower priority and may run as a background process
so that any read and replication of resources takes higher priority
in the present technology.
[0082] Once all lines from a detailed log are recorded in the
summary log at 908 then the detailed write log can be deleted
910.
[0083] A timestamp of the beginning of the log cleanup is recorded
as part of the summary log. This timestamp is used by tier N+1
servers to determine where they should start processing detailed
log entries from tier N servers. This approach allows the summary
log to be created on a hot basis without blocking further writes or
replication from the server. As noted above, the log summary
process is a low priority so it does not affect the write or copy
rates from the system 50. In one embodiment, the log update process
is a lower priority process than the read process.
[0084] In some cases, a GET request will be made to a tier N+1 tier
server before a given resource has been replicated to that machine.
This would normally result in the GET request failing with a HTTP
404. In one embodiment, instead of immediately returning the 404,
each server is configured so the N+1 machine acts as a proxy and
fetches the required file from the previous tier of servers.
[0085] This process is illustrated in FIG. 10. At 1002, a layer N+1
server may receive a GET request for a resource. If the resource is
available, at 1004, it is returned at 1006. If the resource is not
available, the layer N+1 server looks to fetch the file from
another, previous layer N at 1010. When the resource is fetched at
1010, it is recorded to the local store in the N+1 at 1012. As an
alternative to being copied locally, a 30 I-type redirect to the
other peer server or tier N server may be used at 1010. In an
alternative embodiment, when the read servers are implemented with
environments which can provide very high speed peer to peer
communications, the read server may query 1 or more peers in the
same tier (e.g. tier N+1) for the content before using the GET from
the next tier at 1010. This is particularly effective when the
previous tier is on the other side of a slower WAN link.
[0086] When the resource is sought from the tier N server at 1010
if the file is present it will be returned at 1012. However, if the
resource requested from layer N is not available at layer N, the
tier N server will also perform the method of FIG. 10 looking to
the next previous tier (e.g. N-1) for the resource. If the file is
available, it is returned at 1012.
[0087] This proxy system can be augmented with TTL semantics which
invalidate local resources and trigger a cache miss. Each resource
or set of resources can be assigned a TTL value, preventing the
resource from being returned in response to a GET request after
expiration of the TTL value. The time for replication across all
layers should generally be shorter than the shortest average TTL
for any object in the cache tier. In a "worst case miss," where the
TTL expired at every layer, the delay time is only the sum of the
latencies of the read layers and in most cases, this would only
propagate through a fraction of the layers.
[0088] As noted above, a TTL value may be assigned to each resource
or set of resources. In some cases, TTL based request storms can
occur where a large number of resources expire at the same time.
This can be eliminated by providing a rule which states that no
file can be less fresh than the current replication lag on a given
server. Since the server knows that no resource has changed that it
does not know about, it can completely eliminate the traditional
HTTP GET or HEAD needed to ensure its content is sufficiently
fresh.
[0089] In one embodiment, an intelligent TTL handler 318 is
provided which first compares the last time stamp of update from
the log system and uses it to override the age calculations for
individual resources. This allows an update of 0 or more files from
the log stream to reset all TTL ages. This can completely eliminate
all TTL related GET or HEAD requests even for a very large file
sets It is reasonable to expect a single system to store 10 million
or more resources. If each resource had 10 minute TTL and if they
were managed through a typical cache system and the server was
heavily visited by robots, it could result in 1.4 billion cache
misses per day. In contrast the intelligent TTL approach would have
very close to a misses which effectively reduces the read load
against the resource by 1.4 billion requests per day.
[0090] In general, update rates are limited by the maximum write
rate of a given write cluster, e.g. the update rate of a tier N+1
tier is limited by the write rate of the N tier. In some
applications, limiting write rates may be unacceptable, especially
in large scale systems. In order to address this limitation,
vertical partitioning is used in the system.
[0091] Vertical partitions are illustrated in FIG. 2. Each
partition may be addressed by a different address, which in FIG. 2
is represented as P1.example.domain, P2.example.domain, . . . .
Pn.example.domain. All clients accessing www.example.domain are
redirected or routed to a proper partition based on a computed
partition. Each request results in a hash code calculation which
identifies a hash bucket of respective URIs from which the URI is
resolved and the request routed based on the partition address.
[0092] Each partition can meet the same availability and durability
requirements of its authoritative store 100. This allows all writes
for a subset of content to be routed to different servers within a
given tier. The partitioning may also be driven based on total
storage requirements rather than update loads. In general,
partitions are reflected vertically to allow read rates that are
much higher than write rates for any single partition. The new
write rate limit is the sum of the maximum write rate for all
partitions and the system can scale to larger number of partitions
at need.
[0093] In order to implement partitioning and efficient addressing,
addressing is based on a 16 bit hash key and an assumption of 1,000
writes per second of 0.1K to 100K files per authoritative write
store. The 16 bit hash key provides a maximum of 32,768 hash
buckets which if all writing at maximum rates would allow 32.768
million writes per second.
[0094] Any single cache signature could be allocated to each
partition and a single partition may be dedicated to a given hash
key which allocates an entire cluster of hardware to service writes
for single cache key. It is possible that hash key overlaps could
result in traffic that exceeds that capability of a full cluster.
When this condition is detected a second tier lookup may be used.
In this second index, the hash key is calculated using an
alternative algorithm and then indexed to a specific cache
partition using the second hash signature to index into a separate
partition map. This is expected to be a rare occasion so the second
hash map is treated as a sparse matrix.
[0095] Each server may be provided with a unique name. The unique
name is mapped to a partition (p1, p2 . . . pn) and a layer (VIP0,
VIP1, . . . VIPn) using a simple configuration file. Each server
knows the partition to which it has been allocated. Each tier of
servers of each vertical partition can have a unique VIP (DNS
name). The router(s) handles routing of requests to all machines
registered under that VIP domain name. The routing may be handled
in a round-robin fashion, or other by other balancing techniques.
Each server at a given tier (tier N+1) can be configured to know
the VIP name of the next lower tier (tier N) of servers. All
configuration elements are represented as simple resource fragments
that are replicated to all servers as part of standard replication
process.
[0096] A standard hashing algorithm is used to produce a 16 bit
integer from the URI of the resource. This integer is used in a
hash table lookup where it resolves integer number which may be
between 1 . . . N partitions. This integer is used to look up a
partition number. This information may have a form similar to:
[0097] 10109,3
[0098] 10110,3
[0099] 10110,6
[0100] This file information is generated and stored as an ASCII
resource and distributed through a non partitioned branch to all
replication client servers. Each of the possible buckets is mapped
to a partition. The simplest version of this file would contain a
single entry per possible hash code which is estimated at 16 bytes
of memory per hash code would consume 512K of Memory. Extension to
support hash ranges may be added which may reduce memory
consumption. This is assumed to be an in memory hash table which
allows rapid lookup of the bucket.
[0101] One the system has identified the bucket number the bucket
number is resolved to a given URI for the front most edge of the
partition where the data for a given URI path exists. This second
lookup is used because it is unlikely that any system will actually
use N partitions. In source form this would look as follows.
[0102] 3, p3.example.domain
[0103] 4, P4.example.domain
[0104] 5, P5.example.domain
[0105] 99,P1.overflow.example.domain
[0106] The source files for these maps are stored at the
authoritative write store for the cluster and are replicated like
any other resource. They are replicated using the option to skip
partitioning which allows them to be replicated to all partitions
which effectively ensures they are available on all servers.
[0107] Other addressing schemes may be utilized in accordance with
the present technology. As noted above, a path prefix analysis
partitioning applied before hashing allows this partitioning to be
extended to filtering as necessary.
[0108] In certain situations, a given client may try to access
content at edge tier servers which may not contain the data needed.
In the present implementation, it is desirable that each server at
each subsequent tier (e.g. tier N+1) not be required to understand
the vertical partitioning, allowing flexibility in the
configuration or number of partitions.
[0109] The use of the HTTP redirect described in FIG. 10 allows the
edge servers to return resources without knowledge of vertical
partitioning. The HTTP redirect does require each layer of each
vertical partition to be assigned a unique virtual domain name to
allow re-routing.
[0110] In many cases the maximum write rate for given write
partition is very similar to the maximum write rate for downstream
cache servers so the partitioning may need to be replicated all the
way to the edge cache. Consistent hashing is used to determine
which partition the data for a given URI will be located.
[0111] All updates are effectively written over existing resources
of the same path. To avoid the possibility of returning a resource
which has been partially updated if a server where to request the
resource while it is being updated, all replacement of existing
resources can be written to a different key space such as "original
file path+timestamp+.tmp." Once the new copy has been fully
written, the old version can be deleted and the new version renamed
to the original key. If updating a database resource, any
individual update is assumed to be atomic which means the local
replication client can download the entire resource and process its
updated in a single database transaction.
[0112] With reference to FIG. 10, for inbound read requests where a
server at the N+1 tier determines that a given request for a URI
resides in the partition it is responsible for serving, it will
process the request from its local cache as normal. If it
determines that the data resides in a different partition, it may
return a HTTP redirect to the appropriate partition. For example a
client may access example.domain/joe/parts and the sub URI
"/joe/parts" resolves to partition 4 which is mapped to
P4.example.domain which is returned via a HTTP redirect to the
client as P4.example.domain/joe/parts. P4.example.domain represents
a DNS resolvable name which may be a round robin queue which will
be fronting from 1 . . . N servers. The client then fetches the
appropriate content from redirected URI. The client may also choose
to remember where it was redirected for that piece of content to
avoid the 2 step process in the future. Alternatively the server
which receives the data may choose to act as a proxy and directly
access P4.example.domain/joe/parts and retrieve that data on behalf
of the client.
[0113] Write request handling is largely processed the same as a
read request. The primary difference is that the write or PUT
request is by default proxied by the receiving server to the
appropriate partition by default.
[0114] As the system grows additional partitions will be needed.
The process for adding new partitions is illustrated in FIG. 11. At
1102, a temporary configuration is created to determine which hash
codes will be mapped to the new bucket. At 1104, the temporary
configuration is replicated to all servers, with the original
configuration is retained under a different name for use during
creation of the new partition by writing servers until the resource
copy is complete. The changes are replicated to all servers. At
1106, servers in the new partition begin to copy content from the
servers where the content currently exists. At 1108, during the
copy process all read requests for content needed but not yet
copied are retrieved as a proxy from the current location which is
determined by referencing the old configuration information from
hosts in new partition. All hosts outside the new partition map all
read and write requests to the new partition URI. Once the write of
each piece of content has been confirmed at 1110, the resources are
deleted from the old servers at 1112. The delete may be delayed
slightly to allow propagation through all layers of the cache in
the new cluster.
[0115] Partition resolution data can be changed and replicated when
all network partitions are available. All replication clients will
not receive data simultaneously so data moved from one partition to
another can be reached at either point. Update write requests may
be routed to the old partition for a period of time.
[0116] In some cases, content for the new partition is scattered
randomly across existing partitions. New partition write servers
retrieve the full list of all content from each partition by
reading their summary logs. They apply the partitioning lookup on
each resource listed in the summary log and determine if it belongs
in their partition by applying the hash semantics. If so, a GET is
used to fetch the resource and do a local PUT of the same that
resource to their local store. After the write has been confirmed
the original content is deleted from the old location.
[0117] This requires use of an additional query parameter which
causes the old location to temporarily ignore their partitioning
logic otherwise they would issue a HTTP redirect.
[0118] Partition removal is handled in a similar manner. The
primary partition configuration is left unchanged while a temporary
configuration of the new server mappings is written and propagated
to the authoritative write server of the partition to be removed.
At this time, all write requests are proxied by the partition to be
removed to their new location by the authoritative write servers in
the removal partition. The authoritative write store walks its
content tree and issues a PUT to the new partition for each
resource based on its calculated location based on the new
partition map. Then it issues a delete for the resource after
receiving an acknowledgement from the PUT. Any read requests are
proxied to the new calculated location. If the new location does
not have the resource the local store is checked. This may be
reversed for optimization.
[0119] During this time a portion of the content will be in the new
partition and a portion will be in the old location and the amount
will change as the copy and delete operation continues. When all
content has been removed and copied to its new location the primary
partition configuration is updated to reflect new data locations
and is propagated to all servers.
[0120] After the configuration information has been propagated to
all servers the old partition can be removed. Ideal to leave this
in place for a period of time and only remove after it receives no
requests for a period of time. It is also viable to remove all but
one server and remap all layers to this server which simply acts as
a proxy until all servers have started using the new partition
map.
[0121] Partition splitting may be performed in the same manner as
the creation of a new partition. The main difference is that all
resources which need to be moved reside on a single partition which
allows the split to occur as a result of a single walk.
[0122] To minimize replication of files that may have been written
on the server but did not really change on the client, a MD5 type
hash can be used on the contents of the file. The reading server
(tier N+1) can compare its hash code for the files current contents
with the hash in the write log. If the hash is identical, then the
tier N+1 server can simply change the modification time of the
resource to reflect the server timestamp and skip the GET. The
unique hash code may be added to the write log immediately before
the relative path as shown by the string "82828288" in the sample
below:
[0123] ws1,27,
20090131102,W,82828288/example/items/00019919/[filename]
[0124] Depending on data freshness requirements it can be necessary
to limit the size of items written into any partitioned write area.
This is necessary because a number of larger resources such as
images may delay replication of smaller fragments if they are mixed
in the same log partition. By partitioning larger files into
separate partitions they can be replicated at a different rate.
This is based on the assertion that some files need to be
replicated quickly while a large image or video would have less
impact. A very large file can prevent propagation of the next file.
The copy of a large multi gigabyte file could take several minutes
over a traditional WAN link which would reduce the freshness of any
files in the queue after the larger file. If this conditions
persists for long enough, the TTL for some content could exceed the
server replication age which would trigger a larger number of cache
misses for content with a TTL shorter than the replication lag.
[0125] To prevent large files from delaying replication of smaller
files a special semantic is used. In lieu of replicating large
files immediately, a smaller place holder or proxy file is written.
This proxy file is replicated as normal. The replication client
recognizes these proxy files and adds the need to replicate the
larger file named in the proxy to a lower priority replication
queue. This allows the replication client to move onto subsequent
files with no extra delay. If the large file replaces an existing
file then the proxy file may delete the original file at the time
the proxy file is detected or mark that file for special expiration
so that if it has not been replaced by the time its TTL expires the
original is deleted.
[0126] If a client attempts to access the larger file before it is
replicated then the last version present would be served unless it
has exceeded the TTL. If the TTL has expired or if the resource has
not arrived then it is treated as a standard cache miss.
[0127] In some instances an excess number of large files may exceed
the disk space available in the servers. In this instance, an
alternative is used which allows the remote replication server to
defer fetching the file until first accessed and to clean these
files using standard least recently used (LRU) cache
algorithms.
[0128] When many large files are stored, they may be saved in an
external storage array or other system 250 optimized for large
scale resource management. When using the external array, the
client 370 or higher tier is responsible for writing a copy of the
file to the array or media service 250, and generating a ORI where
the resource can be accessed. This URI is included in the proxy
file which is replicated using the standard mechanism. When the
replication client receives the proxy, it can choose to either
retrieve the resource or to wait until the first request for the
resource is made and then manage the total disk space usage using a
LRU mechanism to clean out the least used resources.
[0129] When using large file optimized services 250, the reading
server may retrieve large resources directly from those stores to
minimize extra network overhead. In that instance the server may
return a HTTP redirect to the reading server with the URI where the
larger resource is available. This presumes the large resource
storage can be accessed by the reading server.
[0130] When writing larger files, extra attention can be paid to
ensuring the resource is written completely before the reading
server is allowed to access the local copy. To ensure this occurs,
the resource is written under a different name such as "requested
path+timestamp+.tmp" and renamed when the write is complete.
[0131] Returning again to FIG. 3, any replication server may
implement handlers 306 which transform the data pulled from a
higher tiered server. The transformed data may be used by the
client 370 or a replication server in another layer to trigger
updates to back end services and databases. By convention the
original resources should not be changed. If the data is
transformed or changed so that is not byte compare equivalent then
it should be stored under a new resource name.
[0132] Client 370 includes handlers 306 that allow transformation
and write to alternative resources names that can be implemented in
the same language as the client and dynamically loaded based on
path matching and content type semantics. The pluggable
transformation agents may also be used to call API in other
services which allow local services which have their own
repositories or databases to be updated based on changes in the
content.
[0133] Data transformation may be used in a number of contexts. One
of these is summarizing the first page of reviews shown in a detail
page. This data only changes as the reviews are approved so the
data summary view changes relatively infrequently.
[0134] Transformed data may include elements of web pages which
require updating, where the entire portion of the page does not
require updating. Consider, for example, an electronic commerce
system where a number of items is offered for sale. It may be
desirable to determine the "best" offer from amongst a series of
sales offers. In one embodiment, the calculation may be made and
written to the authoritative store. In another embodiment, this
calculation and accompanying data may be made at one or more of the
tiers in system 50.
[0135] In some instances, a request may be received for a resource
that would normally be generated during transformation and which
has not yet been generated. This would generate a cache miss that
in many instances would propagate all the way back to the
authoritative store.
[0136] In one embodiment, the authoritative store handler 306 can
dynamically generate the resource on demand. This eliminates any
need for the lower tiers to have custom handlers for data
transformation on the fly. In another embodiment, edge tier servers
can to detect the cache miss and compose the resource needed by
accessing the other pre-transform resources.
[0137] It may be desirable to use the CPU resources present in one
of the servers to dynamically generate the transformed resources
when needed. In this instance, a handler which represents a virtual
resource is used. The server first detects a cache miss and before
attempting to access the next lower tier, checks its list handlers
(which may be local or remote), and uses that handler to
dynamically generate the missing resource using other resources or
external data sources. Once this is done the server returns the
resource as needed. It saves a local copy and writes the generated
resource back to the authoritative store using a PUT. The write
back is based on the presumption that if accessed once the same
resource has a higher probability of being needed again and there
is no guarantee that the next access for the same resource will
land on the same server where content was dynamically
generated.
[0138] Certain cases may occur when a file which contained data
used in a transform changes the transformed view can be
invalidated. For example, using the electronic commerce example, a
product summary record may have been generated using data from many
sources including 1 . . . N offers. When one of the offers changes,
the generated view needs to be deleted so it is not used and is
forced to be regenerated. Generation of this type of transform
should occur in the authoritative write store where the generation
can be triggered at the time of change. However, this approach
consumes resources in the authoritative store. Another approach is
to scale the transforms such that this type of entry is regenerated
on a sufficiently frequent basis that the new transform is
available before the TTL in traditional caches expires. To support
this in the servers, a list of dependant transforms is maintained
for each atomic asset. This list is referenced whenever a given
asset is changed and then the dependant transforms are scheduled
for deletion. This analysis or detection can be assigned to a small
number of servers at the lowest tier set of servers possible and
the deletes can be written against the authoritative store using
the standard process. This can be implemented so the priority
deletion servers are present in each vertical partition. The
servers responsible for deletion processing may be configured to
handle fewer or no inbound cache requests so they can allocate a
majority of their capacity to CPU detection. Priority deletion
servers may be allocated a subset of the write logs using the
standard filtering or log partitioning to guarantee rapid
response.
[0139] Additional data transformations can be enabled at the sever
tiers closest to the edge (e.g. servers 140) which transform the
basic file fragments into those optimized for rapid rendering of
common pages. This transformation is done by low priority
processes. Any new fragments generated can be replicated to other
servers, which may receive a request for the same content. A
specialized handler (not shown) may be implemented so that a cache
miss of this content can cause the content to be rebuilt from the
lower tier fragments present on the server. An alternative cache
miss strategy is to allow the servers to rebuild the transformed
representation of the data based the lower tier fragments directly.
Leveraging background processes in this fashion allows higher
effective machine utilization during idle times while minimizing
work done to yield final rendering forms of the data during peak
times.
[0140] Due to the number of hosts participating in the edge cache,
there is a substantial amount of unused CPU power during non peak
moments on these machines. To maximize the benefit derived from
these servers, partitioning of the data they traverse when building
transformed data types can be utilized. The actual partitioning
information can be replicated as file fragments and treated as a
queue so each summarization process is awarded small units of work
from the queue.
[0141] When summarization work is done by the edge cache, it may be
replicated to all other servers serving the same type of data at
the same tier of the system 50. Data may be written into all
machines of a lower layer. By writing such data to a lower tier,
the transformed data is automatically replicated towards all
servers in the edge cache which deal with the same set of data.
[0142] It is ideal if the server layer supports registration of
dependencies for transformed or summary views so that if any of the
file fragments referenced to build the transformed view change they
any transform generated views that where built based on the content
of those files is automatically invalidated. This is ideally
extended to allow registration of that summary view for rebuild on
a priority but less than real-time basis.
[0143] Some use cases mandate that the freshest data be used. A
good example of this is in electronic commerce system where a
customer has recently changed a shipping address. In this case, the
most recent current shipping address should be provided on any page
rendered by the web server 150, even though the page rendering may
be from a different rendering server than the one responsible for
the update request.
[0144] One solution is to identify such cases using a standard HTTP
header cache-request-directive "no-cache". If this is received by
an edge server, then all system tiers may treat this as a cache
miss and will proxy the request to the next lower layer until the
first layer or authoritative store is reached. This technique can
create request storms on relatively constrained hardware. Due to
the ability of this directive to create request storms at lower
tiers, one solution is to not honor the request and issue an
appropriate error message.
[0145] Another solution is to allow the standard HTTP
cache-request-directive "max-age" to specify that data can be fresh
within a given time frame. This can be used in conjunction with
"max-stale" which allows the server to return data that may be
stale but the server can attach a warning 110 (Response is stale)
if the content age exceeds the max-age. For example if the max-age
of 1 second is used for a customer-shipping-address.xml, the server
will check's its recorded server age. If the replication age is
older than 1 second, the server will check document ages and if it
older than 1 second, the next lower server tier will either return
the content or refer to the next lower tier until it reaches the
authoritative write store. In most instances the replication will
be complete before the client request arrives at the authoritative
write store. However, if the data has only replicated through a
portion of the server layers, it will be found at the highest layer
it has made it to and then pulled forward. It is desirable to use
the largest acceptable max-age to minimize cache misses.
[0146] In the event of a network partition which prevents a server
in one layer from reaching the next lower layer that tier will
return the most recent data it has and will return the Warning 110
(i.e. response is stale). Each cache layer will update its local
age for the content that is retrieved in this fashion to prevent
the next cache miss. This warning can be returned through all
layers to the reading server.
[0147] The tier servers may use standard HTTP HEAD or GET which
allows the servers to return a 304 indicating the content has not
been changed rather than copying content which has not changed. If
the client receives the warning 110, it most likely indicates a
network partition failure.
[0148] In the case of the customer shipping address, mentioned
above, a small lag may be allowed before rendering the content. For
content of this type, the use of relatively small files and small
vertical partitions allows rapid propagation. Assuming a blocking
read on the log changes (defined above) and a 4 to 1 server mix, a
sub 2 second propagation can be delivered in a 3 tier, moderately
loaded cache which provides 16 read servers. At an average read
rate of 5,000 reads per server per second, this supports a max read
rate of about 80,000 reads per seconds with a sub 2-second
propagation delay.
[0149] As defined above, under normal operating conditions, all
write requests are proxied to the authoritative write store for the
vertical partition which currently owns content a given URI space.
This creates a problem in the event of network partition failure
where clients of the system need to updated.
[0150] An example of this would be using two geographically
distinct datacenters, where one data center, for example located in
the United States, provides backup for a second data center, for
example provided in Europe, during outage conditions, and the
authoritative store for a given set of data such as Customer
profile for European users is normally located in Europe and
replicated to the USA. During the failure condition when the USA
data center is in operation assume that a customer wishes to change
a component of their profile such as shipping address. In the
normal operating condition European cache server would receive the
request and simply proxy it to the proper local authoritative
store. During the failure condition the USA server cannot reach the
European server which would prevent the write from occurring which
would be represented to the end user as an availability issue.
[0151] There is a need for cache servers operating in a remote data
center to optionally allow local spooling of writes during network
partition failures. For data which can tolerate some inconsistency,
the process of FIG. 12 is utilized.
[0152] As illustrated in FIG. 12, at 1202 one or more of the
servers in a layer (N) of local servers in the local network
partition detect failure to connect to servers on another
partition.
[0153] At 1204, the local servers in the layer of the partition
elect a local master. This master goes into local write mode and
acts as the authoritative server for the local partition. If the
local master fails a new local master is elected. At 1206, peer
servers at the same tier in the local partition are temporarily
reconfigured to treat the elected master as local master. All write
requests are routed to the locally elected proxy using a proxy
mechanism such as that discussed above for partitioning. Local
servers at same tier temporarily reconfigure to point at the
elected local master to pull change logs.
[0154] At 1208, the local master saves the updates in its local
store and records them in its local write log using a different
machine ID to identify the log. At 1210 all changes are replicated
to lower tier servers in the same network partition using the
processes discussed above with respect to FIG. 5. At 1212, the
current local master continues to test ability connect to servers
in remote network partition.
[0155] When restoration of network connectivity to the missing
partition is detected, at 1212, for each resource, the local master
processes and sorts its write log to find the most recent update at
1214. To perform this function, an index may be maintained of the
log, allowing the local master to find the last write of a resource
written while it was the local master. At 1216, a determination is
made as to whether the local resource is newer than the remote at
1216. If so, it issues a GET against the local server and PUT
against the remote server which updates the normal authoritative
store. In another alternative, the log can be read sequentially by
reading through the local log starting at the first item it wrote
using the new machine ID after being elected as local master. The
remote store records this as a normal update and will end up
overlaying the local version. If the remote content is newer at
1218, then an error is logged for manual reconciliation at 1220,
the local content is copied to a new numbered resource name, and
the new name is added to the error log to allow future
reconciliation. The remote content is fetched and overlays the
local content at 1222.
[0156] Once all files updated at 1224, then at 1226 the local
master sets a special resource file which is detected by the local
replication servers to shift all peers at the same tier in the
local partition to refer to the remote partitions. All servers in
the local partition begin processing changes from remote
authoritative store starting from a point before the failure
occurs. Eventually they are brought fully up to date when they have
processed all the changes which occurred while network connectivity
has been down.
[0157] Alternatively, at 1214, each resource on the local server
may be processed if the total number of changes in the local log
with the new machine ID is greater than some threshold (either an
absolute threshold or a percentage of total resources on the
server, for example).
[0158] Some data, for example banking transactions, does not allow
the possibility of conflicting changes. To support this, servers
can be able to analyze the local path and not accept changes for
data having strict consistency requirements. If data in the files
can be updated at a finer granularity, such as at the atomic data
element in a XML structure, then the process may be applied at a
finer granularity.
[0159] Some consistency issues can be overcome by using the
versioned numbered files identified above. If each version of each
file is retained then it is possible to write automated or manual
processes which can be used to reconcile the content across
versions to derive a valid master version.
[0160] FIG. 13 is a high-tier diagram of the computing system which
can be used to implement any of the computing devices discussed
herein. The computing system of FIG. 13 includes processor 1300,
memory 1302, mass storage device 1304, peripherals 1306, output
devices 1308, input devices 1310, portable storage 1312, and
display system 1314. For purposes of simplicity, the components
shown in FIG. 13 are depicted as being connected via single bus
1320. However, the components may be connected through one or more
data transport means. In one alternative, processor 1300 and memory
1302 may be connected via a local microprocessor bus, and the mass
storage device 1304, peripheral device 1306, portable storage 1312
and display system 1314 may be connected via one or more
input/output buses.
[0161] Processor 1300 may contain a single microprocessor, or may
contain a plurality of microprocessors for configuring the computer
system as a multiprocessor system. Memory 1302 stores instructions
and data for execution by processor 1300. If the technology
described herein is wholly or partially implemented in software,
memory 1302 (which may include one or more memory devices) will
store the executable code for programming processor 1300 to perform
the processes described herein. In one embodiment, memory 1302 may
include banks of dynamic random access memory, high speed cache
memory, flash memory, other nonvolatile memory, and/or other
storage elements.
[0162] Mass storage device 1304, which may be implemented with a
magnetic disc drive or optical disc drive, is a nonvolatile storage
device for storing data and code. In one embodiment, mass storage
device 1304 stores the system software that programs processor 1300
to implement the technology described herein.
[0163] Portable storage device 1312 operates in conjunction with a
portable nonvolatile storage medium, such as a floppy disc, CD-RW,
flash memory card/drive, etc., to input and output data and code to
and from the computing system of FIG. 13. In one embodiment, system
software for implementing the present technology is stored on such
a portable medium, and is input to the computer system via portable
storage medium drive 1312.
[0164] Peripheral devices 1306 may include any type of computer
support device, such as an input/output interface, to add
additional functionality to the computer system. For example,
peripheral devices 1306 may include a network interface for
connecting the computer system to a network, a modem, a router, a
wireless communication device, etc. Input devices 1310 provide a
portion of a user interface, and may include a keyboard or pointing
device (e.g. mouse, track ball, etc.). In order to display textual
and graphical information, the computing system of FIG. 13 will
(optionally) have an output display system 1314, which may include
a video card and monitor. Output devices 1308 can include speakers,
printers, network interfaces, etc.
[0165] The components depicted in the computing system of FIG. 13
are those typically found in computing systems suitable for use
with the technology described herein, and are intended to represent
a broad category of such computer components that are well known in
the art. Many different bus configurations, network platforms,
operating systems can be used. The technology described herein is
not limited to any particular computing system.
[0166] Numerous variations on the above technology are possible.
Non file based stores can be updated using the same replication
strategy. In this instance, the data source can be modified to
provide the update logs and the individual data records can be made
available via HTTP GET at unique URI.
[0167] The sync client 370 can be easily modified to update a local
database in lieu of local files. It is equally viable to store the
elements retrieved from a remote database as local files. In
general, small static files can be served quickly and inexpensively
from standard caching HTTP servers, delivering overall cost benefit
while requiring minimum of investment to move data resources
forward for high speed access.
[0168] Although the subject matter has been described in language
specific to structural features and/or methodological acts, it is
to be understood that the subject matter defined in the appended
claims is not necessarily limited to the specific features or acts
described above. Rather, the specific features and acts described
above are disclosed as example forms of implementing the
claims.
* * * * *
References