Conversation
There was a problem hiding this comment.
maybe I would only delete after writing, because what happens if your write fails?
There was a problem hiding this comment.
Can you still look into this?
| Used to speed up initial sync by avoiding downloading all historical events. | ||
| """ | ||
|
|
||
| version: int = CHECKPOINT_VERSION |
There was a problem hiding this comment.
I think we need to think about the structure for when we have large files and how we would do this in this case. There are two things to take into account: 1) what if a file is so big it doesnt fit into a single gdrive file anymore 2) what if a file is too big to fit in memory. We dont have to solve these problems now, but we should make it in such a way that we can upgrade the code later without having to change everything
There was a problem hiding this comment.
so I have a few ideas about this
- we could fill up checkpoint files on gdrive until they become too big, if they beceome too big we create a second file (e.g.
syftcheckpoint_<timestamp>_part2). - entries in checkpoint files can have "parts", so you could have file: 1, part: 2, this way you can have a single synced file distributed over multiple checkpoint files
- in memory, a checkpoint file content can either be a raw string/bytes, or it can be a pointer to the file, which we could write to disk. This may require some extra thinking because if we write them to disk anyway, maybe they can also just be written to the final destination (the actual target file) in the first place
There was a problem hiding this comment.
Totally, we definitely have to solve this, I will create a new planning document regarding this and I will share, which we could use to discuss different methods to do this.
There was a problem hiding this comment.
perhaps creating a checkpoint should also entail deleting events older than the checkpoint, this would make listing files cheaper
There was a problem hiding this comment.
maybe we can cache this and update it in memory after the first time, that would reduce the nr of api calls
|
this is a really good PR! ❤️ |
663bff5 to
73e6992
Compare
|
|
||
| class CheckpointFile(BaseModel): | ||
| """Represents a single file in the checkpoint.""" | ||
|
|
There was a problem hiding this comment.
I would add a part attribute here so we can support large files later with backwards compatibility
There was a problem hiding this comment.
for now it can always be part 1
There was a problem hiding this comment.
If you make this an update call I think it could be atomic?
| """ | ||
| # Get all file contents | ||
| file_contents = {} | ||
| for path, content in self.file_connection.get_items(): |
There was a problem hiding this comment.
I believe we discussed a single checkpoint file vs multiple checkpoint files, and I believe our latest find was multiple, which means we could implement this differently I believe? Not 100% sure
| _any_shared_datasets: List[tuple] = PrivateAttr(default_factory=list) | ||
|
|
||
| # In-memory rolling state for tracking events since last checkpoint | ||
| _rolling_state: RollingState | None = PrivateAttr(default=None) |
There was a problem hiding this comment.
in memory is not going to scale well, can we not just store in files?
There was a problem hiding this comment.
we could decide to do these in parallel using the threadpool
There was a problem hiding this comment.
I am confused, we have rolling state and checkpoints and normal events? I thought we only have rolling state and checkpoints now? Where do the normal events come from?
|
|
||
| # Upload to Google Drive | ||
| print(f"Creating checkpoint with {len(checkpoint.files)} files...") | ||
| self.connection_router.upload_checkpoint(checkpoint) |
There was a problem hiding this comment.
I think if we have only one checkpoint file for the total state we should delete the old ones (but maybe we should just have multiple files)
| return False | ||
| return self.datasite_owner_syncer.should_create_checkpoint(threshold) | ||
|
|
||
| def try_create_checkpoint(self, threshold: int = 50): |
There was a problem hiding this comment.
lets rename this, something like create_checkpoint_if_state_large
…ng state implementationadded units tests for incremental checkpoint
…kpoints Move _pull_datasets_for_initial_sync() outside the elif block so it runs unconditionally after all restoration paths. The method's internal guards (_filter_collections_needing_download) prevent redundant downloads. Co-Authored-By: Claude Opus 4.6 <[email protected]>
- Add 6 new unit tests: deduplication, overwrites across incrementals, file deletions, compacting trigger, write-then-delete safety - Add 2 new rolling state unit tests: dedup by path, clear resets timestamp - Remove integration test_rolling_state.py (all covered by unit tests) - Remove test_checkpoint_restore_on_fresh_login (subsumed by incremental test) - Move function-level imports to top of test_checkpoints.py Co-Authored-By: Claude Opus 4.6 <[email protected]>
5184831 to
a267400
Compare
Description
Please include a summary of the change, the motivation, and any additional context that will help others understand your PR. If it closes one or more open issues, please tag them as described here.
Affected Dependencies
List any dependencies that are required for this change.
How has this been tested?
Checklist