Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions libs/opsqueue_python/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ pub struct Submission {
pub chunks_total: u64,
pub chunks_done: u64,
pub metadata: Option<submission::Metadata>,
pub strategic_metadata: Option<StrategicMetadataMap>,
}

impl From<opsqueue::common::submission::Submission> for Submission {
Expand All @@ -395,6 +396,7 @@ impl From<opsqueue::common::submission::Submission> for Submission {
chunks_total: value.chunks_total.into(),
chunks_done: value.chunks_done.into(),
metadata: value.metadata,
strategic_metadata: value.strategic_metadata,
}
}
}
Expand All @@ -403,11 +405,12 @@ impl From<opsqueue::common::submission::Submission> for Submission {
impl Submission {
fn __repr__(&self) -> String {
format!(
"Submission(id={0}, chunks_total={1}, chunks_done={2}, metadata={3:?})",
"Submission(id={0}, chunks_total={1}, chunks_done={2}, metadata={3:?}, strategic_metadata={4:?})",
self.id.__repr__(),
self.chunks_total,
self.chunks_done,
self.metadata
self.metadata,
self.strategic_metadata
)
}
}
Expand Down
54 changes: 44 additions & 10 deletions opsqueue/src/common/submission.rs

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to point out that for the other queries we also assert the query plan. This allows us to spot potentially very bad execution behaviour. Could you add that for these queries?

Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ pub struct Submission {
pub chunks_done: ChunkCount,
pub chunk_size: ChunkSize,
pub metadata: Option<Metadata>,
pub strategic_metadata: Option<StrategicMetadataMap>,
pub otel_trace_carrier: String,
}

Expand Down Expand Up @@ -224,6 +225,7 @@ impl Submission {
chunks_done: ChunkCount::zero(),
chunk_size: ChunkSize::default(),
metadata: None,
strategic_metadata: None,
otel_trace_carrier,
}
}
Expand All @@ -243,6 +245,7 @@ impl Submission {
chunks_done: ChunkCount::zero(),
chunk_size,
metadata,
strategic_metadata: None,
otel_trace_carrier,
};
let chunks = chunks
Expand All @@ -268,7 +271,7 @@ pub mod db {
db::{Connection, True, WriterConnection, WriterPool},
};
use chunk::ChunkSize;
use sqlx::{query, query_as, Sqlite};
use sqlx::{query, Sqlite};

use axum_prometheus::metrics::{counter, histogram};

Expand Down Expand Up @@ -421,6 +424,7 @@ pub mod db {
chunks_done: ChunkCount::zero(),
chunk_size,
metadata,
strategic_metadata: None,
otel_trace_carrier,
};
let iter = chunks_contents
Expand Down Expand Up @@ -461,25 +465,37 @@ pub mod db {
id: SubmissionId,
mut conn: impl Connection,
) -> Result<Submission, E<DatabaseError, SubmissionNotFound>> {
let submission = query_as!(
Submission,
let submission_row = query!(
r#"
SELECT id AS "id: SubmissionId"
, prefix
, chunks_total AS "chunks_total: ChunkCount"
, chunks_done AS "chunks_done: ChunkCount"
, chunk_size AS "chunk_size: ChunkSize"
, chunk_size AS "chunk_size!: ChunkSize"
, metadata
, ( SELECT json_group_object(metadata_key, metadata_value)
FROM submissions_metadata
WHERE submission_id = submissions.id
) AS "strategic_metadata: sqlx::types::Json<StrategicMetadataMap>"
, otel_trace_carrier
FROM submissions WHERE id = $1
"#,
id
)
.fetch_optional(conn.get_inner())
.await?;
match submission {
match submission_row {
None => Err(E::R(SubmissionNotFound(id))),
Some(submission) => Ok(submission),
Some(row) => Ok(Submission {
id: row.id,
prefix: row.prefix,
chunks_total: row.chunks_total,
chunks_done: row.chunks_done,
chunk_size: row.chunk_size,
metadata: row.metadata,
strategic_metadata: row.strategic_metadata.map(|json| json.0),
otel_trace_carrier: row.otel_trace_carrier,
}),
}
}

Expand Down Expand Up @@ -535,24 +551,37 @@ pub mod db {
// NOTE: The order is important here; a concurrent writer could move a submission
// from InProgress to Completed/Failed in-between the queries.

let submission = query_as!(
Submission,
let submission_row = query!(
r#"
SELECT
id AS "id: SubmissionId"
, prefix
, chunks_total AS "chunks_total: ChunkCount"
, chunks_done AS "chunks_done: ChunkCount"
, chunk_size AS "chunk_size: ChunkSize"
, chunk_size AS "chunk_size!: ChunkSize"
, metadata
, ( SELECT json_group_object(metadata_key, metadata_value)
FROM submissions_metadata
WHERE submission_id = submissions.id
) AS "strategic_metadata: sqlx::types::Json<StrategicMetadataMap>"
, otel_trace_carrier
FROM submissions WHERE id = $1
"#,
id
)
.fetch_optional(conn.get_inner())
.await?;
if let Some(submission) = submission {
if let Some(row) = submission_row {
let submission = Submission {
id: row.id,
prefix: row.prefix,
chunks_total: row.chunks_total,
chunks_done: row.chunks_done,
chunk_size: row.chunk_size,
metadata: row.metadata,
strategic_metadata: row.strategic_metadata.map(|json| json.0),
otel_trace_carrier: row.otel_trace_carrier,
};
return Ok(Some(SubmissionStatus::InProgress(submission)));
}

Expand Down Expand Up @@ -1046,6 +1075,11 @@ pub mod test {
.expect("insertion failed");

let fetched_submission = get_submission(submission.id, &mut conn).await.unwrap();
// When fetched from DB with no metadata rows, json_group_object returns '{}'.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah so there is no distinction between Option<StrategicMetadataMap> and StrategicMetadataMap on the reader side?


Reading a bit further indeed shows to me that we can also remove the Option from the type to make this clear, this is also the type contract on InsertSubmission. Only the outer function insert_submission_chunks accepts the Option<StrategicMetadataMap> but it is unwrap_or_default into an empty map there.

let submission = Submission {
strategic_metadata: Some(Default::default()),
..submission
};
assert_eq!(fetched_submission, submission);
}

Expand Down
Loading