From ee215a97fa66348d9366deeecb5c6cc2d3244291 Mon Sep 17 00:00:00 2001 From: Sven Ulland Date: Fri, 6 Oct 2023 09:33:57 +0200 Subject: [PATCH] Fix paging-related bugs by reworking the strategy Responses from the S3 ListObjectVersions API may have pages where the Versions list is not present because the DeleteMarkers list takes up the entire page. Fix that bug and rework the strategy to make it clearer and easier to reason about. --- README.md | 75 ++++++++++++++++++++++ s3-pit-restore | 165 +++++++++++++++++++++++-------------------------- 2 files changed, 153 insertions(+), 87 deletions(-) diff --git a/README.md b/README.md index 3f05d18..df58a33 100644 --- a/README.md +++ b/README.md @@ -155,3 +155,78 @@ s3-pit-restore comes with a testing suite. You can run it with: ### Run all the test cases: `$ ./s3-pit-restore -b my-bucket -B restore-bucket-s3 -d /tmp/ -P restore-path --test` + + +## Point-in-time restore strategy + +Restoring an S3 bucket to a given point in time means traversing all object +versions and delete markers to determine which ones were the latest at the given +time. + +The overall strategy is fairly simple, since it's a matter of comparing +timestamps of versions and delete markers, and restoring the one that's closest +in time to the given point-in-time. + +### S3 API details + +The source of versions and delete markers in an S3 bucket is the +ListObjectVersions API call. The response is paged with a maximum of 1000 +entries per page. Each page may have a list of up to 1000 versions and/or a +separate list of up to 1000 delete markers. + +The versions and delete markers are returned in order of key name and recency +(newest first). As S3 iterates over these, it appends versions and delete +markers to separate lists and responds with a page when it reaches the requested +page size (1000 by default). + +For reference, [here's how localstack implements it](https://github.com/localstack/localstack/blob/v2.3.2/localstack/services/s3/v3/provider.py#L1491). + +Examples: + +* If there are 1001 versions of a given key, then the first page contains only a + list of versions, and that list has the most recent 1000 versions of the + object. The next page has 1 version (the oldest) of that same object. + +* If there are 1001 versions and 1001 delete markers for a given key, then: + + * If all the delete markers were placed after the versions, then: + + * Page 1: 1000 delete markers + + * Page 2: 1 delete marker, 999 versions + + * Page 3: 2 versions + + * If the delete markers and versions are mixed in time, say if an object is + deleted and written over and over, then: + + * Page 1: 500 versions, 500 delete markers + + * Page 2: 500 versions, 500 delete markers + + * Page 3: 1 version, 1 delete marker + +The number of versions or delete markers for a key can easily span two or more +pages, so they must be tracked between pages. Only when a new key name is +observed can we be sure that we have seen all versions or delete markers for a +key. + +### Ordering + +Since S3 has 1-second granularity on the LastModified timestamp, multiple +versions and/or delete markers may end up having the same timestamp. + +To determine which version or delete marker is newest when they all have the +same LastModified timestamp: + +* If the IsLatest attribute is true, then this object is the newest. + +* For previous versions where neither is the latest, assume that S3's order is + correct and use the first of the two versions as the most recent one. + +* For previous delete markers, it doesn't really matter: They both represent a + deletion anyway. + +* For a previous version vs a delete marker, it is impossible to say which is + the most recent one since they are kept in different lists. In this case, the + version takes precedence over the delete marker. diff --git a/s3-pit-restore b/s3-pit-restore index 601c4cf..2c3ce7e 100755 --- a/s3-pit-restore +++ b/s3-pit-restore @@ -355,10 +355,6 @@ def do_restore(): dest = args.dest last_obj = {} last_obj["Key"] = "" - # The key that was given for the latest version that was flagged with IsLatest=true. - is_latest_key = None - # The etag that was given for the latest version that was flagged with IsLatest=true. - is_latest_etag = None if args.debug: boto3.set_stream_logger('botocore') @@ -372,108 +368,103 @@ def do_restore(): os.chdir(dest) paginator = client.get_paginator('list_object_versions') - obj_needs_be_deleted = {} page_iterator = paginator.paginate(Bucket=args.bucket, Prefix=args.prefix) - # Delete markers can get desynchronized with the versions markers in the pagination system below. - # To avoid this, we will push from page to page the desynchronized markers until they fall on the - # page they should (the one with the versioning markers for the same set of files) - previous_deletemarkers = [] - for page in page_iterator: - if not "Versions" in page: - print("No versions matching criteria, exiting ...", file=sys.stderr) - sys.exit(1) - versions = page["Versions"] - # Some deletemarkers may come from the previous page: add them now - deletemarkers = previous_deletemarkers + page.get("DeleteMarkers", []) - # And since they have been added, we remove them from the overflow list - previous_deletemarkers = [] - dmarker = {"Key": "", "IsLatest": False} - for obj in versions: - if last_obj["Key"] == obj["Key"]: - # We've had a newer version or a delete of this key - continue - - if obj["IsLatest"]: - is_latest_key = obj["Key"] - is_latest_etag = obj["ETag"] - version_date = obj["LastModified"] + # A running aggregate of objects versions and delete markers, by key. + aggr_objects_by_key = {} - if version_date > pit_end_date or version_date < pit_start_date: - if pit_start_date == datetime.fromtimestamp(0, timezone.utc): - obj_needs_be_deleted[obj["Key"]] = obj + for page in page_iterator: + # Add this page's versions and delete markers to the running aggregate. + for obj in page.get("Versions", []) + page.get("DeleteMarkers", []): + # Add helper attribute to discern versions and delete markers. + obj["IsVersion"] = "Size" in obj + + # Append the object to the running aggregate of objects. + aggr_objects_by_key.setdefault(obj["Key"], []).append(obj) + + # Sort the aggregated keys. + aggregated_keys = sorted(list(aggr_objects_by_key.keys())) + + # Remove the last key in the list if we haven't reached the last page, + # since that key name may have more versions or delete markers on + # following pages. This leaves us only with keys that we definitively + # have all the versions or delete markers for. + if page["IsTruncated"] and aggregated_keys: + aggregated_keys.pop() + + # Process the list of aggregated objects, now known to be complete as of + # the current page. The list may be empty if we've only seen a single + # key's versions or delete markers so far (i.e. more might follow). + for key in aggregated_keys: + # Iterate over all versions and delete markers for this key, + # reverse-ordered by LastModified, IsLatest and IsVersion. + objects_sorted = sorted( + aggr_objects_by_key[key], + key=lambda o: ( + o["LastModified"], # newest first + o["IsLatest"], # latest before non-latest + o["IsVersion"], # versions before delete markers + ), + reverse=True, + ) + + # Keep track of this key's latest version or delete marker. + obj_latest = next(o for o in objects_sorted if o["IsLatest"]) + + # Only consider objects within the restore window. + objects_filtered = list(filter( + lambda o: pit_start_date <= o["LastModified"] <= pit_end_date, + objects_sorted, + )) + + # Special case to restore the non-existence of an object within the + # same bucket, if the latest object is a version. + if len(objects_filtered) == 0: + if args.bucket == args.dest_bucket and obj_latest["IsVersion"]: + handled_by_delete(obj) continue - # Dont go farther in the deletemarkers list than the current key, or else we risk consuming desync delete markers of the next page - # (both versions and deletemarkers list are sorted in alphabetical order of the key, and then in reverse time order for each key) - while deletemarkers and (dmarker["Key"] < obj["Key"] or (dmarker["Key"] == obj["Key"] and dmarker["LastModified"] > pit_end_date)): - dmarker = deletemarkers.pop(0) - if dmarker['IsLatest']: - # The given object is already deleted and does not have to be deleted again. - obj_needs_be_deleted.pop(dmarker["Key"], None) - - #skip dmarker if it's latest than pit_end_date - if dmarker["Key"] == obj["Key"] and dmarker["LastModified"] > obj["LastModified"] and dmarker["LastModified"] <= pit_end_date: - # The most recent operation on this key was a delete - last_obj = dmarker - continue + # Restore the most recent object for this key. + obj = objects_filtered[0] - # This version needs to be restored.. - last_obj = obj + # Handle object version. + if obj["IsVersion"]: + if handled_by_glacier(obj): + continue - if handled_by_glacier(obj): - continue + # Handle all cases where a destination bucket is specified. + if args.dest_bucket is not None: + # Copy to other bucket or prefix. + if args.bucket != args.dest_bucket or args.dest_prefix: + handled_by_copy(obj) + continue - if args.dest_bucket is not None: - obj_needs_be_deleted.pop(obj["Key"], None) - - # We can skip the restore if the current version is equivalent to the newest version - # of the object and if we want to restore it to the same bucket and path. - # is_latest_key == obj["Key"] also ensures that the object is not currently deleted, - # because a version with IsLatest=true was observed. - if is_latest_key != obj["Key"] or \ - is_latest_etag != obj["ETag"] or \ - args.bucket != args.dest_bucket or \ - args.dest_prefix: + # Same bucket and prefix, in-place restore. + # Restore version if it's not already the latest. + if obj_latest["VersionId"] != obj["VersionId"]: handled_by_copy(obj) - continue - if not handled_by_standard(obj): - return + if not handled_by_standard(obj): + return - # The last dmarker may belong to the next version (if dmarker["Key"] != obj["Key"] ), keep it - previous_deletemarkers.append(dmarker) - # And all following may too, if any, so add them now. - while deletemarkers: - previous_deletemarkers.append(deletemarkers.pop(0)) + # Handle object delete marker. + else: + # Restore delete marker if the latest object is a version. + if obj_latest["IsVersion"]: + handled_by_delete(obj) + + # Remove the processed key from the running aggregate. + # FIXME this doesn't work as long as there are early continues and returns above! + del aggr_objects_by_key[key] - for future in concurrent.futures.as_completed(futures): - if future in futures: - try: - future.result() - print_obj(futures[future]) - except Exception as ex: - print('"%s" %s %s %s %s "ERROR: %s"' % (obj["LastModified"], obj["VersionId"], obj["Size"], obj["StorageClass"], obj["Key"], ex), file=sys.stderr) - del(futures[future]) - # Process leftover delete markers. - while previous_deletemarkers: - dmarker = previous_deletemarkers.pop(0) - if dmarker['IsLatest']: - # The given object is already deleted and does not have to be deleted again. - obj_needs_be_deleted.pop(dmarker["Key"], None) - - # delete objects which came in existence after pit_end_date only if the destination bucket is same as source bucket and restoring to same object key - if args.dest_bucket == args.bucket and not args.dest_prefix: - for key in obj_needs_be_deleted: - handled_by_delete(obj_needs_be_deleted[key]) for future in concurrent.futures.as_completed(futures): if future in futures: try: future.result() print_obj(futures[future]) except Exception as ex: - print('"%s" %s %s %s %s "ERROR: %s"' % (obj["LastModified"], obj["VersionId"], obj["Size"], obj["StorageClass"], obj["Key"], ex)) + print('"%s" %s %s %s %s "ERROR: %s"' % (obj["LastModified"], obj["VersionId"], obj["Size"], obj["StorageClass"], obj["Key"], ex), file=sys.stderr) del(futures[future]) if __name__=='__main__':