diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/DefaultChannelPool.java b/client/src/main/java/org/asynchttpclient/netty/channel/DefaultChannelPool.java index 9ba9f1b37..f59d427f9 100755 --- a/client/src/main/java/org/asynchttpclient/netty/channel/DefaultChannelPool.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/DefaultChannelPool.java @@ -28,9 +28,8 @@ import java.net.InetSocketAddress; import java.time.Duration; -import java.util.ArrayList; import java.util.Deque; -import java.util.List; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; @@ -359,9 +358,6 @@ public void run(Timeout timeout) { int totalCount = 0; for (ConcurrentLinkedDeque partition : partitions.values()) { - - // store in intermediate unsynchronized lists to minimize - // the impact on the ConcurrentLinkedDeque if (LOGGER.isDebugEnabled()) { totalCount += partition.size(); } @@ -380,28 +376,38 @@ public void run(Timeout timeout) { } /** - * One pass over a partition. A channel is dropped from the deque when it is a removeAll - * tombstone, remotely closed, idle-timeout expired or TTL expired. Tombstoned/concurrently - * leased channels are only unlinked (their owner closes them); expired channels are closed - * here, but only after this cleaner exclusively claims them, so a channel that {@code poll()} - * is leasing concurrently is never closed. Returns the number of channels closed by this tick. + * One pass over a partition. A channel is dropped from the deque when it is a + * {@code removeAll(Channel)} tombstone, remotely closed, idle-timeout expired or TTL expired. + * Tombstoned/concurrently leased channels are only unlinked (their owner closes them); expired + * channels are closed here, but only after this cleaner exclusively claims them, so a channel that + * {@code poll()} is leasing concurrently is never closed. Returns the number of channels closed by + * this tick. + * + *

Drop-worthy channels are unlinked in place through the iterator (O(1) amortized each) as the + * scan reaches them. The earlier approach collected them into a list and called + * {@link java.util.concurrent.ConcurrentLinkedDeque#removeAll(java.util.Collection) removeAll} after + * the scan, which re-walks every node doing an O(m) list {@code contains()} per node — O(n*m), + * degenerating toward O(n^2) when a whole partition is dropped in one tick (a load spike's + * connections idling out together, or a peer dropping many keep-alives at once). Unlinking via the + * iterator keeps the whole pass O(n). */ private int reapPartition(ConcurrentLinkedDeque partition, long now) { - List toRemove = null; int closed = 0; - for (Channel channel : partition) { + Iterator it = partition.iterator(); + while (it.hasNext()) { + Channel channel = it.next(); IdleState idleState = channel.attr(IDLE_STATE_ATTRIBUTE_KEY).get(); if (idleState == null) { continue; } if (idleState.isOwned()) { - // In-deque + owned ==> a removeAll() tombstone, or a node a concurrent poll() has - // already leased and unlinked. Either way: unlink, never close — the owner of the - // claim is responsible for closing it. removeAll() on an already-unlinked node is a - // harmless no-op. - toRemove = lazyAdd(toRemove, channel); + // In-deque + owned ==> a removeAll(Channel) tombstone, or a node a concurrent poll() + // has already leased and unlinked. Either way: unlink, never close — the owner of the + // claim is responsible for closing it. Unlinking an already-unlinked node through the + // iterator is a harmless no-op. + it.remove(); continue; } @@ -415,7 +421,7 @@ private int reapPartition(ConcurrentLinkedDeque partition, long now) { long startSnapshot = idleState.start(); // Claim before closing so we never close a channel poll() is leasing concurrently. if (!idleState.takeOwnership()) { - continue; // poll() (or removeAll) won the claim; that owner now handles the channel + continue; // poll() (or removeAll(Channel)) won the claim; that owner now handles the channel } if (idleState.start() != startSnapshot) { // The channel was leased and re-offered (fresh start) between the expiry check and @@ -428,21 +434,10 @@ private int reapPartition(ConcurrentLinkedDeque partition, long now) { channel, isIdleTimeoutExpired, isRemotelyClosed, isTtlExpired); close(channel); closed++; - toRemove = lazyAdd(toRemove, channel); + it.remove(); } - if (toRemove != null) { - partition.removeAll(toRemove); - } return closed; } - - private List lazyAdd(List list, Channel channel) { - if (list == null) { - list = new ArrayList<>(1); - } - list.add(channel); - return list; - } } } diff --git a/client/src/test/java/org/asynchttpclient/netty/channel/DefaultChannelPoolTest.java b/client/src/test/java/org/asynchttpclient/netty/channel/DefaultChannelPoolTest.java index 8203d621d..68607630c 100644 --- a/client/src/test/java/org/asynchttpclient/netty/channel/DefaultChannelPoolTest.java +++ b/client/src/test/java/org/asynchttpclient/netty/channel/DefaultChannelPoolTest.java @@ -242,6 +242,103 @@ public void channelReofferedAfterExpiryIsNotReaped() throws Exception { pool.destroy(); } + // ---- reap pass unlinks many channels in a single tick (O(n) iterator-remove) ---- + + @Test + public void cleanerReapsManyIdleExpiredChannelsInOneTick() throws Exception { + // Exercises the reap pass unlinking MANY channels in a single tick — the O(n) iterator-remove + // path that replaced the old collect-then-ConcurrentLinkedDeque.removeAll (which was O(n*m)). + // All channels expire together, as they would when a load spike's connections idle out as a wave. + CapturingTimer timer = new CapturingTimer(); + DefaultChannelPool pool = idlePool(timer, Duration.ofMillis(1)); + + final int count = 50; + Channel[] channels = new Channel[count]; + for (int i = 0; i < count; i++) { + channels[i] = new EmbeddedChannel(); + pool.offer(channels[i], KEY); + } + assertEquals(count, partitionSize(pool, KEY)); + Thread.sleep(40); // now - start >= 1ms for every channel + + timer.fire(); + + assertEquals(0, partitionSize(pool, KEY), "every idle-expired channel must be unlinked in one tick"); + for (Channel c : channels) { + assertFalse(c.isActive(), "each idle-expired channel must be closed"); + } + assertNull(pool.poll(KEY)); + + pool.destroy(); + } + + @Test + public void cleanerReapsExpiredButKeepsHealthyInSameTick() throws Exception { + // A single reap pass must drop the expired channels AND keep the fresh ones leasable: the + // iterator has to remove some nodes while continuing past the ones it keeps. + final long maxIdle = 200; + CapturingTimer timer = new CapturingTimer(); + DefaultChannelPool pool = idlePool(timer, Duration.ofMillis(maxIdle)); + + final int expiredCount = 6; + Channel[] expired = new Channel[expiredCount]; + for (int i = 0; i < expiredCount; i++) { + expired[i] = new EmbeddedChannel(); + pool.offer(expired[i], KEY); + } + Thread.sleep(maxIdle + 150); // these are now well past maxIdleTime + + final int healthyCount = 6; + Channel[] healthy = new Channel[healthyCount]; + for (int i = 0; i < healthyCount; i++) { + healthy[i] = new EmbeddedChannel(); + pool.offer(healthy[i], KEY); // fresh start, comfortably inside maxIdleTime + } + assertEquals(expiredCount + healthyCount, partitionSize(pool, KEY)); + + timer.fire(); + + assertEquals(healthyCount, partitionSize(pool, KEY), "only the fresh channels must survive the tick"); + for (Channel c : expired) { + assertFalse(c.isActive(), "expired channels must be closed"); + } + for (Channel c : healthy) { + assertTrue(c.isActive(), "fresh channels must not be touched"); + } + int leased = 0; + while (pool.poll(KEY) != null) { + leased++; + } + assertEquals(healthyCount, leased, "every surviving channel must remain leasable"); + + pool.destroy(); + } + + @Test + public void cleanerUnlinksManyTombstonesInOneTick() throws Exception { + // Many tombstones (from removeAll(Channel)) must all be unlinked in a single pass, none closed. + CapturingTimer timer = new CapturingTimer(); + DefaultChannelPool pool = ttlPool(timer); + + final int count = 40; + Channel[] channels = new Channel[count]; + for (int i = 0; i < count; i++) { + channels[i] = new EmbeddedChannel(); + pool.offer(channels[i], KEY); + assertTrue(pool.removeAll(channels[i])); + } + assertEquals(count, partitionSize(pool, KEY), "tombstones linger until the cleaner ticks"); + + timer.fire(); + + assertEquals(0, partitionSize(pool, KEY), "every tombstone must be unlinked in one tick"); + for (Channel c : channels) { + assertTrue(c.isActive(), "cleaner must not close tombstoned channels"); + } + + pool.destroy(); + } + // ---- concurrency: no leaked tombstones, never leases a claimed channel ---- @Test