From e15341d7e29667aa252d4af7720234c7ccb19167 Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Fri, 3 Jul 2026 16:24:52 +0100 Subject: [PATCH 1/4] perf: prioritize critical packages in go enrichment scans Signed-off-by: Mouad BANI --- .../apps/packages_worker/src/go/activities.ts | 6 +- .../packages_worker/src/go/proxyClient.ts | 84 ++++++++++++------- 2 files changed, 56 insertions(+), 34 deletions(-) diff --git a/services/apps/packages_worker/src/go/activities.ts b/services/apps/packages_worker/src/go/activities.ts index 0ff111bd7e..0e9784d957 100644 --- a/services/apps/packages_worker/src/go/activities.ts +++ b/services/apps/packages_worker/src/go/activities.ts @@ -16,7 +16,9 @@ const log = getServiceChildLogger('go') const PROXY_SOURCE = 'go-proxy' const PKGGODEV_SOURCE = 'pkg-go-dev' -// TODO: filter to critical packages once computed +// Critical packages sort first so a batch is filled from them before any non-critical +// package is considered — if the upstream registry starts rate-limiting mid-run, it's the +// non-critical tail of the batch that gets skipped, not the critical packages. async function getGoBatch( qx: QueryExecutor, afterPurl: string, @@ -25,7 +27,7 @@ async function getGoBatch( return qx.select( `SELECT purl, name FROM packages WHERE ecosystem = 'go' AND purl > $(after) - ORDER BY last_synced_at ASC NULLS FIRST, purl ASC + ORDER BY is_critical DESC, last_synced_at ASC NULLS FIRST, purl ASC LIMIT $(limit)`, { after: afterPurl, limit: batchSize }, ) diff --git a/services/apps/packages_worker/src/go/proxyClient.ts b/services/apps/packages_worker/src/go/proxyClient.ts index 4df6bdce34..00ea76c6b3 100644 --- a/services/apps/packages_worker/src/go/proxyClient.ts +++ b/services/apps/packages_worker/src/go/proxyClient.ts @@ -2,6 +2,11 @@ import { FetchError, GoProxyLatest } from './types' const BASE = process.env.GO_PROXY_BASE_URL ?? 'https://proxy.golang.org' const ZERO_TIME = '0001-01-01T00:00:00Z' +const MAX_429_RETRIES = 5 + +function sleep(ms: number): Promise { + return new Promise((r) => setTimeout(r, ms)) +} // GOPROXY spec: uppercase letters in a module path are escaped as '!' + lowercase. export function escapeModulePath(module: string): string { @@ -13,40 +18,55 @@ export async function fetchLatest( timeoutMs: number, ): Promise { const url = `${BASE}/${escapeModulePath(module)}/@latest` - const controller = new AbortController() - const timer = setTimeout(() => controller.abort(), timeoutMs) - - let res: Response - try { - res = await fetch(url, { signal: controller.signal }) - } catch (e) { - return { kind: 'TRANSIENT', message: `network error: ${(e as Error).message}` } - } finally { - clearTimeout(timer) - } - if (res.status === 429) { - return { kind: 'RATE_LIMIT', statusCode: 429, message: 'rate limited' } - } - // Any other 4xx is permanent (unknown/invalid module path) — skip, don't retry. - if (res.status >= 400 && res.status < 500) { - return { kind: 'NOT_FOUND', statusCode: res.status, message: `${res.status}` } - } - if (res.status !== 200) { - return { kind: 'TRANSIENT', statusCode: res.status, message: `unexpected status ${res.status}` } - } + for (let attempt = 0; attempt <= MAX_429_RETRIES; attempt++) { + const controller = new AbortController() + const timer = setTimeout(() => controller.abort(), timeoutMs) - let body: { Version?: string; Time?: string; Origin?: { URL?: string } } - try { - body = (await res.json()) as { Version?: string; Time?: string; Origin?: { URL?: string } } - } catch { - return { kind: 'MALFORMED', message: 'invalid json' } - } - if (!body.Version) return { kind: 'MALFORMED', message: 'missing Version' } + let res: Response + try { + res = await fetch(url, { signal: controller.signal }) + } catch (e) { + return { kind: 'TRANSIENT', message: `network error: ${(e as Error).message}` } + } finally { + clearTimeout(timer) + } + + if (res.status === 429) { + if (attempt === MAX_429_RETRIES) { + return { kind: 'RATE_LIMIT', statusCode: 429, message: '429 after retries' } + } + const retryAfterSec = parseInt(res.headers.get('retry-after') ?? '', 10) + const waitMs = Number.isNaN(retryAfterSec) ? 1000 * 2 ** attempt : retryAfterSec * 1000 + await sleep(waitMs) + continue + } + // Any other 4xx is permanent (unknown/invalid module path) — skip, don't retry. + if (res.status >= 400 && res.status < 500) { + return { kind: 'NOT_FOUND', statusCode: res.status, message: `${res.status}` } + } + if (res.status !== 200) { + return { + kind: 'TRANSIENT', + statusCode: res.status, + message: `unexpected status ${res.status}`, + } + } - return { - version: body.Version, - releaseAt: body.Time && body.Time !== ZERO_TIME ? body.Time : null, - repoUrl: body.Origin?.URL || null, + let body: { Version?: string; Time?: string; Origin?: { URL?: string } } + try { + body = (await res.json()) as { Version?: string; Time?: string; Origin?: { URL?: string } } + } catch { + return { kind: 'MALFORMED', message: 'invalid json' } + } + if (!body.Version) return { kind: 'MALFORMED', message: 'missing Version' } + + return { + version: body.Version, + releaseAt: body.Time && body.Time !== ZERO_TIME ? body.Time : null, + repoUrl: body.Origin?.URL || null, + } } + + return { kind: 'RATE_LIMIT', statusCode: 429, message: '429 after retries' } } From 76e99147bb269e8d9158efad55dbcd831c2de6a0 Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Fri, 3 Jul 2026 17:35:29 +0100 Subject: [PATCH 2/4] fix: pick packages based on criticality and last_sycned_at Signed-off-by: Mouad BANI --- .../apps/packages_worker/src/go/activities.ts | 50 +++++++++++++------ .../apps/packages_worker/src/go/workflows.ts | 24 ++++----- 2 files changed, 45 insertions(+), 29 deletions(-) diff --git a/services/apps/packages_worker/src/go/activities.ts b/services/apps/packages_worker/src/go/activities.ts index 0e9784d957..6a0b1d24e2 100644 --- a/services/apps/packages_worker/src/go/activities.ts +++ b/services/apps/packages_worker/src/go/activities.ts @@ -16,30 +16,36 @@ const log = getServiceChildLogger('go') const PROXY_SOURCE = 'go-proxy' const PKGGODEV_SOURCE = 'pkg-go-dev' -// Critical packages sort first so a batch is filled from them before any non-critical -// package is considered — if the upstream registry starts rate-limiting mid-run, it's the -// non-critical tail of the batch that gets skipped, not the critical packages. +// No purl cursor: is_critical DESC + last_synced_at ASC means every call surfaces the +// highest-priority not-yet-synced-this-run rows first, and rows we just synced sink to the +// back (last_synced_at = NOW()) so the next call naturally picks up where this one left off. +// runStartedAt bounds the run — once every row has last_synced_at >= runStartedAt, this +// returns empty and the workflow ends for the day. async function getGoBatch( qx: QueryExecutor, - afterPurl: string, + runStartedAt: string, batchSize: number, ): Promise> { return qx.select( `SELECT purl, name FROM packages - WHERE ecosystem = 'go' AND purl > $(after) + WHERE ecosystem = 'go' AND (last_synced_at IS NULL OR last_synced_at < $(runStartedAt)) ORDER BY is_critical DESC, last_synced_at ASC NULLS FIRST, purl ASC LIMIT $(limit)`, - { after: afterPurl, limit: batchSize }, + { runStartedAt, limit: batchSize }, ) } +export async function getGoRunStartedAt(): Promise { + return new Date().toISOString() +} + export async function enrichGoVersionsBatch( - afterPurl: string, + runStartedAt: string, batchSize: number, -): Promise { +): Promise { const qx = await getPackagesDb() - const rows = await getGoBatch(qx, afterPurl, batchSize) - if (rows.length === 0) return null + const rows = await getGoBatch(qx, runStartedAt, batchSize) + if (rows.length === 0) return 0 const { fetchTimeoutMs, proxyConcurrency } = getGoConfig() @@ -51,6 +57,12 @@ export async function enrichGoVersionsBatch( { purl: row.purl, name: row.name, kind: result.kind, statusCode: result.statusCode }, 'go proxy fetch failed — skipping package', ) + // Touch last_synced_at even on failure so this row sinks behind unprocessed ones and + // isn't re-selected into every subsequent batch for the rest of this run. It'll be + // retried on tomorrow's run. + await qx.result(`UPDATE packages SET last_synced_at = NOW() WHERE purl = $(purl)`, { + purl: row.purl, + }) return } const changed = await qx.selectOne( @@ -91,16 +103,16 @@ export async function enrichGoVersionsBatch( } log.info({ count: rows.length, concurrency: proxyConcurrency }, 'Enriched go versions batch') - return rows[rows.length - 1].purl + return rows.length } export async function enrichGoStatusBatch( - afterPurl: string, + runStartedAt: string, batchSize: number, -): Promise { +): Promise { const qx = await getPackagesDb() - const rows = await getGoBatch(qx, afterPurl, batchSize) - if (rows.length === 0) return null + const rows = await getGoBatch(qx, runStartedAt, batchSize) + if (rows.length === 0) return 0 const { fetchTimeoutMs } = getGoConfig() for (const row of rows) { @@ -112,6 +124,12 @@ export async function enrichGoStatusBatch( { purl: row.purl, name: row.name, kind: result.kind, statusCode: result.statusCode }, 'pkg.go.dev fetch failed — skipping package', ) + // Touch last_synced_at even on failure so this row sinks behind unprocessed ones and + // isn't re-selected into every subsequent batch for the rest of this run. It'll be + // retried on tomorrow's run. + await qx.result(`UPDATE packages SET last_synced_at = NOW() WHERE purl = $(purl)`, { + purl: row.purl, + }) continue } const changed = await qx.selectOne( @@ -139,5 +157,5 @@ export async function enrichGoStatusBatch( } log.info({ count: rows.length }, 'Enriched go status batch') - return rows[rows.length - 1].purl + return rows.length } diff --git a/services/apps/packages_worker/src/go/workflows.ts b/services/apps/packages_worker/src/go/workflows.ts index efa93af1c5..215d9afe11 100644 --- a/services/apps/packages_worker/src/go/workflows.ts +++ b/services/apps/packages_worker/src/go/workflows.ts @@ -16,25 +16,23 @@ const BATCH = 100 const ROUNDS_PER_RUN = 200 interface ScanState { - cursor: string + runStartedAt: string } -export async function enrichGoVersions(state: ScanState = { cursor: '' }): Promise { - let { cursor } = state +export async function enrichGoVersions(state?: ScanState): Promise { + const runStartedAt = state?.runStartedAt ?? (await acts.getGoRunStartedAt()) for (let r = 0; r < ROUNDS_PER_RUN; r++) { - const next = await acts.enrichGoVersionsBatch(cursor, BATCH) - if (next === null) return - cursor = next + const count = await acts.enrichGoVersionsBatch(runStartedAt, BATCH) + if (count === 0) return } - await continueAsNew({ cursor }) + await continueAsNew({ runStartedAt }) } -export async function enrichGoStatus(state: ScanState = { cursor: '' }): Promise { - let { cursor } = state +export async function enrichGoStatus(state?: ScanState): Promise { + const runStartedAt = state?.runStartedAt ?? (await acts.getGoRunStartedAt()) for (let r = 0; r < ROUNDS_PER_RUN; r++) { - const next = await acts.enrichGoStatusBatch(cursor, BATCH) - if (next === null) return - cursor = next + const count = await acts.enrichGoStatusBatch(runStartedAt, BATCH) + if (count === 0) return } - await continueAsNew({ cursor }) + await continueAsNew({ runStartedAt }) } From 43c6eadf1230603c0f3309ca7a2e083f91180b0b Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Fri, 3 Jul 2026 17:55:01 +0100 Subject: [PATCH 3/4] fix: import Signed-off-by: Mouad BANI --- services/apps/packages_worker/src/activities.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/packages_worker/src/activities.ts b/services/apps/packages_worker/src/activities.ts index b747a3bd3d..9dd09a1022 100644 --- a/services/apps/packages_worker/src/activities.ts +++ b/services/apps/packages_worker/src/activities.ts @@ -24,7 +24,7 @@ export { cargoFlushAudit, cargoCleanup, } from './cargo/activities' -export { enrichGoVersionsBatch, enrichGoStatusBatch } from './go/activities' +export { enrichGoVersionsBatch, enrichGoStatusBatch, getGoRunStartedAt } from './go/activities' export { getUnscannedPypiBatch, ingestPypiPackageBatch, From 13a1cda13e8b492fecb9d130b7f480ac01eeadfd Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Fri, 3 Jul 2026 18:03:57 +0100 Subject: [PATCH 4/4] fix: \use per-criticality purl cursors for go enrichment scans Signed-off-by: Mouad BANI --- .../apps/packages_worker/src/activities.ts | 2 +- .../apps/packages_worker/src/go/activities.ts | 82 +++++++++++-------- .../apps/packages_worker/src/go/workflows.ts | 24 +++--- 3 files changed, 60 insertions(+), 48 deletions(-) diff --git a/services/apps/packages_worker/src/activities.ts b/services/apps/packages_worker/src/activities.ts index 9dd09a1022..b747a3bd3d 100644 --- a/services/apps/packages_worker/src/activities.ts +++ b/services/apps/packages_worker/src/activities.ts @@ -24,7 +24,7 @@ export { cargoFlushAudit, cargoCleanup, } from './cargo/activities' -export { enrichGoVersionsBatch, enrichGoStatusBatch, getGoRunStartedAt } from './go/activities' +export { enrichGoVersionsBatch, enrichGoStatusBatch } from './go/activities' export { getUnscannedPypiBatch, ingestPypiPackageBatch, diff --git a/services/apps/packages_worker/src/go/activities.ts b/services/apps/packages_worker/src/go/activities.ts index 6a0b1d24e2..186094edb4 100644 --- a/services/apps/packages_worker/src/go/activities.ts +++ b/services/apps/packages_worker/src/go/activities.ts @@ -16,36 +16,62 @@ const log = getServiceChildLogger('go') const PROXY_SOURCE = 'go-proxy' const PKGGODEV_SOURCE = 'pkg-go-dev' -// No purl cursor: is_critical DESC + last_synced_at ASC means every call surfaces the -// highest-priority not-yet-synced-this-run rows first, and rows we just synced sink to the -// back (last_synced_at = NOW()) so the next call naturally picks up where this one left off. -// runStartedAt bounds the run — once every row has last_synced_at >= runStartedAt, this -// returns empty and the workflow ends for the day. +export interface GoScanCursor { + criticalAfter: string + after: string +} + +type GoRow = { purl: string; name: string } + +// Two independent purl-keyset cursors — one for critical packages, one for everything else — +// each ordered/paginated purely by purl so WHERE and ORDER BY always match (no gaps, no +// duplicates). A single query sorted by is_critical DESC with one shared purl cursor was tried +// and rejected: the cursor advances to the last row's purl, and when a batch is critical-heavy +// that purl can be far ahead of unprocessed non-critical rows, permanently excluding them for +// the rest of the run. async function getGoBatch( qx: QueryExecutor, - runStartedAt: string, + isCritical: boolean, + afterPurl: string, batchSize: number, -): Promise> { +): Promise { + if (batchSize <= 0) return [] return qx.select( `SELECT purl, name FROM packages - WHERE ecosystem = 'go' AND (last_synced_at IS NULL OR last_synced_at < $(runStartedAt)) - ORDER BY is_critical DESC, last_synced_at ASC NULLS FIRST, purl ASC + WHERE ecosystem = 'go' AND is_critical = $(isCritical) AND purl > $(after) + ORDER BY purl ASC LIMIT $(limit)`, - { runStartedAt, limit: batchSize }, + { isCritical, after: afterPurl, limit: batchSize }, ) } -export async function getGoRunStartedAt(): Promise { - return new Date().toISOString() +// Drains not-yet-processed critical packages first, then tops up the rest of the batch with +// non-critical ones — so a rate-limit run only ever starves the non-critical tail. +async function getGoPriorityBatch( + qx: QueryExecutor, + cursor: GoScanCursor, + batchSize: number, +): Promise<{ rows: GoRow[]; nextCursor: GoScanCursor }> { + const critical = await getGoBatch(qx, true, cursor.criticalAfter, batchSize) + const nonCritical = await getGoBatch(qx, false, cursor.after, batchSize - critical.length) + + return { + rows: [...critical, ...nonCritical], + nextCursor: { + criticalAfter: + critical.length > 0 ? critical[critical.length - 1].purl : cursor.criticalAfter, + after: nonCritical.length > 0 ? nonCritical[nonCritical.length - 1].purl : cursor.after, + }, + } } export async function enrichGoVersionsBatch( - runStartedAt: string, + cursor: GoScanCursor, batchSize: number, -): Promise { +): Promise { const qx = await getPackagesDb() - const rows = await getGoBatch(qx, runStartedAt, batchSize) - if (rows.length === 0) return 0 + const { rows, nextCursor } = await getGoPriorityBatch(qx, cursor, batchSize) + if (rows.length === 0) return null const { fetchTimeoutMs, proxyConcurrency } = getGoConfig() @@ -57,12 +83,6 @@ export async function enrichGoVersionsBatch( { purl: row.purl, name: row.name, kind: result.kind, statusCode: result.statusCode }, 'go proxy fetch failed — skipping package', ) - // Touch last_synced_at even on failure so this row sinks behind unprocessed ones and - // isn't re-selected into every subsequent batch for the rest of this run. It'll be - // retried on tomorrow's run. - await qx.result(`UPDATE packages SET last_synced_at = NOW() WHERE purl = $(purl)`, { - purl: row.purl, - }) return } const changed = await qx.selectOne( @@ -103,16 +123,16 @@ export async function enrichGoVersionsBatch( } log.info({ count: rows.length, concurrency: proxyConcurrency }, 'Enriched go versions batch') - return rows.length + return nextCursor } export async function enrichGoStatusBatch( - runStartedAt: string, + cursor: GoScanCursor, batchSize: number, -): Promise { +): Promise { const qx = await getPackagesDb() - const rows = await getGoBatch(qx, runStartedAt, batchSize) - if (rows.length === 0) return 0 + const { rows, nextCursor } = await getGoPriorityBatch(qx, cursor, batchSize) + if (rows.length === 0) return null const { fetchTimeoutMs } = getGoConfig() for (const row of rows) { @@ -124,12 +144,6 @@ export async function enrichGoStatusBatch( { purl: row.purl, name: row.name, kind: result.kind, statusCode: result.statusCode }, 'pkg.go.dev fetch failed — skipping package', ) - // Touch last_synced_at even on failure so this row sinks behind unprocessed ones and - // isn't re-selected into every subsequent batch for the rest of this run. It'll be - // retried on tomorrow's run. - await qx.result(`UPDATE packages SET last_synced_at = NOW() WHERE purl = $(purl)`, { - purl: row.purl, - }) continue } const changed = await qx.selectOne( @@ -157,5 +171,5 @@ export async function enrichGoStatusBatch( } log.info({ count: rows.length }, 'Enriched go status batch') - return rows.length + return nextCursor } diff --git a/services/apps/packages_worker/src/go/workflows.ts b/services/apps/packages_worker/src/go/workflows.ts index 215d9afe11..738c112457 100644 --- a/services/apps/packages_worker/src/go/workflows.ts +++ b/services/apps/packages_worker/src/go/workflows.ts @@ -15,24 +15,22 @@ const acts = proxyActivities({ const BATCH = 100 const ROUNDS_PER_RUN = 200 -interface ScanState { - runStartedAt: string -} +const START_CURSOR = { criticalAfter: '', after: '' } -export async function enrichGoVersions(state?: ScanState): Promise { - const runStartedAt = state?.runStartedAt ?? (await acts.getGoRunStartedAt()) +export async function enrichGoVersions(cursor = START_CURSOR): Promise { for (let r = 0; r < ROUNDS_PER_RUN; r++) { - const count = await acts.enrichGoVersionsBatch(runStartedAt, BATCH) - if (count === 0) return + const next = await acts.enrichGoVersionsBatch(cursor, BATCH) + if (next === null) return + cursor = next } - await continueAsNew({ runStartedAt }) + await continueAsNew(cursor) } -export async function enrichGoStatus(state?: ScanState): Promise { - const runStartedAt = state?.runStartedAt ?? (await acts.getGoRunStartedAt()) +export async function enrichGoStatus(cursor = START_CURSOR): Promise { for (let r = 0; r < ROUNDS_PER_RUN; r++) { - const count = await acts.enrichGoStatusBatch(runStartedAt, BATCH) - if (count === 0) return + const next = await acts.enrichGoStatusBatch(cursor, BATCH) + if (next === null) return + cursor = next } - await continueAsNew({ runStartedAt }) + await continueAsNew(cursor) }