-
Notifications
You must be signed in to change notification settings - Fork 350
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add mysql election logic #5694
base: main
Are you sure you want to change the base?
Conversation
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
0de153b
to
85b541c
Compare
src/meta-srv/src/election/mysql.rs
Outdated
fn create_table_sql(&self) -> String { | ||
format!( | ||
r#" | ||
CREATE TABLE IF NOT EXISTS {} ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better quote the table name in all places:
CREATE TABLE IF NOT EXISTS {} ( | |
CREATE TABLE IF NOT EXISTS `{}` ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all the SQLs below should be quoted, too
} | ||
|
||
impl MySqlElection { | ||
pub async fn with_mysql_client( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the mysql sqls and features in use compatible across all mysql versions? If not, or for future compatible consideration, I suggest check mysql version and some of the critical options first. We can target for the most widely used mysql version, 5.7, or the current stable 8.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran sqlness test locally for mysql 8.0, and the sqlness test for ci uses a mysql 5.7, so it should be alright for now. Do we need a compatible check or just leave some comments for future changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a check beforehand is needed, to not cause any runtime surprise to users after metasrv is started. WDYT @fengjiachun
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds good. I think we can check it before the metasrv starts.
sqlx::query(&sql_factory.create_table_sql()) | ||
.execute(&mut client) | ||
.await | ||
.context(MySqlExecutionSnafu)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add the exeucted sql to MySqlExecutionSnafu
?
if !res { | ||
self.delete_value(&key, &mut executor).await?; | ||
self.put_value_with_lease( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if !res { | |
self.delete_value(&key, &mut executor).await?; | |
self.put_value_with_lease( | |
if !res { | |
warn!("Candidate {} already registered, delete and register again.", key); | |
self.delete_value(&key, &mut executor).await?; | |
self.put_value_with_lease( |
loop { | ||
let _ = keep_alive_interval.tick().await; | ||
let client = self.client.lock().await; | ||
let mut executor = Executor::Default(client); | ||
let (_, prev_expire_time, current_time, origin) = self | ||
.get_value_with_lease(&key, true, &mut executor) | ||
.await? | ||
.unwrap_or_default(); | ||
|
||
ensure!( | ||
prev_expire_time > current_time, | ||
UnexpectedSnafu { | ||
violated: format!( | ||
"Candidate lease expired at {:?} (current time: {:?}), key: {:?}", | ||
prev_expire_time, | ||
current_time, | ||
String::from_utf8_lossy(&key.into_bytes()) | ||
), | ||
} | ||
); | ||
|
||
// Safety: origin is Some since we are using `get_value_with_lease` with `true`. | ||
let origin = origin.unwrap(); | ||
self.update_value_with_lease(&key, &origin, &node_info, &mut executor) | ||
.await?; | ||
std::mem::drop(executor); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use scope
loop { | |
let _ = keep_alive_interval.tick().await; | |
let client = self.client.lock().await; | |
let mut executor = Executor::Default(client); | |
let (_, prev_expire_time, current_time, origin) = self | |
.get_value_with_lease(&key, true, &mut executor) | |
.await? | |
.unwrap_or_default(); | |
ensure!( | |
prev_expire_time > current_time, | |
UnexpectedSnafu { | |
violated: format!( | |
"Candidate lease expired at {:?} (current time: {:?}), key: {:?}", | |
prev_expire_time, | |
current_time, | |
String::from_utf8_lossy(&key.into_bytes()) | |
), | |
} | |
); | |
// Safety: origin is Some since we are using `get_value_with_lease` with `true`. | |
let origin = origin.unwrap(); | |
self.update_value_with_lease(&key, &origin, &node_info, &mut executor) | |
.await?; | |
std::mem::drop(executor); | |
} | |
loop { | |
let _ = keep_alive_interval.tick().await; | |
let client = self.client.lock().await; | |
{ | |
let mut executor = Executor::Default(client); | |
let (_, prev_expire_time, current_time, origin) = self | |
.get_value_with_lease(&key, true, &mut executor) | |
.await? | |
.unwrap_or_default(); | |
ensure!( | |
prev_expire_time > current_time, | |
UnexpectedSnafu { | |
violated: format!( | |
"Candidate lease expired at {:?} (current time: {:?}), key: {:?}", | |
prev_expire_time, | |
current_time, | |
String::from_utf8_lossy(&key.into_bytes()) | |
), | |
} | |
); | |
// Safety: origin is Some since we are using `get_value_with_lease` with `true`. | |
let origin = origin.unwrap(); | |
self.update_value_with_lease(&key, &origin, &node_info, &mut executor) | |
.await?; | |
} | |
} |
.context(UnexpectedSnafu { | ||
violated: format!( | ||
"Invalid value {}, expect node info || {} || expire time", | ||
value, LEASE_SEP | ||
), | ||
})?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.context(UnexpectedSnafu { | |
violated: format!( | |
"Invalid value {}, expect node info || {} || expire time", | |
value, LEASE_SEP | |
), | |
})?; | |
.with_context(|| UnexpectedSnafu { | |
violated: format!( | |
"Invalid value {}, expect node info || {} || expire time", | |
value, LEASE_SEP | |
), | |
})?; |
} | ||
|
||
enum Executor<'a> { | ||
Default(MutexGuard<'a, MySqlConnection>), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not always use the txn? The election is racing against other metasrv process.
@@ -579,6 +579,9 @@ jobs: | |||
- name: "Pg Kvbackend" | |||
opts: "--setup-pg" | |||
kafka: false | |||
- name: "Mysql Kvbackend" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- name: "Mysql Kvbackend" | |
- name: "MySQL KvBackend" |
@@ -579,6 +579,9 @@ jobs: | |||
- name: "Pg Kvbackend" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- name: "Pg Kvbackend" | |
- name: "PostgreSQL KvBackend" |
// Currently the session timeout is longer than the leader lease time. | ||
// So the leader will renew the lease twice before the session timeout if everything goes well. | ||
fn set_idle_session_timeout_sql(&self) -> String { | ||
format!("SET idle_session_timeout = '{}s';", META_LEASE_SECS + 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏻
I hereby agree to the terms of the GreptimeDB CLA.
Refer to a related PR or issue link (optional)
#5528
What's changed and what's your intention?
As title.
PR Checklist
Please convert it to a draft if some of the following conditions are not met.