Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC #23500

Closed
wants to merge 1 commit into from
Closed

POC #23500

Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 75 additions & 66 deletions src/environmentd/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1820,85 +1820,94 @@ async fn test_timeline_read_holds() {

#[mz_ore::test(tokio::test(flavor = "multi_thread", worker_threads = 2))]
async fn test_linearizability() {
// Set the timestamp to zero for deterministic initial timestamps.
let now = Arc::new(Mutex::new(0));
let now_fn = {
let now = Arc::clone(&now);
NowFn::from(move || *now.lock().unwrap())
};
let server = test_util::TestHarness::default()
.with_now(now_fn)
.unsafe_mode()
.start()
.await;
let mz_client = server.connect().await.unwrap();

let view_name = "v_lin";
let source_name = "source_lin";
let (pg_client, cleanup_fn) =
test_util::create_postgres_source_with_table(&mz_client, view_name, "(a INT)", source_name)
for _ in 0..100 {
// Set the timestamp to zero for deterministic initial timestamps.
let now = Arc::new(Mutex::new(0));
let now_fn = {
let now = Arc::clone(&now);
NowFn::from(move || *now.lock().unwrap())
};
let server = test_util::TestHarness::default()
.with_now(now_fn)
.unsafe_mode()
.start()
.await;
// Insert value into postgres table.
let _ = pg_client
.execute(&format!("INSERT INTO {view_name} VALUES (42);"), &[])
.await
.unwrap();
let mz_client = server.connect().await.unwrap();

let view_name = "v_lin";
let source_name = "source_lin";
let (pg_client, cleanup_fn) = test_util::create_postgres_source_with_table(
&mz_client,
view_name,
"(a INT)",
source_name,
)
.await;
// Insert value into postgres table.
let _ = pg_client
.execute(&format!("INSERT INTO {view_name} VALUES (42);"), &[])
.await
.unwrap();

test_util::wait_for_view_population(&mz_client, view_name, 1).await;
test_util::wait_for_view_population(&mz_client, view_name, 1).await;

// The user table's write frontier will be close to zero because we use a deterministic
// now function in this test. It may be slightly higher than zero because bootstrapping
// and background tasks push the global timestamp forward.
// The materialized view's write frontier will be close to the system time because it uses
// the system clock to close timestamps.
// Therefore queries that only involve the view will normally happen at a higher timestamp
// than queries that involve the user table. However, we prevent this when in strict
// serializable mode.
// The user table's write frontier will be close to zero because we use a deterministic
// now function in this test. It may be slightly higher than zero because bootstrapping
// and background tasks push the global timestamp forward.
// The materialized view's write frontier will be close to the system time because it uses
// the system clock to close timestamps.
// Therefore queries that only involve the view will normally happen at a higher timestamp
// than queries that involve the user table. However, we prevent this when in strict
// serializable mode.

mz_client
.batch_execute("SET transaction_isolation = serializable")
.await
.unwrap();
let view_ts = test_util::get_explain_timestamp(view_name, &mz_client).await;
mz_client
.batch_execute("SET transaction_isolation = serializable")
.await
.unwrap();
let view_ts = test_util::get_explain_timestamp(view_name, &mz_client).await;

// Create user table in Materialize.
mz_client
.batch_execute("DROP TABLE IF EXISTS t;")
.await
.unwrap();
mz_client
.batch_execute("CREATE TABLE t (a INT);")
.await
.unwrap();
let join_ts = test_util::get_explain_timestamp(&format!("{view_name}, t"), &mz_client).await;
// Create user table in Materialize.
mz_client
.batch_execute("DROP TABLE IF EXISTS t;")
.await
.unwrap();
mz_client
.batch_execute("CREATE TABLE t (a INT);")
.await
.unwrap();
let join_ts =
test_util::get_explain_timestamp(&format!("{view_name}, t"), &mz_client).await;

// In serializable transaction isolation, read timestamps can go backwards.
assert!(join_ts < view_ts);
// In serializable transaction isolation, read timestamps can go backwards.
assert!(join_ts < view_ts);

mz_client
.batch_execute("SET transaction_isolation = 'strict serializable'")
.await
.unwrap();
mz_client
.batch_execute("SET transaction_isolation = 'strict serializable'")
.await
.unwrap();

let view_ts = test_util::get_explain_timestamp(view_name, &mz_client).await;
let join_ts = test_util::get_explain_timestamp(&format!("{view_name}, t"), &mz_client).await;
let view_ts = test_util::get_explain_timestamp(view_name, &mz_client).await;
let join_ts =
test_util::get_explain_timestamp(&format!("{view_name}, t"), &mz_client).await;

// Since the query on the join was done after the query on the view, it should have a higher or
// equal timestamp in strict serializable mode.
assert!(join_ts >= view_ts);
// Since the query on the join was done after the query on the view, it should have a higher or
// equal timestamp in strict serializable mode.
assert!(join_ts >= view_ts);

mz_client
.batch_execute("SET transaction_isolation = serializable")
.await
.unwrap();
mz_client
.batch_execute("SET transaction_isolation = serializable")
.await
.unwrap();

let view_ts = test_util::get_explain_timestamp(view_name, &mz_client).await;
let join_ts = test_util::get_explain_timestamp(&format!("{view_name}, t"), &mz_client).await;
let view_ts = test_util::get_explain_timestamp(view_name, &mz_client).await;
let join_ts =
test_util::get_explain_timestamp(&format!("{view_name}, t"), &mz_client).await;

// If we go back to serializable, then timestamps can revert again.
assert!(join_ts < view_ts);
// If we go back to serializable, then timestamps can revert again.
assert!(join_ts < view_ts);

cleanup_fn(&mz_client, &pg_client).await;
cleanup_fn(&mz_client, &pg_client).await;
}
}

#[mz_ore::test]
Expand Down