Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,78 @@ s3-pit-restore comes with a testing suite. You can run it with:

### Run all the test cases:
`$ ./s3-pit-restore -b my-bucket -B restore-bucket-s3 -d /tmp/ -P restore-path --test`


## Point-in-time restore strategy

Restoring an S3 bucket to a given point in time means traversing all object
versions and delete markers to determine which ones were the latest at the given
time.

The overall strategy is fairly simple, since it's a matter of comparing
timestamps of versions and delete markers, and restoring the one that's closest
in time to the given point-in-time.

### S3 API details

The source of versions and delete markers in an S3 bucket is the
ListObjectVersions API call. The response is paged with a maximum of 1000
entries per page. Each page may have a list of up to 1000 versions and/or a
separate list of up to 1000 delete markers.

The versions and delete markers are returned in order of key name and recency
(newest first). As S3 iterates over these, it appends versions and delete
markers to separate lists and responds with a page when it reaches the requested
page size (1000 by default).

For reference, [here's how localstack implements it](https://github.com/localstack/localstack/blob/v2.3.2/localstack/services/s3/v3/provider.py#L1491).

Examples:

* If there are 1001 versions of a given key, then the first page contains only a
list of versions, and that list has the most recent 1000 versions of the
object. The next page has 1 version (the oldest) of that same object.

* If there are 1001 versions and 1001 delete markers for a given key, then:

* If all the delete markers were placed after the versions, then:

* Page 1: 1000 delete markers

* Page 2: 1 delete marker, 999 versions

* Page 3: 2 versions

* If the delete markers and versions are mixed in time, say if an object is
deleted and written over and over, then:

* Page 1: 500 versions, 500 delete markers

* Page 2: 500 versions, 500 delete markers

* Page 3: 1 version, 1 delete marker

The number of versions or delete markers for a key can easily span two or more
pages, so they must be tracked between pages. Only when a new key name is
observed can we be sure that we have seen all versions or delete markers for a
key.

### Ordering

Since S3 has 1-second granularity on the LastModified timestamp, multiple
versions and/or delete markers may end up having the same timestamp.

To determine which version or delete marker is newest when they all have the
same LastModified timestamp:

* If the IsLatest attribute is true, then this object is the newest.

* For previous versions where neither is the latest, assume that S3's order is
correct and use the first of the two versions as the most recent one.

* For previous delete markers, it doesn't really matter: They both represent a
deletion anyway.

* For a previous version vs a delete marker, it is impossible to say which is
the most recent one since they are kept in different lists. In this case, the
version takes precedence over the delete marker.
165 changes: 78 additions & 87 deletions s3-pit-restore
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,6 @@ def do_restore():
dest = args.dest
last_obj = {}
last_obj["Key"] = ""
# The key that was given for the latest version that was flagged with IsLatest=true.
is_latest_key = None
# The etag that was given for the latest version that was flagged with IsLatest=true.
is_latest_etag = None

if args.debug: boto3.set_stream_logger('botocore')

Expand All @@ -372,108 +368,103 @@ def do_restore():
os.chdir(dest)

paginator = client.get_paginator('list_object_versions')
obj_needs_be_deleted = {}
page_iterator = paginator.paginate(Bucket=args.bucket, Prefix=args.prefix)
# Delete markers can get desynchronized with the versions markers in the pagination system below.
# To avoid this, we will push from page to page the desynchronized markers until they fall on the
# page they should (the one with the versioning markers for the same set of files)
previous_deletemarkers = []
for page in page_iterator:
if not "Versions" in page:
print("No versions matching criteria, exiting ...", file=sys.stderr)
sys.exit(1)
versions = page["Versions"]
# Some deletemarkers may come from the previous page: add them now
deletemarkers = previous_deletemarkers + page.get("DeleteMarkers", [])
# And since they have been added, we remove them from the overflow list
previous_deletemarkers = []
dmarker = {"Key": "", "IsLatest": False}
for obj in versions:
if last_obj["Key"] == obj["Key"]:
# We've had a newer version or a delete of this key
continue

if obj["IsLatest"]:
is_latest_key = obj["Key"]
is_latest_etag = obj["ETag"]

version_date = obj["LastModified"]
# A running aggregate of objects versions and delete markers, by key.
aggr_objects_by_key = {}

if version_date > pit_end_date or version_date < pit_start_date:
if pit_start_date == datetime.fromtimestamp(0, timezone.utc):
obj_needs_be_deleted[obj["Key"]] = obj
for page in page_iterator:
# Add this page's versions and delete markers to the running aggregate.
for obj in page.get("Versions", []) + page.get("DeleteMarkers", []):
# Add helper attribute to discern versions and delete markers.
obj["IsVersion"] = "Size" in obj

# Append the object to the running aggregate of objects.
aggr_objects_by_key.setdefault(obj["Key"], []).append(obj)

# Sort the aggregated keys.
aggregated_keys = sorted(list(aggr_objects_by_key.keys()))

# Remove the last key in the list if we haven't reached the last page,
# since that key name may have more versions or delete markers on
# following pages. This leaves us only with keys that we definitively
# have all the versions or delete markers for.
if page["IsTruncated"] and aggregated_keys:
aggregated_keys.pop()

# Process the list of aggregated objects, now known to be complete as of
# the current page. The list may be empty if we've only seen a single
# key's versions or delete markers so far (i.e. more might follow).
for key in aggregated_keys:
# Iterate over all versions and delete markers for this key,
# reverse-ordered by LastModified, IsLatest and IsVersion.
objects_sorted = sorted(
aggr_objects_by_key[key],
key=lambda o: (
o["LastModified"], # newest first
o["IsLatest"], # latest before non-latest
o["IsVersion"], # versions before delete markers
),
reverse=True,
)

# Keep track of this key's latest version or delete marker.
obj_latest = next(o for o in objects_sorted if o["IsLatest"])

# Only consider objects within the restore window.
objects_filtered = list(filter(
lambda o: pit_start_date <= o["LastModified"] <= pit_end_date,
objects_sorted,
))

# Special case to restore the non-existence of an object within the
# same bucket, if the latest object is a version.
if len(objects_filtered) == 0:
if args.bucket == args.dest_bucket and obj_latest["IsVersion"]:
handled_by_delete(obj)
continue

# Dont go farther in the deletemarkers list than the current key, or else we risk consuming desync delete markers of the next page
# (both versions and deletemarkers list are sorted in alphabetical order of the key, and then in reverse time order for each key)
while deletemarkers and (dmarker["Key"] < obj["Key"] or (dmarker["Key"] == obj["Key"] and dmarker["LastModified"] > pit_end_date)):
dmarker = deletemarkers.pop(0)
if dmarker['IsLatest']:
# The given object is already deleted and does not have to be deleted again.
obj_needs_be_deleted.pop(dmarker["Key"], None)

#skip dmarker if it's latest than pit_end_date
if dmarker["Key"] == obj["Key"] and dmarker["LastModified"] > obj["LastModified"] and dmarker["LastModified"] <= pit_end_date:
# The most recent operation on this key was a delete
last_obj = dmarker
continue
# Restore the most recent object for this key.
obj = objects_filtered[0]

# This version needs to be restored..
last_obj = obj
# Handle object version.
if obj["IsVersion"]:
if handled_by_glacier(obj):
continue

if handled_by_glacier(obj):
continue
# Handle all cases where a destination bucket is specified.
if args.dest_bucket is not None:
# Copy to other bucket or prefix.
if args.bucket != args.dest_bucket or args.dest_prefix:
handled_by_copy(obj)
continue

if args.dest_bucket is not None:
obj_needs_be_deleted.pop(obj["Key"], None)

# We can skip the restore if the current version is equivalent to the newest version
# of the object and if we want to restore it to the same bucket and path.
# is_latest_key == obj["Key"] also ensures that the object is not currently deleted,
# because a version with IsLatest=true was observed.
if is_latest_key != obj["Key"] or \
is_latest_etag != obj["ETag"] or \
args.bucket != args.dest_bucket or \
args.dest_prefix:
# Same bucket and prefix, in-place restore.
# Restore version if it's not already the latest.
if obj_latest["VersionId"] != obj["VersionId"]:
handled_by_copy(obj)
continue

if not handled_by_standard(obj):
return
if not handled_by_standard(obj):
return

# The last dmarker may belong to the next version (if dmarker["Key"] != obj["Key"] ), keep it
previous_deletemarkers.append(dmarker)
# And all following may too, if any, so add them now.
while deletemarkers:
previous_deletemarkers.append(deletemarkers.pop(0))
# Handle object delete marker.
else:
# Restore delete marker if the latest object is a version.
if obj_latest["IsVersion"]:
handled_by_delete(obj)

# Remove the processed key from the running aggregate.
# FIXME this doesn't work as long as there are early continues and returns above!
del aggr_objects_by_key[key]

for future in concurrent.futures.as_completed(futures):
if future in futures:
try:
future.result()
print_obj(futures[future])
except Exception as ex:
print('"%s" %s %s %s %s "ERROR: %s"' % (obj["LastModified"], obj["VersionId"], obj["Size"], obj["StorageClass"], obj["Key"], ex), file=sys.stderr)
del(futures[future])

# Process leftover delete markers.
while previous_deletemarkers:
dmarker = previous_deletemarkers.pop(0)
if dmarker['IsLatest']:
# The given object is already deleted and does not have to be deleted again.
obj_needs_be_deleted.pop(dmarker["Key"], None)

# delete objects which came in existence after pit_end_date only if the destination bucket is same as source bucket and restoring to same object key
if args.dest_bucket == args.bucket and not args.dest_prefix:
for key in obj_needs_be_deleted:
handled_by_delete(obj_needs_be_deleted[key])
for future in concurrent.futures.as_completed(futures):
if future in futures:
try:
future.result()
print_obj(futures[future])
except Exception as ex:
print('"%s" %s %s %s %s "ERROR: %s"' % (obj["LastModified"], obj["VersionId"], obj["Size"], obj["StorageClass"], obj["Key"], ex))
print('"%s" %s %s %s %s "ERROR: %s"' % (obj["LastModified"], obj["VersionId"], obj["Size"], obj["StorageClass"], obj["Key"], ex), file=sys.stderr)
del(futures[future])

if __name__=='__main__':
Expand Down