Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 47 additions & 13 deletions services/apps/packages_worker/src/go/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,61 @@ const log = getServiceChildLogger('go')
const PROXY_SOURCE = 'go-proxy'
const PKGGODEV_SOURCE = 'pkg-go-dev'

// TODO: filter to critical packages once computed
export interface GoScanCursor {
criticalAfter: string
after: string
}

type GoRow = { purl: string; name: string }

// Two independent purl-keyset cursors — one for critical packages, one for everything else —
// each ordered/paginated purely by purl so WHERE and ORDER BY always match (no gaps, no
// duplicates). A single query sorted by is_critical DESC with one shared purl cursor was tried
// and rejected: the cursor advances to the last row's purl, and when a batch is critical-heavy
// that purl can be far ahead of unprocessed non-critical rows, permanently excluding them for
// the rest of the run.
async function getGoBatch(
qx: QueryExecutor,
isCritical: boolean,
afterPurl: string,
batchSize: number,
): Promise<Array<{ purl: string; name: string }>> {
): Promise<GoRow[]> {
if (batchSize <= 0) return []
return qx.select(
`SELECT purl, name FROM packages
WHERE ecosystem = 'go' AND purl > $(after)
ORDER BY last_synced_at ASC NULLS FIRST, purl ASC
WHERE ecosystem = 'go' AND is_critical = $(isCritical) AND purl > $(after)
ORDER BY purl ASC
LIMIT $(limit)`,
{ after: afterPurl, limit: batchSize },
{ isCritical, after: afterPurl, limit: batchSize },
)
}

// Drains not-yet-processed critical packages first, then tops up the rest of the batch with
// non-critical ones — so a rate-limit run only ever starves the non-critical tail.
async function getGoPriorityBatch(
qx: QueryExecutor,
cursor: GoScanCursor,
batchSize: number,
): Promise<{ rows: GoRow[]; nextCursor: GoScanCursor }> {
const critical = await getGoBatch(qx, true, cursor.criticalAfter, batchSize)
const nonCritical = await getGoBatch(qx, false, cursor.after, batchSize - critical.length)

return {
rows: [...critical, ...nonCritical],
nextCursor: {
criticalAfter:
critical.length > 0 ? critical[critical.length - 1].purl : cursor.criticalAfter,
after: nonCritical.length > 0 ? nonCritical[nonCritical.length - 1].purl : cursor.after,
},
}
}

export async function enrichGoVersionsBatch(
afterPurl: string,
cursor: GoScanCursor,
batchSize: number,
): Promise<string | null> {
): Promise<GoScanCursor | null> {
const qx = await getPackagesDb()
const rows = await getGoBatch(qx, afterPurl, batchSize)
const { rows, nextCursor } = await getGoPriorityBatch(qx, cursor, batchSize)
if (rows.length === 0) return null

const { fetchTimeoutMs, proxyConcurrency } = getGoConfig()
Expand Down Expand Up @@ -89,15 +123,15 @@ export async function enrichGoVersionsBatch(
}

log.info({ count: rows.length, concurrency: proxyConcurrency }, 'Enriched go versions batch')
return rows[rows.length - 1].purl
return nextCursor
}

export async function enrichGoStatusBatch(
afterPurl: string,
cursor: GoScanCursor,
batchSize: number,
): Promise<string | null> {
): Promise<GoScanCursor | null> {
const qx = await getPackagesDb()
const rows = await getGoBatch(qx, afterPurl, batchSize)
const { rows, nextCursor } = await getGoPriorityBatch(qx, cursor, batchSize)
if (rows.length === 0) return null

const { fetchTimeoutMs } = getGoConfig()
Expand Down Expand Up @@ -137,5 +171,5 @@ export async function enrichGoStatusBatch(
}

log.info({ count: rows.length }, 'Enriched go status batch')
return rows[rows.length - 1].purl
return nextCursor
}
84 changes: 52 additions & 32 deletions services/apps/packages_worker/src/go/proxyClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ import { FetchError, GoProxyLatest } from './types'

const BASE = process.env.GO_PROXY_BASE_URL ?? 'https://proxy.golang.org'
const ZERO_TIME = '0001-01-01T00:00:00Z'
const MAX_429_RETRIES = 5

function sleep(ms: number): Promise<void> {
return new Promise((r) => setTimeout(r, ms))
}

// GOPROXY spec: uppercase letters in a module path are escaped as '!' + lowercase.
export function escapeModulePath(module: string): string {
Expand All @@ -13,40 +18,55 @@ export async function fetchLatest(
timeoutMs: number,
): Promise<GoProxyLatest | FetchError> {
const url = `${BASE}/${escapeModulePath(module)}/@latest`
const controller = new AbortController()
const timer = setTimeout(() => controller.abort(), timeoutMs)

let res: Response
try {
res = await fetch(url, { signal: controller.signal })
} catch (e) {
return { kind: 'TRANSIENT', message: `network error: ${(e as Error).message}` }
} finally {
clearTimeout(timer)
}

if (res.status === 429) {
return { kind: 'RATE_LIMIT', statusCode: 429, message: 'rate limited' }
}
// Any other 4xx is permanent (unknown/invalid module path) — skip, don't retry.
if (res.status >= 400 && res.status < 500) {
return { kind: 'NOT_FOUND', statusCode: res.status, message: `${res.status}` }
}
if (res.status !== 200) {
return { kind: 'TRANSIENT', statusCode: res.status, message: `unexpected status ${res.status}` }
}
for (let attempt = 0; attempt <= MAX_429_RETRIES; attempt++) {
const controller = new AbortController()
const timer = setTimeout(() => controller.abort(), timeoutMs)

let body: { Version?: string; Time?: string; Origin?: { URL?: string } }
try {
body = (await res.json()) as { Version?: string; Time?: string; Origin?: { URL?: string } }
} catch {
return { kind: 'MALFORMED', message: 'invalid json' }
}
if (!body.Version) return { kind: 'MALFORMED', message: 'missing Version' }
let res: Response
try {
res = await fetch(url, { signal: controller.signal })
} catch (e) {
return { kind: 'TRANSIENT', message: `network error: ${(e as Error).message}` }
} finally {
clearTimeout(timer)
}

if (res.status === 429) {
if (attempt === MAX_429_RETRIES) {
return { kind: 'RATE_LIMIT', statusCode: 429, message: '429 after retries' }
}
const retryAfterSec = parseInt(res.headers.get('retry-after') ?? '', 10)
const waitMs = Number.isNaN(retryAfterSec) ? 1000 * 2 ** attempt : retryAfterSec * 1000
Comment thread
mbani01 marked this conversation as resolved.
await sleep(waitMs)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long Retry-After misses heartbeats

Medium Severity

On HTTP 429, fetchLatest may sleep for the full numeric Retry-After value in seconds before retrying. The go enrichment activity only heartbeats at the start of each package; a wait longer than the configured two-minute heartbeat timeout can fail the activity mid-batch.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 76e9914. Configure here.

continue
Comment on lines +35 to +42
}
// Any other 4xx is permanent (unknown/invalid module path) — skip, don't retry.
if (res.status >= 400 && res.status < 500) {
return { kind: 'NOT_FOUND', statusCode: res.status, message: `${res.status}` }
}
if (res.status !== 200) {
return {
kind: 'TRANSIENT',
statusCode: res.status,
message: `unexpected status ${res.status}`,
}
}

return {
version: body.Version,
releaseAt: body.Time && body.Time !== ZERO_TIME ? body.Time : null,
repoUrl: body.Origin?.URL || null,
let body: { Version?: string; Time?: string; Origin?: { URL?: string } }
try {
body = (await res.json()) as { Version?: string; Time?: string; Origin?: { URL?: string } }
} catch {
return { kind: 'MALFORMED', message: 'invalid json' }
}
if (!body.Version) return { kind: 'MALFORMED', message: 'missing Version' }

return {
version: body.Version,
releaseAt: body.Time && body.Time !== ZERO_TIME ? body.Time : null,
repoUrl: body.Origin?.URL || null,
}
}

return { kind: 'RATE_LIMIT', statusCode: 429, message: '429 after retries' }
}
14 changes: 5 additions & 9 deletions services/apps/packages_worker/src/go/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,22 @@ const acts = proxyActivities<typeof activities>({
const BATCH = 100
const ROUNDS_PER_RUN = 200

interface ScanState {
cursor: string
}
const START_CURSOR = { criticalAfter: '', after: '' }

export async function enrichGoVersions(state: ScanState = { cursor: '' }): Promise<void> {
let { cursor } = state
export async function enrichGoVersions(cursor = START_CURSOR): Promise<void> {
for (let r = 0; r < ROUNDS_PER_RUN; r++) {
const next = await acts.enrichGoVersionsBatch(cursor, BATCH)
if (next === null) return
cursor = next
}
await continueAsNew<typeof enrichGoVersions>({ cursor })
await continueAsNew<typeof enrichGoVersions>(cursor)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Legacy workflow cursor shape breaks

High Severity

The Go enrichment workflows now take a GoScanCursor as the first argument and pass that object to continueAsNew, but previously they used { cursor: string }. Replay or continuation from an in-flight run still supplies the old shape, so criticalAfter and after are undefined in getGoPriorityBatch, pagination breaks, and the workflow can exit early as if the scan finished.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 13a1cda. Configure here.

}

export async function enrichGoStatus(state: ScanState = { cursor: '' }): Promise<void> {
let { cursor } = state
export async function enrichGoStatus(cursor = START_CURSOR): Promise<void> {
for (let r = 0; r < ROUNDS_PER_RUN; r++) {
const next = await acts.enrichGoStatusBatch(cursor, BATCH)
if (next === null) return
cursor = next
}
await continueAsNew<typeof enrichGoStatus>({ cursor })
await continueAsNew<typeof enrichGoStatus>(cursor)
}
Loading