diff --git a/crates/core/src/checkpoint.rs b/crates/core/src/checkpoint.rs index 10e434d..30cf11c 100644 --- a/crates/core/src/checkpoint.rs +++ b/crates/core/src/checkpoint.rs @@ -35,28 +35,20 @@ fn powersync_validate_checkpoint_impl( // language=SQLite let statement = db.prepare_v2( "WITH -bucket_list(bucket, lower_op_id, checksum) AS ( +bucket_list(bucket, checksum) AS ( SELECT json_extract(json_each.value, '$.bucket') as bucket, - 0 as lower_op_id, json_extract(json_each.value, '$.checksum') as checksum FROM json_each(json_extract(?1, '$.buckets')) ) SELECT bucket_list.bucket as bucket, IFNULL(buckets.add_checksum, 0) as add_checksum, - IFNULL(SUM(oplog.hash), 0) as oplog_checksum, - COUNT(oplog.op_id) as count, - IFNULL(MAX(oplog.op_id), 0) as last_op_id, - IFNULL(buckets.last_applied_op, 0) as last_applied_op, + IFNULL(buckets.op_checksum, 0) as oplog_checksum, bucket_list.checksum as expected_checksum FROM bucket_list LEFT OUTER JOIN ps_buckets AS buckets ON buckets.name = bucket_list.bucket - LEFT OUTER JOIN ps_oplog AS oplog ON - bucket_list.bucket = oplog.bucket AND - oplog.op_id <= CAST(json_extract(?1, '$.last_op_id') as INTEGER) AND - oplog.op_id > bucket_list.lower_op_id GROUP BY bucket_list.bucket", )?; @@ -69,10 +61,7 @@ GROUP BY bucket_list.bucket", // checksums with column_int are wrapped to i32 by SQLite let add_checksum = statement.column_int(1)?; let oplog_checksum = statement.column_int(2)?; - let _count = statement.column_int(3)?; - let _last_op_id = statement.column_int64(4)?; - let _last_applied_op = statement.column_int64(5)?; - let expected_checksum = statement.column_int(6)?; + let expected_checksum = statement.column_int(3)?; // wrapping add is like +, but safely overflows let checksum = oplog_checksum.wrapping_add(add_checksum); diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index e3802e0..b84b682 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -97,6 +97,8 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super let mut last_op: Option = None; let mut add_checksum: i32 = 0; + let mut op_checksum: i32 = 0; + let mut remove_operations: i32 = 0; while iterate_statement.step()? == ResultCode::ROW { let op_id = iterate_statement.column_int64(0)?; @@ -126,6 +128,7 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super let superseded_op = supersede_statement.column_int64(0)?; let supersede_checksum = supersede_statement.column_int(1)?; add_checksum = add_checksum.wrapping_add(supersede_checksum); + op_checksum = op_checksum.wrapping_sub(supersede_checksum); if superseded_op <= last_applied_op { // Superseded an operation previously applied - we cannot skip removes @@ -172,6 +175,14 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super insert_statement.bind_int(8, checksum)?; insert_statement.exec()?; + + op_checksum = op_checksum.wrapping_add(checksum); + + if opi == 4 { + // We persisted a REMOVE statement, so the bucket needs + // to be compacted at some point. + remove_operations += 1; + } } else if op == "MOVE" { add_checksum = add_checksum.wrapping_add(checksum); } else if op == "CLEAR" { @@ -185,7 +196,7 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super // We also replace the checksum with the checksum of the CLEAR op. // language=SQLite let clear_statement2 = db.prepare_v2( - "UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1 WHERE name = ?2", + "UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1, op_checksum = 0 WHERE name = ?2", )?; clear_statement2.bind_text(2, bucket, sqlite::Destructor::STATIC)?; clear_statement2.bind_int(1, checksum)?; @@ -193,6 +204,7 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super add_checksum = 0; last_applied_op = 0; + op_checksum = 0; } } @@ -201,12 +213,16 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super let statement = db.prepare_v2( "UPDATE ps_buckets SET last_op = ?2, - add_checksum = add_checksum + ?3 + add_checksum = (add_checksum + ?3) & 0xffffffff, + op_checksum = (op_checksum + ?4) & 0xffffffff, + remove_operations = (remove_operations + ?5) WHERE name = ?1", )?; statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?; statement.bind_int64(2, *last_op)?; statement.bind_int(3, add_checksum)?; + statement.bind_int(4, op_checksum)?; + statement.bind_int(5, remove_operations)?; statement.exec()?; } @@ -216,17 +232,34 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super pub fn clear_remove_ops(db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQLiteError> { // language=SQLite - let statement = - db.prepare_v2("SELECT name, last_applied_op FROM ps_buckets WHERE pending_delete = 0")?; + let statement = db.prepare_v2( + " +SELECT + name, + last_applied_op, + (SELECT IFNULL(SUM(oplog.hash), 0) + FROM ps_oplog oplog + WHERE oplog.bucket = ps_buckets.name + AND oplog.op_id <= ps_buckets.last_applied_op + AND (oplog.superseded = 1 OR oplog.op != 3) + ) as checksum +FROM ps_buckets +WHERE ps_buckets.pending_delete = 0 AND + ps_buckets.remove_operations >= CASE + WHEN ?1 = '' THEN 1 + ELSE IFNULL(?1 ->> 'threshold', 1) + END", + )?; + // Compact bucket if there are 50 or more operations + statement.bind_text(1, _data, sqlite::Destructor::STATIC); // language=SQLite let update_statement = db.prepare_v2( - "UPDATE ps_buckets - SET add_checksum = add_checksum + (SELECT IFNULL(SUM(hash), 0) - FROM ps_oplog AS oplog - WHERE (superseded = 1 OR op != 3) - AND oplog.bucket = ?1 - AND oplog.op_id <= ?2) + " + UPDATE ps_buckets + SET add_checksum = (add_checksum + ?2) & 0xffffffff, + op_checksum = (op_checksum - ?2) & 0xffffffff, + remove_operations = 0 WHERE ps_buckets.name = ?1", )?; @@ -243,10 +276,10 @@ pub fn clear_remove_ops(db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQL // Note: Each iteration here may be run in a separate transaction. let name = statement.column_text(0)?; let last_applied_op = statement.column_int64(1)?; + let checksum = statement.column_int(2)?; update_statement.bind_text(1, name, sqlite::Destructor::STATIC)?; - update_statement.bind_int64(2, last_applied_op)?; - + update_statement.bind_int(2, checksum)?; update_statement.exec()?; // Must use the same values as above diff --git a/crates/core/src/view_admin.rs b/crates/core/src/view_admin.rs index 0ede232..de78756 100644 --- a/crates/core/src/view_admin.rs +++ b/crates/core/src/view_admin.rs @@ -131,7 +131,7 @@ CREATE TABLE IF NOT EXISTS ps_migration(id INTEGER PRIMARY KEY, down_migrations return Err(SQLiteError::from(ResultCode::ABORT)); } - const CODE_VERSION: i32 = 3; + const CODE_VERSION: i32 = 4; let mut current_version = current_version_stmt.column_int(0)?; @@ -250,6 +250,26 @@ INSERT INTO ps_migration(id, down_migrations) VALUES(3, json_array(json_object(' ").into_db_result(local_db)?; } + if current_version < 4 { + // language=SQLite + local_db.exec_safe("\ +ALTER TABLE ps_buckets ADD COLUMN op_checksum INTEGER NOT NULL DEFAULT 0; +ALTER TABLE ps_buckets ADD COLUMN remove_operations INTEGER NOT NULL DEFAULT 0; + +UPDATE ps_buckets SET op_checksum = ( + SELECT IFNULL(SUM(ps_oplog.hash), 0) & 0xffffffff FROM ps_oplog WHERE ps_oplog.bucket = ps_buckets.name +); + +INSERT INTO ps_migration(id, down_migrations) + VALUES(4, + json_array( + json_object('sql', 'DELETE FROM ps_migration WHERE id >= 4'), + json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN op_checksum'), + json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN remove_operations') + )); + ").into_db_result(local_db)?; + } + setup_internal_views(local_db)?; Ok(String::from(""))