Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -897,10 +897,42 @@
conf.setReadConsistencyLevel(readConsistencyLevel);
Optional.ofNullable(properties.getProperty("enable_topology_probing"))
.ifPresent(v -> conf.setEnableTopologyProbing(Boolean.parseBoolean(v)));
// Keep the ConfigNode's CommonConfig in sync so that SHOW VARIABLES / cluster-parameter
// consistency checks report the hot-reloaded value; the DataNode side additionally refreshes
// JVMCommonUtils where the ReadOnly disk guard actually consumes it.
commonDescriptor.loadHotModifiedDiskSpaceWarningThreshold(properties);
loadHotModifiedProcedureConfig(properties);
loadPipeHotModifiedProp(properties);
ConfigurationFileUtils.updateAppliedProperties(properties, true);
}

private void loadHotModifiedProcedureConfig(TrimProperties properties) throws IOException {
int procedureCompletedCleanInterval =
Integer.parseInt(
properties.getProperty(
"procedure_completed_clean_interval",
String.valueOf(conf.getProcedureCompletedCleanInterval())));
if (procedureCompletedCleanInterval <= 0) {
throw new IOException(
"procedure_completed_clean_interval should be greater than 0, but was "
+ procedureCompletedCleanInterval
+ ".");
}
int procedureCompletedEvictTTL =
Integer.parseInt(
properties.getProperty(
"procedure_completed_evict_ttl",
String.valueOf(conf.getProcedureCompletedEvictTTL())));
if (procedureCompletedEvictTTL <= 0) {
throw new IOException(
"procedure_completed_evict_ttl should be greater than 0, but was "
+ procedureCompletedEvictTTL
+ ".");

Check warning on line 930 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Abbreviation in name 'procedureCompletedEvictTTL' must contain no more than '2' consecutive capital letters.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8hVq-P9-TPZ9tM9EjZ&open=AZ8hVq-P9-TPZ9tM9EjZ&pullRequest=18091
}
conf.setProcedureCompletedCleanInterval(procedureCompletedCleanInterval);
conf.setProcedureCompletedEvictTTL(procedureCompletedEvictTTL);
}

private void loadPipeHotModifiedProp(TrimProperties properties) throws IOException {
PipeDescriptor.loadPipeProps(commonDescriptor.getConfig(), properties, true);
PipePeriodicalLogReducer.update();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1787,6 +1787,8 @@
int previousSchemaRegionPerDataNode = CONF.getSchemaRegionPerDataNode();
int previousDataRegionPerDataNode = CONF.getDataRegionPerDataNode();
boolean wasTopologyProbingEnabled = CONF.isEnableTopologyProbing();
int previousProcedureCompletedCleanInterval = CONF.getProcedureCompletedCleanInterval();

Check warning on line 1790 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Distance between variable 'previousProcedureCompletedCleanInterval' declaration and its first usage is 6, but allowed 3. Consider making that variable final if you still need to store its value in advance (before method calls that might have side effects on the original value).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8hVq9w9-TPZ9tM9EjV&open=AZ8hVq9w9-TPZ9tM9EjV&pullRequest=18091
int previousProcedureCompletedEvictTTL = CONF.getProcedureCompletedEvictTTL();

Check warning on line 1791 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Distance between variable 'previousProcedureCompletedEvictTTL' declaration and its first usage is 6, but allowed 3. Consider making that variable final if you still need to store its value in advance (before method calls that might have side effects on the original value).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8hVq9w9-TPZ9tM9EjW&open=AZ8hVq9w9-TPZ9tM9EjW&pullRequest=18091

Check warning on line 1791 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Abbreviation in name 'previousProcedureCompletedEvictTTL' must contain no more than '2' consecutive capital letters.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8hVq9w9-TPZ9tM9EjX&open=AZ8hVq9w9-TPZ9tM9EjX&pullRequest=18091
if (configurationFileFound) {
File file = new File(url.getFile());
try {
Expand Down Expand Up @@ -1817,6 +1819,8 @@
handleRegionPerDataNodeHotReload(
previousSchemaRegionPerDataNode, previousDataRegionPerDataNode);
handleTopologyProbingHotReload(wasTopologyProbingEnabled);
handleProcedureCleanerHotReload(
previousProcedureCompletedCleanInterval, previousProcedureCompletedEvictTTL);
if (currentNodeId == req.getNodeId() || req.getNodeId() == NodeManager.APPLY_CONFIG_LOCALLY) {
return tsStatus;
}
Expand Down Expand Up @@ -1884,6 +1888,14 @@
}
}

private void handleProcedureCleanerHotReload(int previousCleanInterval, int previousEvictTTL) {

Check warning on line 1891 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Abbreviation in name 'previousEvictTTL' must contain no more than '2' consecutive capital letters.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8hVq9w9-TPZ9tM9EjY&open=AZ8hVq9w9-TPZ9tM9EjY&pullRequest=18091
if (previousCleanInterval == CONF.getProcedureCompletedCleanInterval()
&& previousEvictTTL == CONF.getProcedureCompletedEvictTTL()) {
return;
}
getProcedureManager().updateCompletedProcedureCleaner();
}

@Override
public TSStatus startRepairData() {
TSStatus status = confirmLeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,20 @@ public void stopExecutor() {
}
}

/**
* Reload the completed-procedure cleaner with the current {@code
* procedure_completed_clean_interval} and {@code procedure_completed_evict_ttl}. Only takes
* effect on the running (leader) ConfigNode; on a follower the executor is stopped and the fresh
* values are picked up when {@link #startExecutor} runs after the next leader switch.
*/
public void updateCompletedProcedureCleaner() {
if (executor.isRunning()) {
executor.restartCompletedCleaner(
CONFIG_NODE_CONFIG.getProcedureCompletedCleanInterval(),
CONFIG_NODE_CONFIG.getProcedureCompletedEvictTTL());
}
}

public boolean isProcedureExecutionThread() {
return ProcedureExecutor.isProcedureExecutionThread();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.confignode.procedure;

import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.procedure.store.IProcedureStore;

import org.slf4j.Logger;
Expand All @@ -32,7 +33,7 @@
public class CompletedProcedureRecycler<Env> extends InternalProcedure<Env> {
private static final Logger LOG = LoggerFactory.getLogger(CompletedProcedureRecycler.class);
private static final int DEFAULT_BATCH_SIZE = 8;
private final long evictTTL;
private final long evictTTLInMs;

Check warning on line 36 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Abbreviation in name 'evictTTLInMs' must contain no more than '2' consecutive capital letters.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8hy-9wATBB7BRe_URD&open=AZ8hy-9wATBB7BRe_URD&pullRequest=18091
private final Map<Long, CompletedProcedureContainer<Env>> completed;
private final IProcedureStore<Env> store;

Expand All @@ -44,7 +45,14 @@
super(TimeUnit.SECONDS.toMillis(cleanTimeInterval));
this.completed = completedMap;
this.store = store;
this.evictTTL = evictTTL;
// evictTTL is configured in seconds, but isExpired compares it against a
// System.currentTimeMillis() delta, so it must be converted to milliseconds here.
this.evictTTLInMs = TimeUnit.SECONDS.toMillis(evictTTL);
}

@TestOnly
long getEvictTTLInMs() {

Check warning on line 54 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Abbreviation in name 'getEvictTTLInMs' must contain no more than '2' consecutive capital letters.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8hy-9wATBB7BRe_URE&open=AZ8hy-9wATBB7BRe_URE&pullRequest=18091
return evictTTLInMs;
}

@Override
Expand All @@ -66,7 +74,7 @@
final Map.Entry<Long, CompletedProcedureContainer<Env>> entry = it.next();
final CompletedProcedureContainer<Env> retainer = entry.getValue();
final Procedure<?> proc = retainer.getProcedure();
if (retainer.isExpired(now, evictTTL)) {
if (retainer.isExpired(now, evictTTLInMs)) {
// Failed procedures aren't persisted in WAL.
batchIds[batchCount++] = entry.getKey();
if (batchCount == batchIds.length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@
private int corePoolSize;
private int maxPoolSize;

/**
* The internal cleaner that recycles completed procedures. Kept as a reference so that its clean
* interval / evict TTL can be reloaded at runtime (see {@link #restartCompletedCleaner}). All
* accesses ({@link #startCompletedCleaner} on leader transition, {@link #restartCompletedCleaner}
* on hot reload, and the test getter) are performed while holding this instance's monitor, so the
* field is guarded by {@code synchronized} for both mutual exclusion and cross-thread visibility.
*/
private CompletedProcedureRecycler<Env> completedProcedureRecycler;

private final ProcedureScheduler scheduler;

private final AtomicLong workId = new AtomicLong(0);
Expand Down Expand Up @@ -289,9 +298,28 @@
LOG.info(ProcedureMessages.PROCEDURE_WORKERS_ARE_STARTED, workerThreads.size());
}

public void startCompletedCleaner(long cleanTimeInterval, long cleanEvictTTL) {
addInternalProcedure(
new CompletedProcedureRecycler(store, completed, cleanTimeInterval, cleanEvictTTL));
public synchronized void startCompletedCleaner(long cleanTimeInterval, long cleanEvictTTL) {

Check warning on line 301 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Abbreviation in name 'cleanEvictTTL' must contain no more than '2' consecutive capital letters.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8hVq7C9-TPZ9tM9EjT&open=AZ8hVq7C9-TPZ9tM9EjT&pullRequest=18091
completedProcedureRecycler =
new CompletedProcedureRecycler<>(store, completed, cleanTimeInterval, cleanEvictTTL);
addInternalProcedure(completedProcedureRecycler);
}

/**
* Reload the completed-procedure cleaner with a new clean interval / evict TTL at runtime. The
* clean interval and evict TTL are captured by {@link CompletedProcedureRecycler} at
* construction, so applying the new values requires removing the current recycler and scheduling
* a fresh one.
*/
public synchronized void restartCompletedCleaner(long cleanTimeInterval, long cleanEvictTTL) {

Check warning on line 313 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Abbreviation in name 'cleanEvictTTL' must contain no more than '2' consecutive capital letters.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8hVq7C9-TPZ9tM9EjU&open=AZ8hVq7C9-TPZ9tM9EjU&pullRequest=18091
if (completedProcedureRecycler != null) {
removeInternalProcedure(completedProcedureRecycler);
}
startCompletedCleaner(cleanTimeInterval, cleanEvictTTL);
}

@TestOnly
synchronized CompletedProcedureRecycler<Env> getCompletedProcedureRecycler() {
return completedProcedureRecycler;
}

public void addInternalProcedure(InternalProcedure interalProcedure) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.procedure;

import org.apache.iotdb.confignode.procedure.entity.NoopProcedure;
import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
import org.apache.iotdb.confignode.procedure.store.IProcedureStore;

import org.junit.Assert;
import org.junit.Test;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

public class CompletedProcedureRecyclerTest {

@SuppressWarnings("unchecked")
private static IProcedureStore<TestProcEnv> runningStore() {
NoopProcedureStore store = new NoopProcedureStore();
store.setRunning(true);
return store;
}

private static CompletedProcedureContainer<TestProcEnv> completedAt(long lastUpdate) {
NoopProcedure procedure = new NoopProcedure();
procedure.setLastUpdate(lastUpdate);
return new CompletedProcedureContainer<>(procedure);
}

/** The evict TTL is configured in seconds but compared against a millisecond delta. */
@Test
public void evictTtlIsConvertedFromSecondsToMillis() {
CompletedProcedureRecycler<TestProcEnv> recycler =
new CompletedProcedureRecycler<>(runningStore(), new ConcurrentHashMap<>(), 30, 60);
Assert.assertEquals(TimeUnit.SECONDS.toMillis(60), recycler.getEvictTTLInMs());
}

@Test
public void freshCompletedProcedureIsRetainedWhileStaleOneIsEvicted() {
final long now = System.currentTimeMillis();
final Map<Long, CompletedProcedureContainer<TestProcEnv>> completed = new ConcurrentHashMap<>();
// Completed 1s ago: with a 100s evict TTL it must survive. Before the seconds->millis fix a
// 100s TTL was treated as 100ms, so this fresh entry was evicted almost immediately.
completed.put(1L, completedAt(now - TimeUnit.SECONDS.toMillis(1)));
// Completed 300s ago: with a 100s evict TTL it must be evicted.
completed.put(2L, completedAt(now - TimeUnit.SECONDS.toMillis(300)));

CompletedProcedureRecycler<TestProcEnv> recycler =
new CompletedProcedureRecycler<>(runningStore(), completed, 30, 100);
recycler.periodicExecute(null);

Assert.assertTrue(
"A procedure completed 1s ago must survive a 100s evict TTL", completed.containsKey(1L));
Assert.assertFalse(
"A procedure completed 300s ago must be evicted by a 100s evict TTL",
completed.containsKey(2L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,20 @@ public void testInternalProcedureCanBeDeduplicatedAndRemoved() throws Interrupte
Assert.assertTrue(procExecutor.removeInternalProcedure(internalProcedure));
}

@Test
public void testRestartCompletedCleanerAppliesNewEvictTtl() {
procExecutor.startCompletedCleaner(30, 60);
CompletedProcedureRecycler<TestProcEnv> first = procExecutor.getCompletedProcedureRecycler();
Assert.assertNotNull(first);
Assert.assertEquals(TimeUnit.SECONDS.toMillis(60), first.getEvictTTLInMs());

// Hot reload with a different interval / TTL replaces the recycler with a fresh instance.
procExecutor.restartCompletedCleaner(15, 120);
CompletedProcedureRecycler<TestProcEnv> second = procExecutor.getCompletedProcedureRecycler();
Assert.assertNotSame(first, second);
Assert.assertEquals(TimeUnit.SECONDS.toMillis(120), second.getEvictTTLInMs());
}

private int waitThreadCount(final int expectedThreads) {
long startTime = System.currentTimeMillis();
while (procExecutor.isRunning()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.utils.JVMCommonUtils;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.confignode.rpc.thrift.TCQConfig;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
Expand Down Expand Up @@ -2252,6 +2253,12 @@ public synchronized void loadHotModifiedProps(TrimProperties properties)
BinaryAllocator.getInstance().close(true);
}

// update disk_space_warning_threshold; also refresh the static copy in JVMCommonUtils that
// the ReadOnly disk guard reads, otherwise the new threshold would not take effect until
// restart. Parsing / validation is shared with the ConfigNode hot-reload path.
JVMCommonUtils.setDiskSpaceWarningThreshold(
commonDescriptor.loadHotModifiedDiskSpaceWarningThreshold(properties));

commonDescriptor
.getConfig()
.setTimestampPrecisionCheckEnabled(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ topology_probing_base_interval_in_ms=5000
topology_probing_timeout_ratio=0.5

# Disk remaining threshold at which DataNode is set to ReadOnly status
# effectiveMode: restart
# effectiveMode: hot_reload
# Datatype: double(percentage)
disk_space_warning_threshold=0.05

Expand Down Expand Up @@ -2185,12 +2185,12 @@ zombie_tsfile_writer_threshold=600000
procedure_core_worker_thread_count=4

# Default time interval of completed procedure cleaner work in, time unit is second
# effectiveMode: restart
# effectiveMode: hot_reload
# Datatype: int
procedure_completed_clean_interval=30

# Default ttl of completed procedure, time unit is second
# effectiveMode: restart
# effectiveMode: hot_reload
# Datatype: int
procedure_completed_evict_ttl=60

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,29 @@
"enable_retry_for_unknown_error"))));
}

/**
* Parse, validate and apply {@code disk_space_warning_threshold} for a runtime hot reload, then
* return the applied value. Shared by the ConfigNode and DataNode hot-reload paths so both use
* the same parsing / bounds rules. Callers on the DataNode must additionally refresh the {@code
* JVMCommonUtils} static copy that the ReadOnly disk guard actually consumes.
*/
public double loadHotModifiedDiskSpaceWarningThreshold(final TrimProperties properties)
throws IOException {

Check warning on line 593 in iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Expected @throws tag for 'IOException'.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8hVq0H9-TPZ9tM9EjR&open=AZ8hVq0H9-TPZ9tM9EjR&pullRequest=18091
double diskSpaceWarningThreshold =
Double.parseDouble(
properties.getProperty(
"disk_space_warning_threshold",
String.valueOf(config.getDiskSpaceWarningThreshold())));
if (diskSpaceWarningThreshold < 0 || diskSpaceWarningThreshold >= 1) {
throw new IOException(
"disk_space_warning_threshold must be in [0, 1), but was "
+ diskSpaceWarningThreshold
+ ".");
}
config.setDiskSpaceWarningThreshold(diskSpaceWarningThreshold);
return diskSpaceWarningThreshold;
}

/**
* Reload only the subscription consensus properties that are intended to take effect on hot
* configuration reload.
Expand Down
Loading
Loading