From 261917d6fb49f1fa653df61bf30d5ddfd131ce9a Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 2 Jul 2026 13:23:57 +0800 Subject: [PATCH 1/3] Add pipe receiver memory protection --- .../thrift/IoTDBDataNodeReceiver.java | 123 +++++++++++++++--- .../pipe/receiver/IoTDBFileReceiver.java | 35 ++++- .../common/PipeTransferSliceReqHandler.java | 37 ++++++ .../request/PipeTransferCompressedReq.java | 15 +++ .../PipeTransferSliceReqBuilderTest.java | 14 ++ .../PipeTransferCompressedReqTest.java | 5 + 6 files changed, 213 insertions(+), 16 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 978d924c05651..99fbaa534f817 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -43,6 +43,7 @@ import org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqHandler; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferCompressedReq; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFilePieceReq; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV1; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq; @@ -192,6 +193,7 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance(); private PipeMemoryBlock allocatedMemoryBlock; + private final List allocatedSliceMemoryBlocks = new ArrayList<>(); private final Set autoCreatedTreeDatabases = ConcurrentHashMap.newKeySet(); private final Set conflictedTreeDatabases = ConcurrentHashMap.newKeySet(); @@ -219,7 +221,7 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { if (PipeRequestType.isValidatedRequestType(rawRequestType)) { final PipeRequestType requestType = PipeRequestType.valueOf(rawRequestType); if (requestType != PipeRequestType.TRANSFER_SLICE) { - sliceReqHandler.clear(); + clearSliceReqHandler(); } switch (requestType) { case HANDSHAKE_DATANODE_V1: @@ -442,8 +444,18 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { } case TRANSFER_COMPRESSED: { + long requestedMemorySizeInBytes = 0; try { - return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req)); + requestedMemorySizeInBytes = + PipeTransferCompressedReq.getMaxDecompressedLengthInBytes(req); + try (final PipeMemoryBlock ignored = + tryAllocateReceiverMemory(requestedMemorySizeInBytes)) { + return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req)); + } + } catch (final PipeRuntimeOutOfMemoryCriticalException e) { + return new TPipeTransferResp( + getReceiverTemporaryUnavailableStatus( + "decompressing pipe transfer request", requestedMemorySizeInBytes, e)); } finally { PipeDataNodeReceiverMetrics.getInstance() .recordTransferCompressedTimer(System.nanoTime() - startTime); @@ -823,22 +835,102 @@ private String getConfigReceiverId() { } private TPipeTransferResp handleTransferSlice(final PipeTransferSliceReq pipeTransferSliceReq) { - final boolean isInorder = sliceReqHandler.receiveSlice(pipeTransferSliceReq); - if (!isInorder) { + final long sliceBodySizeInBytes = getSliceBodySizeInBytes(pipeTransferSliceReq); + long requestedMemorySizeInBytes = sliceBodySizeInBytes; + String memoryAction = "buffering sliced pipe transfer request"; + PipeMemoryBlock sliceMemoryBlock = null; + try { + sliceMemoryBlock = tryAllocateReceiverMemory(sliceBodySizeInBytes); + + final boolean isInorder = sliceReqHandler.receiveSlice(pipeTransferSliceReq); + if (!isInorder) { + closeMemoryBlock(sliceMemoryBlock); + clearSliceReqHandler(); + return new TPipeTransferResp( + RpcUtils.getStatus( + TSStatusCode.PIPE_TRANSFER_SLICE_OUT_OF_ORDER, + "Slice request is out of order, please check the request sequence.")); + } + + allocatedSliceMemoryBlocks.add(sliceMemoryBlock); + sliceMemoryBlock = null; + + if (pipeTransferSliceReq.getSliceIndex() + 1 < pipeTransferSliceReq.getSliceCount()) { + return new TPipeTransferResp( + RpcUtils.getStatus( + TSStatusCode.SUCCESS_STATUS, + "Slice received, waiting for more slices to complete the request.")); + } + + memoryAction = "assembling sliced pipe transfer request"; + requestedMemorySizeInBytes = pipeTransferSliceReq.getOriginBodySize(); + try (final PipeMemoryBlock ignored = tryAllocateReceiverMemory(requestedMemorySizeInBytes)) { + final Optional req = sliceReqHandler.makeReqIfComplete(); + if (!req.isPresent()) { + return new TPipeTransferResp( + RpcUtils.getStatus( + TSStatusCode.SUCCESS_STATUS, + "Slice received, waiting for more slices to complete the request.")); + } + clearSliceReqHandler(); + return receive(req.get()); + } + } catch (final PipeRuntimeOutOfMemoryCriticalException e) { + closeMemoryBlock(sliceMemoryBlock); + clearSliceReqHandler(); return new TPipeTransferResp( - RpcUtils.getStatus( - TSStatusCode.PIPE_TRANSFER_SLICE_OUT_OF_ORDER, - "Slice request is out of order, please check the request sequence.")); + getReceiverTemporaryUnavailableStatus(memoryAction, requestedMemorySizeInBytes, e)); + } catch (final RuntimeException e) { + closeMemoryBlock(sliceMemoryBlock); + clearSliceReqHandler(); + throw e; } - final Optional req = sliceReqHandler.makeReqIfComplete(); - if (!req.isPresent()) { - return new TPipeTransferResp( - RpcUtils.getStatus( - TSStatusCode.SUCCESS_STATUS, - "Slice received, waiting for more slices to complete the request.")); + } + + private long getSliceBodySizeInBytes(final PipeTransferSliceReq pipeTransferSliceReq) { + return pipeTransferSliceReq.getSliceBody() == null + ? 0 + : pipeTransferSliceReq.getSliceBody().length; + } + + private void clearSliceReqHandler() { + sliceReqHandler.clear(); + allocatedSliceMemoryBlocks.forEach(this::closeMemoryBlock); + allocatedSliceMemoryBlocks.clear(); + } + + private void closeMemoryBlock(final PipeMemoryBlock memoryBlock) { + if (Objects.nonNull(memoryBlock)) { + memoryBlock.close(); } - // sliceReqHandler will be cleared in the receive(req) method - return receive(req.get()); + } + + private PipeMemoryBlock tryAllocateReceiverMemory(final long requestedMemorySizeInBytes) + throws PipeRuntimeOutOfMemoryCriticalException { + return PipeDataNodeResourceManager.memory() + .forceAllocate(Math.max(requestedMemorySizeInBytes, 0)); + } + + @Override + protected AutoCloseable tryAllocateMemoryForFilePiece(final PipeTransferFilePieceReq req) + throws PipeRuntimeOutOfMemoryCriticalException { + return tryAllocateReceiverMemory(req.getFilePiece() == null ? 0 : req.getFilePiece().length); + } + + @Override + protected TSStatus getReceiverTemporaryUnavailableStatus( + final String action, + final long requestedMemorySizeInBytes, + final PipeRuntimeOutOfMemoryCriticalException e) { + return new TSStatus(TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) + .setMessage( + String.format( + "Temporarily out of memory when %s. Requested memory: %d bytes, used memory: %d bytes, free memory: %d bytes, total non-floating memory: %d bytes", + action, + requestedMemorySizeInBytes, + PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes(), + PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(), + PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes())); } /** @@ -1388,6 +1480,7 @@ private TSStatus executeStatementForTableModelWithPermissionCheck( @Override public synchronized void handleExit() { + clearSliceReqHandler(); if (Objects.nonNull(configReceiverId.get())) { try { ClusterConfigTaskExecutor.getInstance().handlePipeConfigClientExit(configReceiverId.get()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java index 8304161c9d0db..0eabcf75f7c99 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.audit.UserEntity; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.i18n.PipeMessages; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; @@ -386,7 +387,7 @@ protected final TPipeTransferResp handleTransferFilePiece( final PipeTransferFilePieceReq req, final boolean isRequestThroughAirGap, final boolean isSingleFile) { - try { + try (final AutoCloseable ignored = tryAllocateMemoryForFilePiece(req)) { updateWritingFileIfNeeded(req.getFileName(), isSingleFile); // If the request is through air gap, the sender will resend the file piece from the beginning @@ -419,6 +420,18 @@ protected final TPipeTransferResp handleTransferFilePiece( writingFileWriter.write(req.getFilePiece()); return PipeTransferFilePieceResp.toTPipeTransferResp( RpcUtils.SUCCESS_STATUS, writingFileWriter.length()); + } catch (final PipeRuntimeOutOfMemoryCriticalException e) { + final TSStatus status = + getReceiverTemporaryUnavailableStatus( + "receiving pipe file piece", getFilePieceSizeInBytes(req), e); + PipeLogger.log( + LOGGER::warn, e, PipeMessages.RECEIVER_FAILED_WRITE_FILE_PIECE, receiverId.get(), req); + try { + return PipeTransferFilePieceResp.toTPipeTransferResp( + status, PipeTransferFilePieceResp.ERROR_END_OFFSET); + } catch (Exception ex) { + return PipeTransferFilePieceResp.toTPipeTransferResp(status); + } } catch (final Exception e) { PipeLogger.log( LOGGER::warn, e, PipeMessages.RECEIVER_FAILED_WRITE_FILE_PIECE, receiverId.get(), req); @@ -435,6 +448,26 @@ protected final TPipeTransferResp handleTransferFilePiece( } } + protected AutoCloseable tryAllocateMemoryForFilePiece(final PipeTransferFilePieceReq req) + throws PipeRuntimeOutOfMemoryCriticalException { + return () -> {}; + } + + protected TSStatus getReceiverTemporaryUnavailableStatus( + final String action, + final long requestedMemorySizeInBytes, + final PipeRuntimeOutOfMemoryCriticalException e) { + return new TSStatus(TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) + .setMessage( + String.format( + "Temporarily out of memory when %s. Requested memory: %d bytes. Root cause: %s", + action, requestedMemorySizeInBytes, e.getMessage())); + } + + private static long getFilePieceSizeInBytes(final PipeTransferFilePieceReq req) { + return req.getFilePiece() == null ? 0 : req.getFilePiece().length; + } + protected final void updateWritingFileIfNeeded(final String fileName, final boolean isSingleFile) throws IOException { if (isFileExistedAndNameCorrect(fileName)) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqHandler.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqHandler.java index 450cedd526cf0..00b783db20459 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqHandler.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqHandler.java @@ -43,8 +43,14 @@ public class PipeTransferSliceReqHandler { private int sliceCount = -1; private final List sliceBodies = new ArrayList<>(); + private long receivedBodySize = 0; public boolean receiveSlice(final PipeTransferSliceReq req) { + if (!isValidSliceReq(req)) { + clear(); + return false; + } + if (orderId == -1 || originReqType == -1 || originBodySize == -1 @@ -59,6 +65,7 @@ public boolean receiveSlice(final PipeTransferSliceReq req) { originReqType = req.getOriginReqType(); originBodySize = req.getOriginBodySize(); sliceCount = req.getSliceCount(); + receivedBodySize = 0; } else { LOGGER.warn( PipeMessages.INVALID_STATE_SLICE, @@ -105,10 +112,39 @@ public boolean receiveSlice(final PipeTransferSliceReq req) { return false; } + if (receivedBodySize + req.getSliceBody().length > originBodySize) { + LOGGER.warn( + "Received slice body size {} exceeds origin body size {}", + receivedBodySize + req.getSliceBody().length, + originBodySize); + clear(); + return false; + } + sliceBodies.add(req.getSliceBody()); + receivedBodySize += req.getSliceBody().length; + + if (sliceBodies.size() == sliceCount && receivedBodySize != originBodySize) { + LOGGER.warn( + "Received slice body size {} is not equal to origin body size {}", + receivedBodySize, + originBodySize); + clear(); + return false; + } + return true; } + private boolean isValidSliceReq(final PipeTransferSliceReq req) { + return req.getOriginBodySize() >= 0 + && req.getSliceCount() > 0 + && req.getSliceIndex() >= 0 + && req.getSliceIndex() < req.getSliceCount() + && req.getSliceBody() != null + && req.getSliceBody().length <= req.getOriginBodySize(); + } + public Optional makeReqIfComplete() { if (sliceBodies.size() != sliceCount) { return Optional.empty(); @@ -132,5 +168,6 @@ public void clear() { originBodySize = -1; sliceCount = -1; sliceBodies.clear(); + receivedBodySize = 0; } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReq.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReq.java index 73bac4c05b401..ad12244713207 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReq.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReq.java @@ -112,6 +112,21 @@ public static TPipeTransferReq fromTPipeTransferReq(final TPipeTransferReq trans return decompressedReq; } + /** Get the largest intermediate decompressed body size without consuming the request body. */ + public static int getMaxDecompressedLengthInBytes(final TPipeTransferReq transferReq) { + final ByteBuffer compressedBuffer = transferReq.body.duplicate(); + + int maxDecompressedLength = compressedBuffer.remaining(); + final int compressorsSize = ReadWriteIOUtils.readByte(compressedBuffer); + for (int i = 0; i < compressorsSize; ++i) { + ReadWriteIOUtils.readByte(compressedBuffer); + final int decompressedLength = ReadWriteIOUtils.readInt(compressedBuffer); + checkDecompressedLength(decompressedLength); + maxDecompressedLength = Math.max(maxDecompressedLength, decompressedLength); + } + return maxDecompressedLength; + } + /** This method is used to prevent decompression bomb attacks. */ private static void checkDecompressedLength(final int decompressedLength) throws IllegalArgumentException { diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java index cc836cae945f9..7ed84be8eb350 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java @@ -123,6 +123,20 @@ public void testPipeTransferSliceReqFromLegacyV13Body() throws IOException { Assert.assertEquals(2, sliceReq.getSliceCount()); } + @Test + public void testSliceReqHandlerRejectsOversizedSlices() throws IOException { + final TPipeTransferReq req = createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 6); + final PipeTransferSliceReq firstSlice = + PipeTransferSliceReq.toTPipeTransferReq(7, req.getType(), 0, 2, req.body.duplicate(), 0, 4); + final PipeTransferSliceReq oversizedSecondSlice = + PipeTransferSliceReq.toTPipeTransferReq(7, req.getType(), 1, 2, req.body.duplicate(), 2, 6); + + final PipeTransferSliceReqHandler handler = new PipeTransferSliceReqHandler(); + Assert.assertTrue(handler.receiveSlice(firstSlice)); + Assert.assertFalse(handler.receiveSlice(oversizedSecondSlice)); + Assert.assertFalse(handler.makeReqIfComplete().isPresent()); + } + private static TPipeTransferReq createReq(final byte version, final int bodySize) { final byte[] body = new byte[bodySize]; for (int i = 0; i < body.length; ++i) { diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReqTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReqTest.java index ccdbc5086a5c9..a6d888b320000 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReqTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReqTest.java @@ -51,6 +51,11 @@ public void testPipeTransferCompressedReq() throws IOException { Collections.singletonList( PipeCompressorFactory.getCompressor( PipeCompressor.PipeCompressionType.GZIP.getIndex()))); + Assert.assertEquals( + Math.max(compressedReq.body.remaining(), originalReq.body.remaining() + 3), + PipeTransferCompressedReq.getMaxDecompressedLengthInBytes(compressedReq)); + Assert.assertEquals(0, compressedReq.body.position()); + final TPipeTransferReq decompressedReq = PipeTransferCompressedReq.fromTPipeTransferReq(compressedReq); From 78ced86c182bb5af5204b9ce2527438e9aa502ee Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 3 Jul 2026 09:57:54 +0800 Subject: [PATCH 2/3] Add pipe receiver memory protection tests --- .../pipe/receiver/IoTDBFileReceiverTest.java | 88 +++++++++++++++++++ .../PipeTransferSliceReqBuilderTest.java | 38 ++++++++ 2 files changed, 126 insertions(+) diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java index 5a1ed3e653aec..7af0be3028705 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java @@ -22,10 +22,13 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFilePieceReq; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV1; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferHandshakeV1Req; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.response.PipeTransferFilePieceResp; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; @@ -138,10 +141,54 @@ public void testSealFileV1FailureDeletesTransferredFile() throws Exception { } } + @Test + public void testFilePieceMemoryAllocationFailureReturnsTemporaryUnavailable() throws Exception { + final Path baseDir = Files.createTempDirectory("iotdb-file-receiver-test"); + final DummyFileReceiver receiver = new DummyFileReceiver(baseDir.toFile()); + try { + receiver.setFailFilePieceMemoryAllocation(true); + + final TPipeTransferResp response = + receiver.writeFilePiece("normal.tsfile", 0, new byte[] {1, 2, 3}); + final PipeTransferFilePieceResp filePieceResp = + PipeTransferFilePieceResp.fromTPipeTransferResp(response); + + Assert.assertEquals( + TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(), + response.getStatus().getCode()); + Assert.assertEquals( + PipeTransferFilePieceResp.ERROR_END_OFFSET, filePieceResp.getEndWritingOffset()); + Assert.assertFalse(receiver.getWritingFileInBaseDir("normal.tsfile").exists()); + } finally { + receiver.handleExit(); + } + } + + @Test + public void testFilePieceMemoryAllocationIsClosedAfterWrite() throws Exception { + final Path baseDir = Files.createTempDirectory("iotdb-file-receiver-test"); + final DummyFileReceiver receiver = new DummyFileReceiver(baseDir.toFile()); + try { + final TPipeTransferResp response = + receiver.writeFilePiece("normal.tsfile", 0, new byte[] {1, 2, 3}); + final PipeTransferFilePieceResp filePieceResp = + PipeTransferFilePieceResp.fromTPipeTransferResp(response); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), response.getStatus().getCode()); + Assert.assertEquals(3, filePieceResp.getEndWritingOffset()); + Assert.assertEquals(1, receiver.getFilePieceMemoryCloseCount()); + } finally { + receiver.handleExit(); + } + } + private static class DummyFileReceiver extends IoTDBFileReceiver { private final File receiverFileBaseDir; private TSStatus loadFileV1Status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + private boolean failFilePieceMemoryAllocation = false; + private int filePieceMemoryCloseCount = 0; DummyFileReceiver(final File baseDir) { receiverFileBaseDir = baseDir; @@ -166,6 +213,23 @@ void setLoadFileV1Status(final TSStatus status) { loadFileV1Status = status; } + void setFailFilePieceMemoryAllocation(final boolean failFilePieceMemoryAllocation) { + this.failFilePieceMemoryAllocation = failFilePieceMemoryAllocation; + } + + int getFilePieceMemoryCloseCount() { + return filePieceMemoryCloseCount; + } + + TPipeTransferResp writeFilePiece( + final String fileName, final long startWritingOffset, final byte[] filePiece) + throws IOException { + return handleTransferFilePiece( + DummyFilePieceReq.toTPipeTransferReq(fileName, startWritingOffset, filePiece), + false, + true); + } + TPipeTransferResp sealFileV1(final String fileName, final long fileLength) throws IOException { return handleTransferFileSealV1(DummyFileSealReqV1.toTPipeTransferReq(fileName, fileLength)); } @@ -228,6 +292,14 @@ protected TSStatus login() { return new TSStatus(200); } + @Override + protected AutoCloseable tryAllocateMemoryForFilePiece(final PipeTransferFilePieceReq req) { + if (failFilePieceMemoryAllocation) { + throw new PipeRuntimeOutOfMemoryCriticalException("no memory for file piece"); + } + return () -> filePieceMemoryCloseCount++; + } + @Override protected TSStatus loadFileV1( final PipeTransferFileSealReqV1 req, final String fileAbsolutePath) { @@ -252,6 +324,22 @@ public TPipeTransferResp receive(TPipeTransferReq req) { } } + private static class DummyFilePieceReq extends PipeTransferFilePieceReq { + + static DummyFilePieceReq toTPipeTransferReq( + final String fileName, final long startWritingOffset, final byte[] filePiece) + throws IOException { + return (DummyFilePieceReq) + new DummyFilePieceReq() + .convertToTPipeTransferReq(fileName, startWritingOffset, filePiece); + } + + @Override + protected PipeRequestType getPlanType() { + return PipeRequestType.TRANSFER_TS_FILE_PIECE; + } + } + private static class DummyHandshakeReq extends PipeTransferHandshakeV1Req { static DummyHandshakeReq toTPipeTransferReq(final String timestampPrecision) diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java index 7ed84be8eb350..40ec6d2d62c02 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java @@ -37,6 +37,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Optional; public class PipeTransferSliceReqBuilderTest { @@ -137,6 +138,43 @@ public void testSliceReqHandlerRejectsOversizedSlices() throws IOException { Assert.assertFalse(handler.makeReqIfComplete().isPresent()); } + @Test + public void testSliceReqHandlerAssemblesCompleteRequest() throws IOException { + final TPipeTransferReq req = createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 6); + final PipeTransferSliceReq firstSlice = + PipeTransferSliceReq.toTPipeTransferReq(7, req.getType(), 0, 2, req.body.duplicate(), 0, 4); + final PipeTransferSliceReq secondSlice = + PipeTransferSliceReq.toTPipeTransferReq(7, req.getType(), 1, 2, req.body.duplicate(), 4, 6); + + final PipeTransferSliceReqHandler handler = new PipeTransferSliceReqHandler(); + Assert.assertTrue(handler.receiveSlice(firstSlice)); + Assert.assertFalse(handler.makeReqIfComplete().isPresent()); + Assert.assertTrue(handler.receiveSlice(secondSlice)); + + final Optional completedReq = handler.makeReqIfComplete(); + Assert.assertTrue(completedReq.isPresent()); + + final TPipeTransferReq assembledReq = completedReq.get(); + Assert.assertEquals(IoTDBSinkRequestVersion.VERSION_1.getVersion(), assembledReq.getVersion()); + Assert.assertEquals(req.getType(), assembledReq.getType()); + + final byte[] assembledBody = new byte[assembledReq.body.remaining()]; + assembledReq.body.get(assembledBody); + Assert.assertArrayEquals(new byte[] {0, 1, 2, 3, 4, 5}, assembledBody); + } + + @Test + public void testSliceReqHandlerRejectsInvalidMetadata() throws IOException { + final TPipeTransferReq req = createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 2); + final PipeTransferSliceReq invalidSlice = + PipeTransferSliceReq.toTPipeTransferReq(7, req.getType(), 0, 0, req.body.duplicate(), 0, 1); + + final PipeTransferSliceReqHandler handler = new PipeTransferSliceReqHandler(); + + Assert.assertFalse(handler.receiveSlice(invalidSlice)); + Assert.assertFalse(handler.makeReqIfComplete().isPresent()); + } + private static TPipeTransferReq createReq(final byte version, final int bodySize) { final byte[] body = new byte[bodySize]; for (int i = 0; i < body.length; ++i) { From 9213e481eb4de937fae60706fc5a534bdbf94256 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 3 Jul 2026 13:36:34 +0800 Subject: [PATCH 3/3] Address receiver memory review comments --- .../iotdb/db/i18n/DataNodePipeMessages.java | 3 ++ .../iotdb/db/i18n/DataNodePipeMessages.java | 3 ++ .../thrift/IoTDBDataNodeReceiver.java | 11 ++---- .../iotdb/commons/i18n/PipeMessages.java | 2 ++ .../iotdb/commons/i18n/PipeMessages.java | 2 ++ .../pipe/receiver/IoTDBFileReceiver.java | 6 ++-- .../request/PipeTransferCompressedReq.java | 6 ++++ .../pipe/receiver/IoTDBFileReceiverTest.java | 1 + .../PipeTransferCompressedReqTest.java | 34 ++++++++++++++++++- 9 files changed, 56 insertions(+), 12 deletions(-) diff --git a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java index 00efec4d3a806..a43985c26895c 100644 --- a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java +++ b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java @@ -1232,6 +1232,9 @@ public final class DataNodePipeMessages { public static final String RECEIVER_ID_UNSUPPORTED_STATEMENT_TYPE_FOR_REDIRECTION = "Receiver id = {}: Unsupported statement type {} for redirection."; public static final String RECEIVER_IS_READY = "Receiver-{} is ready"; + public static final String RECEIVER_TEMPORARILY_OUT_OF_MEMORY_FORMAT = + "Temporarily out of memory when %s. Requested memory: %d bytes, used memory: %d bytes, " + + "free memory: %d bytes, total non-floating memory: %d bytes"; public static final String REGISTER_WITH_INTERVAL_IN_SECONDS_SUCCESSFULLY = "Register {} with interval in seconds {} successfully."; public static final String SOCKET_CLOSED_WHEN_EXECUTING_READTILLFULL = diff --git a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java index 3c53b1909bbcd..b63c3d4e11cd0 100644 --- a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java +++ b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java @@ -1185,6 +1185,9 @@ public final class DataNodePipeMessages { public static final String RECEIVER_ID_UNSUPPORTED_STATEMENT_TYPE_FOR_REDIRECTION = "接收器 id = {}:不支持的 statement type {} for redirection。"; public static final String RECEIVER_IS_READY = "Receiver-{} is ready"; + public static final String RECEIVER_TEMPORARILY_OUT_OF_MEMORY_FORMAT = + "执行 %s 时暂时内存不足。请求内存:%d bytes,已用内存:%d bytes,可用内存:%d bytes," + + "总非浮动内存:%d bytes"; public static final String REGISTER_WITH_INTERVAL_IN_SECONDS_SUCCESSFULLY = "Register {} with interval in seconds {} successfully."; public static final String SOCKET_CLOSED_WHEN_EXECUTING_READTILLFULL = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 99fbaa534f817..d8ab3e4aa9417 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -43,7 +43,6 @@ import org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqHandler; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferCompressedReq; -import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFilePieceReq; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV1; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq; @@ -447,7 +446,7 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { long requestedMemorySizeInBytes = 0; try { requestedMemorySizeInBytes = - PipeTransferCompressedReq.getMaxDecompressedLengthInBytes(req); + PipeTransferCompressedReq.getMaxAdditionalDecompressedLengthInBytes(req); try (final PipeMemoryBlock ignored = tryAllocateReceiverMemory(requestedMemorySizeInBytes)) { return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req)); @@ -911,12 +910,6 @@ private PipeMemoryBlock tryAllocateReceiverMemory(final long requestedMemorySize .forceAllocate(Math.max(requestedMemorySizeInBytes, 0)); } - @Override - protected AutoCloseable tryAllocateMemoryForFilePiece(final PipeTransferFilePieceReq req) - throws PipeRuntimeOutOfMemoryCriticalException { - return tryAllocateReceiverMemory(req.getFilePiece() == null ? 0 : req.getFilePiece().length); - } - @Override protected TSStatus getReceiverTemporaryUnavailableStatus( final String action, @@ -925,7 +918,7 @@ protected TSStatus getReceiverTemporaryUnavailableStatus( return new TSStatus(TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) .setMessage( String.format( - "Temporarily out of memory when %s. Requested memory: %d bytes, used memory: %d bytes, free memory: %d bytes, total non-floating memory: %d bytes", + DataNodePipeMessages.RECEIVER_TEMPORARILY_OUT_OF_MEMORY_FORMAT, action, requestedMemorySizeInBytes, PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes(), diff --git a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/PipeMessages.java b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/PipeMessages.java index f0086f2689855..888be7e1a5257 100644 --- a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/PipeMessages.java +++ b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/PipeMessages.java @@ -588,6 +588,8 @@ public final class PipeMessages { "Receiver id = %s: Handshake failed, response status = %s."; public static final String RECEIVER_HANDSHAKE_FAILED_LOGIN = "Receiver id = %s: Handshake failed because login failed, response status = %s."; + public static final String RECEIVER_TEMPORARILY_OUT_OF_MEMORY_FORMAT = + "Temporarily out of memory when %s. Requested memory: %d bytes. Root cause: %s"; public static final String RECEIVER_USER_LOGIN_SUCCESS = "Receiver id = {}: User {} login successfully."; public static final String RECEIVER_EXITED = diff --git a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/PipeMessages.java b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/PipeMessages.java index d081a8fd4cf89..8ff8c6490b4c7 100644 --- a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/PipeMessages.java +++ b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/PipeMessages.java @@ -571,6 +571,8 @@ public final class PipeMessages { "接收器 id = %s:握手失败,响应状态 = %s。"; public static final String RECEIVER_HANDSHAKE_FAILED_LOGIN = "接收器 id = %s:因登录失败导致握手失败,响应状态 = %s。"; + public static final String RECEIVER_TEMPORARILY_OUT_OF_MEMORY_FORMAT = + "执行 %s 时暂时内存不足。请求内存:%d bytes。根因:%s"; public static final String RECEIVER_USER_LOGIN_SUCCESS = "接收器 id = {}:用户 {} 登录成功。"; public static final String RECEIVER_EXITED = diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java index 0eabcf75f7c99..f05e5069eed42 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java @@ -460,8 +460,10 @@ protected TSStatus getReceiverTemporaryUnavailableStatus( return new TSStatus(TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) .setMessage( String.format( - "Temporarily out of memory when %s. Requested memory: %d bytes. Root cause: %s", - action, requestedMemorySizeInBytes, e.getMessage())); + PipeMessages.RECEIVER_TEMPORARILY_OUT_OF_MEMORY_FORMAT, + action, + requestedMemorySizeInBytes, + e.getMessage())); } private static long getFilePieceSizeInBytes(final PipeTransferFilePieceReq req) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReq.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReq.java index ad12244713207..7aa639140beca 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReq.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReq.java @@ -127,6 +127,12 @@ public static int getMaxDecompressedLengthInBytes(final TPipeTransferReq transfe return maxDecompressedLength; } + /** Get the largest additional decompressed body size beyond the current transfer frame. */ + public static int getMaxAdditionalDecompressedLengthInBytes(final TPipeTransferReq transferReq) { + final int transferFrameBodySize = transferReq.body.duplicate().remaining(); + return Math.max(0, getMaxDecompressedLengthInBytes(transferReq) - transferFrameBodySize); + } + /** This method is used to prevent decompression bomb attacks. */ private static void checkDecompressedLength(final int decompressedLength) throws IllegalArgumentException { diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java index 7af0be3028705..f774d725bae72 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java @@ -156,6 +156,7 @@ public void testFilePieceMemoryAllocationFailureReturnsTemporaryUnavailable() th Assert.assertEquals( TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(), response.getStatus().getCode()); + Assert.assertTrue(response.getStatus().getMessage().contains("no memory for file piece")); Assert.assertEquals( PipeTransferFilePieceResp.ERROR_END_OFFSET, filePieceResp.getEndWritingOffset()); Assert.assertFalse(receiver.getWritingFileInBaseDir("normal.tsfile").exists()); diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReqTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReqTest.java index a6d888b320000..a00530b8c6800 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReqTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReqTest.java @@ -51,9 +51,15 @@ public void testPipeTransferCompressedReq() throws IOException { Collections.singletonList( PipeCompressorFactory.getCompressor( PipeCompressor.PipeCompressionType.GZIP.getIndex()))); + final int compressedBodySize = compressedReq.body.remaining(); + final int maxDecompressedLength = + Math.max(compressedBodySize, originalReq.body.remaining() + 3); Assert.assertEquals( - Math.max(compressedReq.body.remaining(), originalReq.body.remaining() + 3), + maxDecompressedLength, PipeTransferCompressedReq.getMaxDecompressedLengthInBytes(compressedReq)); + Assert.assertEquals( + maxDecompressedLength - compressedBodySize, + PipeTransferCompressedReq.getMaxAdditionalDecompressedLengthInBytes(compressedReq)); Assert.assertEquals(0, compressedReq.body.position()); final TPipeTransferReq decompressedReq = @@ -66,6 +72,32 @@ public void testPipeTransferCompressedReq() throws IOException { Assert.assertArrayEquals(originalReq.getBody(), decompressedReq.getBody()); } + @Test + public void testAdditionalDecompressedLengthExcludesTransferFrameBody() throws IOException { + final TPipeTransferReq originalReq = new TPipeTransferReq(); + originalReq.version = IoTDBSinkRequestVersion.VERSION_1.getVersion(); + originalReq.type = PipeRequestType.TRANSFER_TABLET_BINARY.getType(); + final byte[] highlyCompressibleBody = new byte[16 * 1024]; + Arrays.fill(highlyCompressibleBody, (byte) 1); + originalReq.body = ByteBuffer.wrap(highlyCompressibleBody); + + final TPipeTransferReq compressedReq = + PipeTransferCompressedReq.toTPipeTransferReq( + originalReq, + Collections.singletonList( + PipeCompressorFactory.getCompressor( + PipeCompressor.PipeCompressionType.GZIP.getIndex()))); + final int compressedBodySize = compressedReq.body.remaining(); + final int maxDecompressedLength = + PipeTransferCompressedReq.getMaxDecompressedLengthInBytes(compressedReq); + + Assert.assertTrue(maxDecompressedLength > compressedBodySize); + Assert.assertEquals( + maxDecompressedLength - compressedBodySize, + PipeTransferCompressedReq.getMaxAdditionalDecompressedLengthInBytes(compressedReq)); + Assert.assertEquals(0, compressedReq.body.position()); + } + @Test public void testPipeTransferCompressedReqFromLegacyV13Body() throws IOException { final TPipeTransferReq originalReq = new TPipeTransferReq();