diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index 43996574723a..81f31330595d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -897,10 +897,42 @@ public void loadHotModifiedProps(TrimProperties properties) throws IOException { conf.setReadConsistencyLevel(readConsistencyLevel); Optional.ofNullable(properties.getProperty("enable_topology_probing")) .ifPresent(v -> conf.setEnableTopologyProbing(Boolean.parseBoolean(v))); + // Keep the ConfigNode's CommonConfig in sync so that SHOW VARIABLES / cluster-parameter + // consistency checks report the hot-reloaded value; the DataNode side additionally refreshes + // JVMCommonUtils where the ReadOnly disk guard actually consumes it. + commonDescriptor.loadHotModifiedDiskSpaceWarningThreshold(properties); + loadHotModifiedProcedureConfig(properties); loadPipeHotModifiedProp(properties); ConfigurationFileUtils.updateAppliedProperties(properties, true); } + private void loadHotModifiedProcedureConfig(TrimProperties properties) throws IOException { + int procedureCompletedCleanInterval = + Integer.parseInt( + properties.getProperty( + "procedure_completed_clean_interval", + String.valueOf(conf.getProcedureCompletedCleanInterval()))); + if (procedureCompletedCleanInterval <= 0) { + throw new IOException( + "procedure_completed_clean_interval should be greater than 0, but was " + + procedureCompletedCleanInterval + + "."); + } + int procedureCompletedEvictTTL = + Integer.parseInt( + properties.getProperty( + "procedure_completed_evict_ttl", + String.valueOf(conf.getProcedureCompletedEvictTTL()))); + if (procedureCompletedEvictTTL <= 0) { + throw new IOException( + "procedure_completed_evict_ttl should be greater than 0, but was " + + procedureCompletedEvictTTL + + "."); + } + conf.setProcedureCompletedCleanInterval(procedureCompletedCleanInterval); + conf.setProcedureCompletedEvictTTL(procedureCompletedEvictTTL); + } + private void loadPipeHotModifiedProp(TrimProperties properties) throws IOException { PipeDescriptor.loadPipeProps(commonDescriptor.getConfig(), properties, true); PipePeriodicalLogReducer.update(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index b11c83d5784e..dfdafd0f97da 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -1787,6 +1787,8 @@ public TSStatus setConfiguration(TSetConfigurationReq req) { int previousSchemaRegionPerDataNode = CONF.getSchemaRegionPerDataNode(); int previousDataRegionPerDataNode = CONF.getDataRegionPerDataNode(); boolean wasTopologyProbingEnabled = CONF.isEnableTopologyProbing(); + int previousProcedureCompletedCleanInterval = CONF.getProcedureCompletedCleanInterval(); + int previousProcedureCompletedEvictTTL = CONF.getProcedureCompletedEvictTTL(); if (configurationFileFound) { File file = new File(url.getFile()); try { @@ -1817,6 +1819,8 @@ public TSStatus setConfiguration(TSetConfigurationReq req) { handleRegionPerDataNodeHotReload( previousSchemaRegionPerDataNode, previousDataRegionPerDataNode); handleTopologyProbingHotReload(wasTopologyProbingEnabled); + handleProcedureCleanerHotReload( + previousProcedureCompletedCleanInterval, previousProcedureCompletedEvictTTL); if (currentNodeId == req.getNodeId() || req.getNodeId() == NodeManager.APPLY_CONFIG_LOCALLY) { return tsStatus; } @@ -1884,6 +1888,14 @@ private void handleTopologyProbingHotReload(boolean wasEnabled) { } } + private void handleProcedureCleanerHotReload(int previousCleanInterval, int previousEvictTTL) { + if (previousCleanInterval == CONF.getProcedureCompletedCleanInterval() + && previousEvictTTL == CONF.getProcedureCompletedEvictTTL()) { + return; + } + getProcedureManager().updateCompletedProcedureCleaner(); + } + @Override public TSStatus startRepairData() { TSStatus status = confirmLeader(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 28514043188f..33bd1810a5df 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -263,6 +263,20 @@ public void stopExecutor() { } } + /** + * Reload the completed-procedure cleaner with the current {@code + * procedure_completed_clean_interval} and {@code procedure_completed_evict_ttl}. Only takes + * effect on the running (leader) ConfigNode; on a follower the executor is stopped and the fresh + * values are picked up when {@link #startExecutor} runs after the next leader switch. + */ + public void updateCompletedProcedureCleaner() { + if (executor.isRunning()) { + executor.restartCompletedCleaner( + CONFIG_NODE_CONFIG.getProcedureCompletedCleanInterval(), + CONFIG_NODE_CONFIG.getProcedureCompletedEvictTTL()); + } + } + public boolean isProcedureExecutionThread() { return ProcedureExecutor.isProcedureExecutionThread(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java index 179563cc3b3e..3c524a62d505 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.procedure; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.procedure.store.IProcedureStore; import org.slf4j.Logger; @@ -32,7 +33,7 @@ public class CompletedProcedureRecycler extends InternalProcedure { private static final Logger LOG = LoggerFactory.getLogger(CompletedProcedureRecycler.class); private static final int DEFAULT_BATCH_SIZE = 8; - private final long evictTTL; + private final long evictTTLInMs; private final Map> completed; private final IProcedureStore store; @@ -44,7 +45,14 @@ public CompletedProcedureRecycler( super(TimeUnit.SECONDS.toMillis(cleanTimeInterval)); this.completed = completedMap; this.store = store; - this.evictTTL = evictTTL; + // evictTTL is configured in seconds, but isExpired compares it against a + // System.currentTimeMillis() delta, so it must be converted to milliseconds here. + this.evictTTLInMs = TimeUnit.SECONDS.toMillis(evictTTL); + } + + @TestOnly + long getEvictTTLInMs() { + return evictTTLInMs; } @Override @@ -66,7 +74,7 @@ protected void periodicExecute(final Env env) { final Map.Entry> entry = it.next(); final CompletedProcedureContainer retainer = entry.getValue(); final Procedure proc = retainer.getProcedure(); - if (retainer.isExpired(now, evictTTL)) { + if (retainer.isExpired(now, evictTTLInMs)) { // Failed procedures aren't persisted in WAL. batchIds[batchCount++] = entry.getKey(); if (batchCount == batchIds.length) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java index bdaace3d768d..51953703603c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java @@ -76,6 +76,15 @@ public class ProcedureExecutor { private int corePoolSize; private int maxPoolSize; + /** + * The internal cleaner that recycles completed procedures. Kept as a reference so that its clean + * interval / evict TTL can be reloaded at runtime (see {@link #restartCompletedCleaner}). All + * accesses ({@link #startCompletedCleaner} on leader transition, {@link #restartCompletedCleaner} + * on hot reload, and the test getter) are performed while holding this instance's monitor, so the + * field is guarded by {@code synchronized} for both mutual exclusion and cross-thread visibility. + */ + private CompletedProcedureRecycler completedProcedureRecycler; + private final ProcedureScheduler scheduler; private final AtomicLong workId = new AtomicLong(0); @@ -289,9 +298,28 @@ public void startWorkers() { LOG.info(ProcedureMessages.PROCEDURE_WORKERS_ARE_STARTED, workerThreads.size()); } - public void startCompletedCleaner(long cleanTimeInterval, long cleanEvictTTL) { - addInternalProcedure( - new CompletedProcedureRecycler(store, completed, cleanTimeInterval, cleanEvictTTL)); + public synchronized void startCompletedCleaner(long cleanTimeInterval, long cleanEvictTTL) { + completedProcedureRecycler = + new CompletedProcedureRecycler<>(store, completed, cleanTimeInterval, cleanEvictTTL); + addInternalProcedure(completedProcedureRecycler); + } + + /** + * Reload the completed-procedure cleaner with a new clean interval / evict TTL at runtime. The + * clean interval and evict TTL are captured by {@link CompletedProcedureRecycler} at + * construction, so applying the new values requires removing the current recycler and scheduling + * a fresh one. + */ + public synchronized void restartCompletedCleaner(long cleanTimeInterval, long cleanEvictTTL) { + if (completedProcedureRecycler != null) { + removeInternalProcedure(completedProcedureRecycler); + } + startCompletedCleaner(cleanTimeInterval, cleanEvictTTL); + } + + @TestOnly + synchronized CompletedProcedureRecycler getCompletedProcedureRecycler() { + return completedProcedureRecycler; } public void addInternalProcedure(InternalProcedure interalProcedure) { diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecyclerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecyclerTest.java new file mode 100644 index 000000000000..eb951641e611 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecyclerTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure; + +import org.apache.iotdb.confignode.procedure.entity.NoopProcedure; +import org.apache.iotdb.confignode.procedure.env.TestProcEnv; +import org.apache.iotdb.confignode.procedure.store.IProcedureStore; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +public class CompletedProcedureRecyclerTest { + + @SuppressWarnings("unchecked") + private static IProcedureStore runningStore() { + NoopProcedureStore store = new NoopProcedureStore(); + store.setRunning(true); + return store; + } + + private static CompletedProcedureContainer completedAt(long lastUpdate) { + NoopProcedure procedure = new NoopProcedure(); + procedure.setLastUpdate(lastUpdate); + return new CompletedProcedureContainer<>(procedure); + } + + /** The evict TTL is configured in seconds but compared against a millisecond delta. */ + @Test + public void evictTtlIsConvertedFromSecondsToMillis() { + CompletedProcedureRecycler recycler = + new CompletedProcedureRecycler<>(runningStore(), new ConcurrentHashMap<>(), 30, 60); + Assert.assertEquals(TimeUnit.SECONDS.toMillis(60), recycler.getEvictTTLInMs()); + } + + @Test + public void freshCompletedProcedureIsRetainedWhileStaleOneIsEvicted() { + final long now = System.currentTimeMillis(); + final Map> completed = new ConcurrentHashMap<>(); + // Completed 1s ago: with a 100s evict TTL it must survive. Before the seconds->millis fix a + // 100s TTL was treated as 100ms, so this fresh entry was evicted almost immediately. + completed.put(1L, completedAt(now - TimeUnit.SECONDS.toMillis(1))); + // Completed 300s ago: with a 100s evict TTL it must be evicted. + completed.put(2L, completedAt(now - TimeUnit.SECONDS.toMillis(300))); + + CompletedProcedureRecycler recycler = + new CompletedProcedureRecycler<>(runningStore(), completed, 30, 100); + recycler.periodicExecute(null); + + Assert.assertTrue( + "A procedure completed 1s ago must survive a 100s evict TTL", completed.containsKey(1L)); + Assert.assertFalse( + "A procedure completed 300s ago must be evicted by a 100s evict TTL", + completed.containsKey(2L)); + } +} diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java index 75a7168c1f61..ab6ec55f4d78 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java @@ -140,6 +140,20 @@ public void testInternalProcedureCanBeDeduplicatedAndRemoved() throws Interrupte Assert.assertTrue(procExecutor.removeInternalProcedure(internalProcedure)); } + @Test + public void testRestartCompletedCleanerAppliesNewEvictTtl() { + procExecutor.startCompletedCleaner(30, 60); + CompletedProcedureRecycler first = procExecutor.getCompletedProcedureRecycler(); + Assert.assertNotNull(first); + Assert.assertEquals(TimeUnit.SECONDS.toMillis(60), first.getEvictTTLInMs()); + + // Hot reload with a different interval / TTL replaces the recycler with a fresh instance. + procExecutor.restartCompletedCleaner(15, 120); + CompletedProcedureRecycler second = procExecutor.getCompletedProcedureRecycler(); + Assert.assertNotSame(first, second); + Assert.assertEquals(TimeUnit.SECONDS.toMillis(120), second.getEvictTTLInMs()); + } + private int waitThreadCount(final int expectedThreads) { long startTime = System.currentTimeMillis(); while (procExecutor.isRunning() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 147c3340bb38..df84e525ec80 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -31,6 +31,7 @@ import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer; import org.apache.iotdb.commons.schema.SchemaConstant; import org.apache.iotdb.commons.service.metric.MetricService; +import org.apache.iotdb.commons.utils.JVMCommonUtils; import org.apache.iotdb.commons.utils.NodeUrlUtils; import org.apache.iotdb.confignode.rpc.thrift.TCQConfig; import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig; @@ -2252,6 +2253,12 @@ public synchronized void loadHotModifiedProps(TrimProperties properties) BinaryAllocator.getInstance().close(true); } + // update disk_space_warning_threshold; also refresh the static copy in JVMCommonUtils that + // the ReadOnly disk guard reads, otherwise the new threshold would not take effect until + // restart. Parsing / validation is shared with the ConfigNode hot-reload path. + JVMCommonUtils.setDiskSpaceWarningThreshold( + commonDescriptor.loadHotModifiedDiskSpaceWarningThreshold(properties)); + commonDescriptor .getConfig() .setTimestampPrecisionCheckEnabled( diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 23036c78414f..0324c21bd573 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -768,7 +768,7 @@ topology_probing_base_interval_in_ms=5000 topology_probing_timeout_ratio=0.5 # Disk remaining threshold at which DataNode is set to ReadOnly status -# effectiveMode: restart +# effectiveMode: hot_reload # Datatype: double(percentage) disk_space_warning_threshold=0.05 @@ -2185,12 +2185,12 @@ zombie_tsfile_writer_threshold=600000 procedure_core_worker_thread_count=4 # Default time interval of completed procedure cleaner work in, time unit is second -# effectiveMode: restart +# effectiveMode: hot_reload # Datatype: int procedure_completed_clean_interval=30 # Default ttl of completed procedure, time unit is second -# effectiveMode: restart +# effectiveMode: hot_reload # Datatype: int procedure_completed_evict_ttl=60 diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index 9d7c6bdffc26..7f2802e0ed07 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -583,6 +583,29 @@ public void loadRetryProperties(TrimProperties properties) throws IOException { "enable_retry_for_unknown_error")))); } + /** + * Parse, validate and apply {@code disk_space_warning_threshold} for a runtime hot reload, then + * return the applied value. Shared by the ConfigNode and DataNode hot-reload paths so both use + * the same parsing / bounds rules. Callers on the DataNode must additionally refresh the {@code + * JVMCommonUtils} static copy that the ReadOnly disk guard actually consumes. + */ + public double loadHotModifiedDiskSpaceWarningThreshold(final TrimProperties properties) + throws IOException { + double diskSpaceWarningThreshold = + Double.parseDouble( + properties.getProperty( + "disk_space_warning_threshold", + String.valueOf(config.getDiskSpaceWarningThreshold()))); + if (diskSpaceWarningThreshold < 0 || diskSpaceWarningThreshold >= 1) { + throw new IOException( + "disk_space_warning_threshold must be in [0, 1), but was " + + diskSpaceWarningThreshold + + "."); + } + config.setDiskSpaceWarningThreshold(diskSpaceWarningThreshold); + return diskSpaceWarningThreshold; + } + /** * Reload only the subscription consensus properties that are intended to take effect on hot * configuration reload. diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/conf/CommonDescriptorDiskSpaceWarningThresholdTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/conf/CommonDescriptorDiskSpaceWarningThresholdTest.java new file mode 100644 index 000000000000..a28f4b825d33 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/conf/CommonDescriptorDiskSpaceWarningThresholdTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.conf; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class CommonDescriptorDiskSpaceWarningThresholdTest { + + private static final String KEY = "disk_space_warning_threshold"; + + @Test + public void validValueIsAppliedAndReturned() throws IOException { + final CommonDescriptor descriptor = CommonDescriptor.getInstance(); + final double original = descriptor.getConfig().getDiskSpaceWarningThreshold(); + try { + final TrimProperties properties = new TrimProperties(); + properties.setProperty(KEY, "0.1"); + final double applied = descriptor.loadHotModifiedDiskSpaceWarningThreshold(properties); + Assert.assertEquals(0.1, applied, 0.0); + Assert.assertEquals(0.1, descriptor.getConfig().getDiskSpaceWarningThreshold(), 0.0); + } finally { + descriptor.getConfig().setDiskSpaceWarningThreshold(original); + } + } + + @Test + public void absentKeyKeepsCurrentValue() throws IOException { + final CommonDescriptor descriptor = CommonDescriptor.getInstance(); + final double original = descriptor.getConfig().getDiskSpaceWarningThreshold(); + try { + descriptor.getConfig().setDiskSpaceWarningThreshold(0.07); + final double applied = + descriptor.loadHotModifiedDiskSpaceWarningThreshold(new TrimProperties()); + Assert.assertEquals(0.07, applied, 0.0); + } finally { + descriptor.getConfig().setDiskSpaceWarningThreshold(original); + } + } + + @Test + public void outOfRangeValueIsRejectedWithoutMutatingConfig() { + final CommonDescriptor descriptor = CommonDescriptor.getInstance(); + final double original = descriptor.getConfig().getDiskSpaceWarningThreshold(); + try { + for (final String badValue : new String[] {"1.0", "1.5", "-0.1"}) { + final TrimProperties properties = new TrimProperties(); + properties.setProperty(KEY, badValue); + Assert.assertThrows( + IOException.class, + () -> descriptor.loadHotModifiedDiskSpaceWarningThreshold(properties)); + } + // A rejected value must not mutate the config. + Assert.assertEquals(original, descriptor.getConfig().getDiskSpaceWarningThreshold(), 0.0); + } finally { + descriptor.getConfig().setDiskSpaceWarningThreshold(original); + } + } +}