Skip to content

Commit 5a18308

Browse files
committed
Add priority field to buckets
1 parent 024fda8 commit 5a18308

File tree

6 files changed

+101
-18
lines changed

6 files changed

+101
-18
lines changed

crates/core/src/bucket_priority.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
use sqlite_nostd::ResultCode;
2+
3+
use crate::error::SQLiteError;
4+
5+
#[repr(transparent)]
6+
#[derive(Clone, Copy, PartialEq, Eq)]
7+
pub struct BucketPriority(i32);
8+
9+
impl BucketPriority {
10+
pub fn may_publish_with_outstanding_uploads(self) -> bool {
11+
self.0 == 0
12+
}
13+
14+
pub const HIGHEST: BucketPriority = BucketPriority(0);
15+
pub const LOWEST: BucketPriority = BucketPriority(3);
16+
}
17+
18+
impl TryFrom<i32> for BucketPriority {
19+
type Error = SQLiteError;
20+
21+
fn try_from(value: i32) -> Result<Self, Self::Error> {
22+
if value < BucketPriority::LOWEST.0 || value > BucketPriority::HIGHEST.0 {
23+
return Err(SQLiteError::from(ResultCode::MISUSE));
24+
}
25+
26+
return Ok(BucketPriority(value));
27+
}
28+
}
29+
30+
impl Default for BucketPriority {
31+
fn default() -> Self {
32+
Self(1)
33+
}
34+
}
35+
36+
impl Into<i32> for BucketPriority {
37+
fn into(self) -> i32 {
38+
self.0
39+
}
40+
}
41+
42+
impl PartialOrd<BucketPriority> for BucketPriority {
43+
fn partial_cmp(&self, other: &BucketPriority) -> Option<core::cmp::Ordering> {
44+
Some(self.0.partial_cmp(&other.0)?.reverse())
45+
}
46+
}

crates/core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use core::ffi::{c_char, c_int};
1212
use sqlite::ResultCode;
1313
use sqlite_nostd as sqlite;
1414

15+
mod bucket_priority;
1516
mod checkpoint;
1617
mod crud_vtab;
1718
mod diff;

crates/core/src/migrations.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,5 +310,21 @@ json_array(
310310
.into_db_result(local_db)?;
311311
}
312312

313+
if current_version < 7 && target_version >= 7 {
314+
local_db
315+
.exec_safe(
316+
"\
317+
ALTER TABLE ps_buckets ADD COLUMN priority NOT NULL DEFAULT 1;
318+
INSERT INTO ps_migration(id, down_migrations)
319+
VALUES(6,
320+
json_array(
321+
json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN priority'),
322+
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 7')
323+
));
324+
",
325+
)
326+
.into_db_result(local_db)?;
327+
}
328+
313329
Ok(())
314330
}

crates/core/src/operations_vtab.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,30 +76,29 @@ extern "C" fn update(
7676
} else if rowid.value_type() == sqlite::ColumnType::Null {
7777
// INSERT
7878
let op = args[2].text();
79-
let data = args[3].text();
8079

8180
let tab = unsafe { &mut *vtab.cast::<VirtualTable>() };
8281
let db = tab.db;
8382

8483
if op == "save" {
85-
let result = insert_operation(db, data);
84+
let result = insert_operation(db, args[3].text());
8685
vtab_result(vtab, result)
8786
} else if op == "sync_local" {
88-
let result = sync_local(db, data);
87+
let result = sync_local(db, &args[3]);
8988
if let Ok(result_row) = result {
9089
unsafe {
9190
*p_row_id = result_row;
9291
}
9392
}
9493
vtab_result(vtab, result)
9594
} else if op == "clear_remove_ops" {
96-
let result = clear_remove_ops(db, data);
95+
let result = clear_remove_ops(db, args[3].text());
9796
vtab_result(vtab, result)
9897
} else if op == "delete_pending_buckets" {
99-
let result = delete_pending_buckets(db, data);
98+
let result = delete_pending_buckets(db, args[3].text());
10099
vtab_result(vtab, result)
101100
} else if op == "delete_bucket" {
102-
let result = delete_bucket(db, data);
101+
let result = delete_bucket(db, args[3].text());
103102
vtab_result(vtab, result)
104103
} else {
105104
ResultCode::MISUSE as c_int

crates/core/src/sync_local.rs

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,30 @@ use alloc::collections::BTreeSet;
22
use alloc::format;
33
use alloc::string::String;
44

5+
use crate::bucket_priority::BucketPriority;
56
use crate::error::{PSResult, SQLiteError};
6-
use sqlite_nostd as sqlite;
7+
use sqlite_nostd::{self as sqlite, Value};
78
use sqlite_nostd::{ColumnType, Connection, ResultCode};
89

910
use crate::ext::SafeManagedStmt;
1011
use crate::util::{internal_table_name, quote_internal_name};
1112

12-
pub fn can_update_local(db: *mut sqlite::sqlite3) -> Result<bool, SQLiteError> {
13+
fn can_apply_sync_changes(
14+
db: *mut sqlite::sqlite3,
15+
priority: BucketPriority,
16+
) -> Result<bool, SQLiteError> {
17+
// We can only make sync changes visible if data is consistent, meaning that we've seen the
18+
// target operation sent in the original checkpoint message. We allow weakening consistency when
19+
// buckets from different priorities are involved (buckets with higher priorities or a lower
20+
// priority number can be published before we've reached the checkpoint for other buckets).
1321
// language=SQLite
1422
let statement = db.prepare_v2(
1523
"\
1624
SELECT group_concat(name)
1725
FROM ps_buckets
18-
WHERE target_op > last_op",
26+
WHERE (target_op > last_op) AND (priority <= ?)",
1927
)?;
28+
statement.bind_int(1, priority.into())?;
2029

2130
if statement.step()? != ResultCode::ROW {
2231
return Err(SQLiteError::from(ResultCode::ABORT));
@@ -26,22 +35,34 @@ WHERE target_op > last_op",
2635
return Ok(false);
2736
}
2837

29-
// This is specifically relevant for when data is added to crud before another batch is completed.
30-
31-
// language=SQLite
32-
let statement = db.prepare_v2("SELECT 1 FROM ps_crud LIMIT 1")?;
33-
if statement.step()? != ResultCode::DONE {
34-
return Ok(false);
38+
// Don't publish downloaded data until the upload queue is empty (except for downloaded data in
39+
// priority 0, which is published earlier).
40+
if !priority.may_publish_with_outstanding_uploads() {
41+
let statement = db.prepare_v2("SELECT 1 FROM ps_crud LIMIT 1")?;
42+
if statement.step()? != ResultCode::DONE {
43+
return Ok(false);
44+
}
3545
}
3646

3747
Ok(true)
3848
}
3949

40-
pub fn sync_local(db: *mut sqlite::sqlite3, _data: &str) -> Result<i64, SQLiteError> {
41-
if !can_update_local(db)? {
50+
pub fn sync_local<V: Value>(db: *mut sqlite::sqlite3, data: &V) -> Result<i64, SQLiteError> {
51+
let priority = match data.value_type() {
52+
ColumnType::Integer => BucketPriority::try_from(data.int()),
53+
ColumnType::Float => BucketPriority::try_from(data.double() as i32),
54+
// Older clients without bucket priority support typically send an empty string here.
55+
_ => Ok(BucketPriority::LOWEST),
56+
}?;
57+
58+
if !can_apply_sync_changes(db, priority)? {
4259
return Ok(0);
4360
}
4461

62+
if priority >= BucketPriority::LOWEST {
63+
todo!("Only consider changes from certain bucket priorities")
64+
}
65+
4566
// language=SQLite
4667
let statement = db
4768
.prepare_v2("SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'")

crates/core/src/view_admin.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ fn powersync_init_impl(
120120

121121
setup_internal_views(local_db)?;
122122

123-
powersync_migrate(ctx, 6)?;
123+
powersync_migrate(ctx, 7)?;
124124

125125
Ok(String::from(""))
126126
}

0 commit comments

Comments
 (0)