HIVE-29679: Update Tez AM K8s Operator Auto-Scaling to scale down idle AMs#6561
HIVE-29679: Update Tez AM K8s Operator Auto-Scaling to scale down idle AMs#6561tanishq-chugh wants to merge 8 commits into
Conversation
|
Hi @ayushtkn |
There was a problem hiding this comment.
Pull request overview
This PR updates the Hive Kubernetes Operator’s Tez AM (Application Master) auto-scaling behavior to preferentially scale down idle AMs (instead of terminating AMs by ordinal), reducing the risk of killing AMs that are actively running DAGs. To enable this, Tez AM is migrated from a StatefulSet to a Deployment, with additional operator-managed DNS/EndpointSlice handling and ZooKeeper deregistration to stop HS2 routing to AMs being removed.
Changes:
- Migrate per-LLAP TezAM from StatefulSet → Deployment and use pod-deletion-cost plus deferred scale-down to remove idle AMs first.
- Add Tez AM “busy/idle” signal via new LLAP Tez metrics (
SchedulerDagRunning→tez_am_dag_running), plus operator preStop draining and ZK deregistration. - Add
serviceAccountNameto the HiveCluster spec and wire it into component pods/jobs; extend Helm/CRD/RBAC accordingly.
Reviewed changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java | Switch TezAM status/scaling expectations to Deployment; add TezAM EndpointSlice reconciliation and adjust GC logic. |
| packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/model/HiveClusterSpec.java | Add serviceAccountName field to CR spec. |
| packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/SchemaInitJobDependent.java | Set pod service account on schema-init Job. |
| packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/MetastoreDeploymentDependent.java | Set pod service account on Metastore Deployment. |
| packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveServer2DeploymentDependent.java | Set pod service account on HS2 Deployment. |
| packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java | Build TezAM Deployment + custom EndpointSlice; add TezAM metrics config + autoscaling lifecycle drain. |
| packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveDependentResource.java | Update JMX exporter config to scrape new TezAM busy metric. |
| packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java | Apply deletion-cost to TezAM pods and defer scale-down while it propagates; integrate ZK deregistration. |
| packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmZkDeregistrar.java | New: remove idle AM ZK registration nodes before scale-down. |
| packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmBusyMetrics.java | New: interpret TezAM busy/idle metric for deletion cost and scale-down safety logic. |
| packaging/src/kubernetes/pom.xml | Add Curator framework dependency for ZK operations. |
| packaging/src/kubernetes/helm/hive-operator/values.yaml | Add serviceAccountName; adjust TezAM defaults (replicas removed in diff). |
| packaging/src/kubernetes/helm/hive-operator/templates/hivecluster.yaml | Render serviceAccountName; remove TezAM replicas from rendered CR spec. |
| packaging/src/kubernetes/helm/hive-operator/templates/clusterrole.yaml | Add RBAC for EndpointSlice management. |
| packaging/src/kubernetes/helm/hive-operator/crds/hiveclusters.hive.apache.org-v1.yml | Add CRD schema for serviceAccountName. |
| llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java | Add dagRunning gauge + setter and export it. |
| llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java | Add SchedulerDagRunning metric definition. |
| llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java | Toggle SchedulerDagRunning on DAG start/complete to reflect busy/idle. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| int tezAmReplicas = resolveTezAmReplicaCount(resource, ns, clusterName, llapSpec); | ||
| String tezAmName = LlapResourceBuilder.tezAmResourceName(resource, llapSpec); | ||
| client.configMaps().inNamespace(ns) | ||
| .resource(LlapResourceBuilder.buildTezAmConfigMap(resource, llapSpec)) | ||
| .serverSideApply(); |
| // Garbage-collect per-LLAP TezAM resources | ||
| Map<String, String> tezamSelector = Map.of( | ||
| Labels.MANAGED_BY, Labels.MANAGED_BY_VALUE, | ||
| Labels.APP_INSTANCE, clusterName, | ||
| Labels.APP_COMPONENT, ConfigUtils.COMPONENT_TEZAM); | ||
|
|
||
| client.apps().statefulSets().inNamespace(ns).withLabels(tezamSelector).list().getItems() | ||
| client.apps().deployments().inNamespace(ns).withLabels(tezamSelector).list().getItems() | ||
| .stream() |
| boolean ready = isPodReady(pod); | ||
| endpoints.add(new EndpointBuilder() | ||
| .withHostname(pod.getMetadata().getName()) | ||
| .withAddresses(ip) | ||
| .withNewConditions() |
| .endMetadata() | ||
| .withAddressType("IPv4") | ||
| .withEndpoints(endpoints) | ||
| .build(); |
fc70a87 to
82b804c
Compare
|
| int connTimeoutMs = getTimeMs(hiveSiteConfig, "hive.zookeeper.connection.timeout", 15000); | ||
| int sessionTimeoutMs = getTimeMs(hiveSiteConfig, "hive.zookeeper.session.timeout", 120000); | ||
| int baseSleepMs = getTimeMs(hiveSiteConfig, "hive.zookeeper.connection.basesleeptime", 1000); | ||
| int maxRetries = getInt(hiveSiteConfig, "hive.zookeeper.connection.max.retries", 3); |
There was a problem hiding this comment.
Define these configs in ConfigUtils
| CuratorFramework client = CuratorFrameworkFactory.builder() | ||
| .connectString(zkQuorum) | ||
| .connectionTimeoutMs(connTimeoutMs) | ||
| .sessionTimeoutMs(sessionTimeoutMs) | ||
| .retryPolicy(new ExponentialBackoffRetry(baseSleepMs, maxRetries)) | ||
| .build(); | ||
| try { | ||
| client.start(); |
There was a problem hiding this comment.
a new CuratorFramework client is built, started, and closed every time deregisterIdlePods is called. Because the autoscaler evaluates state periodically (likely every few seconds), this will cause significant connection churn and overhead on the ZooKeeper quorum. Can we consider caching the client per HiveCluster/ZK-quorum inside the operator, reusing the connection pool, and only closing it when the cluster is deleted or reconfigured.
| } | ||
| String hostName = extractHostName(new String(data, StandardCharsets.UTF_8)); | ||
| if (hostName != null) { | ||
| // hostName = "<podName>.<svcName>.<ns>.svc.cluster.local" |
There was a problem hiding this comment.
I am not sure it is always true, it is like docker desktop thing I belive, or can be changed
| SchedulerPendingPreemptionTaskCount("Total number of tasks pending for pre-emption"), | ||
| SchedulerPreemptedTaskCount("Total number of tasks pre-empted"), | ||
| SchedulerCompletedDagCount("Number of DAGs completed"); | ||
| SchedulerCompletedDagCount("Number of DAGs completed"), |
There was a problem hiding this comment.
don't change the name here, it would be incompatible
| SchedulerPreemptedTaskCount("Total number of tasks pre-empted"), | ||
| SchedulerCompletedDagCount("Number of DAGs completed"); | ||
| SchedulerCompletedDagCount("Number of DAGs completed"), | ||
| SchedulerDagRunning("Binary that represents if the AM is idle or running a DAG"); |
There was a problem hiding this comment.
Why Binary? It would be like only we know what is 1 what is 0 and this isn't a Binary that represents if the AM is idle or running a DAG too big, maybe be DAG Status should be the name
| static int getInt(Map<String, String> config, String key, int defaultVal) { | ||
| if (config == null) { | ||
| return defaultVal; | ||
| } | ||
| String val = config.get(key); | ||
| if (val == null) { | ||
| return defaultVal; | ||
| } | ||
| try { | ||
| return Integer.parseInt(val.trim()); | ||
| } catch (NumberFormatException e) { | ||
| LOG.debug("Unparseable ZK config '{}' = '{}', using default {}", key, val, defaultVal); | ||
| return defaultVal; | ||
| } |
There was a problem hiding this comment.
ConfigUtils has this I believe
public static int getInt(Map<String, String> overrides,
String key, String altKey, int defaultVal) {
| static int getTimeMs(Map<String, String> config, String key, int defaultMs) { | ||
| if (config == null) { | ||
| return defaultMs; | ||
| } | ||
| String val = config.get(key); | ||
| if (val == null) { | ||
| return defaultMs; | ||
| } | ||
| val = val.trim(); | ||
| try { | ||
| if (val.endsWith("ms")) { | ||
| return Integer.parseInt(val.substring(0, val.length() - 2).trim()); | ||
| } | ||
| if (val.endsWith("s")) { | ||
| return (int) (Double.parseDouble(val.substring(0, val.length() - 1).trim()) * 1_000); | ||
| } | ||
| if (val.endsWith("m")) { | ||
| return (int) (Double.parseDouble(val.substring(0, val.length() - 1).trim()) * 60_000); | ||
| } | ||
| return Integer.parseInt(val); | ||
| } catch (NumberFormatException e) { | ||
| LOG.debug("Unparseable ZK config '{}' = '{}', using default {}ms", key, val, defaultMs); | ||
| return defaultMs; | ||
| } | ||
| } |
| static String extractHostName(String json) { | ||
| String marker = "\"hostName\":\""; | ||
| int start = json.indexOf(marker); | ||
| if (start < 0) { | ||
| return null; | ||
| } | ||
| start += marker.length(); | ||
| int end = json.indexOf('"', start); | ||
| return end > start ? json.substring(start, end) : null; | ||
| } |
There was a problem hiding this comment.
move it to utils class, I am not sure this is the correct way for parson Json, if space or something won't break your parsing, Jackson is already there in the classpath
| // --- TezAM resource builders (one TezAM per LLAP cluster) --- | ||
|
|
||
| /** TezAM StatefulSet name for a specific LLAP cluster. */ | ||
| /** TezAM Deployment/Service name for a specific LLAP cluster. */ |
There was a problem hiding this comment.
I know what is Deployment, what does Service here means? isn't Service TezAM?
| return ip.indexOf(':') >= 0 ? "IPv6" : "IPv4"; | ||
| } | ||
|
|
||
| private static boolean isPodReady(io.fabric8.kubernetes.api.model.Pod pod) { |
There was a problem hiding this comment.
metrics scrapper has this method, we can reuse. We can import it io.fabric8.kubernetes.api.model. instead of prefix



What changes were proposed in this pull request?
Update the Tez AM auto-scaling logic to scale-down AMs which are idle
Why are the changes needed?
To prevent AM scale-down removing AMs based on ordinals (in decreasing order) that can cause AMs with running DAGs to be terminated
Does this PR introduce any user-facing change?
No
How was this patch tested?
Manual Testing
Helm command used:
Initial State:

After starting 2 beeline sessions each in

llap-0andllap-2tenant space:Pod to AppID Mappings:

Starting two long running queries, one each in both tenant spaces:


Scale-down(after cooling periods) post closing 2 idle beeline sessions in each tenant space:
