Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -93,6 +92,10 @@ public abstract class AbstractOperatePipeProcedureV2
// putting it here is just for convenience
protected AtomicReference<PipeTaskInfo> pipeTaskInfo;

// Only used to release global locks before retrying the same state. Do not serialize it because a
// recovered procedure is already re-scheduled by the procedure framework.
private transient boolean shouldYieldAfterExecution;

private static final String SKIP_PIPE_PROCEDURE_MESSAGE =
"Try to start a RUNNING pipe or stop a STOPPED pipe, do nothing.";

Expand Down Expand Up @@ -162,15 +165,17 @@ protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
LOGGER.warn("ProcedureId {} release lock. No need to release pipe lock.", getProcId());
} else {
LOGGER.debug("ProcedureId {} release lock. Pipe lock will be released.", getProcId());
if (this instanceof PipeMetaSyncProcedure) {
if (isSuccess() && this instanceof PipeMetaSyncProcedure) {
configNodeProcedureEnv
.getConfigManager()
.getPipeManager()
.getPipeTaskCoordinator()
.updateLastSyncedVersion();
}
PipeProcedureMetrics.getInstance()
.updateTimer(this.getOperation().getName(), this.elapsedTime());
if (isFinished()) {
PipeProcedureMetrics.getInstance()
.updateTimer(this.getOperation().getName(), this.elapsedTime());
}
releasePipeTaskCoordinatorLock(configNodeProcedureEnv);
}
}
Expand All @@ -196,7 +201,7 @@ private void releasePipeTaskCoordinatorLock(ConfigNodeProcedureEnv configNodePro
public abstract void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env);

/**
* Execute at state {@link OperatePipeTaskState#WRITE_CONFIG_NODE_CONSENSUS}.
* Execute at state {@link OperatePipeTaskState#WRITE_CONFIG_NODE_CONSENSUS}.
*
* @throws PipeException if configNode consensus write failed
*/
Expand All @@ -215,6 +220,7 @@ public abstract void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
@Override
protected Flow executeFromState(ConfigNodeProcedureEnv env, OperatePipeTaskState state)
throws InterruptedException {
shouldYieldAfterExecution = false;
if (pipeTaskInfo == null) {
LOGGER.warn(
"ProcedureId {}: Pipe lock is not acquired, executeFromState's execution will be skipped.",
Expand Down Expand Up @@ -262,8 +268,7 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, OperatePipeTaskState
RETRY_THRESHOLD,
e);
setNextState(getCurrentState());
// Wait 3s for next retry
TimeUnit.MILLISECONDS.sleep(3000L);
shouldYieldAfterExecution = true;
} else {
LOGGER.warn(
"ProcedureId {}: All {} retries failed when trying to {} at state [{}], will rollback...",
Expand All @@ -283,6 +288,11 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, OperatePipeTaskState
return Flow.HAS_MORE_STATE;
}

@Override
protected boolean isYieldAfterExecution(final ConfigNodeProcedureEnv env) {
return shouldYieldAfterExecution;
}

@Override
protected boolean isRollbackSupported(OperatePipeTaskState state) {
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.procedure.impl.pipe;

import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.state.pipe.task.OperatePipeTaskState;
import org.apache.iotdb.pipe.api.exception.PipeException;

import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;

public class AbstractOperatePipeProcedureV2Test {

@Test
public void testSuccessfulStateDoesNotYield() throws Exception {
final TestOperatePipeProcedure procedure = new TestOperatePipeProcedure();

Assert.assertEquals(
StateMachineProcedure.Flow.HAS_MORE_STATE,
procedure.executeFromState(null, OperatePipeTaskState.VALIDATE_TASK));

Assert.assertFalse(procedure.isYieldAfterExecution(null));
Assert.assertEquals(1, procedure.validateExecutionCount);
}

@Test
public void testRetryStateYieldsAndResetsAfterNextExecution() throws Exception {
final TestOperatePipeProcedure procedure = new TestOperatePipeProcedure();
procedure.failValidation = true;

Assert.assertEquals(
StateMachineProcedure.Flow.HAS_MORE_STATE,
procedure.executeFromState(null, OperatePipeTaskState.VALIDATE_TASK));

Assert.assertTrue(procedure.isYieldAfterExecution(null));
Assert.assertEquals(1, procedure.validateExecutionCount);

procedure.failValidation = false;
Assert.assertEquals(
StateMachineProcedure.Flow.HAS_MORE_STATE,
procedure.executeFromState(null, OperatePipeTaskState.VALIDATE_TASK));

Assert.assertFalse(procedure.isYieldAfterExecution(null));
Assert.assertEquals(2, procedure.validateExecutionCount);
}

@Test
public void testRetryStateYieldsOnlyBeforeRetryThreshold() throws Exception {
final TestOperatePipeProcedure procedure = new TestOperatePipeProcedure();

final Procedure<?>[] validateSubProcedures = procedure.runOnce();
Assert.assertEquals(1, validateSubProcedures.length);
Assert.assertSame(procedure, validateSubProcedures[0]);
Assert.assertFalse(procedure.isYieldAfterExecution(null));

procedure.failCalculation = true;
final Procedure<?>[] calculateSubProcedures = procedure.runOnce();
Assert.assertEquals(1, calculateSubProcedures.length);
Assert.assertSame(procedure, calculateSubProcedures[0]);
Assert.assertTrue(procedure.isYieldAfterExecution(null));
Assert.assertEquals(1, procedure.calculateExecutionCount);

Assert.assertNull(procedure.runOnce());
Assert.assertTrue(procedure.hasException());
Assert.assertFalse(procedure.isYieldAfterExecution(null));
Assert.assertEquals(2, procedure.calculateExecutionCount);
}

private static class TestOperatePipeProcedure extends AbstractOperatePipeProcedureV2 {

private int validateExecutionCount;
private int calculateExecutionCount;
private boolean failValidation;
private boolean failCalculation;

private TestOperatePipeProcedure() {
pipeTaskInfo = new AtomicReference<>(new PipeTaskInfo());
}

private Procedure<?>[] runOnce() throws InterruptedException {
return execute(null);
}

@Override
protected PipeTaskOperation getOperation() {
return PipeTaskOperation.START_PIPE;
}

@Override
public boolean executeFromValidateTask(
final org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv env)
throws PipeException {
validateExecutionCount++;
if (failValidation) {
throw new PipeException("retry");
}
return true;
}

@Override
public void executeFromCalculateInfoForTask(
final org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv env) {
calculateExecutionCount++;
if (failCalculation) {
throw new RuntimeException("retry");
}
}

@Override
public void executeFromWriteConfigNodeConsensus(
final org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv env) {
// Do nothing
}

@Override
public void executeFromOperateOnDataNodes(
final org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv env) {
// Do nothing
}

@Override
public void rollbackFromValidateTask(
final org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv env) {
// Do nothing
}

@Override
public void rollbackFromCalculateInfoForTask(
final org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv env) {
// Do nothing
}

@Override
public void rollbackFromWriteConfigNodeConsensus(
final org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv env) {
// Do nothing
}

@Override
public void rollbackFromOperateOnDataNodes(
final org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv env)
throws IOException {
// Do nothing
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1155,49 +1155,20 @@ public TPushPipeMetaResp pushSinglePipeMeta(TPushSinglePipeMetaReq req) {

@Override
public TPushPipeMetaResp pushMultiPipeMeta(TPushMultiPipeMetaReq req) {
boolean hasException = false;
// If there is any exception, we use the size of exceptionMessages to record the fail index
List<TPushPipeMetaRespExceptionMessage> exceptionMessages = new ArrayList<>();
try {
if (req.isSetPipeNamesToDrop()) {
for (String pipeNameToDrop : req.getPipeNamesToDrop()) {
TPushPipeMetaRespExceptionMessage message =
PipeDataNodeAgent.task().handleDropPipe(pipeNameToDrop);
exceptionMessages.add(message);
if (message != null) {
// If there is any exception, skip the remaining pipes
hasException = true;
break;
}
}
} else if (req.isSetPipeMetas()) {
for (ByteBuffer byteBuffer : req.getPipeMetas()) {
final PipeMeta pipeMeta = PipeMeta.deserialize4TaskAgent(byteBuffer);
TPushPipeMetaRespExceptionMessage message =
PipeDataNodeAgent.task().handleSinglePipeMetaChanges(pipeMeta);
exceptionMessages.add(message);
if (message != null) {
// If there is any exception, skip the remaining pipes
hasException = true;
break;
return PushMultiPipeMetaHelper.pushMultiPipeMeta(
req,
new PushMultiPipeMetaHelper.Handler() {
@Override
public TPushPipeMetaRespExceptionMessage handleDropPipe(final String pipeName) {
return PipeDataNodeAgent.task().handleDropPipe(pipeName);
}
}
} else {
throw new Exception("Invalid TPushMultiPipeMetaReq");
}

return hasException
? new TPushPipeMetaResp()
.setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
.setExceptionMessages(exceptionMessages)
: new TPushPipeMetaResp()
.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
} catch (Exception e) {
LOGGER.warn("Error occurred when pushing multi pipe meta", e);
return new TPushPipeMetaResp()
.setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
.setExceptionMessages(exceptionMessages);
}
@Override
public TPushPipeMetaRespExceptionMessage handleSinglePipeMeta(final ByteBuffer pipeMeta) {
return PipeDataNodeAgent.task()
.handleSinglePipeMetaChanges(PipeMeta.deserialize4TaskAgent(pipeMeta));
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.protocol.thrift.impl;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.mpp.rpc.thrift.TPushMultiPipeMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
import org.apache.iotdb.rpc.TSStatusCode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

final class PushMultiPipeMetaHelper {

private static final Logger LOGGER = LoggerFactory.getLogger(PushMultiPipeMetaHelper.class);

private PushMultiPipeMetaHelper() {
// Utility class
}

interface Handler {

TPushPipeMetaRespExceptionMessage handleDropPipe(String pipeName) throws Exception;

TPushPipeMetaRespExceptionMessage handleSinglePipeMeta(ByteBuffer pipeMeta) throws Exception;
}

static TPushPipeMetaResp pushMultiPipeMeta(
final TPushMultiPipeMetaReq req, final Handler handler) {
final List<TPushPipeMetaRespExceptionMessage> exceptionMessages = new ArrayList<>();
try {
if (req.isSetPipeNamesToDrop()) {
for (final String pipeNameToDrop : req.getPipeNamesToDrop()) {
final TPushPipeMetaRespExceptionMessage message = handler.handleDropPipe(pipeNameToDrop);
if (message != null) {
exceptionMessages.add(message);
}
}
} else if (req.isSetPipeMetas()) {
for (final ByteBuffer pipeMeta : req.getPipeMetas()) {
final TPushPipeMetaRespExceptionMessage message = handler.handleSinglePipeMeta(pipeMeta);
if (message != null) {
exceptionMessages.add(message);
}
}
} else {
throw new Exception("Invalid TPushMultiPipeMetaReq");
}

return exceptionMessages.isEmpty()
? new TPushPipeMetaResp()
.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))
: new TPushPipeMetaResp()
.setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
.setExceptionMessages(exceptionMessages);
} catch (final Exception e) {
LOGGER.warn("Error occurred when pushing multi pipe meta", e);
return new TPushPipeMetaResp()
.setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
.setExceptionMessages(exceptionMessages);
}
}
}
Loading
Loading