Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1232,6 +1232,9 @@ public final class DataNodePipeMessages {
public static final String RECEIVER_ID_UNSUPPORTED_STATEMENT_TYPE_FOR_REDIRECTION =
"Receiver id = {}: Unsupported statement type {} for redirection.";
public static final String RECEIVER_IS_READY = "Receiver-{} is ready";
public static final String RECEIVER_TEMPORARILY_OUT_OF_MEMORY_FORMAT =
"Temporarily out of memory when %s. Requested memory: %d bytes, used memory: %d bytes, "
+ "free memory: %d bytes, total non-floating memory: %d bytes";
public static final String REGISTER_WITH_INTERVAL_IN_SECONDS_SUCCESSFULLY =
"Register {} with interval in seconds {} successfully.";
public static final String SOCKET_CLOSED_WHEN_EXECUTING_READTILLFULL =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,9 @@ public final class DataNodePipeMessages {
public static final String RECEIVER_ID_UNSUPPORTED_STATEMENT_TYPE_FOR_REDIRECTION =
"接收器 id = {}:不支持的 statement type {} for redirection。";
public static final String RECEIVER_IS_READY = "Receiver-{} is ready";
public static final String RECEIVER_TEMPORARILY_OUT_OF_MEMORY_FORMAT =
"执行 %s 时暂时内存不足。请求内存:%d bytes,已用内存:%d bytes,可用内存:%d bytes,"
+ "总非浮动内存:%d bytes";
public static final String REGISTER_WITH_INTERVAL_IN_SECONDS_SUCCESSFULLY =
"Register {} with interval in seconds {} successfully.";
public static final String SOCKET_CLOSED_WHEN_EXECUTING_READTILLFULL =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver {
private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();

private PipeMemoryBlock allocatedMemoryBlock;
private final List<PipeMemoryBlock> allocatedSliceMemoryBlocks = new ArrayList<>();
private final Set<String> autoCreatedTreeDatabases = ConcurrentHashMap.newKeySet();
private final Set<String> conflictedTreeDatabases = ConcurrentHashMap.newKeySet();

Expand Down Expand Up @@ -219,7 +220,7 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) {
if (PipeRequestType.isValidatedRequestType(rawRequestType)) {
final PipeRequestType requestType = PipeRequestType.valueOf(rawRequestType);
if (requestType != PipeRequestType.TRANSFER_SLICE) {
sliceReqHandler.clear();
clearSliceReqHandler();
}
switch (requestType) {
case HANDSHAKE_DATANODE_V1:
Expand Down Expand Up @@ -442,8 +443,18 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) {
}
case TRANSFER_COMPRESSED:
{
long requestedMemorySizeInBytes = 0;
try {
return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
requestedMemorySizeInBytes =
PipeTransferCompressedReq.getMaxAdditionalDecompressedLengthInBytes(req);
try (final PipeMemoryBlock ignored =
tryAllocateReceiverMemory(requestedMemorySizeInBytes)) {
return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
}
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
return new TPipeTransferResp(
getReceiverTemporaryUnavailableStatus(
"decompressing pipe transfer request", requestedMemorySizeInBytes, e));
Comment on lines +455 to +457

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i18n

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied in 9213e48: the receiver OOM status text now uses the DataNode pipe i18n messages with English and Chinese entries.

} finally {
PipeDataNodeReceiverMetrics.getInstance()
.recordTransferCompressedTimer(System.nanoTime() - startTime);
Expand Down Expand Up @@ -823,22 +834,96 @@ private String getConfigReceiverId() {
}

private TPipeTransferResp handleTransferSlice(final PipeTransferSliceReq pipeTransferSliceReq) {
final boolean isInorder = sliceReqHandler.receiveSlice(pipeTransferSliceReq);
if (!isInorder) {
final long sliceBodySizeInBytes = getSliceBodySizeInBytes(pipeTransferSliceReq);
long requestedMemorySizeInBytes = sliceBodySizeInBytes;
String memoryAction = "buffering sliced pipe transfer request";
PipeMemoryBlock sliceMemoryBlock = null;
try {
sliceMemoryBlock = tryAllocateReceiverMemory(sliceBodySizeInBytes);

final boolean isInorder = sliceReqHandler.receiveSlice(pipeTransferSliceReq);
if (!isInorder) {
closeMemoryBlock(sliceMemoryBlock);
clearSliceReqHandler();
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_SLICE_OUT_OF_ORDER,
"Slice request is out of order, please check the request sequence."));
}

allocatedSliceMemoryBlocks.add(sliceMemoryBlock);
sliceMemoryBlock = null;

if (pipeTransferSliceReq.getSliceIndex() + 1 < pipeTransferSliceReq.getSliceCount()) {
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.SUCCESS_STATUS,
"Slice received, waiting for more slices to complete the request."));
}

memoryAction = "assembling sliced pipe transfer request";
requestedMemorySizeInBytes = pipeTransferSliceReq.getOriginBodySize();
try (final PipeMemoryBlock ignored = tryAllocateReceiverMemory(requestedMemorySizeInBytes)) {
final Optional<TPipeTransferReq> req = sliceReqHandler.makeReqIfComplete();
if (!req.isPresent()) {
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.SUCCESS_STATUS,
"Slice received, waiting for more slices to complete the request."));
}
clearSliceReqHandler();
return receive(req.get());
}
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
closeMemoryBlock(sliceMemoryBlock);
clearSliceReqHandler();
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_SLICE_OUT_OF_ORDER,
"Slice request is out of order, please check the request sequence."));
getReceiverTemporaryUnavailableStatus(memoryAction, requestedMemorySizeInBytes, e));
} catch (final RuntimeException e) {
closeMemoryBlock(sliceMemoryBlock);
clearSliceReqHandler();
throw e;
}
final Optional<TPipeTransferReq> req = sliceReqHandler.makeReqIfComplete();
if (!req.isPresent()) {
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.SUCCESS_STATUS,
"Slice received, waiting for more slices to complete the request."));
}

private long getSliceBodySizeInBytes(final PipeTransferSliceReq pipeTransferSliceReq) {
return pipeTransferSliceReq.getSliceBody() == null
? 0
: pipeTransferSliceReq.getSliceBody().length;
}

private void clearSliceReqHandler() {
sliceReqHandler.clear();
allocatedSliceMemoryBlocks.forEach(this::closeMemoryBlock);
allocatedSliceMemoryBlocks.clear();
}

private void closeMemoryBlock(final PipeMemoryBlock memoryBlock) {
if (Objects.nonNull(memoryBlock)) {
memoryBlock.close();
}
// sliceReqHandler will be cleared in the receive(req) method
return receive(req.get());
}

private PipeMemoryBlock tryAllocateReceiverMemory(final long requestedMemorySizeInBytes)
throws PipeRuntimeOutOfMemoryCriticalException {
return PipeDataNodeResourceManager.memory()
.forceAllocate(Math.max(requestedMemorySizeInBytes, 0));
}

@Override
protected TSStatus getReceiverTemporaryUnavailableStatus(
final String action,
final long requestedMemorySizeInBytes,
final PipeRuntimeOutOfMemoryCriticalException e) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
.setMessage(
String.format(
DataNodePipeMessages.RECEIVER_TEMPORARILY_OUT_OF_MEMORY_FORMAT,
action,
requestedMemorySizeInBytes,
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes(),
PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(),
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes()));
}

/**
Expand Down Expand Up @@ -1388,6 +1473,7 @@ private TSStatus executeStatementForTableModelWithPermissionCheck(

@Override
public synchronized void handleExit() {
clearSliceReqHandler();
if (Objects.nonNull(configReceiverId.get())) {
try {
ClusterConfigTaskExecutor.getInstance().handlePipeConfigClientExit(configReceiverId.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,8 @@ public final class PipeMessages {
"Receiver id = %s: Handshake failed, response status = %s.";
public static final String RECEIVER_HANDSHAKE_FAILED_LOGIN =
"Receiver id = %s: Handshake failed because login failed, response status = %s.";
public static final String RECEIVER_TEMPORARILY_OUT_OF_MEMORY_FORMAT =
"Temporarily out of memory when %s. Requested memory: %d bytes. Root cause: %s";
public static final String RECEIVER_USER_LOGIN_SUCCESS =
"Receiver id = {}: User {} login successfully.";
public static final String RECEIVER_EXITED =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,8 @@ public final class PipeMessages {
"接收器 id = %s:握手失败,响应状态 = %s。";
public static final String RECEIVER_HANDSHAKE_FAILED_LOGIN =
"接收器 id = %s:因登录失败导致握手失败,响应状态 = %s。";
public static final String RECEIVER_TEMPORARILY_OUT_OF_MEMORY_FORMAT =
"执行 %s 时暂时内存不足。请求内存:%d bytes。根因:%s";
public static final String RECEIVER_USER_LOGIN_SUCCESS =
"接收器 id = {}:用户 {} 登录成功。";
public static final String RECEIVER_EXITED =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.audit.UserEntity;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import org.apache.iotdb.commons.i18n.PipeMessages;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
Expand Down Expand Up @@ -386,7 +387,7 @@ protected final TPipeTransferResp handleTransferFilePiece(
final PipeTransferFilePieceReq req,
final boolean isRequestThroughAirGap,
final boolean isSingleFile) {
try {
try (final AutoCloseable ignored = tryAllocateMemoryForFilePiece(req)) {
updateWritingFileIfNeeded(req.getFileName(), isSingleFile);

// If the request is through air gap, the sender will resend the file piece from the beginning
Expand Down Expand Up @@ -419,6 +420,18 @@ protected final TPipeTransferResp handleTransferFilePiece(
writingFileWriter.write(req.getFilePiece());
return PipeTransferFilePieceResp.toTPipeTransferResp(
RpcUtils.SUCCESS_STATUS, writingFileWriter.length());
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
final TSStatus status =
getReceiverTemporaryUnavailableStatus(
"receiving pipe file piece", getFilePieceSizeInBytes(req), e);
PipeLogger.log(
LOGGER::warn, e, PipeMessages.RECEIVER_FAILED_WRITE_FILE_PIECE, receiverId.get(), req);
try {
return PipeTransferFilePieceResp.toTPipeTransferResp(
status, PipeTransferFilePieceResp.ERROR_END_OFFSET);
} catch (Exception ex) {
return PipeTransferFilePieceResp.toTPipeTransferResp(status);
}
Comment on lines +432 to +434

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception is not reflected in the status?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied in 9213e48: the returned status now keeps the receiver OOM context and includes the root-cause exception message instead of dropping it.

} catch (final Exception e) {
PipeLogger.log(
LOGGER::warn, e, PipeMessages.RECEIVER_FAILED_WRITE_FILE_PIECE, receiverId.get(), req);
Expand All @@ -435,6 +448,28 @@ protected final TPipeTransferResp handleTransferFilePiece(
}
}

protected AutoCloseable tryAllocateMemoryForFilePiece(final PipeTransferFilePieceReq req)
throws PipeRuntimeOutOfMemoryCriticalException {
return () -> {};
}

protected TSStatus getReceiverTemporaryUnavailableStatus(
final String action,
final long requestedMemorySizeInBytes,
final PipeRuntimeOutOfMemoryCriticalException e) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
.setMessage(
String.format(
PipeMessages.RECEIVER_TEMPORARILY_OUT_OF_MEMORY_FORMAT,
action,
requestedMemorySizeInBytes,
e.getMessage()));
}

private static long getFilePieceSizeInBytes(final PipeTransferFilePieceReq req) {
return req.getFilePiece() == null ? 0 : req.getFilePiece().length;
}

protected final void updateWritingFileIfNeeded(final String fileName, final boolean isSingleFile)
throws IOException {
if (isFileExistedAndNameCorrect(fileName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,14 @@ public class PipeTransferSliceReqHandler {

private int sliceCount = -1;
private final List<byte[]> sliceBodies = new ArrayList<>();
private long receivedBodySize = 0;

public boolean receiveSlice(final PipeTransferSliceReq req) {
if (!isValidSliceReq(req)) {
clear();
return false;
}

if (orderId == -1
|| originReqType == -1
|| originBodySize == -1
Expand All @@ -59,6 +65,7 @@ public boolean receiveSlice(final PipeTransferSliceReq req) {
originReqType = req.getOriginReqType();
originBodySize = req.getOriginBodySize();
sliceCount = req.getSliceCount();
receivedBodySize = 0;
} else {
LOGGER.warn(
PipeMessages.INVALID_STATE_SLICE,
Expand Down Expand Up @@ -105,10 +112,39 @@ public boolean receiveSlice(final PipeTransferSliceReq req) {
return false;
}

if (receivedBodySize + req.getSliceBody().length > originBodySize) {
LOGGER.warn(
"Received slice body size {} exceeds origin body size {}",
receivedBodySize + req.getSliceBody().length,
originBodySize);
clear();
return false;
}

sliceBodies.add(req.getSliceBody());
receivedBodySize += req.getSliceBody().length;

if (sliceBodies.size() == sliceCount && receivedBodySize != originBodySize) {
LOGGER.warn(
"Received slice body size {} is not equal to origin body size {}",
receivedBodySize,
originBodySize);
clear();
return false;
}

return true;
}

private boolean isValidSliceReq(final PipeTransferSliceReq req) {
return req.getOriginBodySize() >= 0
&& req.getSliceCount() > 0
&& req.getSliceIndex() >= 0
&& req.getSliceIndex() < req.getSliceCount()
&& req.getSliceBody() != null
&& req.getSliceBody().length <= req.getOriginBodySize();
}

public Optional<TPipeTransferReq> makeReqIfComplete() {
if (sliceBodies.size() != sliceCount) {
return Optional.empty();
Expand All @@ -132,5 +168,6 @@ public void clear() {
originBodySize = -1;
sliceCount = -1;
sliceBodies.clear();
receivedBodySize = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,27 @@ public static TPipeTransferReq fromTPipeTransferReq(final TPipeTransferReq trans
return decompressedReq;
}

/** Get the largest intermediate decompressed body size without consuming the request body. */
public static int getMaxDecompressedLengthInBytes(final TPipeTransferReq transferReq) {
final ByteBuffer compressedBuffer = transferReq.body.duplicate();

int maxDecompressedLength = compressedBuffer.remaining();
final int compressorsSize = ReadWriteIOUtils.readByte(compressedBuffer);
for (int i = 0; i < compressorsSize; ++i) {
ReadWriteIOUtils.readByte(compressedBuffer);
final int decompressedLength = ReadWriteIOUtils.readInt(compressedBuffer);
checkDecompressedLength(decompressedLength);
maxDecompressedLength = Math.max(maxDecompressedLength, decompressedLength);
}
return maxDecompressedLength;
}

/** Get the largest additional decompressed body size beyond the current transfer frame. */
public static int getMaxAdditionalDecompressedLengthInBytes(final TPipeTransferReq transferReq) {
final int transferFrameBodySize = transferReq.body.duplicate().remaining();
return Math.max(0, getMaxDecompressedLengthInBytes(transferReq) - transferFrameBodySize);
}

/** This method is used to prevent decompression bomb attacks. */
private static void checkDecompressedLength(final int decompressedLength)
throws IllegalArgumentException {
Expand Down
Loading
Loading