[Python] Bound Watch state with a timestamp cursor#39090
Draft
Eliaaazzz wants to merge 2 commits into
Draft
Conversation
Add an experimental Watch transform that watches a growing set of outputs per input element. Watch.growth_of(poll_fn) runs a periodic poll loop as a splittable DoFn and emits an unbounded PCollection of (input, output) pairs. Each process() performs one poll round and self-checkpoints via defer_remainder. New outputs are deduplicated with a stable 128-bit blake2b hash of the encoded output, and a manual watermark estimator advances per poll. Per-input termination supports never() and after_total_of(); polling also stops when a poll returns PollResult.complete(). The DoFn is its own RestrictionProvider, and restriction state serializes through a tagged GrowthState coder. Tests cover termination conditions, coder round-trips, the restriction tracker claim/checkpoint/dedup logic, and DirectRunner end-to-end runs.
Follow-up to the experimental Watch transform that documents the event-time contract, makes late data observable, and adds an opt-in O(1) bound on the per-input state. Watermark/event-time: - Document the event-time/watermark contract on the module and PollResult. - Warn (throttled) when an output is emitted behind the current watermark, so out-of-order late data is observable instead of silently dropped, and point to PollResult.with_watermark for out-of-order sources. Scalability: - Add opt-in Watch.with_timestamp_cursor(): dedup by a high-water-mark timestamp instead of by value identity. Watch keeps only the greatest event time emitted per input and emits the polled outputs strictly past it, so the per-input state is a single timestamp that never grows and the poll result is not hashed, giving O(1) state and per-checkpoint encoding. For sources whose outputs carry strictly increasing event-time timestamps; an output at or below the cursor is treated as already seen, so out-of-order or equal-timestamp outputs are dropped (use the default exact dedup otherwise). - Reuse the parent dedup map on idle rounds to drop the O(N) per-round copy. Tests cover the watermark contract, the out-of-order late warning, cursor emit-after-high-water-mark, O(1) state, relist-once, complete, cursor coder round-trip, and end-to-end cursor dedup. 26 tests and 5 subtests pass.
85476e4 to
71d6282
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Stacked on #39023 (the Watch transform MVP); the first commit belongs to that PR, so the change to review is the second commit. Relates to #18459 (the unbounded
completeddedup set).By default Watch dedups by value identity and keeps one hash per distinct output, so the per-input state grows without bound. This adds an opt-in
Watch.with_timestamp_cursor()that dedups by a high-water-mark timestamp instead: Watch keeps only the greatest event time it has emitted for an input and emits the polled outputs strictly past it, then advances the cursor. No hash set is kept and the poll result is not hashed, so the per-input state and per-checkpoint encoding are O(1) regardless of how many outputs the input produces. It is for sources whose outputs carry strictly increasing event-time timestamps; an output at or below the cursor is treated as already seen, so the default exact dedup remains for arbitrary-relisting or out-of-order sources. This also documents the event-time/watermark contract and adds a throttled late-emission warning.Testing: 28 unit tests and 5 subtests on the in-memory DirectRunner; yapf 0.43.0, isort 7.0.0, and pylint clean. Load tested on Dataflow Runner v2 (us-east1) against the default hash dedup, 4 inputs emitting 15000 outputs per 1s round over a 240s budget: the cursor processed 6,825,000 outputs versus the default's 1,650,000 in the same budget (4.1x), because the default re-serializes its growing dedup set on every checkpoint (it reached about 300-495K entries per input) while the cursor writes one timestamp; the gap widens with runtime. Both stayed exactly-once and reached JOB_STATE_DONE. Jobs on apache-beam-testing: cursor
2026-06-24_08_45_05-17501676526076065162, default2026-06-24_08_45_23-5060488636914380336.Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.