Skip to content

Commit e259a2e

Browse files
committed
Add last_synced_at.
1 parent 81f4dd9 commit e259a2e

File tree

4 files changed

+108
-20
lines changed

4 files changed

+108
-20
lines changed

crates/core/src/client_id.rs renamed to crates/core/src/kv.rs

+35-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use sqlite::ResultCode;
1212
use sqlite_nostd as sqlite;
1313
use sqlite_nostd::{Connection, Context, Value};
1414

15+
use crate::create_sqlite_optional_text_fn;
1516
use crate::create_sqlite_text_fn;
1617
use crate::error::SQLiteError;
1718
use crate::sync_types::Checkpoint;
@@ -39,7 +40,30 @@ fn powersync_client_id_impl(
3940
create_sqlite_text_fn!(
4041
powersync_client_id,
4142
powersync_client_id_impl,
42-
"powersync_client_id"
43+
"powersync_last_synced_at"
44+
);
45+
46+
fn powersync_last_synced_at_impl(
47+
ctx: *mut sqlite::context,
48+
args: &[*mut sqlite::value],
49+
) -> Result<Option<String>, SQLiteError> {
50+
let db = ctx.db_handle();
51+
52+
// language=SQLite
53+
let statement = db.prepare_v2("select value from ps_kv where key = 'last_synced_at'")?;
54+
55+
if statement.step()? == ResultCode::ROW {
56+
let client_id = statement.column_text(0)?;
57+
return Ok(Some(client_id.to_string()));
58+
} else {
59+
return Ok(None);
60+
}
61+
}
62+
63+
create_sqlite_optional_text_fn!(
64+
powersync_last_synced_at,
65+
powersync_last_synced_at_impl,
66+
"powersync_last_synced_at"
4367
);
4468

4569
pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
@@ -53,6 +77,16 @@ pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
5377
None,
5478
None,
5579
)?;
80+
db.create_function_v2(
81+
"powersync_last_synced_at",
82+
0,
83+
sqlite::UTF8 | sqlite::DETERMINISTIC,
84+
None,
85+
Some(powersync_last_synced_at),
86+
None,
87+
None,
88+
None,
89+
)?;
5690

5791
Ok(())
5892
}

crates/core/src/lib.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@ use sqlite::ResultCode;
1212
use sqlite_nostd as sqlite;
1313

1414
mod checkpoint;
15-
mod client_id;
1615
mod crud_vtab;
1716
mod diff;
1817
mod error;
1918
mod ext;
19+
mod kv;
2020
mod macros;
2121
mod operations;
2222
mod operations_vtab;
@@ -54,7 +54,7 @@ fn init_extension(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
5454
crate::diff::register(db)?;
5555
crate::view_admin::register(db)?;
5656
crate::checkpoint::register(db)?;
57-
crate::client_id::register(db)?;
57+
crate::kv::register(db)?;
5858

5959
crate::schema_management::register(db)?;
6060
crate::operations_vtab::register(db)?;

crates/core/src/macros.rs

+35-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
#[macro_export]
32
macro_rules! create_sqlite_text_fn {
43
($fn_name:ident, $fn_impl_name:ident, $description:literal) => {
@@ -31,6 +30,41 @@ macro_rules! create_sqlite_text_fn {
3130
};
3231
}
3332

33+
#[macro_export]
34+
macro_rules! create_sqlite_optional_text_fn {
35+
($fn_name:ident, $fn_impl_name:ident, $description:literal) => {
36+
extern "C" fn $fn_name(
37+
ctx: *mut sqlite::context,
38+
argc: c_int,
39+
argv: *mut *mut sqlite::value,
40+
) {
41+
let args = sqlite::args!(argc, argv);
42+
43+
let result = $fn_impl_name(ctx, args);
44+
45+
if let Err(err) = result {
46+
let SQLiteError(code, message) = SQLiteError::from(err);
47+
if message.is_some() {
48+
ctx.result_error(&format!("{:} {:}", $description, message.unwrap()));
49+
} else {
50+
let error = ctx.db_handle().errmsg().unwrap();
51+
if error == "not an error" {
52+
ctx.result_error(&format!("{:}", $description));
53+
} else {
54+
ctx.result_error(&format!("{:} {:}", $description, error));
55+
}
56+
}
57+
ctx.result_error_code(code);
58+
} else if let Ok(r) = result {
59+
if let Some(s) = r {
60+
ctx.result_text_transient(&s);
61+
} else {
62+
ctx.result_null();
63+
}
64+
}
65+
}
66+
};
67+
}
3468

3569
// Wrap a function in an auto-transaction.
3670
// Gives the equivalent of SQLite's auto-commit behaviour, except that applies to all statements

crates/core/src/sync_local.rs

+36-16
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,21 @@ use alloc::collections::BTreeSet;
22
use alloc::format;
33
use alloc::string::String;
44

5+
use crate::error::{PSResult, SQLiteError};
56
use sqlite_nostd as sqlite;
67
use sqlite_nostd::{ColumnType, Connection, ResultCode};
7-
use crate::error::{SQLiteError, PSResult};
88

99
use crate::ext::SafeManagedStmt;
1010
use crate::util::{internal_table_name, quote_internal_name};
1111

12-
1312
pub fn can_update_local(db: *mut sqlite::sqlite3) -> Result<bool, SQLiteError> {
1413
// language=SQLite
15-
let statement = db.prepare_v2("\
14+
let statement = db.prepare_v2(
15+
"\
1616
SELECT group_concat(name)
1717
FROM ps_buckets
18-
WHERE target_op > last_op AND (name = '$local' OR pending_delete = 0)")?;
18+
WHERE target_op > last_op AND (name = '$local' OR pending_delete = 0)",
19+
)?;
1920

2021
if statement.step()? != ResultCode::ROW {
2122
return Err(SQLiteError::from(ResultCode::ABORT));
@@ -36,15 +37,15 @@ WHERE target_op > last_op AND (name = '$local' OR pending_delete = 0)")?;
3637
Ok(true)
3738
}
3839

39-
pub fn sync_local(
40-
db: *mut sqlite::sqlite3, _data: &str) -> Result<i64, SQLiteError> {
41-
40+
pub fn sync_local(db: *mut sqlite::sqlite3, _data: &str) -> Result<i64, SQLiteError> {
4241
if !can_update_local(db)? {
4342
return Ok(0);
4443
}
4544

4645
// language=SQLite
47-
let statement = db.prepare_v2("SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'").into_db_result(db)?;
46+
let statement = db
47+
.prepare_v2("SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'")
48+
.into_db_result(db)?;
4849
let mut tables: BTreeSet<String> = BTreeSet::new();
4950

5051
while statement.step()? == ResultCode::ROW {
@@ -60,7 +61,9 @@ pub fn sync_local(
6061
// |--SEARCH r USING INDEX ps_oplog_by_row (row_type=? AND row_id=?)
6162
// `--USE TEMP B-TREE FOR GROUP BY
6263
// language=SQLite
63-
let statement = db.prepare_v2("\
64+
let statement = db
65+
.prepare_v2(
66+
"\
6467
-- 3. Group the objects from different buckets together into a single one (ops).
6568
SELECT r.row_type as type,
6669
r.row_id as id,
@@ -79,7 +82,9 @@ FROM ps_buckets AS buckets
7982
WHERE r.superseded = 0
8083
AND b.superseded = 0
8184
-- Group for (3)
82-
GROUP BY r.row_type, r.row_id").into_db_result(db)?;
85+
GROUP BY r.row_type, r.row_id",
86+
)
87+
.into_db_result(db)?;
8388

8489
// TODO: cache statements
8590

@@ -96,12 +101,16 @@ GROUP BY r.row_type, r.row_id").into_db_result(db)?;
96101

97102
if buckets == "[]" {
98103
// DELETE
99-
let delete_statement = db.prepare_v2(&format!("DELETE FROM {} WHERE id = ?", quoted)).into_db_result(db)?;
104+
let delete_statement = db
105+
.prepare_v2(&format!("DELETE FROM {} WHERE id = ?", quoted))
106+
.into_db_result(db)?;
100107
delete_statement.bind_text(1, id, sqlite::Destructor::STATIC)?;
101108
delete_statement.exec()?;
102109
} else {
103110
// INSERT/UPDATE
104-
let insert_statement = db.prepare_v2(&format!("REPLACE INTO {}(id, data) VALUES(?, ?)", quoted)).into_db_result(db)?;
111+
let insert_statement = db
112+
.prepare_v2(&format!("REPLACE INTO {}(id, data) VALUES(?, ?)", quoted))
113+
.into_db_result(db)?;
105114
insert_statement.bind_text(1, id, sqlite::Destructor::STATIC)?;
106115
insert_statement.bind_text(2, data?, sqlite::Destructor::STATIC)?;
107116
insert_statement.exec()?;
@@ -110,14 +119,18 @@ GROUP BY r.row_type, r.row_id").into_db_result(db)?;
110119
if buckets == "[]" {
111120
// DELETE
112121
// language=SQLite
113-
let delete_statement = db.prepare_v2("DELETE FROM ps_untyped WHERE type = ? AND id = ?").into_db_result(db)?;
122+
let delete_statement = db
123+
.prepare_v2("DELETE FROM ps_untyped WHERE type = ? AND id = ?")
124+
.into_db_result(db)?;
114125
delete_statement.bind_text(1, type_name, sqlite::Destructor::STATIC)?;
115126
delete_statement.bind_text(2, id, sqlite::Destructor::STATIC)?;
116127
delete_statement.exec()?;
117128
} else {
118129
// INSERT/UPDATE
119130
// language=SQLite
120-
let insert_statement = db.prepare_v2("REPLACE INTO ps_untyped(type, id, data) VALUES(?, ?, ?)").into_db_result(db)?;
131+
let insert_statement = db
132+
.prepare_v2("REPLACE INTO ps_untyped(type, id, data) VALUES(?, ?, ?)")
133+
.into_db_result(db)?;
121134
insert_statement.bind_text(1, type_name, sqlite::Destructor::STATIC)?;
122135
insert_statement.bind_text(2, id, sqlite::Destructor::STATIC)?;
123136
insert_statement.bind_text(3, data?, sqlite::Destructor::STATIC)?;
@@ -127,9 +140,16 @@ GROUP BY r.row_type, r.row_id").into_db_result(db)?;
127140
}
128141

129142
// language=SQLite
130-
db.exec_safe("UPDATE ps_buckets
143+
db.exec_safe(
144+
"UPDATE ps_buckets
131145
SET last_applied_op = last_op
132-
WHERE last_applied_op != last_op").into_db_result(db)?;
146+
WHERE last_applied_op != last_op",
147+
)
148+
.into_db_result(db)?;
149+
150+
// language=SQLite
151+
db.exec_safe("insert or replace into ps_kv(key, value) values('last_synced_at', datetime())")
152+
.into_db_result(db)?;
133153

134154
Ok(1)
135155
}

0 commit comments

Comments
 (0)