Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ export interface BqExportToGcsInput {
// is replaced by a server-side maximumBytesBilled cap, since a dry-run only validates the first
// statement and cannot predict the WHILE loop's total scan. See ADR-0004.
isScript?: boolean
// Fill-constraints run (package_dependencies only): the export is a full Option-A scan but the
// downstream merge upserts version_constraint (ON CONFLICT DO UPDATE) instead of DO NOTHING. Full
// and fill produce identical parquet, so sync_mode alone can't tell them apart — persist it in the
// job meta so a --resume-job run reprocesses the export with the correct merge instead of silently
// reverting to DO NOTHING (which skips the backfill).
isFill?: boolean
}

export interface BqExportToGcsOutput {
Expand All @@ -53,6 +59,7 @@ export async function bqExportToGcs(input: BqExportToGcsInput): Promise<BqExport
exportName,
ecosystems,
isScript,
isFill,
} = input

// Named exports use a stable GCS path independent of runId so they survive across bootstrap runs.
Expand Down Expand Up @@ -115,6 +122,7 @@ export async function bqExportToGcs(input: BqExportToGcsInput): Promise<BqExport
tableRowCounts: {
'bq:export': 0,
...(ecosystems ? { 'meta:ecosystems': ecosystems } : {}),
...(isFill ? { 'meta:fill': 1 } : {}),
},
})
return { gcsPrefix: namedPrefix, rowCount: 0, bqBytesBilled: 0, jobId }
Expand Down Expand Up @@ -212,9 +220,13 @@ export async function bqExportToGcs(input: BqExportToGcsInput): Promise<BqExport
const provisionalDate = snapshotAt ? new Date(snapshotAt) : null
const jobId = await createIngestJob(qx, jobKind, syncMode, provisionalDate, exportName)

// H7: mark exporting before we start the BQ job; store ecosystems filter in table_row_counts JSONB.
// H7: mark exporting before we start the BQ job; store ecosystems filter + fill flag in the
// table_row_counts JSONB so --resume-job can restore the original export's settings.
const exportMeta: Record<string, string | number | string[]> = {}
if (ecosystems) exportMeta['meta:ecosystems'] = ecosystems
if (isFill) exportMeta['meta:fill'] = 1
await markJobStatus(qx, jobId, 'exporting', {
...(ecosystems ? { tableRowCounts: { 'meta:ecosystems': ecosystems } } : {}),
...(Object.keys(exportMeta).length > 0 ? { tableRowCounts: exportMeta } : {}),
})

// From here the row is 'exporting'; any BQ failure (incl. script-mode maximumBytesBilled aborts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { getPackagesDb } from '../../db'

const log = getServiceChildLogger('createVersionsLookup')

const VALID_ECOSYSTEMS = new Set(['npm', 'go', 'maven', 'pypi', 'nuget', 'cargo'])
const VALID_ECOSYSTEMS = new Set(['npm', 'go', 'maven', 'pypi', 'nuget', 'cargo', 'rubygems'])

// Builds a persistent UNLOGGED lookup table in the staging schema for the root-version JOIN in
// ingestDependencies. Using a temp table per chunk would rebuild 4GB for every chunk on npm
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { getIngestJobForResume } from '@crowd/data-access-layer'

import { getPackagesDb } from '../../db'

export interface GetResumeExportInput {
jobId: number
}

export interface GetResumeExportOutput {
jobId: number
jobKind: string
status: string
syncMode: string
gcsPrefix: string | null
progressDone: number
progressTotal: number
rowCountPg: number
ecosystems: string[] | null
fill: boolean
}

// Pure fetch of a prior job's resume-relevant fields (export path, status, file-load progress,
// rows merged). Returns null if the job id does not exist. All validation — kind, status, presence
// of an export, and that the parquet files still exist — is done by the caller (ingestDependencies)
// so it can fail fast with non-retryable errors instead of retrying a bad-input activity.
export async function getResumeExport(
input: GetResumeExportInput,
): Promise<GetResumeExportOutput | null> {
const qx = await getPackagesDb()
const job = await getIngestJobForResume(qx, input.jobId)
if (!job) {
return null
}
return {
jobId: job.id,
jobKind: job.jobKind,
status: job.status,
syncMode: job.syncMode,
gcsPrefix: job.gcsPrefix,
progressDone: job.progressDone,
progressTotal: job.progressTotal,
rowCountPg: job.rowCountPg,
ecosystems: job.ecosystems,
fill: job.fill,
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
export * from './bqExportToGcs'
export * from './setJobStep'
export * from './createVersionsLookup'
export * from './managePackageDepsConstraints'
export * from './managePackageDepsIndexes'
export * from './manageVersionsConstraints'
export * from './manageVersionsIndexes'
export * from './listParquetFiles'
export * from './gcsParquetToStaging'
export * from './mergeStagingToTable'
export * from './getLastSnapshot'
export * from './getResumeExport'
export * from './checkDependentCountsGuard'
export * from './checkEdgeSnapshotQuality'
export * from './probePartitionExists'
Expand Down

This file was deleted.

This file was deleted.

Loading
Loading