Skip to content

[Python] Bound Watch state with a timestamp cursor#39090

Draft
Eliaaazzz wants to merge 2 commits into
apache:masterfrom
Eliaaazzz:python-watch-timestamp-cursor
Draft

[Python] Bound Watch state with a timestamp cursor#39090
Eliaaazzz wants to merge 2 commits into
apache:masterfrom
Eliaaazzz:python-watch-timestamp-cursor

Conversation

@Eliaaazzz

@Eliaaazzz Eliaaazzz commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Stacked on #39023 (the Watch transform MVP); the first commit belongs to that PR, so the change to review is the second commit. Relates to #18459 (the unbounded completed dedup set).

By default Watch dedups by value identity and keeps one hash per distinct output, so the per-input state grows without bound. This adds an opt-in Watch.with_timestamp_cursor() that dedups by a high-water-mark timestamp instead: Watch keeps only the greatest event time it has emitted for an input and emits the polled outputs strictly past it, then advances the cursor. No hash set is kept and the poll result is not hashed, so the per-input state and per-checkpoint encoding are O(1) regardless of how many outputs the input produces. It is for sources whose outputs carry strictly increasing event-time timestamps; an output at or below the cursor is treated as already seen, so the default exact dedup remains for arbitrary-relisting or out-of-order sources. This also documents the event-time/watermark contract and adds a throttled late-emission warning.

Testing: 28 unit tests and 5 subtests on the in-memory DirectRunner; yapf 0.43.0, isort 7.0.0, and pylint clean. Load tested on Dataflow Runner v2 (us-east1) against the default hash dedup, 4 inputs emitting 15000 outputs per 1s round over a 240s budget: the cursor processed 6,825,000 outputs versus the default's 1,650,000 in the same budget (4.1x), because the default re-serializes its growing dedup set on every checkpoint (it reached about 300-495K entries per input) while the cursor writes one timestamp; the gap widens with runtime. Both stayed exactly-once and reached JOB_STATE_DONE. Jobs on apache-beam-testing: cursor 2026-06-24_08_45_05-17501676526076065162, default 2026-06-24_08_45_23-5060488636914380336.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Add an experimental Watch transform that watches a growing set of outputs
per input element. Watch.growth_of(poll_fn) runs a periodic poll loop as a
splittable DoFn and emits an unbounded PCollection of (input, output) pairs.

Each process() performs one poll round and self-checkpoints via
defer_remainder. New outputs are deduplicated with a stable 128-bit blake2b
hash of the encoded output, and a manual watermark estimator advances per
poll. Per-input termination supports never() and after_total_of(); polling
also stops when a poll returns PollResult.complete(). The DoFn is its own
RestrictionProvider, and restriction state serializes through a tagged
GrowthState coder.

Tests cover termination conditions, coder round-trips, the restriction
tracker claim/checkpoint/dedup logic, and DirectRunner end-to-end runs.
Follow-up to the experimental Watch transform that documents the event-time
contract, makes late data observable, and adds an opt-in O(1) bound on the
per-input state.

Watermark/event-time:
- Document the event-time/watermark contract on the module and PollResult.
- Warn (throttled) when an output is emitted behind the current watermark, so
  out-of-order late data is observable instead of silently dropped, and point
  to PollResult.with_watermark for out-of-order sources.

Scalability:
- Add opt-in Watch.with_timestamp_cursor(): dedup by a high-water-mark timestamp
  instead of by value identity. Watch keeps only the greatest event time emitted
  per input and emits the polled outputs strictly past it, so the per-input
  state is a single timestamp that never grows and the poll result is not
  hashed, giving O(1) state and per-checkpoint encoding. For sources whose
  outputs carry strictly increasing event-time timestamps; an output at or below
  the cursor is treated as already seen, so out-of-order or equal-timestamp
  outputs are dropped (use the default exact dedup otherwise).
- Reuse the parent dedup map on idle rounds to drop the O(N) per-round copy.

Tests cover the watermark contract, the out-of-order late warning, cursor
emit-after-high-water-mark, O(1) state, relist-once, complete, cursor coder
round-trip, and end-to-end cursor dedup. 26 tests and 5 subtests pass.
@Eliaaazzz Eliaaazzz force-pushed the python-watch-timestamp-cursor branch from 85476e4 to 71d6282 Compare June 24, 2026 15:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant