From 7b7dc66ac6e014b45c528bfc15bd548ceb42be2d Mon Sep 17 00:00:00 2001 From: Endre Berki Date: Fri, 19 Jun 2026 13:06:49 +0200 Subject: [PATCH 1/8] feat: improve CoalescingBuffer to park on idle, reducing CPU usage --- .../lib/core/services/adk/streaming.py | 50 +++++++++++++---- tests/lib/core/services/adk/test_streaming.py | 54 +++++++++++++++++++ 2 files changed, 94 insertions(+), 10 deletions(-) diff --git a/src/agentex/lib/core/services/adk/streaming.py b/src/agentex/lib/core/services/adk/streaming.py index 7215f084c..e5e2a4f99 100644 --- a/src/agentex/lib/core/services/adk/streaming.py +++ b/src/agentex/lib/core/services/adk/streaming.py @@ -166,7 +166,12 @@ def __init__(self, on_flush: Callable[[StreamTaskMessageDelta], Awaitable[object self._first_flushed = False self._closed = False self._lock = asyncio.Lock() - self._flush_signal = asyncio.Event() + # Two events so the ticker can park at zero CPU when idle: + # _wake — buffer went empty -> non-empty; the ticker should run + # _flush_now — flush immediately (first delta / size threshold / close), + # bypassing the coalescing window + self._wake = asyncio.Event() + self._flush_now = asyncio.Event() self._task: asyncio.Task[None] | None = None def start(self) -> None: @@ -177,22 +182,42 @@ async def add(self, update: StreamTaskMessageDelta) -> None: if self._closed: return async with self._lock: + was_empty = not self._buf self._buf.append(update) self._buf_chars += _delta_char_len(update.delta) if not self._first_flushed or self._buf_chars >= self.MAX_BUFFERED_CHARS: self._first_flushed = True - self._flush_signal.set() + self._flush_now.set() + # Wake the (possibly parked) ticker when the buffer goes from empty + # to non-empty; it then applies the coalescing window itself. + if was_empty: + self._wake.set() async def _run(self) -> None: try: while True: - try: - await asyncio.wait_for(self._flush_signal.wait(), timeout=self.FLUSH_INTERVAL_S) - except asyncio.TimeoutError: - pass + # Park at zero CPU until there is data to flush (or close()). + # This is the key change from a fixed-interval ticker: an idle + # or orphaned buffer blocks here instead of waking every + # FLUSH_INTERVAL_S forever — the latter leaked CPU when a buffer + # outlived its stream without close() running (one spinning task + # per such stream). + await self._wake.wait() + self._wake.clear() + # First delta / size threshold / close flush immediately; + # otherwise coalesce for up to FLUSH_INTERVAL_S so consecutive + # deltas batch into a single publish. + if not self._flush_now.is_set() and not self._closed: + try: + await asyncio.wait_for(self._flush_now.wait(), timeout=self.FLUSH_INTERVAL_S) + except asyncio.TimeoutError: + pass async with self._lock: - self._flush_signal.clear() + self._flush_now.clear() drained = self._drain_locked() + # Data that arrived during the flush keeps the ticker running. + if self._buf: + self._wake.set() for u in drained: try: await self._on_flush(u) @@ -215,12 +240,17 @@ async def close(self) -> None: # producing the duplicate-tail symptom seen on the UI stream. self._closed = True if self._task is not None: - self._flush_signal.set() + # Wake the parked ticker so it sees _closed and exits after its + # next drain. + self._wake.set() + self._flush_now.set() try: await self._task except asyncio.CancelledError: - # Propagate if our caller is being cancelled; the task itself - # swallows CancelledError so this only fires on outer cancel. + # Our caller is being cancelled. Force-cancel the ticker so it + # can never be orphaned into a parked/looping task, then + # propagate the cancellation. + self._task.cancel() raise self._task = None async with self._lock: diff --git a/tests/lib/core/services/adk/test_streaming.py b/tests/lib/core/services/adk/test_streaming.py index b07c55f74..cb762fea1 100644 --- a/tests/lib/core/services/adk/test_streaming.py +++ b/tests/lib/core/services/adk/test_streaming.py @@ -303,6 +303,60 @@ async def on_flush(u: StreamTaskMessageDelta) -> None: await buf.close() +class TestCoalescingBufferIdleParks: + """Regression: the ticker must park (block on its wake event) when there is + no buffered data, instead of waking every FLUSH_INTERVAL_S. The old + fixed-interval ticker spun at 1/FLUSH_INTERVAL forever, so a buffer that + outlived its stream (orphaned, close() not run) pinned worker CPU — one + spinning task per such stream. + """ + + @staticmethod + def _count_drains(buf: CoalescingBuffer) -> list[int]: + """Instrument _drain_locked to count ticker wake/drain cycles.""" + n = [0] + orig = buf._drain_locked + + def counting() -> list[StreamTaskMessageDelta]: + n[0] += 1 + return orig() + + buf._drain_locked = counting # type: ignore[method-assign] + return n + + @pytest.mark.asyncio + async def test_idle_buffer_does_not_spin(self) -> None: + """With no data ever added, the ticker must not drain at all over many + FLUSH_INTERVAL_S windows.""" + buf = CoalescingBuffer(on_flush=AsyncMock()) + drains = self._count_drains(buf) + buf.start() + try: + # ~8 windows at FLUSH_INTERVAL_S=0.050; a polling ticker would have + # woken ~8 times. A parked ticker drains 0 times. + await asyncio.sleep(0.4) + assert drains[0] == 0, f"idle ticker woke {drains[0]}x (must park at 0)" + finally: + await buf.close() + + @pytest.mark.asyncio + async def test_orphaned_buffer_parks_after_flush(self, task_message: TaskMessage) -> None: + """A buffer whose close() never runs (orphaned on an abnormal stream + exit) must still park at zero CPU once its data is drained — not spin. + This is the exact condition that previously leaked worker CPU.""" + buf = CoalescingBuffer(on_flush=AsyncMock()) + buf.start() + try: + await buf.add(_text(task_message, "hi")) # one immediate flush + await asyncio.sleep(0.020) # let it flush and park + drains = self._count_drains(buf) + # Deliberately do NOT close — simulate an orphaned buffer. + await asyncio.sleep(0.4) + assert drains[0] == 0, f"orphaned ticker woke {drains[0]}x (must park at 0)" + finally: + await buf.close() # cleanup only + + class TestCoalescingBufferClose: @pytest.mark.asyncio async def test_close_drains_remaining_buffered_items(self, task_message: TaskMessage) -> None: From f8f84272dd836cf6c599f2cac43a60692aa685c8 Mon Sep 17 00:00:00 2001 From: Endre Berki Date: Mon, 22 Jun 2026 10:26:04 +0200 Subject: [PATCH 2/8] feat: ensure StreamTaskMessageFull closes coalescing buffer to prevent CPU leaks --- .../lib/core/services/adk/streaming.py | 27 ++++++--- tests/lib/core/services/adk/test_streaming.py | 56 ++++++++++++++++++- 2 files changed, 74 insertions(+), 9 deletions(-) diff --git a/src/agentex/lib/core/services/adk/streaming.py b/src/agentex/lib/core/services/adk/streaming.py index e5e2a4f99..a232a82e0 100644 --- a/src/agentex/lib/core/services/adk/streaming.py +++ b/src/agentex/lib/core/services/adk/streaming.py @@ -215,9 +215,9 @@ async def _run(self) -> None: async with self._lock: self._flush_now.clear() drained = self._drain_locked() - # Data that arrived during the flush keeps the ticker running. - if self._buf: - self._wake.set() + # Deltas that arrive after this drain (e.g. during the _on_flush + # awaits below) re-arm the ticker via add()'s empty->non-empty + # _wake.set(), so the loop re-runs to flush them. for u in drained: try: await self._on_flush(u) @@ -450,15 +450,17 @@ async def close(self) -> TaskMessage: if not self.task_message: raise ValueError("Context not properly initialized - no task message") - if self._is_closed: - return self.task_message # Already done - - # Drain any buffered deltas before announcing DONE so consumers see the - # full sequence in order. + # Always reap the buffer ticker first, even if the context was already + # marked done by a Full/Done update on another path. close() is the last + # line of defense against an orphaned, forever-polling ticker, so it must + # never be short-circuited before stopping it. if self._buffer is not None: await self._buffer.close() self._buffer = None + if self._is_closed: + return self.task_message # Already done (buffer reaped above) + # Send the DONE event done_event = StreamTaskMessageDone( parent_task_message=self.task_message, @@ -522,6 +524,15 @@ async def stream_update(self, update: TaskMessageUpdate) -> TaskMessageUpdate | await self.close() return update elif isinstance(update, StreamTaskMessageFull): + # A full message supersedes any buffered deltas and ends the stream. + # Close the coalescing buffer (stopping its ticker) BEFORE marking + # the context done — otherwise __aexit__'s close() early-returns on + # _is_closed and the ticker is never stopped, polling forever and + # leaking CPU (mirrors the StreamTaskMessageDone branch, which closes + # via self.close()). + if self._buffer is not None: + await self._buffer.close() + self._buffer = None await self._agentex_client.messages.update( task_id=self.task_id, message_id=update.parent_task_message.id, # type: ignore[union-attr] diff --git a/tests/lib/core/services/adk/test_streaming.py b/tests/lib/core/services/adk/test_streaming.py index cb762fea1..86e21877f 100644 --- a/tests/lib/core/services/adk/test_streaming.py +++ b/tests/lib/core/services/adk/test_streaming.py @@ -22,7 +22,10 @@ ToolResponseDelta, ReasoningSummaryDelta, ) -from agentex.types.task_message_update import StreamTaskMessageDelta +from agentex.types.task_message_update import ( + StreamTaskMessageDelta, + StreamTaskMessageFull, +) from agentex.lib.core.services.adk.streaming import ( CoalescingBuffer, StreamingTaskMessageContext, @@ -574,3 +577,54 @@ async def test_open_without_created_at_passes_omit(self) -> None: kwargs = client.messages.create.call_args.kwargs assert kwargs["created_at"] is omit + + +class TestFullMessageClosesBuffer: + """Regression: a StreamTaskMessageFull must stop the coalescing-buffer ticker. + + A ``StreamTaskMessageFull`` ends the stream and marks the context done. If it + marks ``_is_closed`` without closing the buffer, ``__aexit__``'s ``close()`` + early-returns on the ``_is_closed`` guard and the ticker is never stopped — + it polls every ``FLUSH_INTERVAL_S`` forever, one orphaned task per stream + (the OneEdge worker CPU leak). These tests pin both halves of the fix: + the Full branch closes the buffer, and ``close()`` reaps it even if the + context was already marked closed by another path. + """ + + @pytest.mark.asyncio + async def test_full_message_stops_ticker(self) -> None: + ctx, _svc, tm = await _make_context("coalesced") + # Stream a delta so the buffer and its background ticker are live. + await ctx.stream_update(_text(tm, "hello")) + buf = ctx._buffer + assert buf is not None + task = buf._task + assert task is not None and not task.done() + + # End-of-turn full message (OneEdge's pattern). + await ctx.stream_update( + StreamTaskMessageFull( + parent_task_message=tm, + content=TextContent(author="agent", content="final", format="markdown"), + type="full", + ) + ) + + assert ctx._buffer is None, "Full message left the buffer un-closed" + assert task.done(), "coalescing-buffer ticker still running after Full (orphaned)" + + @pytest.mark.asyncio + async def test_close_reaps_buffer_even_if_already_marked_closed(self) -> None: + # Defense-in-depth: if any path marks the context closed without closing + # the buffer, close() must still stop the ticker rather than short-circuit. + ctx, _svc, tm = await _make_context("coalesced") + await ctx.stream_update(_text(tm, "hi")) + buf = ctx._buffer + assert buf is not None + task = buf._task + assert task is not None and not task.done() + + ctx._is_closed = True # stray "already done" mark with a live buffer + await ctx.close() + + assert task.done(), "close() must reap the buffer even when already marked closed" From 316faf4eb69d23ba6988f3e4b016b0744ca0c89e Mon Sep 17 00:00:00 2001 From: Endre Berki Date: Mon, 22 Jun 2026 10:38:04 +0200 Subject: [PATCH 3/8] feat: ensure Full message closes coalescing buffer to prevent stale duplicates --- .../lib/core/services/adk/streaming.py | 20 +++++++------ tests/lib/core/services/adk/test_streaming.py | 29 +++++++++++++++++++ 2 files changed, 40 insertions(+), 9 deletions(-) diff --git a/src/agentex/lib/core/services/adk/streaming.py b/src/agentex/lib/core/services/adk/streaming.py index a232a82e0..1c2ea2054 100644 --- a/src/agentex/lib/core/services/adk/streaming.py +++ b/src/agentex/lib/core/services/adk/streaming.py @@ -518,21 +518,23 @@ async def stream_update(self, update: TaskMessageUpdate) -> TaskMessageUpdate | await self._buffer.add(update) return update + # A full message supersedes the streamed deltas and ends the stream. + # Drain and stop the coalescing buffer BEFORE publishing the Full, so any + # leftover buffered deltas land on the stream in order (deltas -> Full) + # rather than after the terminal Full — a consumer treating Full as the + # final message would otherwise see those trailing deltas as a stale + # duplicate tail. Closing here also stops the ticker, so it can't be + # orphaned when __aexit__'s close() later short-circuits on _is_closed. + if isinstance(update, StreamTaskMessageFull) and self._buffer is not None: + await self._buffer.close() + self._buffer = None + result = await self._streaming_service.stream_update(update) if isinstance(update, StreamTaskMessageDone): await self.close() return update elif isinstance(update, StreamTaskMessageFull): - # A full message supersedes any buffered deltas and ends the stream. - # Close the coalescing buffer (stopping its ticker) BEFORE marking - # the context done — otherwise __aexit__'s close() early-returns on - # _is_closed and the ticker is never stopped, polling forever and - # leaking CPU (mirrors the StreamTaskMessageDone branch, which closes - # via self.close()). - if self._buffer is not None: - await self._buffer.close() - self._buffer = None await self._agentex_client.messages.update( task_id=self.task_id, message_id=update.parent_task_message.id, # type: ignore[union-attr] diff --git a/tests/lib/core/services/adk/test_streaming.py b/tests/lib/core/services/adk/test_streaming.py index 86e21877f..ce376ba77 100644 --- a/tests/lib/core/services/adk/test_streaming.py +++ b/tests/lib/core/services/adk/test_streaming.py @@ -613,6 +613,35 @@ async def test_full_message_stops_ticker(self) -> None: assert ctx._buffer is None, "Full message left the buffer un-closed" assert task.done(), "coalescing-buffer ticker still running after Full (orphaned)" + @pytest.mark.asyncio + async def test_full_is_terminal_publish_no_trailing_deltas(self) -> None: + # Leftover buffered deltas must be drained BEFORE the Full hits the + # stream (deltas -> Full), never after it — a consumer treating Full as + # the final message would see a trailing delta as a stale duplicate tail. + ctx, svc, tm = await _make_context("coalesced") + # First delta flushes immediately; the second stays in the coalescing + # window, so it is still buffered when the Full arrives. + await ctx.stream_update(_text(tm, "alpha")) + await ctx.stream_update(_text(tm, "beta")) + + full = StreamTaskMessageFull( + parent_task_message=tm, + content=TextContent(author="agent", content="alphabeta", format="markdown"), + type="full", + ) + await ctx.stream_update(full) + + # Every publish (delta flushes + the Full) goes through the service mock. + published = [c.args[0] for c in svc.stream_update.await_args_list] + assert published, "nothing was published" + assert published[-1] is full, ( + f"Full must be the terminal publish; saw trailing " + f"{type(published[-1]).__name__} after it (stale duplicate tail)" + ) + assert any(isinstance(u, StreamTaskMessageDelta) for u in published[:-1]), ( + "expected the buffered deltas to be published before the Full" + ) + @pytest.mark.asyncio async def test_close_reaps_buffer_even_if_already_marked_closed(self) -> None: # Defense-in-depth: if any path marks the context closed without closing From 922c895a7bd9b02004c5053e8d3cca931ed9fec0 Mon Sep 17 00:00:00 2001 From: Endre Berki Date: Mon, 22 Jun 2026 10:46:25 +0200 Subject: [PATCH 4/8] feat: enhance CoalescingBuffer to park on idle, preventing CPU leaks --- .../lib/core/services/adk/streaming.py | 66 +++++++------------ tests/lib/core/services/adk/test_streaming.py | 56 +++++----------- 2 files changed, 40 insertions(+), 82 deletions(-) diff --git a/src/agentex/lib/core/services/adk/streaming.py b/src/agentex/lib/core/services/adk/streaming.py index 1c2ea2054..671472832 100644 --- a/src/agentex/lib/core/services/adk/streaming.py +++ b/src/agentex/lib/core/services/adk/streaming.py @@ -166,10 +166,9 @@ def __init__(self, on_flush: Callable[[StreamTaskMessageDelta], Awaitable[object self._first_flushed = False self._closed = False self._lock = asyncio.Lock() - # Two events so the ticker can park at zero CPU when idle: - # _wake — buffer went empty -> non-empty; the ticker should run - # _flush_now — flush immediately (first delta / size threshold / close), - # bypassing the coalescing window + # _wake lets the ticker park at zero CPU when idle (set on empty -> + # non-empty); _flush_now bypasses the coalescing window (first delta / + # size threshold / close). self._wake = asyncio.Event() self._flush_now = asyncio.Event() self._task: asyncio.Task[None] | None = None @@ -188,25 +187,20 @@ async def add(self, update: StreamTaskMessageDelta) -> None: if not self._first_flushed or self._buf_chars >= self.MAX_BUFFERED_CHARS: self._first_flushed = True self._flush_now.set() - # Wake the (possibly parked) ticker when the buffer goes from empty - # to non-empty; it then applies the coalescing window itself. + # Unpark the ticker; it applies the coalescing window itself. if was_empty: self._wake.set() async def _run(self) -> None: try: while True: - # Park at zero CPU until there is data to flush (or close()). - # This is the key change from a fixed-interval ticker: an idle - # or orphaned buffer blocks here instead of waking every - # FLUSH_INTERVAL_S forever — the latter leaked CPU when a buffer - # outlived its stream without close() running (one spinning task - # per such stream). + # Park at zero CPU until there is data (or close()). A + # fixed-interval ticker instead leaked CPU on buffers orphaned + # without close() — one task spinning every FLUSH_INTERVAL_S. await self._wake.wait() self._wake.clear() - # First delta / size threshold / close flush immediately; - # otherwise coalesce for up to FLUSH_INTERVAL_S so consecutive - # deltas batch into a single publish. + # Coalesce for up to FLUSH_INTERVAL_S unless an immediate flush + # is already pending. if not self._flush_now.is_set() and not self._closed: try: await asyncio.wait_for(self._flush_now.wait(), timeout=self.FLUSH_INTERVAL_S) @@ -215,41 +209,32 @@ async def _run(self) -> None: async with self._lock: self._flush_now.clear() drained = self._drain_locked() - # Deltas that arrive after this drain (e.g. during the _on_flush - # awaits below) re-arm the ticker via add()'s empty->non-empty - # _wake.set(), so the loop re-runs to flush them. + # Deltas arriving during the _on_flush awaits below re-arm the + # ticker via add(), so they get flushed on the next loop. for u in drained: try: await self._on_flush(u) except Exception as e: logger.exception(f"CoalescingBuffer flush failed: {e}") - # Check _closed *after* draining so close() always gets a final - # in-loop flush pass. Exiting here (instead of being cancelled - # mid-flush) guarantees each in-flight item is published exactly - # once — close()'s final drain then only picks up items added - # after the last lock release. + # Check _closed *after* draining so close() gets a final flush + # pass and each item is published exactly once. if self._closed: return except asyncio.CancelledError: pass async def close(self) -> None: - # Signal the ticker to stop and let it exit naturally after its next - # drain. Cancelling mid-flush would risk re-publishing a delta whose - # Redis write already completed but whose await had not yet returned, - # producing the duplicate-tail symptom seen on the UI stream. + # Let the ticker exit after its next drain rather than cancelling + # mid-flush, which could re-publish a delta whose Redis write already + # completed (the duplicate-tail symptom seen on the UI stream). self._closed = True if self._task is not None: - # Wake the parked ticker so it sees _closed and exits after its - # next drain. self._wake.set() self._flush_now.set() try: await self._task except asyncio.CancelledError: - # Our caller is being cancelled. Force-cancel the ticker so it - # can never be orphaned into a parked/looping task, then - # propagate the cancellation. + # Outer cancel: force-cancel the ticker so it isn't orphaned. self._task.cancel() raise self._task = None @@ -450,10 +435,8 @@ async def close(self) -> TaskMessage: if not self.task_message: raise ValueError("Context not properly initialized - no task message") - # Always reap the buffer ticker first, even if the context was already - # marked done by a Full/Done update on another path. close() is the last - # line of defense against an orphaned, forever-polling ticker, so it must - # never be short-circuited before stopping it. + # Reap the buffer ticker before the _is_closed short-circuit, so a + # context already marked done by another path can't orphan it. if self._buffer is not None: await self._buffer.close() self._buffer = None @@ -518,13 +501,10 @@ async def stream_update(self, update: TaskMessageUpdate) -> TaskMessageUpdate | await self._buffer.add(update) return update - # A full message supersedes the streamed deltas and ends the stream. - # Drain and stop the coalescing buffer BEFORE publishing the Full, so any - # leftover buffered deltas land on the stream in order (deltas -> Full) - # rather than after the terminal Full — a consumer treating Full as the - # final message would otherwise see those trailing deltas as a stale - # duplicate tail. Closing here also stops the ticker, so it can't be - # orphaned when __aexit__'s close() later short-circuits on _is_closed. + # Drain and stop the buffer BEFORE publishing the Full, so leftover + # deltas land in order (deltas -> Full); publishing them after the + # terminal Full would look like a stale duplicate tail. This also stops + # the ticker so it isn't orphaned past __aexit__'s close(). if isinstance(update, StreamTaskMessageFull) and self._buffer is not None: await self._buffer.close() self._buffer = None diff --git a/tests/lib/core/services/adk/test_streaming.py b/tests/lib/core/services/adk/test_streaming.py index ce376ba77..72c2eb298 100644 --- a/tests/lib/core/services/adk/test_streaming.py +++ b/tests/lib/core/services/adk/test_streaming.py @@ -307,16 +307,12 @@ async def on_flush(u: StreamTaskMessageDelta) -> None: class TestCoalescingBufferIdleParks: - """Regression: the ticker must park (block on its wake event) when there is - no buffered data, instead of waking every FLUSH_INTERVAL_S. The old - fixed-interval ticker spun at 1/FLUSH_INTERVAL forever, so a buffer that - outlived its stream (orphaned, close() not run) pinned worker CPU — one - spinning task per such stream. - """ + """The ticker must park on its wake event when idle, not poll every + FLUSH_INTERVAL_S — a buffer orphaned without close() otherwise pins CPU.""" @staticmethod def _count_drains(buf: CoalescingBuffer) -> list[int]: - """Instrument _drain_locked to count ticker wake/drain cycles.""" + """Instrument _drain_locked to count drain cycles.""" n = [0] orig = buf._drain_locked @@ -329,14 +325,11 @@ def counting() -> list[StreamTaskMessageDelta]: @pytest.mark.asyncio async def test_idle_buffer_does_not_spin(self) -> None: - """With no data ever added, the ticker must not drain at all over many - FLUSH_INTERVAL_S windows.""" buf = CoalescingBuffer(on_flush=AsyncMock()) drains = self._count_drains(buf) buf.start() try: - # ~8 windows at FLUSH_INTERVAL_S=0.050; a polling ticker would have - # woken ~8 times. A parked ticker drains 0 times. + # ~8 FLUSH_INTERVAL_S windows; a polling ticker would drain ~8x. await asyncio.sleep(0.4) assert drains[0] == 0, f"idle ticker woke {drains[0]}x (must park at 0)" finally: @@ -344,20 +337,17 @@ async def test_idle_buffer_does_not_spin(self) -> None: @pytest.mark.asyncio async def test_orphaned_buffer_parks_after_flush(self, task_message: TaskMessage) -> None: - """A buffer whose close() never runs (orphaned on an abnormal stream - exit) must still park at zero CPU once its data is drained — not spin. - This is the exact condition that previously leaked worker CPU.""" + """An orphaned buffer (close() never runs) must still park once drained.""" buf = CoalescingBuffer(on_flush=AsyncMock()) buf.start() try: - await buf.add(_text(task_message, "hi")) # one immediate flush - await asyncio.sleep(0.020) # let it flush and park - drains = self._count_drains(buf) - # Deliberately do NOT close — simulate an orphaned buffer. + await buf.add(_text(task_message, "hi")) + await asyncio.sleep(0.020) # let the immediate flush land and park + drains = self._count_drains(buf) # count only post-flush cycles await asyncio.sleep(0.4) assert drains[0] == 0, f"orphaned ticker woke {drains[0]}x (must park at 0)" finally: - await buf.close() # cleanup only + await buf.close() class TestCoalescingBufferClose: @@ -580,28 +570,20 @@ async def test_open_without_created_at_passes_omit(self) -> None: class TestFullMessageClosesBuffer: - """Regression: a StreamTaskMessageFull must stop the coalescing-buffer ticker. - - A ``StreamTaskMessageFull`` ends the stream and marks the context done. If it - marks ``_is_closed`` without closing the buffer, ``__aexit__``'s ``close()`` - early-returns on the ``_is_closed`` guard and the ticker is never stopped — - it polls every ``FLUSH_INTERVAL_S`` forever, one orphaned task per stream - (the OneEdge worker CPU leak). These tests pin both halves of the fix: - the Full branch closes the buffer, and ``close()`` reaps it even if the - context was already marked closed by another path. - """ + """A StreamTaskMessageFull must stop the buffer ticker. If it marks the + context done without closing the buffer, close()'s _is_closed short-circuit + leaves the ticker orphaned (the worker CPU leak).""" @pytest.mark.asyncio async def test_full_message_stops_ticker(self) -> None: ctx, _svc, tm = await _make_context("coalesced") - # Stream a delta so the buffer and its background ticker are live. + # A delta makes the buffer and its ticker live. await ctx.stream_update(_text(tm, "hello")) buf = ctx._buffer assert buf is not None task = buf._task assert task is not None and not task.done() - # End-of-turn full message (OneEdge's pattern). await ctx.stream_update( StreamTaskMessageFull( parent_task_message=tm, @@ -615,12 +597,10 @@ async def test_full_message_stops_ticker(self) -> None: @pytest.mark.asyncio async def test_full_is_terminal_publish_no_trailing_deltas(self) -> None: - # Leftover buffered deltas must be drained BEFORE the Full hits the - # stream (deltas -> Full), never after it — a consumer treating Full as - # the final message would see a trailing delta as a stale duplicate tail. + # Buffered deltas must publish BEFORE the Full, never after (a trailing + # delta after the terminal Full reads as a stale duplicate tail). ctx, svc, tm = await _make_context("coalesced") - # First delta flushes immediately; the second stays in the coalescing - # window, so it is still buffered when the Full arrives. + # "alpha" flushes immediately; "beta" stays buffered in the window. await ctx.stream_update(_text(tm, "alpha")) await ctx.stream_update(_text(tm, "beta")) @@ -631,7 +611,6 @@ async def test_full_is_terminal_publish_no_trailing_deltas(self) -> None: ) await ctx.stream_update(full) - # Every publish (delta flushes + the Full) goes through the service mock. published = [c.args[0] for c in svc.stream_update.await_args_list] assert published, "nothing was published" assert published[-1] is full, ( @@ -644,8 +623,7 @@ async def test_full_is_terminal_publish_no_trailing_deltas(self) -> None: @pytest.mark.asyncio async def test_close_reaps_buffer_even_if_already_marked_closed(self) -> None: - # Defense-in-depth: if any path marks the context closed without closing - # the buffer, close() must still stop the ticker rather than short-circuit. + # close() must stop the ticker even when _is_closed is already set. ctx, _svc, tm = await _make_context("coalesced") await ctx.stream_update(_text(tm, "hi")) buf = ctx._buffer From 3c6c2a9ffa08b25d522b510d61613cc4967aaea5 Mon Sep 17 00:00:00 2001 From: Endre Berki Date: Mon, 22 Jun 2026 10:49:48 +0200 Subject: [PATCH 5/8] style(streaming): sort test imports to satisfy ruff I001 Co-Authored-By: Claude Opus 4.8 --- tests/lib/core/services/adk/test_streaming.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/lib/core/services/adk/test_streaming.py b/tests/lib/core/services/adk/test_streaming.py index 72c2eb298..07f9872d8 100644 --- a/tests/lib/core/services/adk/test_streaming.py +++ b/tests/lib/core/services/adk/test_streaming.py @@ -23,8 +23,8 @@ ReasoningSummaryDelta, ) from agentex.types.task_message_update import ( - StreamTaskMessageDelta, StreamTaskMessageFull, + StreamTaskMessageDelta, ) from agentex.lib.core.services.adk.streaming import ( CoalescingBuffer, From a56d1815d0c5021df296ce97bc7b5a5c51008169 Mon Sep 17 00:00:00 2001 From: Endre Berki Date: Mon, 22 Jun 2026 11:15:15 +0200 Subject: [PATCH 6/8] style(streaming): restore pre-existing comments trimmed in error The _closed-after-drain and close() exit comments are unchanged from next and should not have been shortened. Co-Authored-By: Claude Opus 4.8 --- src/agentex/lib/core/services/adk/streaming.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/agentex/lib/core/services/adk/streaming.py b/src/agentex/lib/core/services/adk/streaming.py index 671472832..09ab57506 100644 --- a/src/agentex/lib/core/services/adk/streaming.py +++ b/src/agentex/lib/core/services/adk/streaming.py @@ -216,17 +216,21 @@ async def _run(self) -> None: await self._on_flush(u) except Exception as e: logger.exception(f"CoalescingBuffer flush failed: {e}") - # Check _closed *after* draining so close() gets a final flush - # pass and each item is published exactly once. + # Check _closed *after* draining so close() always gets a final + # in-loop flush pass. Exiting here (instead of being cancelled + # mid-flush) guarantees each in-flight item is published exactly + # once — close()'s final drain then only picks up items added + # after the last lock release. if self._closed: return except asyncio.CancelledError: pass async def close(self) -> None: - # Let the ticker exit after its next drain rather than cancelling - # mid-flush, which could re-publish a delta whose Redis write already - # completed (the duplicate-tail symptom seen on the UI stream). + # Signal the ticker to stop and let it exit naturally after its next + # drain. Cancelling mid-flush would risk re-publishing a delta whose + # Redis write already completed but whose await had not yet returned, + # producing the duplicate-tail symptom seen on the UI stream. self._closed = True if self._task is not None: self._wake.set() From 9a1c8ba8944a35805c08c593807fbf07e1a2c5b5 Mon Sep 17 00:00:00 2001 From: Endre Berki Date: Mon, 22 Jun 2026 11:23:20 +0200 Subject: [PATCH 7/8] fix(streaming): drain coalesced buffer before terminal Done, dedupe Done MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Done path published the terminal update before reaping the buffer, so a delta still in the coalescing window arrived after Done, and close() emitted a second Done — consumers saw a stale trailing delta and a duplicate terminal. Drain/stop the buffer before the terminal for Done as well as Full, and let close() be the sole Done emitter so it publishes exactly once. Co-Authored-By: Claude Opus 4.8 --- .../lib/core/services/adk/streaming.py | 19 +++++++++------- tests/lib/core/services/adk/test_streaming.py | 22 +++++++++++++++++++ 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/src/agentex/lib/core/services/adk/streaming.py b/src/agentex/lib/core/services/adk/streaming.py index 09ab57506..7789b72b3 100644 --- a/src/agentex/lib/core/services/adk/streaming.py +++ b/src/agentex/lib/core/services/adk/streaming.py @@ -505,20 +505,23 @@ async def stream_update(self, update: TaskMessageUpdate) -> TaskMessageUpdate | await self._buffer.add(update) return update - # Drain and stop the buffer BEFORE publishing the Full, so leftover - # deltas land in order (deltas -> Full); publishing them after the - # terminal Full would look like a stale duplicate tail. This also stops - # the ticker so it isn't orphaned past __aexit__'s close(). - if isinstance(update, StreamTaskMessageFull) and self._buffer is not None: + # Terminal Done/Full updates must drain and stop the buffer BEFORE the + # terminal event reaches consumers, so leftover deltas land in order + # (deltas -> terminal) instead of trailing it as a stale duplicate tail. + # This also stops the ticker so it isn't orphaned past __aexit__'s close(). + if isinstance(update, (StreamTaskMessageDone, StreamTaskMessageFull)) and self._buffer is not None: await self._buffer.close() self._buffer = None - result = await self._streaming_service.stream_update(update) - if isinstance(update, StreamTaskMessageDone): + # close() publishes the single terminal Done, persists, and marks the + # context closed — don't publish here too, that would duplicate it. await self.close() return update - elif isinstance(update, StreamTaskMessageFull): + + result = await self._streaming_service.stream_update(update) + + if isinstance(update, StreamTaskMessageFull): await self._agentex_client.messages.update( task_id=self.task_id, message_id=update.parent_task_message.id, # type: ignore[union-attr] diff --git a/tests/lib/core/services/adk/test_streaming.py b/tests/lib/core/services/adk/test_streaming.py index 07f9872d8..5f872fc86 100644 --- a/tests/lib/core/services/adk/test_streaming.py +++ b/tests/lib/core/services/adk/test_streaming.py @@ -23,6 +23,7 @@ ReasoningSummaryDelta, ) from agentex.types.task_message_update import ( + StreamTaskMessageDone, StreamTaskMessageFull, StreamTaskMessageDelta, ) @@ -621,6 +622,27 @@ async def test_full_is_terminal_publish_no_trailing_deltas(self) -> None: "expected the buffered deltas to be published before the Full" ) + @pytest.mark.asyncio + async def test_done_is_single_terminal_publish_no_trailing_deltas(self) -> None: + # Same guarantee as Full: buffered deltas publish BEFORE the terminal + # Done, and Done is published exactly once (not duplicated by close()). + ctx, svc, tm = await _make_context("coalesced") + # "alpha" flushes immediately; "beta" stays buffered in the window. + await ctx.stream_update(_text(tm, "alpha")) + await ctx.stream_update(_text(tm, "beta")) + + await ctx.stream_update(StreamTaskMessageDone(parent_task_message=tm, type="done")) + + published = [c.args[0] for c in svc.stream_update.await_args_list] + dones = [u for u in published if isinstance(u, StreamTaskMessageDone)] + assert len(dones) == 1, f"Done must publish exactly once, saw {len(dones)}" + assert isinstance(published[-1], StreamTaskMessageDone), ( + f"Done must be the terminal publish; saw trailing {type(published[-1]).__name__}" + ) + assert any(isinstance(u, StreamTaskMessageDelta) for u in published[:-1]), ( + "expected the buffered deltas to be published before the Done" + ) + @pytest.mark.asyncio async def test_close_reaps_buffer_even_if_already_marked_closed(self) -> None: # close() must stop the ticker even when _is_closed is already set. From 0aec588d68015b4b802aaf28a715dc8d7c611bf2 Mon Sep 17 00:00:00 2001 From: Endre Berki Date: Mon, 22 Jun 2026 11:43:02 +0200 Subject: [PATCH 8/8] fix(streaming): preserve Done metadata and shield ticker on cancelled close Two issues flagged in review: - The Done dedupe routed the streamed Done through close(), which synthesized a fresh StreamTaskMessageDone and dropped caller fields like `index`. close() now publishes the caller's Done as-is when provided, synthesizing only for implicit (__aexit__) closes. - CoalescingBuffer.close() awaited the ticker bare; a cancelled close() propagates the cancel into the ticker mid-flush, _run swallows it, and an already-drained batch is lost. Shield the await so the ticker finishes its in-flight flush and exits on _closed while the cancel still propagates. Co-Authored-By: Claude Opus 4.8 --- .../lib/core/services/adk/streaming.py | 51 ++++++++++--------- tests/lib/core/services/adk/test_streaming.py | 41 +++++++++++++-- 2 files changed, 64 insertions(+), 28 deletions(-) diff --git a/src/agentex/lib/core/services/adk/streaming.py b/src/agentex/lib/core/services/adk/streaming.py index 7789b72b3..c6ab24503 100644 --- a/src/agentex/lib/core/services/adk/streaming.py +++ b/src/agentex/lib/core/services/adk/streaming.py @@ -235,12 +235,12 @@ async def close(self) -> None: if self._task is not None: self._wake.set() self._flush_now.set() - try: - await self._task - except asyncio.CancelledError: - # Outer cancel: force-cancel the ticker so it isn't orphaned. - self._task.cancel() - raise + # Shield the ticker: if close() is cancelled, awaiting the task bare + # would propagate the cancel into the ticker mid-flush, and _run + # swallows CancelledError — silently dropping an already-drained + # batch. Shielded, the ticker finishes its in-flight flush and exits + # on _closed; the cancel still propagates to our caller. + await asyncio.shield(self._task) self._task = None async with self._lock: drained = self._drain_locked() @@ -434,8 +434,14 @@ async def open(self) -> "StreamingTaskMessageContext": return self - async def close(self) -> TaskMessage: - """Close the streaming context.""" + async def close(self, done_event: StreamTaskMessageDone | None = None) -> TaskMessage: + """Close the streaming context. + + ``done_event`` is the caller-provided terminal update when close is + driven by a streamed ``StreamTaskMessageDone`` — published as-is so its + ``index``/``parent_task_message`` survive. An implicit close (``__aexit__``) + passes nothing and a terminal Done is synthesized. + """ if not self.task_message: raise ValueError("Context not properly initialized - no task message") @@ -448,12 +454,10 @@ async def close(self) -> TaskMessage: if self._is_closed: return self.task_message # Already done (buffer reaped above) - # Send the DONE event - done_event = StreamTaskMessageDone( - parent_task_message=self.task_message, - type="done", + # Send the DONE event (the caller's, if provided, so its metadata survives). + await self._streaming_service.stream_update( + done_event or StreamTaskMessageDone(parent_task_message=self.task_message, type="done") ) - await self._streaming_service.stream_update(done_event) # Update the task message with the final content has_deltas = ( @@ -505,20 +509,19 @@ async def stream_update(self, update: TaskMessageUpdate) -> TaskMessageUpdate | await self._buffer.add(update) return update - # Terminal Done/Full updates must drain and stop the buffer BEFORE the - # terminal event reaches consumers, so leftover deltas land in order - # (deltas -> terminal) instead of trailing it as a stale duplicate tail. - # This also stops the ticker so it isn't orphaned past __aexit__'s close(). - if isinstance(update, (StreamTaskMessageDone, StreamTaskMessageFull)) and self._buffer is not None: - await self._buffer.close() - self._buffer = None - if isinstance(update, StreamTaskMessageDone): - # close() publishes the single terminal Done, persists, and marks the - # context closed — don't publish here too, that would duplicate it. - await self.close() + # close() drains the buffer first, then publishes this exact Done + # once (preserving its index/parent), persists, and marks closed. + await self.close(done_event=update) return update + # Full publishes below, so drain and stop the buffer first → leftover + # deltas land in order (deltas -> Full) instead of trailing the terminal + # Full as a stale duplicate tail. Also stops the ticker. + if isinstance(update, StreamTaskMessageFull) and self._buffer is not None: + await self._buffer.close() + self._buffer = None + result = await self._streaming_service.stream_update(update) if isinstance(update, StreamTaskMessageFull): diff --git a/tests/lib/core/services/adk/test_streaming.py b/tests/lib/core/services/adk/test_streaming.py index 5f872fc86..3145ef8a6 100644 --- a/tests/lib/core/services/adk/test_streaming.py +++ b/tests/lib/core/services/adk/test_streaming.py @@ -400,6 +400,35 @@ async def on_flush(u: StreamTaskMessageDelta) -> None: await buf.add(_text(task_message, "after")) assert flushed == [] + @pytest.mark.asyncio + async def test_cancelled_close_does_not_drop_in_flight_batch(self, task_message: TaskMessage) -> None: + """If close() is cancelled while the ticker is mid-flush, the already- + drained batch must still publish — not be lost to a force-cancel.""" + flushed: list[StreamTaskMessageDelta] = [] + gate = asyncio.Event() + entered = asyncio.Event() + + async def on_flush(u: StreamTaskMessageDelta) -> None: + entered.set() + await gate.wait() # block mid-flush, before the item is recorded + flushed.append(u) + + buf = CoalescingBuffer(on_flush=on_flush) + buf.start() + await buf.add(_text(task_message, "hi")) # first delta → immediate flush + await entered.wait() # ticker is now blocked inside on_flush + + close_task = asyncio.create_task(buf.close()) + await asyncio.sleep(0) # let close() reach `await self._task` + close_task.cancel() + with pytest.raises(asyncio.CancelledError): + await close_task + + gate.set() # release the in-flight flush + assert buf._task is not None + await buf._task # ticker finishes the batch and exits on _closed + assert len(flushed) == 1, "in-flight batch was dropped on cancelled close()" + class TestCoalescingBufferCloseDuringFlush: @pytest.mark.asyncio @@ -625,20 +654,24 @@ async def test_full_is_terminal_publish_no_trailing_deltas(self) -> None: @pytest.mark.asyncio async def test_done_is_single_terminal_publish_no_trailing_deltas(self) -> None: # Same guarantee as Full: buffered deltas publish BEFORE the terminal - # Done, and Done is published exactly once (not duplicated by close()). + # Done, Done is published exactly once (not duplicated by close()), and + # the caller's Done is published as-is so its metadata (index) survives. ctx, svc, tm = await _make_context("coalesced") # "alpha" flushes immediately; "beta" stays buffered in the window. await ctx.stream_update(_text(tm, "alpha")) await ctx.stream_update(_text(tm, "beta")) - await ctx.stream_update(StreamTaskMessageDone(parent_task_message=tm, type="done")) + done = StreamTaskMessageDone(parent_task_message=tm, type="done", index=7) + await ctx.stream_update(done) published = [c.args[0] for c in svc.stream_update.await_args_list] dones = [u for u in published if isinstance(u, StreamTaskMessageDone)] assert len(dones) == 1, f"Done must publish exactly once, saw {len(dones)}" - assert isinstance(published[-1], StreamTaskMessageDone), ( - f"Done must be the terminal publish; saw trailing {type(published[-1]).__name__}" + assert published[-1] is done, ( + f"the caller's Done must be the terminal publish (metadata preserved); " + f"saw trailing {type(published[-1]).__name__}" ) + assert dones[0].index == 7, "caller's Done index must be preserved, not synthesized away" assert any(isinstance(u, StreamTaskMessageDelta) for u in published[:-1]), ( "expected the buffered deltas to be published before the Done" )