From 5692b0cc178410972bc64f4ac0dde0b3b1efcd8a Mon Sep 17 00:00:00 2001 From: Alessandro Gallo Date: Wed, 17 Jun 2026 17:09:14 +0200 Subject: [PATCH 1/3] [CRE-1299] - support white listed requests --- libs/opsqueue_python/src/common.rs | 7 ++-- opsqueue/src/common/submission.rs | 54 ++++++++++++++++++++++++------ 2 files changed, 49 insertions(+), 12 deletions(-) diff --git a/libs/opsqueue_python/src/common.rs b/libs/opsqueue_python/src/common.rs index f19eaa6..c3d6da8 100644 --- a/libs/opsqueue_python/src/common.rs +++ b/libs/opsqueue_python/src/common.rs @@ -386,6 +386,7 @@ pub struct Submission { pub chunks_total: u64, pub chunks_done: u64, pub metadata: Option, + pub strategic_metadata: Option, } impl From for Submission { @@ -395,6 +396,7 @@ impl From for Submission { chunks_total: value.chunks_total.into(), chunks_done: value.chunks_done.into(), metadata: value.metadata, + strategic_metadata: value.strategic_metadata, } } } @@ -403,11 +405,12 @@ impl From for Submission { impl Submission { fn __repr__(&self) -> String { format!( - "Submission(id={0}, chunks_total={1}, chunks_done={2}, metadata={3:?})", + "Submission(id={0}, chunks_total={1}, chunks_done={2}, metadata={3:?}, strategic_metadata={4:?})", self.id.__repr__(), self.chunks_total, self.chunks_done, - self.metadata + self.metadata, + self.strategic_metadata ) } } diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index 69040f1..da38fd0 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -151,6 +151,7 @@ pub struct Submission { pub chunks_done: ChunkCount, pub chunk_size: ChunkSize, pub metadata: Option, + pub strategic_metadata: Option, pub otel_trace_carrier: String, } @@ -224,6 +225,7 @@ impl Submission { chunks_done: ChunkCount::zero(), chunk_size: ChunkSize::default(), metadata: None, + strategic_metadata: None, otel_trace_carrier, } } @@ -243,6 +245,7 @@ impl Submission { chunks_done: ChunkCount::zero(), chunk_size, metadata, + strategic_metadata: None, otel_trace_carrier, }; let chunks = chunks @@ -268,7 +271,7 @@ pub mod db { db::{Connection, True, WriterConnection, WriterPool}, }; use chunk::ChunkSize; - use sqlx::{query, query_as, Sqlite}; + use sqlx::{query, Sqlite}; use axum_prometheus::metrics::{counter, histogram}; @@ -421,6 +424,7 @@ pub mod db { chunks_done: ChunkCount::zero(), chunk_size, metadata, + strategic_metadata: None, otel_trace_carrier, }; let iter = chunks_contents @@ -461,15 +465,18 @@ pub mod db { id: SubmissionId, mut conn: impl Connection, ) -> Result> { - let submission = query_as!( - Submission, + let submission_row = query!( r#" SELECT id AS "id: SubmissionId" , prefix , chunks_total AS "chunks_total: ChunkCount" , chunks_done AS "chunks_done: ChunkCount" - , chunk_size AS "chunk_size: ChunkSize" + , chunk_size AS "chunk_size!: ChunkSize" , metadata + , ( SELECT json_group_object(metadata_key, metadata_value) + FROM submissions_metadata + WHERE submission_id = submissions.id + ) AS "strategic_metadata: sqlx::types::Json" , otel_trace_carrier FROM submissions WHERE id = $1 "#, @@ -477,9 +484,18 @@ pub mod db { ) .fetch_optional(conn.get_inner()) .await?; - match submission { + match submission_row { None => Err(E::R(SubmissionNotFound(id))), - Some(submission) => Ok(submission), + Some(row) => Ok(Submission { + id: row.id, + prefix: row.prefix, + chunks_total: row.chunks_total, + chunks_done: row.chunks_done, + chunk_size: row.chunk_size, + metadata: row.metadata, + strategic_metadata: row.strategic_metadata.map(|json| json.0), + otel_trace_carrier: row.otel_trace_carrier, + }), } } @@ -535,16 +551,19 @@ pub mod db { // NOTE: The order is important here; a concurrent writer could move a submission // from InProgress to Completed/Failed in-between the queries. - let submission = query_as!( - Submission, + let submission_row = query!( r#" SELECT id AS "id: SubmissionId" , prefix , chunks_total AS "chunks_total: ChunkCount" , chunks_done AS "chunks_done: ChunkCount" - , chunk_size AS "chunk_size: ChunkSize" + , chunk_size AS "chunk_size!: ChunkSize" , metadata + , ( SELECT json_group_object(metadata_key, metadata_value) + FROM submissions_metadata + WHERE submission_id = submissions.id + ) AS "strategic_metadata: sqlx::types::Json" , otel_trace_carrier FROM submissions WHERE id = $1 "#, @@ -552,7 +571,17 @@ pub mod db { ) .fetch_optional(conn.get_inner()) .await?; - if let Some(submission) = submission { + if let Some(row) = submission_row { + let submission = Submission { + id: row.id, + prefix: row.prefix, + chunks_total: row.chunks_total, + chunks_done: row.chunks_done, + chunk_size: row.chunk_size, + metadata: row.metadata, + strategic_metadata: row.strategic_metadata.map(|json| json.0), + otel_trace_carrier: row.otel_trace_carrier, + }; return Ok(Some(SubmissionStatus::InProgress(submission))); } @@ -1046,6 +1075,11 @@ pub mod test { .expect("insertion failed"); let fetched_submission = get_submission(submission.id, &mut conn).await.unwrap(); + // When fetched from DB with no metadata rows, json_group_object returns '{}'. + let submission = Submission { + strategic_metadata: Some(Default::default()), + ..submission + }; assert_eq!(fetched_submission, submission); } From 9c24d01dc1f6802691a3a869b435c99487493c59 Mon Sep 17 00:00:00 2001 From: Alessandro Gallo Date: Fri, 26 Jun 2026 12:05:19 +0200 Subject: [PATCH 2/3] wip --- libs/opsqueue_python/src/common.rs | 8 +++--- opsqueue/src/common/submission.rs | 41 ++++++++++++++++++++---------- 2 files changed, 32 insertions(+), 17 deletions(-) diff --git a/libs/opsqueue_python/src/common.rs b/libs/opsqueue_python/src/common.rs index c3d6da8..e5d2d92 100644 --- a/libs/opsqueue_python/src/common.rs +++ b/libs/opsqueue_python/src/common.rs @@ -386,7 +386,7 @@ pub struct Submission { pub chunks_total: u64, pub chunks_done: u64, pub metadata: Option, - pub strategic_metadata: Option, + pub strategic_metadata: StrategicMetadataMap, } impl From for Submission { @@ -451,7 +451,7 @@ pub struct SubmissionCompleted { pub id: SubmissionId, pub chunks_total: u64, pub metadata: Option, - pub strategic_metadata: Option, + pub strategic_metadata: StrategicMetadataMap, pub completed_at: DateTime, } @@ -462,7 +462,7 @@ pub struct SubmissionFailed { pub chunks_total: u64, pub chunks_done: Option, pub metadata: Option, - pub strategic_metadata: Option, + pub strategic_metadata: StrategicMetadataMap, pub failed_at: DateTime, pub failed_chunk_id: u64, } @@ -474,7 +474,7 @@ pub struct SubmissionCancelled { pub chunks_total: u64, pub chunks_done: u64, pub metadata: Option, - pub strategic_metadata: Option, + pub strategic_metadata: StrategicMetadataMap, pub cancelled_at: DateTime, } diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index da38fd0..fadc9e4 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -151,7 +151,7 @@ pub struct Submission { pub chunks_done: ChunkCount, pub chunk_size: ChunkSize, pub metadata: Option, - pub strategic_metadata: Option, + pub strategic_metadata: StrategicMetadataMap, pub otel_trace_carrier: String, } @@ -167,7 +167,7 @@ pub struct SubmissionCompleted { pub chunks_total: ChunkCount, pub chunk_size: ChunkSize, pub metadata: Option, - pub strategic_metadata: Option, + pub strategic_metadata: StrategicMetadataMap, pub completed_at: DateTime, pub otel_trace_carrier: String, } @@ -180,7 +180,7 @@ pub struct SubmissionFailed { pub chunks_done: Option, pub chunk_size: ChunkSize, pub metadata: Option, - pub strategic_metadata: Option, + pub strategic_metadata: StrategicMetadataMap, pub failed_at: DateTime, pub failed_chunk_id: ChunkIndex, pub otel_trace_carrier: String, @@ -197,7 +197,7 @@ pub struct SubmissionCancelled { pub chunks_total: ChunkCount, pub chunks_done: ChunkCount, pub metadata: Option, - pub strategic_metadata: Option, + pub strategic_metadata: StrategicMetadataMap, pub cancelled_at: DateTime, } @@ -225,7 +225,7 @@ impl Submission { chunks_done: ChunkCount::zero(), chunk_size: ChunkSize::default(), metadata: None, - strategic_metadata: None, + strategic_metadata: StrategicMetadataMap::default(), otel_trace_carrier, } } @@ -245,7 +245,7 @@ impl Submission { chunks_done: ChunkCount::zero(), chunk_size, metadata, - strategic_metadata: None, + strategic_metadata: StrategicMetadataMap::default(), otel_trace_carrier, }; let chunks = chunks @@ -424,7 +424,7 @@ pub mod db { chunks_done: ChunkCount::zero(), chunk_size, metadata, - strategic_metadata: None, + strategic_metadata: strategic_metadata.clone(), otel_trace_carrier, }; let iter = chunks_contents @@ -493,7 +493,10 @@ pub mod db { chunks_done: row.chunks_done, chunk_size: row.chunk_size, metadata: row.metadata, - strategic_metadata: row.strategic_metadata.map(|json| json.0), + strategic_metadata: row + .strategic_metadata + .map(|json| json.0) + .unwrap_or_default(), otel_trace_carrier: row.otel_trace_carrier, }), } @@ -579,7 +582,10 @@ pub mod db { chunks_done: row.chunks_done, chunk_size: row.chunk_size, metadata: row.metadata, - strategic_metadata: row.strategic_metadata.map(|json| json.0), + strategic_metadata: row + .strategic_metadata + .map(|json| json.0) + .unwrap_or_default(), otel_trace_carrier: row.otel_trace_carrier, }; return Ok(Some(SubmissionStatus::InProgress(submission))); @@ -612,7 +618,10 @@ pub mod db { chunks_total: row.chunks_total, chunk_size: row.chunk_size, metadata: row.metadata, - strategic_metadata: row.strategic_metadata.map(|json| json.0), + strategic_metadata: row + .strategic_metadata + .map(|json| json.0) + .unwrap_or_default(), completed_at: row.completed_at, otel_trace_carrier: row.otel_trace_carrier, }; @@ -649,7 +658,10 @@ pub mod db { chunks_done: row.chunks_done, chunk_size: row.chunk_size, metadata: row.metadata, - strategic_metadata: row.strategic_metadata.map(|json| json.0), + strategic_metadata: row + .strategic_metadata + .map(|json| json.0) + .unwrap_or_default(), failed_at: row.failed_at, failed_chunk_id: row.failed_chunk_id, otel_trace_carrier: row.otel_trace_carrier, @@ -688,7 +700,10 @@ pub mod db { chunks_total: row.chunks_total, chunks_done: row.chunks_done, metadata: row.metadata, - strategic_metadata: row.strategic_metadata.map(|json| json.0), + strategic_metadata: row + .strategic_metadata + .map(|json| json.0) + .unwrap_or_default(), cancelled_at: row.cancelled_at, }; return Ok(Some(SubmissionStatus::Cancelled(cancelled_submission))); @@ -1077,7 +1092,7 @@ pub mod test { let fetched_submission = get_submission(submission.id, &mut conn).await.unwrap(); // When fetched from DB with no metadata rows, json_group_object returns '{}'. let submission = Submission { - strategic_metadata: Some(Default::default()), + strategic_metadata: Default::default(), ..submission }; assert_eq!(fetched_submission, submission); From 8088491e8820d8415a5662588b6dcea1c32ca38d Mon Sep 17 00:00:00 2001 From: Alessandro Gallo Date: Fri, 26 Jun 2026 12:15:31 +0200 Subject: [PATCH 3/3] new test added --- .../src/common/.submission.rs.pending-snap | 8 + opsqueue/src/common/submission.rs | 141 ++++++++++++++++++ 2 files changed, 149 insertions(+) create mode 100644 opsqueue/src/common/.submission.rs.pending-snap diff --git a/opsqueue/src/common/.submission.rs.pending-snap b/opsqueue/src/common/.submission.rs.pending-snap new file mode 100644 index 0000000..c59e025 --- /dev/null +++ b/opsqueue/src/common/.submission.rs.pending-snap @@ -0,0 +1,8 @@ +{"run_id":"1782468703-948887232","line":1165,"new":{"module_name":"opsqueue__common__submission__test","snapshot_name":"query_plan_submission_status_failed","metadata":{"source":"opsqueue/src/common/submission.rs","assertion_line":1165,"expression":"explained"},"snapshot":"3, 0, SEARCH submissions_failed USING INDEX sqlite_autoindex_submissions_failed_1 (id=?)\n15, 0, CORRELATED SCALAR SUBQUERY 1\n20, 15, SEARCH submissions_metadata USING PRIMARY KEY (submission_id=?)"},"old":{"module_name":"opsqueue__common__submission__test","metadata":{},"snapshot":"2, 0, SEARCH submissions_failed USING INTEGER PRIMARY KEY (rowid=?)\n14, 0, CORRELATED SCALAR SUBQUERY 1\n19, 14, SEARCH submissions_metadata USING PRIMARY KEY (submission_id=?)"}} +{"run_id":"1782468703-948887232","line":1192,"new":{"module_name":"opsqueue__common__submission__test","snapshot_name":"query_plan_submission_status_cancelled","metadata":{"source":"opsqueue/src/common/submission.rs","assertion_line":1192,"expression":"explained"},"snapshot":"3, 0, SEARCH submissions_cancelled USING INDEX sqlite_autoindex_submissions_cancelled_1 (id=?)\n14, 0, CORRELATED SCALAR SUBQUERY 1\n19, 14, SEARCH submissions_metadata USING PRIMARY KEY (submission_id=?)"},"old":{"module_name":"opsqueue__common__submission__test","metadata":{},"snapshot":"2, 0, SEARCH submissions_cancelled USING INTEGER PRIMARY KEY (rowid=?)\n11, 0, CORRELATED SCALAR SUBQUERY 1\n16, 11, SEARCH submissions_metadata USING PRIMARY KEY (submission_id=?)"}} +{"run_id":"1782468703-948887232","line":1135,"new":{"module_name":"opsqueue__common__submission__test","snapshot_name":"query_plan_submission_status_completed","metadata":{"source":"opsqueue/src/common/submission.rs","assertion_line":1135,"expression":"explained"},"snapshot":"3, 0, SEARCH submissions_completed USING INDEX sqlite_autoindex_submissions_completed_1 (id=?)\n14, 0, CORRELATED SCALAR SUBQUERY 1\n19, 14, SEARCH submissions_metadata USING PRIMARY KEY (submission_id=?)"},"old":{"module_name":"opsqueue__common__submission__test","metadata":{},"snapshot":"2, 0, SEARCH submissions_completed USING INTEGER PRIMARY KEY (rowid=?)\n12, 0, CORRELATED SCALAR SUBQUERY 1\n17, 12, SEARCH submissions_metadata USING PRIMARY KEY (submission_id=?)"}} +{"run_id":"1782468703-948887232","line":1107,"new":{"module_name":"opsqueue__common__submission__test","snapshot_name":"query_plan_submission_status_in_progress","metadata":{"source":"opsqueue/src/common/submission.rs","assertion_line":1107,"expression":"explained"},"snapshot":"3, 0, SEARCH submissions USING INDEX sqlite_autoindex_submissions_1 (id=?)\n15, 0, CORRELATED SCALAR SUBQUERY 1\n20, 15, SEARCH submissions_metadata USING PRIMARY KEY (submission_id=?)"},"old":{"module_name":"opsqueue__common__submission__test","metadata":{},"snapshot":"2, 0, SEARCH submissions USING INTEGER PRIMARY KEY (rowid=?)\n13, 0, CORRELATED SCALAR SUBQUERY 1\n18, 13, SEARCH submissions_metadata USING PRIMARY KEY (submission_id=?)"}} +{"run_id":"1782468859-730064235","line":1192,"new":null,"old":null} +{"run_id":"1782468859-730064235","line":1135,"new":null,"old":null} +{"run_id":"1782468859-730064235","line":1165,"new":null,"old":null} +{"run_id":"1782468859-730064235","line":1107,"new":null,"old":null} diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index fadc9e4..014c07e 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -1048,6 +1048,8 @@ pub mod test { use assert_matches::*; use chrono::Utc; use chunk::ChunkSize; + use itertools::Itertools; + use sqlx::{Row, SqliteConnection}; use crate::common::StrategicMetadataMap; use crate::db::{Connection as _, WriterPool}; @@ -1055,6 +1057,145 @@ pub mod test { use super::db::*; use super::*; + async fn explain_query_plan(query: &str, conn: &mut SqliteConnection) -> String { + sqlx::raw_sql(&format!("EXPLAIN QUERY PLAN {query}")) + .fetch_all(&mut *conn) + .await + .unwrap_or_else(|_| panic!("Invalid query: \n{query}\n")) + .into_iter() + .map(|row| { + let id = row.get::("id"); + let parent = row.get::("parent"); + let detail = row.get::("detail"); + format!("{id}, {parent}, {detail}") + }) + .join("\n") + } + + fn assert_non_regressing_query_plan(query: &str, explained: &str) { + assert!( + !explained.contains("MATERIALIZED"), + "Query should contain no materialization, but it did.\n\nQuery: {query}\n\nPlan:\n\n{explained}" + ); + assert!( + !explained.contains("B-TREE"), + "Query should contain no temporary B-tree construction, but it did.\n\nQuery: {query}\n\nPlan:\n\n{explained}" + ); + } + + #[sqlx::test] + pub async fn test_query_plan_submission_status_in_progress(db: sqlx::SqlitePool) { + let mut conn = db.acquire().await.unwrap(); + let query = r#" + SELECT + id + , prefix + , chunks_total + , chunks_done + , chunk_size + , metadata + , ( SELECT json_group_object(metadata_key, metadata_value) + FROM submissions_metadata + WHERE submission_id = submissions.id + ) AS strategic_metadata + , otel_trace_carrier + FROM submissions WHERE id = 1 + "#; + + let explained = explain_query_plan(query, &mut conn).await; + assert_non_regressing_query_plan(query, &explained); + insta::assert_snapshot!(explained, @r" + 3, 0, SEARCH submissions USING INDEX sqlite_autoindex_submissions_1 (id=?) + 15, 0, CORRELATED SCALAR SUBQUERY 1 + 20, 15, SEARCH submissions_metadata USING PRIMARY KEY (submission_id=?) + "); + } + + #[sqlx::test] + pub async fn test_query_plan_submission_status_completed(db: sqlx::SqlitePool) { + let mut conn = db.acquire().await.unwrap(); + let query = r#" + SELECT + id + , prefix + , chunks_total + , chunk_size + , metadata + , ( SELECT json_group_object(metadata_key, metadata_value) + FROM submissions_metadata + WHERE submission_id = submissions_completed.id + ) AS strategic_metadata + , completed_at + , otel_trace_carrier + FROM submissions_completed WHERE id = 1 + "#; + + let explained = explain_query_plan(query, &mut conn).await; + assert_non_regressing_query_plan(query, &explained); + insta::assert_snapshot!(explained, @r" + 3, 0, SEARCH submissions_completed USING INDEX sqlite_autoindex_submissions_completed_1 (id=?) + 14, 0, CORRELATED SCALAR SUBQUERY 1 + 19, 14, SEARCH submissions_metadata USING PRIMARY KEY (submission_id=?) + "); + } + + #[sqlx::test] + pub async fn test_query_plan_submission_status_failed(db: sqlx::SqlitePool) { + let mut conn = db.acquire().await.unwrap(); + let query = r#" + SELECT + id + , prefix + , chunks_total + , chunks_done + , chunk_size + , metadata + , ( SELECT json_group_object(metadata_key, metadata_value) + FROM submissions_metadata + WHERE submission_id = submissions_failed.id + ) AS strategic_metadata + , failed_at + , failed_chunk_id + , otel_trace_carrier + FROM submissions_failed WHERE id = 1 + "#; + + let explained = explain_query_plan(query, &mut conn).await; + assert_non_regressing_query_plan(query, &explained); + insta::assert_snapshot!(explained, @r" + 3, 0, SEARCH submissions_failed USING INDEX sqlite_autoindex_submissions_failed_1 (id=?) + 15, 0, CORRELATED SCALAR SUBQUERY 1 + 20, 15, SEARCH submissions_metadata USING PRIMARY KEY (submission_id=?) + "); + } + + #[sqlx::test] + pub async fn test_query_plan_submission_status_cancelled(db: sqlx::SqlitePool) { + let mut conn = db.acquire().await.unwrap(); + let query = r#" + SELECT + id + , prefix + , chunks_total + , chunks_done + , metadata + , ( SELECT json_group_object(metadata_key, metadata_value) + FROM submissions_metadata + WHERE submission_id = submissions_cancelled.id + ) AS strategic_metadata + , cancelled_at + FROM submissions_cancelled WHERE id = 1 + "#; + + let explained = explain_query_plan(query, &mut conn).await; + assert_non_regressing_query_plan(query, &explained); + insta::assert_snapshot!(explained, @r" + 3, 0, SEARCH submissions_cancelled USING INDEX sqlite_autoindex_submissions_cancelled_1 (id=?) + 14, 0, CORRELATED SCALAR SUBQUERY 1 + 19, 14, SEARCH submissions_metadata USING PRIMARY KEY (submission_id=?) + "); + } + #[sqlx::test] pub async fn test_insert_submission(db: sqlx::SqlitePool) { let db = WriterPool::new(db);