Skip to content

HIVE-29679: Update Tez AM K8s Operator Auto-Scaling to scale down idle AMs#6561

Open
tanishq-chugh wants to merge 8 commits into
apache:masterfrom
tanishq-chugh:tezam-k8s-op-sd
Open

HIVE-29679: Update Tez AM K8s Operator Auto-Scaling to scale down idle AMs#6561
tanishq-chugh wants to merge 8 commits into
apache:masterfrom
tanishq-chugh:tezam-k8s-op-sd

Conversation

@tanishq-chugh

@tanishq-chugh tanishq-chugh commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Update the Tez AM auto-scaling logic to scale-down AMs which are idle

Why are the changes needed?

To prevent AM scale-down removing AMs based on ordinals (in decreasing order) that can cause AMs with running DAGs to be terminated

Does this PR introduce any user-facing change?

No

How was this patch tested?

Manual Testing

Helm command used:

helm install hive ./helm/hive-operator \
        --set cluster.database.type=postgres \
        --set cluster.database.url="jdbc:postgresql://postgres-postgresql:5432/metastore" \
        --set cluster.database.driver="org.postgresql.Driver" \
        --set cluster.database.username=hive \
        --set cluster.database.passwordSecretRef.name=hive-db-secret \
        --set cluster.database.passwordSecretRef.key=password \
        --set cluster.database.driverJarUrl="https://repo1.maven.org/maven2/org/postgresql/postgresql/42.7.5/postgresql-42.7.5.jar" \
        --set cluster.zookeeper.quorum="zookeeper:2181" \
        --set cluster.storage.coreSiteOverrides."fs\.defaultFS"="s3a://hive" \
        --set cluster.storage.coreSiteOverrides."fs\.s3a\.endpoint"="http://ozone-s3g-rest:9878/" \
        --set-string cluster.storage.coreSiteOverrides."fs\.s3a\.path\.style\.access"=true \
        --set 'cluster.storage.envVars[0].name=HADOOP_OPTIONAL_TOOLS' \
        --set 'cluster.storage.envVars[0].value=hadoop-aws' \
        --set 'cluster.storage.envVars[1].name=AWS_ACCESS_KEY_ID' \
        --set 'cluster.storage.envVars[1].value=ozone' \
        --set 'cluster.storage.envVars[2].name=AWS_SECRET_ACCESS_KEY' \
        --set 'cluster.storage.envVars[2].value=ozone' \
        --set cluster.hiveServer2.autoscaling.enabled=false \
        --set cluster.hiveServer2.replicas=2 \
        --set cluster.metastore.autoscaling.enabled=false \
        --set cluster.metastore.replicas=2 \
        --set cluster.autoSuspend.enabled=false \
        --set-string 'cluster.llapClusterRouting=user:alice=llap0\,user:bob=llap1\,default=llap2' \
        --set 'cluster.llapClusters[0].name=llap0' \
        --set 'cluster.llapClusters[0].replicas=2' \
        --set 'cluster.llapClusters[0].autoscaling.enabled=true' \
        --set 'cluster.llapClusters[0].autoscaling.minReplicas=0' \
        --set 'cluster.llapClusters[0].autoscaling.scaleUpThreshold=1' \
        --set-string 'cluster.llapClusters[0].configOverrides.hive\.llap\.daemon\.task\.scheduler\.wait\.queue\.size=1' \
        --set 'cluster.llapClusters[0].tezAm.replicas=2' \
        --set 'cluster.llapClusters[0].tezAm.autoscaling.enabled=true' \
        --set 'cluster.llapClusters[0].tezAm.autoscaling.minReplicas=0' \
        --set 'cluster.llapClusters[0].tezAm.autoscaling.scaleDownStabilizationSeconds=60' \
        --set 'cluster.llapClusters[0].tezAm.autoscaling.metricsScrapeIntervalSeconds=10' \
        --set 'cluster.llapClusters[1].name=llap1' \
        --set 'cluster.llapClusters[1].replicas=2' \
        --set 'cluster.llapClusters[1].autoscaling.enabled=true' \
        --set 'cluster.llapClusters[1].autoscaling.minReplicas=0' \
        --set 'cluster.llapClusters[1].autoscaling.scaleUpThreshold=1' \
        --set-string 'cluster.llapClusters[1].configOverrides.hive\.llap\.daemon\.task\.scheduler\.wait\.queue\.size=1' \
        --set 'cluster.llapClusters[1].tezAm.replicas=2' \
        --set 'cluster.llapClusters[1].tezAm.autoscaling.enabled=true' \
        --set 'cluster.llapClusters[1].tezAm.autoscaling.minReplicas=0' \
        --set 'cluster.llapClusters[1].tezAm.autoscaling.scaleDownStabilizationSeconds=60' \
        --set 'cluster.llapClusters[1].tezAm.autoscaling.metricsScrapeIntervalSeconds=10' \
        --set 'cluster.llapClusters[2].name=llap2' \
        --set 'cluster.llapClusters[2].replicas=2' \
        --set 'cluster.llapClusters[2].autoscaling.enabled=true' \
        --set 'cluster.llapClusters[2].autoscaling.minReplicas=0' \
        --set 'cluster.llapClusters[2].autoscaling.scaleUpThreshold=1' \
        --set-string 'cluster.llapClusters[2].configOverrides.hive\.llap\.daemon\.task\.scheduler\.wait\.queue\.size=1' \
        --set 'cluster.llapClusters[2].tezAm.replicas=2' \
        --set 'cluster.llapClusters[2].tezAm.autoscaling.enabled=true' \
        --set 'cluster.llapClusters[2].tezAm.autoscaling.minReplicas=0' \
        --set 'cluster.llapClusters[2].tezAm.autoscaling.scaleDownStabilizationSeconds=60' \
        --set 'cluster.llapClusters[2].tezAm.autoscaling.metricsScrapeIntervalSeconds=10'

Initial State:
image

After starting 2 beeline sessions each in llap-0 and llap-2 tenant space:
image

Pod to AppID Mappings:
image

Starting two long running queries, one each in both tenant spaces:
image
image

Scale-down(after cooling periods) post closing 2 idle beeline sessions in each tenant space:
image

@tanishq-chugh

Copy link
Copy Markdown
Contributor Author

Hi @ayushtkn
Could you please help with a review on this PR?
Thanks!

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the Hive Kubernetes Operator’s Tez AM (Application Master) auto-scaling behavior to preferentially scale down idle AMs (instead of terminating AMs by ordinal), reducing the risk of killing AMs that are actively running DAGs. To enable this, Tez AM is migrated from a StatefulSet to a Deployment, with additional operator-managed DNS/EndpointSlice handling and ZooKeeper deregistration to stop HS2 routing to AMs being removed.

Changes:

  • Migrate per-LLAP TezAM from StatefulSet → Deployment and use pod-deletion-cost plus deferred scale-down to remove idle AMs first.
  • Add Tez AM “busy/idle” signal via new LLAP Tez metrics (SchedulerDagRunningtez_am_dag_running), plus operator preStop draining and ZK deregistration.
  • Add serviceAccountName to the HiveCluster spec and wire it into component pods/jobs; extend Helm/CRD/RBAC accordingly.

Reviewed changes

Copilot reviewed 18 out of 18 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java Switch TezAM status/scaling expectations to Deployment; add TezAM EndpointSlice reconciliation and adjust GC logic.
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/model/HiveClusterSpec.java Add serviceAccountName field to CR spec.
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/SchemaInitJobDependent.java Set pod service account on schema-init Job.
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/MetastoreDeploymentDependent.java Set pod service account on Metastore Deployment.
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveServer2DeploymentDependent.java Set pod service account on HS2 Deployment.
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java Build TezAM Deployment + custom EndpointSlice; add TezAM metrics config + autoscaling lifecycle drain.
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveDependentResource.java Update JMX exporter config to scrape new TezAM busy metric.
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java Apply deletion-cost to TezAM pods and defer scale-down while it propagates; integrate ZK deregistration.
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmZkDeregistrar.java New: remove idle AM ZK registration nodes before scale-down.
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmBusyMetrics.java New: interpret TezAM busy/idle metric for deletion cost and scale-down safety logic.
packaging/src/kubernetes/pom.xml Add Curator framework dependency for ZK operations.
packaging/src/kubernetes/helm/hive-operator/values.yaml Add serviceAccountName; adjust TezAM defaults (replicas removed in diff).
packaging/src/kubernetes/helm/hive-operator/templates/hivecluster.yaml Render serviceAccountName; remove TezAM replicas from rendered CR spec.
packaging/src/kubernetes/helm/hive-operator/templates/clusterrole.yaml Add RBAC for EndpointSlice management.
packaging/src/kubernetes/helm/hive-operator/crds/hiveclusters.hive.apache.org-v1.yml Add CRD schema for serviceAccountName.
llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java Add dagRunning gauge + setter and export it.
llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java Add SchedulerDagRunning metric definition.
llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java Toggle SchedulerDagRunning on DAG start/complete to reflect busy/idle.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 639 to 643
int tezAmReplicas = resolveTezAmReplicaCount(resource, ns, clusterName, llapSpec);
String tezAmName = LlapResourceBuilder.tezAmResourceName(resource, llapSpec);
client.configMaps().inNamespace(ns)
.resource(LlapResourceBuilder.buildTezAmConfigMap(resource, llapSpec))
.serverSideApply();
Comment on lines 743 to 750
// Garbage-collect per-LLAP TezAM resources
Map<String, String> tezamSelector = Map.of(
Labels.MANAGED_BY, Labels.MANAGED_BY_VALUE,
Labels.APP_INSTANCE, clusterName,
Labels.APP_COMPONENT, ConfigUtils.COMPONENT_TEZAM);

client.apps().statefulSets().inNamespace(ns).withLabels(tezamSelector).list().getItems()
client.apps().deployments().inNamespace(ns).withLabels(tezamSelector).list().getItems()
.stream()
Comment on lines +281 to +285
boolean ready = isPodReady(pod);
endpoints.add(new EndpointBuilder()
.withHostname(pod.getMetadata().getName())
.withAddresses(ip)
.withNewConditions()
Comment on lines +304 to +307
.endMetadata()
.withAddressType("IPv4")
.withEndpoints(endpoints)
.build();
Comment thread packaging/src/kubernetes/helm/hive-operator/values.yaml
@sonarqubecloud

Copy link
Copy Markdown

Comment on lines +78 to +81
int connTimeoutMs = getTimeMs(hiveSiteConfig, "hive.zookeeper.connection.timeout", 15000);
int sessionTimeoutMs = getTimeMs(hiveSiteConfig, "hive.zookeeper.session.timeout", 120000);
int baseSleepMs = getTimeMs(hiveSiteConfig, "hive.zookeeper.connection.basesleeptime", 1000);
int maxRetries = getInt(hiveSiteConfig, "hive.zookeeper.connection.max.retries", 3);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Define these configs in ConfigUtils

Comment on lines +84 to +91
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zkQuorum)
.connectionTimeoutMs(connTimeoutMs)
.sessionTimeoutMs(sessionTimeoutMs)
.retryPolicy(new ExponentialBackoffRetry(baseSleepMs, maxRetries))
.build();
try {
client.start();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a new CuratorFramework client is built, started, and closed every time deregisterIdlePods is called. Because the autoscaler evaluates state periodically (likely every few seconds), this will cause significant connection churn and overhead on the ZooKeeper quorum. Can we consider caching the client per HiveCluster/ZK-quorum inside the operator, reusing the connection pool, and only closing it when the cluster is deleted or reconfigured.

}
String hostName = extractHostName(new String(data, StandardCharsets.UTF_8));
if (hostName != null) {
// hostName = "<podName>.<svcName>.<ns>.svc.cluster.local"

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure it is always true, it is like docker desktop thing I belive, or can be changed

SchedulerPendingPreemptionTaskCount("Total number of tasks pending for pre-emption"),
SchedulerPreemptedTaskCount("Total number of tasks pre-empted"),
SchedulerCompletedDagCount("Number of DAGs completed");
SchedulerCompletedDagCount("Number of DAGs completed"),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't change the name here, it would be incompatible

SchedulerPreemptedTaskCount("Total number of tasks pre-empted"),
SchedulerCompletedDagCount("Number of DAGs completed");
SchedulerCompletedDagCount("Number of DAGs completed"),
SchedulerDagRunning("Binary that represents if the AM is idle or running a DAG");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Binary? It would be like only we know what is 1 what is 0 and this isn't a Binary that represents if the AM is idle or running a DAG too big, maybe be DAG Status should be the name

Comment on lines +169 to +182
static int getInt(Map<String, String> config, String key, int defaultVal) {
if (config == null) {
return defaultVal;
}
String val = config.get(key);
if (val == null) {
return defaultVal;
}
try {
return Integer.parseInt(val.trim());
} catch (NumberFormatException e) {
LOG.debug("Unparseable ZK config '{}' = '{}', using default {}", key, val, defaultVal);
return defaultVal;
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConfigUtils has this I believe

  public static int getInt(Map<String, String> overrides,
      String key, String altKey, int defaultVal) {

Comment on lines +143 to +167
static int getTimeMs(Map<String, String> config, String key, int defaultMs) {
if (config == null) {
return defaultMs;
}
String val = config.get(key);
if (val == null) {
return defaultMs;
}
val = val.trim();
try {
if (val.endsWith("ms")) {
return Integer.parseInt(val.substring(0, val.length() - 2).trim());
}
if (val.endsWith("s")) {
return (int) (Double.parseDouble(val.substring(0, val.length() - 1).trim()) * 1_000);
}
if (val.endsWith("m")) {
return (int) (Double.parseDouble(val.substring(0, val.length() - 1).trim()) * 60_000);
}
return Integer.parseInt(val);
} catch (NumberFormatException e) {
LOG.debug("Unparseable ZK config '{}' = '{}', using default {}ms", key, val, defaultMs);
return defaultMs;
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to ConfigUtils

Comment on lines +185 to +194
static String extractHostName(String json) {
String marker = "\"hostName\":\"";
int start = json.indexOf(marker);
if (start < 0) {
return null;
}
start += marker.length();
int end = json.indexOf('"', start);
return end > start ? json.substring(start, end) : null;
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move it to utils class, I am not sure this is the correct way for parson Json, if space or something won't break your parsing, Jackson is already there in the classpath

// --- TezAM resource builders (one TezAM per LLAP cluster) ---

/** TezAM StatefulSet name for a specific LLAP cluster. */
/** TezAM Deployment/Service name for a specific LLAP cluster. */

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know what is Deployment, what does Service here means? isn't Service TezAM?

return ip.indexOf(':') >= 0 ? "IPv6" : "IPv4";
}

private static boolean isPodReady(io.fabric8.kubernetes.api.model.Pod pod) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metrics scrapper has this method, we can reuse. We can import it io.fabric8.kubernetes.api.model. instead of prefix

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants