Add mutual SSL support for pipe sinks#18080
Open
Caideyipi wants to merge 3 commits into
Open
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
This PR extends Apache IoTDB Pipe Thrift SSL sink/connector configuration to support mutual TLS (mTLS) by adding optional client key-store parameters, propagating them through the sync sink/client stack (DataNode + ConfigNode), and adding UT/IT coverage to verify acceptance/rejection behavior and end-to-end mTLS transfer.
Changes:
- Add
connector.ssl.*aliases (enable/trust-store/key-store) and new*.ssl.key-store-*parameters, and wire them into sync thrift pipe sinks/connectors and legacy pipe sink. - Propagate optional key-store path/password through
IoTDBSslSyncSink -> IoTDBSyncClientManager -> IoTDBSyncClientand node-specific managers/sinks. - Add UTs for parameter validation/masking and an IT (
IoTDBPipeMutualSSLIT) that validates mutual-SSL pipe transfer.
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java | Adds connector/sink SSL alias handling + passes key-store settings into sync client construction. |
| iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java | Stores and forwards key-store settings when creating sync clients. |
| iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java | Adds constructor overload + forwards key-store to SSL transport creation. |
| iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java | Introduces connector.ssl.* constants and new key-store constants. |
| iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java | Adds UTs for accepting/rejecting mTLS-related pipe parameters (sync/async/legacy). |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataNodeSyncSink.java | Propagates key-store settings with DataNode home-dir path resolution. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java | Explicitly rejects any SSL attributes (including new key-store ones) for async thrift sink. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java | Adds connector/sink alias support and passes key-store into both thrift client and SessionPool. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeSyncClientManager.java | Threads key-store parameters through DataNode sync client manager constructor. |
| iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/IoTDBConfigRegionSinkTest.java | Adds UTs for ConfigNode sink accepting/rejecting mTLS parameters. |
| iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java | Propagates key-store settings with ConfigNode home-dir path resolution. |
| iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/client/IoTDBConfigNodeSyncClientManager.java | Threads key-store parameters through ConfigNode sync client manager constructor. |
| iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java | Uses parsed connection params for username (minor robustness improvement). |
| iotdb-api/pipe-api/src/test/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParametersTest.java | Adds UT ensuring ssl.key-store-pwd is masked by ValueHider. |
| iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java | Adds ssl.key-store-pwd to masked keys set. |
| integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeMutualSSLIT.java | New IT covering end-to-end pipe transfer through a receiver requiring Thrift mTLS client auth. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+100
to
+123
| validator | ||
| .validate( | ||
| args -> !((boolean) args[0]) || ((boolean) args[1] && (boolean) args[2]), | ||
| String.format( | ||
| "When ssl transport is enabled, %s and %s must be specified", | ||
| SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY, SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY), | ||
| IOTDB_THRIFT_SSL_CONNECTOR.getPipePluginName().equals(userSpecifiedConnectorName) | ||
| || IOTDB_THRIFT_SSL_SINK.getPipePluginName().equals(userSpecifiedConnectorName) | ||
| || parameters.getBooleanOrDefault( | ||
| Arrays.asList(CONNECTOR_IOTDB_SSL_ENABLE_KEY, SINK_IOTDB_SSL_ENABLE_KEY), | ||
| false), | ||
| parameters.hasAnyAttributes( | ||
| CONNECTOR_IOTDB_SSL_TRUST_STORE_PATH_KEY, SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY), | ||
| parameters.hasAnyAttributes( | ||
| CONNECTOR_IOTDB_SSL_TRUST_STORE_PWD_KEY, SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY)) | ||
| .validate( | ||
| args -> (boolean) args[0] == (boolean) args[1], | ||
| String.format( | ||
| "%s and %s must be specified together", | ||
| SINK_IOTDB_SSL_KEY_STORE_PATH_KEY, SINK_IOTDB_SSL_KEY_STORE_PWD_KEY), | ||
| parameters.hasAnyAttributes( | ||
| CONNECTOR_IOTDB_SSL_KEY_STORE_PATH_KEY, SINK_IOTDB_SSL_KEY_STORE_PATH_KEY), | ||
| parameters.hasAnyAttributes( | ||
| CONNECTOR_IOTDB_SSL_KEY_STORE_PWD_KEY, SINK_IOTDB_SSL_KEY_STORE_PWD_KEY)); |
Comment on lines
+163
to
+168
| parameters.getBooleanOrDefault( | ||
| Arrays.asList(CONNECTOR_IOTDB_SSL_ENABLE_KEY, SINK_IOTDB_SSL_ENABLE_KEY), false), | ||
| parameters.hasAnyAttributes( | ||
| CONNECTOR_IOTDB_SSL_TRUST_STORE_PATH_KEY, SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY), | ||
| parameters.hasAnyAttributes( | ||
| CONNECTOR_IOTDB_SSL_TRUST_STORE_PWD_KEY, SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This PR extends the pipe thrift SSL sink path to support mutual SSL authentication, following the general thrift client mTLS support added in #18026.
Changes
sink.ssl.key-store-pathsink.ssl.key-store-pwdconnector.ssl.key-store-pathconnector.ssl.key-store-pwdiotdb-thrift-ssl-sink/iotdb-thrift-ssl-connectorread both sink and connector SSL aliases for:IoTDBSslSyncSinkIoTDBSyncClientManagerIoTDBSyncClientSessionPool.ssl.key-store-pwdin pipe parameter value masking.Tests
ssl.key-store-pwdvalue hiding.IoTDBPipeMutualSSLITstarts a receiver with thrift SSL client auth enabled, creates aniotdb-thrift-ssl-sinkpipe with trust/key stores, and verifies pipe data transfer through a mutual-SSL receiver.Local Verification
mvn -Ddevelocity.off=true spotless:apply -pl iotdb-api/pipe-api,iotdb-core/datanode,iotdb-core/confignode,integration-test -P with-integration-testsgit diff --checkI also attempted targeted UT execution with
mvn -Ddevelocity.off=true test -pl iotdb-api/pipe-api -Dtest=PipeParametersTest ..., but the local Windows environment failed to start/continue the JVM due to native memory/pagefile exhaustion (There is insufficient memory for the Java Runtime Environment to continue). The same environment issue also affected broader compile/test attempts.