Skip to content

WIP: Raw tables #87

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions crates/core/src/crud_vtab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ extern crate alloc;

use alloc::boxed::Box;
use alloc::string::String;
use alloc::sync::Arc;
use const_format::formatcp;
use core::ffi::{c_char, c_int, c_void};
use core::sync::atomic::Ordering;

use sqlite::{Connection, ResultCode, Value};
use sqlite_nostd as sqlite;
Expand All @@ -13,6 +15,7 @@ use sqlite_nostd::ResultCode::NULL;
use crate::error::SQLiteError;
use crate::ext::SafeManagedStmt;
use crate::schema::TableInfoFlags;
use crate::state::DatabaseState;
use crate::vtab_util::*;

// Structure:
Expand All @@ -31,11 +34,12 @@ struct VirtualTable {
db: *mut sqlite::sqlite3,
current_tx: Option<i64>,
insert_statement: Option<ManagedStmt>,
state: Arc<DatabaseState>,
}

extern "C" fn connect(
db: *mut sqlite::sqlite3,
_aux: *mut c_void,
aux: *mut c_void,
_argc: c_int,
_argv: *const *const c_char,
vtab: *mut *mut sqlite::vtab,
Expand All @@ -58,6 +62,14 @@ extern "C" fn connect(
db,
current_tx: None,
insert_statement: None,
state: {
// Increase refcount - we can't use from_raw alone because we don't own the aux
// data (connect could be called multiple times).
let state = Arc::from_raw(aux as *mut DatabaseState);
let clone = state.clone();
core::mem::forget(state);
clone
},
}));
*vtab = tab.cast::<sqlite::vtab>();
let _ = sqlite::vtab_config(db, 0);
Expand Down Expand Up @@ -127,13 +139,20 @@ fn insert_operation(
flags: TableInfoFlags,
) -> Result<(), SQLiteError> {
let tab = unsafe { &mut *(vtab.cast::<VirtualTable>()) };
if tab.current_tx.is_none() {
if tab.state.is_in_sync_local.load(Ordering::Relaxed) {
return Err(SQLiteError(
ResultCode::MISUSE,
Some(String::from("No tx_id")),
Some(String::from("Using ps_crud during sync operation")),
));
}
let current_tx = tab.current_tx.unwrap();

let Some(current_tx) = tab.current_tx else {
return Err(SQLiteError(
ResultCode::MISUSE,
Some(String::from("No tx_id")),
));
};

// language=SQLite
let statement = tab
.insert_statement
Expand Down Expand Up @@ -206,8 +225,13 @@ static MODULE: sqlite_nostd::module = sqlite_nostd::module {
xIntegrity: None,
};

pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
db.create_module_v2("powersync_crud_", &MODULE, None, None)?;
pub fn register(db: *mut sqlite::sqlite3, state: Arc<DatabaseState>) -> Result<(), ResultCode> {
db.create_module_v2(
"powersync_crud_",
&MODULE,
Some(Arc::into_raw(state) as *mut c_void),
Some(DatabaseState::destroy_arc),
)?;

Ok(())
}
13 changes: 10 additions & 3 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ extern crate alloc;

use core::ffi::{c_char, c_int};

use alloc::sync::Arc;
use sqlite::ResultCode;
use sqlite_nostd as sqlite;

use crate::state::DatabaseState;

mod bson;
mod checkpoint;
mod crud_vtab;
Expand All @@ -26,6 +29,7 @@ mod migrations;
mod operations;
mod operations_vtab;
mod schema;
mod state;
mod sync;
mod sync_local;
mod util;
Expand Down Expand Up @@ -53,6 +57,8 @@ pub extern "C" fn sqlite3_powersync_init(
}

fn init_extension(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
let state = Arc::new(DatabaseState::new());

crate::version::register(db)?;
crate::views::register(db)?;
crate::uuid::register(db)?;
Expand All @@ -62,11 +68,12 @@ fn init_extension(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
crate::view_admin::register(db)?;
crate::checkpoint::register(db)?;
crate::kv::register(db)?;
sync::register(db)?;
crate::state::register(db, state.clone())?;
sync::register(db, state.clone())?;

crate::schema::register(db)?;
crate::operations_vtab::register(db)?;
crate::crud_vtab::register(db)?;
crate::operations_vtab::register(db, state.clone())?;
crate::crud_vtab::register(db, state)?;

Ok(())
}
Expand Down
24 changes: 20 additions & 4 deletions crates/core/src/operations_vtab.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
extern crate alloc;

use alloc::boxed::Box;
use alloc::sync::Arc;
use core::ffi::{c_char, c_int, c_void};

use sqlite::{Connection, ResultCode, Value};
Expand All @@ -9,21 +10,23 @@ use sqlite_nostd as sqlite;
use crate::operations::{
clear_remove_ops, delete_bucket, delete_pending_buckets, insert_operation,
};
use crate::state::DatabaseState;
use crate::sync_local::sync_local;
use crate::vtab_util::*;

#[repr(C)]
struct VirtualTable {
base: sqlite::vtab,
db: *mut sqlite::sqlite3,
state: Arc<DatabaseState>,

target_applied: bool,
target_validated: bool,
}

extern "C" fn connect(
db: *mut sqlite::sqlite3,
_aux: *mut c_void,
aux: *mut c_void,
_argc: c_int,
_argv: *const *const c_char,
vtab: *mut *mut sqlite::vtab,
Expand All @@ -43,6 +46,14 @@ extern "C" fn connect(
zErrMsg: core::ptr::null_mut(),
},
db,
state: {
// Increase refcount - we can't use from_raw alone because we don't own the aux
// data (connect could be called multiple times).
let state = Arc::from_raw(aux as *mut DatabaseState);
let clone = state.clone();
core::mem::forget(state);
clone
},
target_validated: false,
target_applied: false,
}));
Expand Down Expand Up @@ -83,7 +94,7 @@ extern "C" fn update(
let result = insert_operation(db, args[3].text());
vtab_result(vtab, result)
} else if op == "sync_local" {
let result = sync_local(db, &args[3]);
let result = sync_local(&tab.state, db, &args[3]);
if let Ok(result_row) = result {
unsafe {
*p_row_id = result_row;
Expand Down Expand Up @@ -139,8 +150,13 @@ static MODULE: sqlite_nostd::module = sqlite_nostd::module {
xIntegrity: None,
};

pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
db.create_module_v2("powersync_operations", &MODULE, None, None)?;
pub fn register(db: *mut sqlite::sqlite3, state: Arc<DatabaseState>) -> Result<(), ResultCode> {
db.create_module_v2(
"powersync_operations",
&MODULE,
Some(Arc::into_raw(state) as *mut c_void),
Some(DatabaseState::destroy_arc),
)?;

Ok(())
}
10 changes: 7 additions & 3 deletions crates/core/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ use alloc::vec::Vec;
use serde::Deserialize;
use sqlite::ResultCode;
use sqlite_nostd as sqlite;
pub use table_info::{DiffIncludeOld, Table, TableInfoFlags};
pub use table_info::{
DiffIncludeOld, PendingStatement, PendingStatementValue, RawTable, Table, TableInfoFlags,
};

#[derive(Deserialize)]
#[derive(Deserialize, Default)]
pub struct Schema {
tables: Vec<table_info::Table>,
pub tables: Vec<table_info::Table>,
#[serde(default)]
pub raw_tables: Vec<table_info::RawTable>,
}

pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
Expand Down
21 changes: 21 additions & 0 deletions crates/core/src/schema/table_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ pub struct Table {
pub flags: TableInfoFlags,
}

#[derive(Deserialize)]
pub struct RawTable {
pub name: String,
pub put: PendingStatement,
pub delete: PendingStatement,
}

impl Table {
pub fn from_json(text: &str) -> Result<Self, serde_json::Error> {
serde_json::from_str(text)
Expand Down Expand Up @@ -225,3 +232,17 @@ impl<'de> Deserialize<'de> for TableInfoFlags {
)
}
}

#[derive(Deserialize)]
pub struct PendingStatement {
pub sql: String,
/// This vec should contain an entry for each parameter in [sql].
pub params: Vec<PendingStatementValue>,
}

#[derive(Deserialize)]
pub enum PendingStatementValue {
Id,
Column(String),
// TODO: Stuff like a raw object of put data?
}
74 changes: 74 additions & 0 deletions crates/core/src/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use core::{
ffi::{c_int, c_void},
sync::atomic::{AtomicBool, Ordering},
};

use alloc::sync::Arc;
use sqlite::{Connection, ResultCode};
use sqlite_nostd::{self as sqlite, Context};

/// State that is shared for a SQLite database connection after the core extension has been
/// registered on it.
///
/// `init_extension` allocates an instance of this in an `Arc` that is shared as user-data for
/// functions/vtabs that need access to it.
pub struct DatabaseState {
pub is_in_sync_local: AtomicBool,
}

impl DatabaseState {
pub fn new() -> Self {
DatabaseState {
is_in_sync_local: AtomicBool::new(false),
}
}

pub fn sync_local_guard<'a>(&'a self) -> impl Drop + use<'a> {
self.is_in_sync_local
.compare_exchange(false, true, Ordering::Acquire, Ordering::Acquire)
.expect("should not be syncing already");

struct ClearOnDrop<'a>(&'a DatabaseState);

impl Drop for ClearOnDrop<'_> {
fn drop(&mut self) {
self.0.is_in_sync_local.store(false, Ordering::Release);
}
}

ClearOnDrop(self)
}

pub unsafe extern "C" fn destroy_arc(ptr: *mut c_void) {
drop(Arc::from_raw(ptr.cast::<DatabaseState>()));
}
}

pub fn register(db: *mut sqlite::sqlite3, state: Arc<DatabaseState>) -> Result<(), ResultCode> {
unsafe extern "C" fn func(
ctx: *mut sqlite::context,
_argc: c_int,
_argv: *mut *mut sqlite::value,
) {
let data = ctx.user_data().cast::<DatabaseState>();
let data = unsafe { data.as_ref() }.unwrap();

ctx.result_int(if data.is_in_sync_local.load(Ordering::Relaxed) {
1
} else {
0
});
}

db.create_function_v2(
"powersync_in_sync_operation",
0,
0,
Some(Arc::into_raw(state) as *mut c_void),
Some(func),
None,
None,
Some(DatabaseState::destroy_arc),
)?;
Ok(())
}
9 changes: 7 additions & 2 deletions crates/core/src/sync/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ use alloc::borrow::Cow;
use alloc::boxed::Box;
use alloc::rc::Rc;
use alloc::string::ToString;
use alloc::sync::Arc;
use alloc::{string::String, vec::Vec};
use serde::{Deserialize, Serialize};
use sqlite::{ResultCode, Value};
use sqlite_nostd::{self as sqlite, ColumnType};
use sqlite_nostd::{Connection, Context};

use crate::error::SQLiteError;
use crate::schema::Schema;
use crate::state::DatabaseState;

use super::streaming_sync::SyncClient;
use super::sync_status::DownloadSyncStatus;
Expand All @@ -22,6 +25,8 @@ pub struct StartSyncStream {
/// Bucket parameters to include in the request when opening a sync stream.
#[serde(default)]
pub parameters: Option<serde_json::Map<String, serde_json::Value>>,
#[serde(default)]
pub schema: Schema,
}

/// A request sent from a client SDK to the [SyncClient] with a `powersync_control` invocation.
Expand Down Expand Up @@ -118,7 +123,7 @@ struct SqlController {
client: SyncClient,
}

pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
pub fn register(db: *mut sqlite::sqlite3, state: Arc<DatabaseState>) -> Result<(), ResultCode> {
extern "C" fn control(
ctx: *mut sqlite::context,
argc: c_int,
Expand Down Expand Up @@ -199,7 +204,7 @@ pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
}

let controller = Box::new(SqlController {
client: SyncClient::new(db),
client: SyncClient::new(db, state),
});

db.create_function_v2(
Expand Down
7 changes: 5 additions & 2 deletions crates/core/src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use alloc::sync::Arc;
use sqlite_nostd::{self as sqlite, ResultCode};

mod bucket_priority;
Expand All @@ -13,6 +14,8 @@ mod sync_status;
pub use bucket_priority::BucketPriority;
pub use checksum::Checksum;

pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
interface::register(db)
use crate::state::DatabaseState;

pub fn register(db: *mut sqlite::sqlite3, state: Arc<DatabaseState>) -> Result<(), ResultCode> {
interface::register(db, state)
}
Loading