docs: add engine-native record selection plan#792
Conversation
Document the V1 API, engine architecture, resume model, implementation phases, and validation strategy for exact accepted-row targets. Refs #790 Signed-off-by: Nabin Mulepati <nmulepati@nvidia.com>
Greptile SummaryThis PR adds a source-of-truth design plan for engine-native record selection, allowing users to declare a boolean predicate column and request an exact number of accepted output rows from a single bounded
|
| Filename | Overview |
|---|---|
| plans/790/engine-native-record-selection.md | New 985-line design plan for engine-native record selection; three design gaps identified: unresolved V1 media artifact strategy, incomplete resume state machine for the IF_POSSIBLE + fingerprint-mismatch path, and a metadata accounting invariant that breaks when rows fail before predicate evaluation. |
Sequence Diagram
%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant U as User
participant I as DataDesigner.create
participant C as AcceptanceController
participant S as AsyncTaskScheduler
participant D as Column DAG
participant St as ArtifactStorage
U->>I: "create(builder, num_records=X)"
I->>C: "initialize(target=X, cap=M)"
loop "while accepted < X and candidates < M"
C->>C: "next_candidate_batch(size=min(buffer_size, target, remaining_budget))"
C->>S: run(row_group_id, candidate_offset, batch_size)
S->>D: generate full column DAG for candidate batch
D-->>S: completed rows + predicate column
S->>C: select(dataframe)
Note over C: Validate bool/null, build accepted mask, trim to remaining target
C-->>S: SelectionDecision(accepted_indices, trimmed_count)
S->>St: write accepted rows to parquet-files/batch_N.parquet
S->>St: commit batch marker to selection-checkpoints/batch_N.json
St-->>C: record_checkpoint(batch, decision)
end
alt "accepted == X"
I-->>U: DatasetCreationResults (exact)
else cap exhausted + return_partial
I-->>U: DatasetCreationResults (partial)
else cap exhausted + raise
I-->>U: RecordSelectionExhaustedError
end
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant U as User
participant I as DataDesigner.create
participant C as AcceptanceController
participant S as AsyncTaskScheduler
participant D as Column DAG
participant St as ArtifactStorage
U->>I: "create(builder, num_records=X)"
I->>C: "initialize(target=X, cap=M)"
loop "while accepted < X and candidates < M"
C->>C: "next_candidate_batch(size=min(buffer_size, target, remaining_budget))"
C->>S: run(row_group_id, candidate_offset, batch_size)
S->>D: generate full column DAG for candidate batch
D-->>S: completed rows + predicate column
S->>C: select(dataframe)
Note over C: Validate bool/null, build accepted mask, trim to remaining target
C-->>S: SelectionDecision(accepted_indices, trimmed_count)
S->>St: write accepted rows to parquet-files/batch_N.parquet
S->>St: commit batch marker to selection-checkpoints/batch_N.json
St-->>C: record_checkpoint(batch, decision)
end
alt "accepted == X"
I-->>U: DatasetCreationResults (exact)
else cap exhausted + return_partial
I-->>U: DatasetCreationResults (partial)
else cap exhausted + raise
I-->>U: RecordSelectionExhaustedError
end
Prompt To Fix All With AI
Fix the following 3 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 3
plans/790/engine-native-record-selection.md:694-699
**Unresolved V1 media artifact strategy**
The section states "V1 must choose and document one of these approaches," but then presents three options without selecting one. Open question 5 also defers the decision. Because the artifact layout section fully specifies every other path, leaving the media strategy unresolved means implementers could pick different approaches, which would make the artifacts, test cases, and cleanup behavior inconsistent. The three options have meaningfully different ownership implications for `MediaStorage`: option 1 requires candidate-scoped staging directories that don't exist today; option 2 requires per-row path tracking in the scheduler; option 3 adds a GC pass at cleanup time. A decision here should be made before implementation of Phase 2 begins.
### Issue 2 of 3
plans/790/engine-native-record-selection.md:634-657
**Resume state machine missing `IF_POSSIBLE` fingerprint-mismatch path**
The state machine only shows `LoadConfig --> Incompatible` for fingerprint mismatch when `resume=ALWAYS`. When checkpoints exist but the fingerprint has changed and `resume=IF_POSSIBLE`, there is no transition shown. Standard resume semantics would fall back to `Fresh`, discarding committed candidate progress, but that branch is absent. An implementer who reads only this diagram could silently re-run from scratch without any signal to the caller that prior work was abandoned, or could raise an error when the mode was intended to be lenient.
### Issue 3 of 3
plans/790/engine-native-record-selection.md:586-612
**Metadata accounting invariant breaks when rows fail before predicate evaluation**
The plan states "Failed/dropped candidate slots count against `max_candidate_records`; they consumed an attempt and potentially model cost," but no `failed_generation_records` counter appears in the proposed `record_selection` metadata block. The example JSON validates as `accepted (4 217) + rejected (7 769) + null_predicate (14) == candidate_records_generated (12 000)`, which holds only when all candidates reach the predicate step. If a batch has partial generation failures, the invariant silently breaks and the discrepancy becomes invisible to callers reading `metadata.json` or the batch markers.
Reviews (1): Last reviewed commit: "docs: add record selection design plan" | Re-trigger Greptile
| V1 must choose and document one of these approaches: | ||
|
|
||
| 1. **Preferred:** write candidate media under a candidate-batch-scoped temporary directory and commit only accepted paths. | ||
| 2. Track media paths produced per row and delete those associated exclusively with rejected rows. | ||
| 3. Explicitly allow orphan media until run cleanup, then garbage-collect paths not referenced by accepted output. | ||
|
|
There was a problem hiding this comment.
Unresolved V1 media artifact strategy
The section states "V1 must choose and document one of these approaches," but then presents three options without selecting one. Open question 5 also defers the decision. Because the artifact layout section fully specifies every other path, leaving the media strategy unresolved means implementers could pick different approaches, which would make the artifacts, test cases, and cleanup behavior inconsistent. The three options have meaningfully different ownership implications for MediaStorage: option 1 requires candidate-scoped staging directories that don't exist today; option 2 requires per-row path tracking in the scheduler; option 3 adds a GC pass at cleanup time. A decision here should be made before implementation of Phase 2 begins.
Prompt To Fix With AI
This is a comment left during a code review.
Path: plans/790/engine-native-record-selection.md
Line: 694-699
Comment:
**Unresolved V1 media artifact strategy**
The section states "V1 must choose and document one of these approaches," but then presents three options without selecting one. Open question 5 also defers the decision. Because the artifact layout section fully specifies every other path, leaving the media strategy unresolved means implementers could pick different approaches, which would make the artifacts, test cases, and cleanup behavior inconsistent. The three options have meaningfully different ownership implications for `MediaStorage`: option 1 requires candidate-scoped staging directories that don't exist today; option 2 requires per-row path tracking in the scheduler; option 3 adds a GC pass at cleanup time. A decision here should be made before implementation of Phase 2 begins.
How can I resolve this? If you propose a fix, please make it concise.| ```mermaid | ||
| stateDiagram-v2 | ||
| [*] --> LoadConfig | ||
| LoadConfig --> Fresh: no selection checkpoints | ||
| LoadConfig --> Reconstruct: checkpoints exist and fingerprint matches | ||
| LoadConfig --> Incompatible: fingerprint mismatch with resume=ALWAYS | ||
|
|
||
| Reconstruct --> ValidateMarkers | ||
| ValidateMarkers --> Corrupt: missing or invalid committed artifact | ||
| ValidateMarkers --> Satisfied: accepted >= target | ||
| ValidateMarkers --> Exhausted: candidates >= cap and accepted < target | ||
| ValidateMarkers --> GenerateNext: accepted < target and budget remains | ||
|
|
||
| Fresh --> GenerateNext | ||
| GenerateNext --> CommitBatch | ||
| CommitBatch --> Satisfied: accepted >= target | ||
| CommitBatch --> Exhausted: candidate cap reached | ||
| CommitBatch --> GenerateNext: more candidates needed | ||
|
|
||
| Satisfied --> [*] | ||
| Exhausted --> [*] | ||
| Incompatible --> [*] | ||
| Corrupt --> [*] | ||
| ``` |
There was a problem hiding this comment.
Resume state machine missing
IF_POSSIBLE fingerprint-mismatch path
The state machine only shows LoadConfig --> Incompatible for fingerprint mismatch when resume=ALWAYS. When checkpoints exist but the fingerprint has changed and resume=IF_POSSIBLE, there is no transition shown. Standard resume semantics would fall back to Fresh, discarding committed candidate progress, but that branch is absent. An implementer who reads only this diagram could silently re-run from scratch without any signal to the caller that prior work was abandoned, or could raise an error when the mode was intended to be lenient.
Prompt To Fix With AI
This is a comment left during a code review.
Path: plans/790/engine-native-record-selection.md
Line: 634-657
Comment:
**Resume state machine missing `IF_POSSIBLE` fingerprint-mismatch path**
The state machine only shows `LoadConfig --> Incompatible` for fingerprint mismatch when `resume=ALWAYS`. When checkpoints exist but the fingerprint has changed and `resume=IF_POSSIBLE`, there is no transition shown. Standard resume semantics would fall back to `Fresh`, discarding committed candidate progress, but that branch is absent. An implementer who reads only this diagram could silently re-run from scratch without any signal to the caller that prior work was abandoned, or could raise an error when the mode was intended to be lenient.
How can I resolve this? If you propose a fix, please make it concise.|
|
||
| ```json | ||
| { | ||
| "target_num_records": 5000, | ||
| "actual_num_records": 4217, | ||
| "record_selection": { | ||
| "predicate_column": "meets_criteria", | ||
| "max_candidate_records": 20000, | ||
| "on_exhausted": "raise", | ||
| "candidate_records_generated": 12000, | ||
| "candidate_batches_completed": 12, | ||
| "accepted_records": 4217, | ||
| "rejected_records": 7769, | ||
| "null_predicate_records": 14, | ||
| "trimmed_accepted_records": 0, | ||
| "acceptance_rate": 0.3514167, | ||
| "selection_satisfied": false, | ||
| "selection_exhausted": false, | ||
| "next_candidate_batch_id": 12, | ||
| "next_candidate_offset": 12000 | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| The candidate-batch marker directory is the filesystem source of truth. Global metadata is a convenient summary and may lag | ||
| by one checkpoint during a crash, just as current metadata may lag parquet writes. | ||
|
|
There was a problem hiding this comment.
Metadata accounting invariant breaks when rows fail before predicate evaluation
The plan states "Failed/dropped candidate slots count against max_candidate_records; they consumed an attempt and potentially model cost," but no failed_generation_records counter appears in the proposed record_selection metadata block. The example JSON validates as accepted (4 217) + rejected (7 769) + null_predicate (14) == candidate_records_generated (12 000), which holds only when all candidates reach the predicate step. If a batch has partial generation failures, the invariant silently breaks and the discrepancy becomes invisible to callers reading metadata.json or the batch markers.
Prompt To Fix With AI
This is a comment left during a code review.
Path: plans/790/engine-native-record-selection.md
Line: 586-612
Comment:
**Metadata accounting invariant breaks when rows fail before predicate evaluation**
The plan states "Failed/dropped candidate slots count against `max_candidate_records`; they consumed an attempt and potentially model cost," but no `failed_generation_records` counter appears in the proposed `record_selection` metadata block. The example JSON validates as `accepted (4 217) + rejected (7 769) + null_predicate (14) == candidate_records_generated (12 000)`, which holds only when all candidates reach the predicate step. If a batch has partial generation failures, the invariant silently breaks and the discrepancy becomes invisible to callers reading `metadata.json` or the batch markers.
How can I resolve this? If you propose a fix, please make it concise.
Code Review: PR #792 —
|
andreatgretel
left a comment
There was a problem hiding this comment.
The engine-native direction makes sense to me: exact accepted-row selection is a stage-local data-plane operation, while workflow chaining remains responsible for separate generate, judge, enrich, and transform stages.
The main design points I think need resolving before implementation are:
- preserving explicit candidate offsets through the current row-group-plan normalization;
- keeping selection checkpoint artifacts valid after after-generation processing; and
- defining a successful zero-row
return_partialpath through the interface and profiler.
The remaining comments are API and lifecycle clarifications rather than objections to the overall direction.
| ... | ||
| ``` | ||
|
|
||
| Add a plan type capable of preserving explicit start offsets: |
There was a problem hiding this comment.
CandidateBatchPlan as sketched will lose its explicit start_offset in the current scheduler wiring. _prepare_async_run() accepts RowGroupInput, and normalize_row_group_plan() preserves only CompactRowGroupPlan and ExplicitRowGroupPlan. Any other iterable is wrapped in ExplicitRowGroupPlan, which recomputes offsets starting from zero.
Because each candidate plan contains one row group, every candidate batch would expose offset zero to the ordered seed generator and replay the beginning of the seed dataset.
Could the plan specify either a complete RowGroupPlanLike implementation that is preserved by the normalizer, or a dedicated candidate-offset parameter? The regression test should assert that candidate batch 1 starts after candidate batch 0 rather than only checking output counts.
| For v1: | ||
|
|
||
| 1. Reject known row-count-changing after-generation processors at compile/runtime setup. | ||
| 2. Run allowed after-generation processors once over accepted output. |
There was a problem hiding this comment.
The resume model treats candidate-batch markers and their referenced parquet files as the source of truth. However, ProcessorRunner.run_after_generation() currently deletes parquet-files/ and re-chunks the combined dataset. Even a row-count-preserving processor can therefore remove marker paths or change their expected row counts, making a completed selection run appear corrupt on resume.
Could the plan explicitly separate immutable accepted-candidate partitions from the published postprocessed dataset? Selection markers could reference the immutable partitions, while terminal metadata points to the final processed output.
| ) | ||
| ``` | ||
|
|
||
| For `return_partial`, finalize the accepted output and record `selection_satisfied=false` and |
There was a problem hiding this comment.
return_partial is specified to complete successfully even when zero candidates pass, but DataDesigner.create() currently rejects every zero-row dataset before profiling. Since zero-acceptance batches intentionally write no parquet, the all-rejected path cannot currently return a DatasetCreationResults.
Could the plan explicitly require terminal empty-output handling, including bypassing the ordinary zero-row failure guard, materializing a schema-bearing empty dataset, and defining empty profiling behavior? Otherwise the empty-partial test described below will not be implementable.
|
|
||
| - It participates in normal DAG dependency discovery. | ||
| - It can be previewed and debugged like any other column. | ||
| - It can be generated by expressions, plugins, validators, or future generator types. |
There was a problem hiding this comment.
Could we clarify the scope of custom or plugin-generated predicates? A full-column predicate can perform batch-global filtering, but it cannot safely implement run-global deduplication, quotas, or ranking unless its cross-batch state is durable across resume. Checkpointed accepted rows also cannot later be revoked, which excludes selectors such as global top-N.
It may be worth stating that V1 predicates must be row-local or otherwise monotonic and resume-safe.
| builder.add_column( | ||
| dd.ExpressionColumnConfig( | ||
| name="meets_criteria", | ||
| expr="{{ quality.score >= 0.8 }}", |
There was a problem hiding this comment.
I think this predicate path is incomplete for LLMJudgeColumnConfig. Judge results are nested under the configured score name, so with Score(name="answer_quality", ...) the expression would be something like quality.answer_quality.score >= 0.8. quality.score does not match the current structured judge output.
Could the example define a concrete Score and use its full nested path?
| self._handle_selection_completion(controller) | ||
| ``` | ||
|
|
||
| Initialize generator instances once and reuse them across candidate batches. In particular: |
There was a problem hiding this comment.
A fresh scheduler per candidate batch currently means calling _prepare_async_run() per batch, and that method invokes every generator's log_pre_generation(). To satisfy the stated once-per-logical-build behavior, the implementation will need to move those calls outside scheduler preparation and invoke them before the candidate loop.
📋 Summary
Adds the source-of-truth design plan for engine-native record selection, allowing users to request an exact number of rows that satisfy a declared boolean criterion in one bounded, resumable DataDesigner run. The plan defines V1 as the complete user-facing feature and leaves concurrent batches and early cancellation as benchmark-driven optimizations.
🔗 Related Issue
Related to #790. The issue remains open to track implementation of the approved design.
🔄 Changes
RecordSelectionConfigAPI and exact accepted-row semantics.🧪 Testing
git diff --check origin/main..HEAD✅ Checklist