diff --git a/database/src/lib.rs b/database/src/lib.rs index 41d664776..9f7e8b6d9 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -3,7 +3,7 @@ use chrono::{DateTime, Utc}; use hashbrown::HashMap; use intern::intern; use serde::{Deserialize, Serialize}; -use std::fmt; +use std::fmt::{self, Display, Formatter}; use std::hash; use std::ops::{Add, Sub}; use std::sync::Arc; @@ -155,6 +155,15 @@ impl FromStr for CommitType { } } +impl Display for CommitType { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + CommitType::Try => f.write_str("try"), + CommitType::Master => f.write_str("master"), + } + } +} + #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub struct Commit { pub sha: String, @@ -794,3 +803,165 @@ pub struct ArtifactCollection { pub duration: Duration, pub end_time: DateTime, } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum CommitJobType { + Try { pr: u32 }, + Master { pr: u32 }, + Release { tag: String }, +} + +impl CommitJobType { + /// Get the name of the type as a `str` + pub fn name(&self) -> &'static str { + match self { + CommitJobType::Try { pr: _ } => "try", + CommitJobType::Master { pr: _ } => "master", + CommitJobType::Release { tag: _ } => "release", + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommitJob { + pub sha: String, + pub parent_sha: String, + pub commit_time: Date, + pub target: Target, + pub include: Option, + pub exclude: Option, + pub runs: Option, + pub backends: Option, + pub job_type: CommitJobType, + pub state: CommitJobState, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum CommitJobState { + Queued, + Finished(CommitJobFinished), + Failed(CommitJobFailed), + InProgress(CommitJobInProgress), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommitJobInProgress { + pub machine_id: String, + pub started_at: Date, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommitJobFinished { + pub machine_id: String, + pub started_at: Date, + pub finished_at: Date, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommitJobFailed { + pub machine_id: String, + pub started_at: Date, + pub finished_at: Date, +} + +impl CommitJob { + /// Get the status as a string + pub fn status(&self) -> &'static str { + match self.state { + CommitJobState::Queued => "queued", + CommitJobState::InProgress(_) => "in_progress", + CommitJobState::Finished(_) => "finished", + CommitJobState::Failed(_) => "failed", + } + } +} + +/// Maps from the database to a Rust struct +#[allow(clippy::too_many_arguments)] +fn commit_job_create( + sha: String, + parent_sha: String, + commit_type: &str, + pr: Option, + release_tag: Option, + commit_time: Date, + target: Target, + machine_id: Option, + started_at: Option, + finished_at: Option, + status: &str, + include: Option, + exclude: Option, + runs: Option, + backends: Option, +) -> CommitJob { + let job_type = match commit_type { + "try" => CommitJobType::Try { + pr: pr.expect("`pr` cannot be `None` for a Commit of type `try`"), + }, + "master" => CommitJobType::Master { + pr: pr.expect("`pr` cannot be `None` for a Commit of type `master`"), + }, + "release" => CommitJobType::Release { + tag: release_tag + .expect("`release_tag` cannot be `None` for a Commit of type `release`"), + }, + _ => panic!("Unhandled commit_type {}", commit_type), + }; + + let state = match status { + "queued" => CommitJobState::Queued, + + "in_progress" => { + let started_at = + started_at.expect("`started_at` must be Some for an `in_progress` job"); + let machine_id = + machine_id.expect("`machine_id` must be Some for an `in_progress` job"); + + CommitJobState::InProgress(CommitJobInProgress { + started_at, + machine_id, + }) + } + + "finished" | "failed" => { + let started_at = + started_at.expect("`started_at` must be Some for finished or failed job"); + let finished_at = + finished_at.expect("`finished_at` must be Some for finished or failed"); + let machine_id = + machine_id.expect("`machine_id` must be Some for finished or failed a job"); + + if status == "finished" { + CommitJobState::Finished(CommitJobFinished { + started_at, + finished_at, + machine_id, + }) + } else { + CommitJobState::Failed(CommitJobFailed { + started_at, + finished_at, + machine_id, + }) + } + } + + other => { + panic!("unknown status `{other}` (expected `queued`, `in_progress`, `finished` or `failed`)") + } + }; + + CommitJob { + sha, + parent_sha, + commit_time, + target, + include, + exclude, + runs, + backends, + job_type, + state, + } +} diff --git a/database/src/pool.rs b/database/src/pool.rs index 71c0855a7..e353546aa 100644 --- a/database/src/pool.rs +++ b/database/src/pool.rs @@ -1,5 +1,6 @@ use crate::{ - ArtifactCollection, ArtifactId, ArtifactIdNumber, CodegenBackend, CompileBenchmark, Target, + ArtifactCollection, ArtifactId, ArtifactIdNumber, CodegenBackend, CommitJob, CompileBenchmark, + Target, }; use crate::{CollectionId, Index, Profile, QueuedCommit, Scenario, Step}; use chrono::{DateTime, Utc}; @@ -178,6 +179,16 @@ pub trait Connection: Send + Sync { /// Removes all data associated with the given artifact. async fn purge_artifact(&self, aid: &ArtifactId); + + /// Add a job to the queue + async fn enqueue_commit_job(&self, jobs: &CommitJob); + + /// Dequeue jobs, we pass `machine_id` and `target` in case there are jobs + /// the machine was previously doing and can pick up again + async fn take_commit_job(&self, machine_id: &str, target: Target) -> Option; + + /// Mark the job as finished + async fn finish_commit_job(&self, machine_id: &str, target: Target, sha: String); } #[async_trait::async_trait] @@ -301,7 +312,7 @@ mod tests { use std::str::FromStr; use super::*; - use crate::{tests::run_db_test, Commit, CommitType, Date}; + use crate::{tests::run_db_test, Commit, CommitJobState, CommitJobType, CommitType, Date}; /// Create a Commit fn create_commit(commit_sha: &str, time: chrono::DateTime, r#type: CommitType) -> Commit { @@ -312,6 +323,29 @@ mod tests { } } + /// Create a CommitJob + fn create_commit_job( + sha: &str, + parent_sha: &str, + commit_time: chrono::DateTime, + target: Target, + job_type: CommitJobType, + state: CommitJobState, + ) -> CommitJob { + CommitJob { + sha: sha.to_string(), + parent_sha: parent_sha.to_string(), + commit_time: Date(commit_time), + target, + include: None, + exclude: None, + runs: None, + backends: None, + job_type, + state, + } + } + #[tokio::test] async fn pstat_returns_empty_vector_when_empty() { run_db_test(|ctx| async { @@ -370,4 +404,122 @@ mod tests { }) .await; } + + #[tokio::test] + async fn take_commit_job() { + run_db_test(|ctx| async { + // ORDER: + // Releases first + // Master commits second, order by oldest PR ascending + // Try commits last, order by oldest PR ascending + + let db = ctx.db_client().connection().await; + let time = chrono::DateTime::from_str("2021-09-01T00:00:00.000Z").unwrap(); + + // Try commits + let try_job_1 = create_commit_job( + "sha1", + "p1", + time, + Target::X86_64UnknownLinuxGnu, + CommitJobType::Try { pr: 1 }, + CommitJobState::Queued, + ); + let try_job_2 = create_commit_job( + "sha2", + "p2", + time, + Target::X86_64UnknownLinuxGnu, + CommitJobType::Try { pr: 2 }, + CommitJobState::Queued, + ); + + // Master commits + let master_job_1 = create_commit_job( + "sha3", + "p3", + time, + Target::X86_64UnknownLinuxGnu, + CommitJobType::Master { pr: 3 }, + CommitJobState::Queued, + ); + let master_job_2 = create_commit_job( + "sha4", + "p4", + time, + Target::X86_64UnknownLinuxGnu, + CommitJobType::Master { pr: 4 }, + CommitJobState::Queued, + ); + + // Release commits + let release_job_1 = create_commit_job( + "sha5", + "p5", + time, + Target::X86_64UnknownLinuxGnu, + CommitJobType::Release { tag: "tag1".into() }, + CommitJobState::Queued, + ); + let release_job_2 = create_commit_job( + "sha6", + "p6", + time, + Target::X86_64UnknownLinuxGnu, + CommitJobType::Release { tag: "tag2".into() }, + CommitJobState::Queued, + ); + + // Shuffle the insert order a bit + let all_commits = vec![ + release_job_1, + master_job_2, + try_job_1, + release_job_2, + master_job_1, + try_job_2, + ]; + + // queue all the jobs + for commit in all_commits { + db.enqueue_commit_job(&commit).await; + } + + // Now we test the ordering: after each dequeue we immediately mark + // the job as finished for the sake of testing so it can't be + // returned again in the test. + // + // The priority should be; + // + // 1. Release commits (oldest tag first) + // 2. Master commits (oldest PR first) + // 3. Try commits (oldest PR first) + // + // Given the data we inserted above the expected SHA order is: + // sha5, sha6, sha3, sha4, sha1, sha2. + + let machine = "machine-1"; + let target = Target::X86_64UnknownLinuxGnu; + let expected = ["sha5", "sha6", "sha3", "sha4", "sha1", "sha2"]; + + for &sha in &expected { + let job = db.take_commit_job(machine, target).await; + assert!(job.is_some(), "expected a job for sha {sha}"); + let job = job.unwrap(); + assert_eq!(job.sha, sha, "jobs dequeued out of priority order"); + + // Mark the job finished so it is not returned again. + db.finish_commit_job(machine, target, sha.to_string()).await; + } + + // After all six jobs have been taken, the queue should be empty. + assert!( + db.take_commit_job(machine, target).await.is_none(), + "queue should be empty after draining all jobs" + ); + + Ok(ctx) + }) + .await; + } } diff --git a/database/src/pool/postgres.rs b/database/src/pool/postgres.rs index 374b4904f..26a35ed31 100644 --- a/database/src/pool/postgres.rs +++ b/database/src/pool/postgres.rs @@ -1,7 +1,8 @@ use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction}; use crate::{ - ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, CodegenBackend, CollectionId, - Commit, CommitType, CompileBenchmark, Date, Index, Profile, QueuedCommit, Scenario, Target, + commit_job_create, ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, CodegenBackend, + CollectionId, Commit, CommitJob, CommitJobType, CommitType, CompileBenchmark, Date, Index, + Profile, QueuedCommit, Scenario, Target, }; use anyhow::Context as _; use chrono::{DateTime, TimeZone, Utc}; @@ -285,6 +286,26 @@ static MIGRATIONS: &[&str] = &[ alter table pstat_series drop constraint test_case; alter table pstat_series add constraint test_case UNIQUE(crate, profile, scenario, backend, target, metric); "#, + r#" + CREATE TABLE IF NOT EXISTS commit_queue ( + sha TEXT, + parent_sha TEXT, + commit_type TEXT, + pr INTEGER, + release_tag TEXT, + commit_time TIMESTAMPTZ, + target TEXT, + include TEXT, + exclude TEXT, + runs INTEGER DEFAULT 0, + backends TEXT, + machine_id TEXT, + started_at TIMESTAMPTZ, + finished_at TIMESTAMPTZ, + status TEXT, + PRIMARY KEY (sha, target) + ); + "#, ]; #[async_trait::async_trait] @@ -1365,8 +1386,268 @@ where .await .unwrap(); } + + /// Add a job to the queue + async fn enqueue_commit_job(&self, job: &CommitJob) { + match &job.job_type { + CommitJobType::Try { pr } | CommitJobType::Master { pr } => self + .conn() + .execute( + "INSERT INTO commit_queue ( + sha, + parent_sha, + commit_type, + commit_time, + status, + target, + include, + exclude, + runs, + backends, + pr + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + ON CONFLICT DO NOTHING", + &[ + &job.sha, + &job.parent_sha, + &job.job_type.name(), + &job.commit_time.0, + &"queued", + &job.target, + &job.include, + &job.exclude, + &job.runs, + &job.backends, + &(*pr as i32), + ], + ) + .await + .unwrap(), + CommitJobType::Release { tag } => self + .conn() + .execute( + "INSERT INTO commit_queue ( + sha, + parent_sha, + commit_type, + commit_time, + status, + target, + include, + exclude, + runs, + backends, + release_tag + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + ON CONFLICT DO NOTHING", + &[ + &job.sha, + &job.parent_sha, + &job.job_type.name(), + &job.commit_time.0, + &"queued", + &job.target, + &job.include, + &job.exclude, + &job.runs, + &job.backends, + &tag, + ], + ) + .await + .unwrap(), + }; + } + + async fn take_commit_job(&self, machine_id: &str, target: Target) -> Option { + /// Map a database row from the commit queue to a `CommitJob` + fn commit_queue_row_to_commit_job(row: &tokio_postgres::Row) -> CommitJob { + let sha = row.get::<_, String>(0); + let parent_sha = row.get::<_, String>(1); + let commit_type = row.get::<_, String>(2); + let pr = row.get::<_, Option>(3).map(|it| it as u32); + let release_tag = row.get::<_, Option>(4); + let commit_time = row.get::<_, DateTime>(5); + let target = Target::from_str(&row.get::<_, String>(6)).unwrap(); + let include = row.get::<_, Option>(7); + let exclude = row.get::<_, Option>(8); + let runs = row.get::<_, Option>(9); + let backends = row.get::<_, Option>(10); + let machine_id = row.get::<_, Option>(11); + let started_at = row.get::<_, Option>>(12).map(Date); + let finished_at = row.get::<_, Option>>(13).map(Date); + let status = row.get::<_, String>(14); + + commit_job_create( + sha, + parent_sha, + &commit_type, + pr, + release_tag, + Date(commit_time), + target, + machine_id, + started_at, + finished_at, + &status, + include, + exclude, + runs, + backends, + ) + } + + let maybe_drift_job = self + .conn() + .query_opt( + " + WITH job_to_update AS ( + SELECT + sha, + parent_sha, + commit_type, + pr, + release_tag, + commit_time, + target, + include, + exclude, + runs, + backends, + machine_id, + started_at, + finished_at, + status + FROM commit_queue + WHERE target != $1 AND status IN ('finished', 'in_progress') + ORDER BY started_at + LIMIT 1 + FOR UPDATE SKIP LOCKED + ) + UPDATE commit_queue + SET started_at = NOW(), + status = 'in_progress', + machine_id = $2 + WHERE + target = $1 + AND sha = (SELECT sha FROM job_to_update) + RETURNING *; + ", + &[&target, &machine_id], + ) + .await + .unwrap(); + + /* If we are, we will take that job */ + if let Some(row) = maybe_drift_job { + return Some(commit_queue_row_to_commit_job(&row)); + } + + /* See if there are any jobs that need taking care of */ + let job = self + .conn() + .query_opt( + " + WITH job_to_update AS ( + SELECT + sha, + parent_sha, + commit_type, + pr, + release_tag, + commit_time, + target, + include, + exclude, + runs, + backends, + machine_id, + started_at, + finished_at, + status, + CASE + WHEN commit_type = 'release' THEN 0 + WHEN commit_type = 'master' THEN 1 + WHEN commit_type = 'try' THEN 2 + ELSE -1 + END AS type_rank + FROM commit_queue + WHERE target = $1 + AND status = 'queued' + ORDER BY type_rank, pr ASC, sha + LIMIT 1 + FOR UPDATE SKIP LOCKED + ) + UPDATE commit_queue + SET started_at = NOW(), + status = 'in_progress', + machine_id = $2 + WHERE + sha = (SELECT sha FROM job_to_update) + AND target = $1 + RETURNING *; + ", + &[&target, &machine_id], + ) + .await + .unwrap(); + + /* If there is one, we will take that job */ + if let Some(row) = job { + return Some(commit_queue_row_to_commit_job(&row)); + } + + /* There are no jobs in the queue */ + return None; + } + + /// Mark a job in the database as done + async fn finish_commit_job(&self, machine_id: &str, target: Target, sha: String) { + self.conn() + .query_opt( + " + UPDATE commit_queue + SET finished_at = NOW(), + status = 'finished' + WHERE + sha = $1 + AND machine_id = $2 + AND target = $3; + ", + &[&sha, &machine_id, &target], + ) + .await + .unwrap(); + } } +#[macro_export] +macro_rules! impl_to_postgresql_via_to_string { + ($t:ty) => { + impl tokio_postgres::types::ToSql for $t { + fn to_sql( + &self, + ty: &tokio_postgres::types::Type, + out: &mut bytes::BytesMut, + ) -> Result> + { + self.to_string().to_sql(ty, out) + } + + fn accepts(ty: &tokio_postgres::types::Type) -> bool { + ::accepts(ty) + } + + // Only compile if the type is acceptable + tokio_postgres::types::to_sql_checked!(); + } + }; +} + +impl_to_postgresql_via_to_string!(Target); + fn parse_artifact_id(ty: &str, sha: &str, date: Option>) -> ArtifactId { match ty { "master" => ArtifactId::Commit(Commit { diff --git a/database/src/pool/sqlite.rs b/database/src/pool/sqlite.rs index 27d6b46de..49b637cd4 100644 --- a/database/src/pool/sqlite.rs +++ b/database/src/pool/sqlite.rs @@ -1,12 +1,13 @@ use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction}; use crate::{ - ArtifactCollection, ArtifactId, Benchmark, CodegenBackend, CollectionId, Commit, CommitType, - CompileBenchmark, Date, Profile, Target, + commit_job_create, ArtifactCollection, ArtifactId, Benchmark, CodegenBackend, CollectionId, + Commit, CommitJob, CommitJobType, CommitType, CompileBenchmark, Date, Profile, Target, }; use crate::{ArtifactIdNumber, Index, QueuedCommit}; use chrono::{DateTime, TimeZone, Utc}; use hashbrown::HashMap; use rusqlite::params; +use rusqlite::types::{FromSql, FromSqlError, FromSqlResult, ToSql, ValueRef}; use rusqlite::OptionalExtension; use std::path::PathBuf; use std::str::FromStr; @@ -404,6 +405,28 @@ static MIGRATIONS: &[Migration] = &[ alter table pstat_series_with_target rename to pstat_series; "#, ), + Migration::without_foreign_key_constraints( + r#" + CREATE TABLE IF NOT EXISTS commit_queue ( + sha TEXT, + parent_sha TEXT, + commit_type TEXT, + pr INTEGER, + release_tag TEXT, + commit_time TIMESTAMP, + target TEXT, + include TEXT, + exclude TEXT, + runs INTEGER DEFAULT 0, + backends TEXT, + machine_id TEXT, + started_at TIMESTAMP, + finished_at TIMESTAMP, + status TEXT, + PRIMARY KEY (sha, target) + ); + "#, + ), ]; #[async_trait::async_trait] @@ -1252,6 +1275,297 @@ impl Connection for SqliteConnection { ) .unwrap(); } + + /// Add a job to the queue + async fn enqueue_commit_job(&self, job: &CommitJob) { + match &job.job_type { + CommitJobType::Try { pr } | CommitJobType::Master { pr } => self + .raw_ref() + .execute( + "INSERT OR IGNORE commit_queue ( + sha, + parent_sha, + commit_type, + commit_time, + status, + target, + include, + exclude, + runs, + backends, + pr + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT DO NOTHING", + params![ + &job.sha, + &job.parent_sha, + job.job_type.name(), + &job.commit_time, + &"queued", + &job.target, + &job.include, + &job.exclude, + &job.runs, + &job.backends, + &pr, + ], + ) + .unwrap(), + CommitJobType::Release { tag } => self + .raw_ref() + .execute( + "INSERT OR IGNORE INTO commit_queue ( + sha, + parent_sha, + commit_type, + commit_time, + status, + target, + include, + exclude, + runs, + backends, + release_tag + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT DO NOTHING", + params![ + &job.sha, + &job.parent_sha, + &job.job_type.name(), + &job.commit_time, + &"queued", + &job.target, + &job.include, + &job.exclude, + &job.runs, + &job.backends, + &tag, + ], + ) + .unwrap(), + }; + } + + /// For this to work we need a central database + async fn take_commit_job(&self, machine_id: &str, target: Target) -> Option { + /* This is the same order as the `SELECT ...` below, which is also the + * table creation order */ + fn commit_queue_row_to_commit_job(row: &rusqlite::Row) -> CommitJob { + let sha = row.get::<_, String>(0).unwrap(); + let parent_sha = row.get::<_, String>(1).unwrap(); + let commit_type = row.get::<_, String>(2).unwrap(); + let pr = row.get::<_, Option>(3).unwrap(); + let release_tag = row.get::<_, Option>(4).unwrap(); + let commit_time = row.get::<_, String>(5).unwrap().parse::().unwrap(); + let target = Target::from_str(&row.get::<_, String>(6).unwrap()).unwrap(); + let include = row.get::<_, Option>(7).unwrap(); + let exclude = row.get::<_, Option>(8).unwrap(); + let runs = row.get::<_, Option>(9).unwrap(); + let backends = row.get::<_, Option>(10).unwrap(); + let machine_id = row.get::<_, Option>(11).unwrap(); + let started_at = row + .get::<_, Option>(12) + .unwrap() + .map(|ts| ts.parse::().unwrap()); + + let finished_at = row + .get::<_, Option>(13) + .unwrap() + .map(|ts| ts.parse::().unwrap()); + let status = row.get::<_, String>(14).unwrap(); + + commit_job_create( + sha, + parent_sha, + &commit_type, + pr, + release_tag, + commit_time, + target, + machine_id, + started_at, + finished_at, + &status, + include, + exclude, + runs, + backends, + ) + } + + /* Check to see if we are out of sync with other collectors of + * different architectures, if we are we will update the row and + * return this `sha` */ + let maybe_drift_job = self + .raw_ref() + .prepare( + " + WITH job_to_update AS ( + SELECT + sha, + parent_sha, + commit_type, + pr, + release_tag, + commit_time, + target, + include, + exclude, + runs, + backends, + machine_id, + started_at, + finished_at, + status + FROM commit_queue + WHERE target != ? AND status IN ('finished', 'in_progress') + ORDER BY started_at + LIMIT 1 + ) + UPDATE commit_queue + SET started_at = DATETIME('now'), + status = 'in_progress', + machine_id = ? + WHERE + target = ? + AND sha = (SELECT sha FROM job_to_update) + RETURNING *; + ", + ) + .unwrap() + .query_map(params![&target, &target, machine_id, &target], |row| { + Ok(commit_queue_row_to_commit_job(row)) + }) + .unwrap() + .map(|sha| sha.unwrap()) + .collect::>(); + + if let Some(drift_job) = maybe_drift_job.first() { + return Some(drift_job.clone()); + } + + /* See if there are any jobs that need taking care of */ + let jobs = self + .raw_ref() + .prepare( + " + WITH job_to_update AS ( + SELECT + sha, + parent_sha, + commit_type, + pr, + release_tag, + commit_time, + target, + include, + exclude, + runs, + backends, + machine_id, + started_at, + finished_at, + status, + CASE + WHEN commit_type = 'release' THEN 0 + WHEN commit_type = 'master' THEN 1 + WHEN commit_type = 'try' THEN 2 + ELSE -1 + END AS type_rank + FROM commit_queue + WHERE target = ? + AND status = 'queued' + ORDER BY + type_rank, + pr ASC, + sha + LIMIT 1 + ) + UPDATE commit_queue + SET started_at = DATETIME('now'), + status = 'in_progress', + machine_id = ? + WHERE + sha = (SELECT sha FROM job_to_update) + AND target = ? + RETURNING *; + ", + ) + .unwrap() + .query_map(params![&target, machine_id, &target], |row| { + Ok(commit_queue_row_to_commit_job(row)) + }) + .unwrap() + .map(|r| r.unwrap()) + .collect::>(); + + /* If there is one, we will take that job */ + if let Some(commit_job) = jobs.first() { + return Some(commit_job.clone()); + } + + /* There are no jobs in the queue */ + return None; + } + + /// For this to work we need a central database + async fn finish_commit_job(&self, machine_id: &str, target: Target, sha: String) { + self.raw_ref() + .execute( + " + UPDATE commit_queue + SET finished_at = DATETIME('now'), + status = 'finished', + WHERE + sha = ? + AND machine_id = ? + AND target = ?; + ", + params![&sha, machine_id, &target], + ) + .unwrap(); + } +} + +#[macro_export] +macro_rules! impl_to_sqlite_via_to_string { + ($t:ty) => { + impl ToSql for $t { + fn to_sql(&self) -> rusqlite::Result> { + Ok(self.to_string().into()) + } + } + }; +} + +impl_to_sqlite_via_to_string!(Target); + +impl ToSql for Date { + fn to_sql(&self) -> rusqlite::Result> { + Ok(self.0.to_rfc3339().into()) + } +} + +impl FromSql for Date { + fn column_result(value: ValueRef<'_>) -> FromSqlResult { + match value { + ValueRef::Text(text) => { + let s = std::str::from_utf8(text).map_err(|e| FromSqlError::Other(Box::new(e)))?; + DateTime::parse_from_rfc3339(s) + .map(|dt| Date(dt.with_timezone(&Utc))) + .map_err(|e| FromSqlError::Other(Box::new(e))) + } + ValueRef::Integer(i) => Ok(Date(Utc.timestamp_opt(i, 0).unwrap())), + ValueRef::Real(f) => { + let secs = f.trunc() as i64; + let nanos = ((f - f.trunc()) * 1e9) as u32; + Ok(Date(Utc.timestamp_opt(secs, nanos).unwrap())) + } + _ => Err(FromSqlError::InvalidType), + } + } } fn parse_artifact_id(ty: &str, sha: &str, date: Option) -> ArtifactId {