diff --git a/services/apps/packages_worker/src/go/activities.ts b/services/apps/packages_worker/src/go/activities.ts index 0ff111bd7e..186094edb4 100644 --- a/services/apps/packages_worker/src/go/activities.ts +++ b/services/apps/packages_worker/src/go/activities.ts @@ -16,27 +16,61 @@ const log = getServiceChildLogger('go') const PROXY_SOURCE = 'go-proxy' const PKGGODEV_SOURCE = 'pkg-go-dev' -// TODO: filter to critical packages once computed +export interface GoScanCursor { + criticalAfter: string + after: string +} + +type GoRow = { purl: string; name: string } + +// Two independent purl-keyset cursors — one for critical packages, one for everything else — +// each ordered/paginated purely by purl so WHERE and ORDER BY always match (no gaps, no +// duplicates). A single query sorted by is_critical DESC with one shared purl cursor was tried +// and rejected: the cursor advances to the last row's purl, and when a batch is critical-heavy +// that purl can be far ahead of unprocessed non-critical rows, permanently excluding them for +// the rest of the run. async function getGoBatch( qx: QueryExecutor, + isCritical: boolean, afterPurl: string, batchSize: number, -): Promise> { +): Promise { + if (batchSize <= 0) return [] return qx.select( `SELECT purl, name FROM packages - WHERE ecosystem = 'go' AND purl > $(after) - ORDER BY last_synced_at ASC NULLS FIRST, purl ASC + WHERE ecosystem = 'go' AND is_critical = $(isCritical) AND purl > $(after) + ORDER BY purl ASC LIMIT $(limit)`, - { after: afterPurl, limit: batchSize }, + { isCritical, after: afterPurl, limit: batchSize }, ) } +// Drains not-yet-processed critical packages first, then tops up the rest of the batch with +// non-critical ones — so a rate-limit run only ever starves the non-critical tail. +async function getGoPriorityBatch( + qx: QueryExecutor, + cursor: GoScanCursor, + batchSize: number, +): Promise<{ rows: GoRow[]; nextCursor: GoScanCursor }> { + const critical = await getGoBatch(qx, true, cursor.criticalAfter, batchSize) + const nonCritical = await getGoBatch(qx, false, cursor.after, batchSize - critical.length) + + return { + rows: [...critical, ...nonCritical], + nextCursor: { + criticalAfter: + critical.length > 0 ? critical[critical.length - 1].purl : cursor.criticalAfter, + after: nonCritical.length > 0 ? nonCritical[nonCritical.length - 1].purl : cursor.after, + }, + } +} + export async function enrichGoVersionsBatch( - afterPurl: string, + cursor: GoScanCursor, batchSize: number, -): Promise { +): Promise { const qx = await getPackagesDb() - const rows = await getGoBatch(qx, afterPurl, batchSize) + const { rows, nextCursor } = await getGoPriorityBatch(qx, cursor, batchSize) if (rows.length === 0) return null const { fetchTimeoutMs, proxyConcurrency } = getGoConfig() @@ -89,15 +123,15 @@ export async function enrichGoVersionsBatch( } log.info({ count: rows.length, concurrency: proxyConcurrency }, 'Enriched go versions batch') - return rows[rows.length - 1].purl + return nextCursor } export async function enrichGoStatusBatch( - afterPurl: string, + cursor: GoScanCursor, batchSize: number, -): Promise { +): Promise { const qx = await getPackagesDb() - const rows = await getGoBatch(qx, afterPurl, batchSize) + const { rows, nextCursor } = await getGoPriorityBatch(qx, cursor, batchSize) if (rows.length === 0) return null const { fetchTimeoutMs } = getGoConfig() @@ -137,5 +171,5 @@ export async function enrichGoStatusBatch( } log.info({ count: rows.length }, 'Enriched go status batch') - return rows[rows.length - 1].purl + return nextCursor } diff --git a/services/apps/packages_worker/src/go/proxyClient.ts b/services/apps/packages_worker/src/go/proxyClient.ts index 4df6bdce34..00ea76c6b3 100644 --- a/services/apps/packages_worker/src/go/proxyClient.ts +++ b/services/apps/packages_worker/src/go/proxyClient.ts @@ -2,6 +2,11 @@ import { FetchError, GoProxyLatest } from './types' const BASE = process.env.GO_PROXY_BASE_URL ?? 'https://proxy.golang.org' const ZERO_TIME = '0001-01-01T00:00:00Z' +const MAX_429_RETRIES = 5 + +function sleep(ms: number): Promise { + return new Promise((r) => setTimeout(r, ms)) +} // GOPROXY spec: uppercase letters in a module path are escaped as '!' + lowercase. export function escapeModulePath(module: string): string { @@ -13,40 +18,55 @@ export async function fetchLatest( timeoutMs: number, ): Promise { const url = `${BASE}/${escapeModulePath(module)}/@latest` - const controller = new AbortController() - const timer = setTimeout(() => controller.abort(), timeoutMs) - - let res: Response - try { - res = await fetch(url, { signal: controller.signal }) - } catch (e) { - return { kind: 'TRANSIENT', message: `network error: ${(e as Error).message}` } - } finally { - clearTimeout(timer) - } - if (res.status === 429) { - return { kind: 'RATE_LIMIT', statusCode: 429, message: 'rate limited' } - } - // Any other 4xx is permanent (unknown/invalid module path) — skip, don't retry. - if (res.status >= 400 && res.status < 500) { - return { kind: 'NOT_FOUND', statusCode: res.status, message: `${res.status}` } - } - if (res.status !== 200) { - return { kind: 'TRANSIENT', statusCode: res.status, message: `unexpected status ${res.status}` } - } + for (let attempt = 0; attempt <= MAX_429_RETRIES; attempt++) { + const controller = new AbortController() + const timer = setTimeout(() => controller.abort(), timeoutMs) - let body: { Version?: string; Time?: string; Origin?: { URL?: string } } - try { - body = (await res.json()) as { Version?: string; Time?: string; Origin?: { URL?: string } } - } catch { - return { kind: 'MALFORMED', message: 'invalid json' } - } - if (!body.Version) return { kind: 'MALFORMED', message: 'missing Version' } + let res: Response + try { + res = await fetch(url, { signal: controller.signal }) + } catch (e) { + return { kind: 'TRANSIENT', message: `network error: ${(e as Error).message}` } + } finally { + clearTimeout(timer) + } + + if (res.status === 429) { + if (attempt === MAX_429_RETRIES) { + return { kind: 'RATE_LIMIT', statusCode: 429, message: '429 after retries' } + } + const retryAfterSec = parseInt(res.headers.get('retry-after') ?? '', 10) + const waitMs = Number.isNaN(retryAfterSec) ? 1000 * 2 ** attempt : retryAfterSec * 1000 + await sleep(waitMs) + continue + } + // Any other 4xx is permanent (unknown/invalid module path) — skip, don't retry. + if (res.status >= 400 && res.status < 500) { + return { kind: 'NOT_FOUND', statusCode: res.status, message: `${res.status}` } + } + if (res.status !== 200) { + return { + kind: 'TRANSIENT', + statusCode: res.status, + message: `unexpected status ${res.status}`, + } + } - return { - version: body.Version, - releaseAt: body.Time && body.Time !== ZERO_TIME ? body.Time : null, - repoUrl: body.Origin?.URL || null, + let body: { Version?: string; Time?: string; Origin?: { URL?: string } } + try { + body = (await res.json()) as { Version?: string; Time?: string; Origin?: { URL?: string } } + } catch { + return { kind: 'MALFORMED', message: 'invalid json' } + } + if (!body.Version) return { kind: 'MALFORMED', message: 'missing Version' } + + return { + version: body.Version, + releaseAt: body.Time && body.Time !== ZERO_TIME ? body.Time : null, + repoUrl: body.Origin?.URL || null, + } } + + return { kind: 'RATE_LIMIT', statusCode: 429, message: '429 after retries' } } diff --git a/services/apps/packages_worker/src/go/workflows.ts b/services/apps/packages_worker/src/go/workflows.ts index efa93af1c5..738c112457 100644 --- a/services/apps/packages_worker/src/go/workflows.ts +++ b/services/apps/packages_worker/src/go/workflows.ts @@ -15,26 +15,22 @@ const acts = proxyActivities({ const BATCH = 100 const ROUNDS_PER_RUN = 200 -interface ScanState { - cursor: string -} +const START_CURSOR = { criticalAfter: '', after: '' } -export async function enrichGoVersions(state: ScanState = { cursor: '' }): Promise { - let { cursor } = state +export async function enrichGoVersions(cursor = START_CURSOR): Promise { for (let r = 0; r < ROUNDS_PER_RUN; r++) { const next = await acts.enrichGoVersionsBatch(cursor, BATCH) if (next === null) return cursor = next } - await continueAsNew({ cursor }) + await continueAsNew(cursor) } -export async function enrichGoStatus(state: ScanState = { cursor: '' }): Promise { - let { cursor } = state +export async function enrichGoStatus(cursor = START_CURSOR): Promise { for (let r = 0; r < ROUNDS_PER_RUN; r++) { const next = await acts.enrichGoStatusBatch(cursor, BATCH) if (next === null) return cursor = next } - await continueAsNew({ cursor }) + await continueAsNew(cursor) }