diff --git a/README.md b/README.md index 3f05d18..df58a33 100644 --- a/README.md +++ b/README.md @@ -155,3 +155,78 @@ s3-pit-restore comes with a testing suite. You can run it with: ### Run all the test cases: `$ ./s3-pit-restore -b my-bucket -B restore-bucket-s3 -d /tmp/ -P restore-path --test` + + +## Point-in-time restore strategy + +Restoring an S3 bucket to a given point in time means traversing all object +versions and delete markers to determine which ones were the latest at the given +time. + +The overall strategy is fairly simple, since it's a matter of comparing +timestamps of versions and delete markers, and restoring the one that's closest +in time to the given point-in-time. + +### S3 API details + +The source of versions and delete markers in an S3 bucket is the +ListObjectVersions API call. The response is paged with a maximum of 1000 +entries per page. Each page may have a list of up to 1000 versions and/or a +separate list of up to 1000 delete markers. + +The versions and delete markers are returned in order of key name and recency +(newest first). As S3 iterates over these, it appends versions and delete +markers to separate lists and responds with a page when it reaches the requested +page size (1000 by default). + +For reference, [here's how localstack implements it](https://github.com/localstack/localstack/blob/v2.3.2/localstack/services/s3/v3/provider.py#L1491). + +Examples: + +* If there are 1001 versions of a given key, then the first page contains only a + list of versions, and that list has the most recent 1000 versions of the + object. The next page has 1 version (the oldest) of that same object. + +* If there are 1001 versions and 1001 delete markers for a given key, then: + + * If all the delete markers were placed after the versions, then: + + * Page 1: 1000 delete markers + + * Page 2: 1 delete marker, 999 versions + + * Page 3: 2 versions + + * If the delete markers and versions are mixed in time, say if an object is + deleted and written over and over, then: + + * Page 1: 500 versions, 500 delete markers + + * Page 2: 500 versions, 500 delete markers + + * Page 3: 1 version, 1 delete marker + +The number of versions or delete markers for a key can easily span two or more +pages, so they must be tracked between pages. Only when a new key name is +observed can we be sure that we have seen all versions or delete markers for a +key. + +### Ordering + +Since S3 has 1-second granularity on the LastModified timestamp, multiple +versions and/or delete markers may end up having the same timestamp. + +To determine which version or delete marker is newest when they all have the +same LastModified timestamp: + +* If the IsLatest attribute is true, then this object is the newest. + +* For previous versions where neither is the latest, assume that S3's order is + correct and use the first of the two versions as the most recent one. + +* For previous delete markers, it doesn't really matter: They both represent a + deletion anyway. + +* For a previous version vs a delete marker, it is impossible to say which is + the most recent one since they are kept in different lists. In this case, the + version takes precedence over the delete marker. diff --git a/s3-pit-restore b/s3-pit-restore index 601c4cf..2c3ce7e 100755 --- a/s3-pit-restore +++ b/s3-pit-restore @@ -355,10 +355,6 @@ def do_restore(): dest = args.dest last_obj = {} last_obj["Key"] = "" - # The key that was given for the latest version that was flagged with IsLatest=true. - is_latest_key = None - # The etag that was given for the latest version that was flagged with IsLatest=true. - is_latest_etag = None if args.debug: boto3.set_stream_logger('botocore') @@ -372,108 +368,103 @@ def do_restore(): os.chdir(dest) paginator = client.get_paginator('list_object_versions') - obj_needs_be_deleted = {} page_iterator = paginator.paginate(Bucket=args.bucket, Prefix=args.prefix) - # Delete markers can get desynchronized with the versions markers in the pagination system below. - # To avoid this, we will push from page to page the desynchronized markers until they fall on the - # page they should (the one with the versioning markers for the same set of files) - previous_deletemarkers = [] - for page in page_iterator: - if not "Versions" in page: - print("No versions matching criteria, exiting ...", file=sys.stderr) - sys.exit(1) - versions = page["Versions"] - # Some deletemarkers may come from the previous page: add them now - deletemarkers = previous_deletemarkers + page.get("DeleteMarkers", []) - # And since they have been added, we remove them from the overflow list - previous_deletemarkers = [] - dmarker = {"Key": "", "IsLatest": False} - for obj in versions: - if last_obj["Key"] == obj["Key"]: - # We've had a newer version or a delete of this key - continue - - if obj["IsLatest"]: - is_latest_key = obj["Key"] - is_latest_etag = obj["ETag"] - version_date = obj["LastModified"] + # A running aggregate of objects versions and delete markers, by key. + aggr_objects_by_key = {} - if version_date > pit_end_date or version_date < pit_start_date: - if pit_start_date == datetime.fromtimestamp(0, timezone.utc): - obj_needs_be_deleted[obj["Key"]] = obj + for page in page_iterator: + # Add this page's versions and delete markers to the running aggregate. + for obj in page.get("Versions", []) + page.get("DeleteMarkers", []): + # Add helper attribute to discern versions and delete markers. + obj["IsVersion"] = "Size" in obj + + # Append the object to the running aggregate of objects. + aggr_objects_by_key.setdefault(obj["Key"], []).append(obj) + + # Sort the aggregated keys. + aggregated_keys = sorted(list(aggr_objects_by_key.keys())) + + # Remove the last key in the list if we haven't reached the last page, + # since that key name may have more versions or delete markers on + # following pages. This leaves us only with keys that we definitively + # have all the versions or delete markers for. + if page["IsTruncated"] and aggregated_keys: + aggregated_keys.pop() + + # Process the list of aggregated objects, now known to be complete as of + # the current page. The list may be empty if we've only seen a single + # key's versions or delete markers so far (i.e. more might follow). + for key in aggregated_keys: + # Iterate over all versions and delete markers for this key, + # reverse-ordered by LastModified, IsLatest and IsVersion. + objects_sorted = sorted( + aggr_objects_by_key[key], + key=lambda o: ( + o["LastModified"], # newest first + o["IsLatest"], # latest before non-latest + o["IsVersion"], # versions before delete markers + ), + reverse=True, + ) + + # Keep track of this key's latest version or delete marker. + obj_latest = next(o for o in objects_sorted if o["IsLatest"]) + + # Only consider objects within the restore window. + objects_filtered = list(filter( + lambda o: pit_start_date <= o["LastModified"] <= pit_end_date, + objects_sorted, + )) + + # Special case to restore the non-existence of an object within the + # same bucket, if the latest object is a version. + if len(objects_filtered) == 0: + if args.bucket == args.dest_bucket and obj_latest["IsVersion"]: + handled_by_delete(obj) continue - # Dont go farther in the deletemarkers list than the current key, or else we risk consuming desync delete markers of the next page - # (both versions and deletemarkers list are sorted in alphabetical order of the key, and then in reverse time order for each key) - while deletemarkers and (dmarker["Key"] < obj["Key"] or (dmarker["Key"] == obj["Key"] and dmarker["LastModified"] > pit_end_date)): - dmarker = deletemarkers.pop(0) - if dmarker['IsLatest']: - # The given object is already deleted and does not have to be deleted again. - obj_needs_be_deleted.pop(dmarker["Key"], None) - - #skip dmarker if it's latest than pit_end_date - if dmarker["Key"] == obj["Key"] and dmarker["LastModified"] > obj["LastModified"] and dmarker["LastModified"] <= pit_end_date: - # The most recent operation on this key was a delete - last_obj = dmarker - continue + # Restore the most recent object for this key. + obj = objects_filtered[0] - # This version needs to be restored.. - last_obj = obj + # Handle object version. + if obj["IsVersion"]: + if handled_by_glacier(obj): + continue - if handled_by_glacier(obj): - continue + # Handle all cases where a destination bucket is specified. + if args.dest_bucket is not None: + # Copy to other bucket or prefix. + if args.bucket != args.dest_bucket or args.dest_prefix: + handled_by_copy(obj) + continue - if args.dest_bucket is not None: - obj_needs_be_deleted.pop(obj["Key"], None) - - # We can skip the restore if the current version is equivalent to the newest version - # of the object and if we want to restore it to the same bucket and path. - # is_latest_key == obj["Key"] also ensures that the object is not currently deleted, - # because a version with IsLatest=true was observed. - if is_latest_key != obj["Key"] or \ - is_latest_etag != obj["ETag"] or \ - args.bucket != args.dest_bucket or \ - args.dest_prefix: + # Same bucket and prefix, in-place restore. + # Restore version if it's not already the latest. + if obj_latest["VersionId"] != obj["VersionId"]: handled_by_copy(obj) - continue - if not handled_by_standard(obj): - return + if not handled_by_standard(obj): + return - # The last dmarker may belong to the next version (if dmarker["Key"] != obj["Key"] ), keep it - previous_deletemarkers.append(dmarker) - # And all following may too, if any, so add them now. - while deletemarkers: - previous_deletemarkers.append(deletemarkers.pop(0)) + # Handle object delete marker. + else: + # Restore delete marker if the latest object is a version. + if obj_latest["IsVersion"]: + handled_by_delete(obj) + + # Remove the processed key from the running aggregate. + # FIXME this doesn't work as long as there are early continues and returns above! + del aggr_objects_by_key[key] - for future in concurrent.futures.as_completed(futures): - if future in futures: - try: - future.result() - print_obj(futures[future]) - except Exception as ex: - print('"%s" %s %s %s %s "ERROR: %s"' % (obj["LastModified"], obj["VersionId"], obj["Size"], obj["StorageClass"], obj["Key"], ex), file=sys.stderr) - del(futures[future]) - # Process leftover delete markers. - while previous_deletemarkers: - dmarker = previous_deletemarkers.pop(0) - if dmarker['IsLatest']: - # The given object is already deleted and does not have to be deleted again. - obj_needs_be_deleted.pop(dmarker["Key"], None) - - # delete objects which came in existence after pit_end_date only if the destination bucket is same as source bucket and restoring to same object key - if args.dest_bucket == args.bucket and not args.dest_prefix: - for key in obj_needs_be_deleted: - handled_by_delete(obj_needs_be_deleted[key]) for future in concurrent.futures.as_completed(futures): if future in futures: try: future.result() print_obj(futures[future]) except Exception as ex: - print('"%s" %s %s %s %s "ERROR: %s"' % (obj["LastModified"], obj["VersionId"], obj["Size"], obj["StorageClass"], obj["Key"], ex)) + print('"%s" %s %s %s %s "ERROR: %s"' % (obj["LastModified"], obj["VersionId"], obj["Size"], obj["StorageClass"], obj["Key"], ex), file=sys.stderr) del(futures[future]) if __name__=='__main__':