Skip to content

Add Databricks-native retry settings to task operators#69182

Open
Beat-Nick wants to merge 2 commits into
apache:mainfrom
Beat-Nick:databricks-expose-task-repair-params
Open

Add Databricks-native retry settings to task operators#69182
Beat-Nick wants to merge 2 commits into
apache:mainfrom
Beat-Nick:databricks-expose-task-repair-params

Conversation

@Beat-Nick

@Beat-Nick Beat-Nick commented Jun 30, 2026

Copy link
Copy Markdown

Add Databricks-native retry settings to task operators

Summary

Adds first-class Databricks task retry settings to DatabricksNotebookOperator and DatabricksTaskOperator: max_retries, min_retry_interval_millis, and retry_on_timeout.

These are Databricks task-level retries, not Airflow task retries. Databricks reruns the failed task attempt inside the same job run; Airflow retries rerun the operator.

This follows the recovery-model discussion in apache/airflow#68358: native task retries handle transient task failures first, while workflow repair remains separate follow-up work for run-level recovery.

Details

The retry fields live on Databricks Jobs API tasks, so the implementation sits in DatabricksTaskBaseOperator and applies to both standalone submits and tasks inside DatabricksWorkflowTaskGroup.

For standalone operators, _get_run_json() switches to the tasks[] submit form only when a retry field is configured through operator arguments or, for DatabricksTaskOperator, task_config. Existing standalone submits keep their current payload unless users opt in.

Monitoring becomes retry-aware only when the effective Databricks max_retries permits another native attempt (-1 or a positive integer). In that mode:

  • Standalone operators wait on the submit run, whose terminal state includes all Databricks retry attempts.
  • Workflow task operators re-resolve the latest attempt for the same task_key and fail as soon as a finite max_retries is exhausted.
  • Unlimited workflow task retries (max_retries=-1) keep waiting for the parent workflow run to become terminal.
  • Deferrable workflow monitoring passes workflow_run_id, databricks_task_key, and max_retries to DatabricksExecutionTrigger, so exhausted retries are detected in the trigger and on_kill can cancel the latest retry attempt instead of a stale attempt id.

Changes

  • Adds retry settings to DatabricksNotebookOperator and DatabricksTaskOperator.
  • Preserves DatabricksTaskOperator precedence: direct operator arguments override matching task_config fields, and the operator-managed task_key cannot be shadowed by task_config.
  • Adds the new operator retry params to template_fields and coerces rendered operator-param strings before submitting to Databricks.
  • Updates sync and deferrable monitoring to detect exhausted Databricks retries.
  • Accepts WAITING_FOR_RETRY and BLOCKED as non-terminal RunState life cycle states.
  • Adds tests for payload generation, argument precedence, templating, sync and deferrable monitoring, trigger serialization, and WAITING_FOR_RETRY.

DatabricksSubmitRunOperator and DatabricksCreateJobsOperator remain raw payload pass-through operators; users can already set per-task retry fields in their task payloads.

Was generative AI tooling used to co-author this PR?
  • Yes - Claude Code (Opus 4.8), Codex (GPT-5)

Generated-by: Claude Code (Opus 4.8), Codex (GPT-5) following the guidelines

@moomindani moomindani left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks — this lands the first half of the split we discussed on #68358 (native task retries first; repair as the scoped follow-up for what retries can't reach), and it follows the cluster-lifecycle framing from that thread. The docs also address the Airflow-retries-vs-native-retries interplay that came up there, including how Airflow retries behaves differently for standalone vs workflow-group operators — good.

I validated the core semantic assumption against a live Databricks workspace, since the monitoring design depends on how native max_retries attempts surface in the Jobs API.

The design's core assumption holds. A one-time runs/submit with an always-failing task and max_retries=1 observed via runs/get polling:

t=  0s  run=RUNNING | task[retry_probe] run_id=942417452203963 attempt=0 PENDING
t= 30s  run=RUNNING | attempt=0 TERMINATED/FAILED | run_id=381198635536160 attempt=1 RUNNING
t= 40s  run=INTERNAL_ERROR/FAILED | both attempts TERMINATED/FAILED

Each retry appears as a separate tasks[] entry under the same task_key with a new run_id and an incremented attempt_number — so re-resolving the latest attempt on each poll is the right model, and treating a failed attempt as inconclusive while the parent run is still active is justified. Also verified: the three new fields are real Jobs API task-level fields; both injection paths (standalone _get_run_json reshape and _convert_to_databricks_workflow_task) are covered; None correctly omits fields; and the BLOCKED/WAITING_FOR_RETRY life-cycle-state additions are needed (previously RunState raised on them).

One substantive improvement and two minor test gaps:

  • The documented tradeoff (a failed attempt inside a workflow group waits for the whole parent run) looks avoidable — see inline; the same probe shows why.
  • The workflow-path conversion test covers only operator-arg retry settings on DatabricksNotebookOperator; task_config-supplied values and the operator-arg-over-task_config precedence are untested for the workflow path.
  • The reshape-without-native-retries case (min_retry_interval_millis alone → tasks: [...] payload but _monitor_single_attempt monitoring) has no test confirming _get_current_databricks_task resolves on the reshaped submit run.

With this in, the repair follow-up from #68358 stays valuable for the cases retries can't reach (fresh-cluster recovery on a terminal run) — worth keeping that scoped as discussed.

Non-binding review.


Drafted-by: Claude Code (Fable 5); reviewed by @moomindani before posting

if run_state.is_successful:
return True
parent_state = RunState(**self._hook.get_run(workflow_run_id)["state"])
return parent_state.is_terminal

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wait-for-parent gate is more conservative than it needs to be. runs/get exposes attempt_number per attempt (verified on a live workspace: attempts arrive as attempt=0, attempt=1, ... under the same task_key), and the operator knows its own max_retries (operator arg or task_config). So when the latest attempt is terminal-failed and attempt_number >= max_retries (with max_retries != -1), retries are exhausted and the failure is already conclusive — no need to keep the Airflow task waiting (or the trigger deferring) until every sibling task finishes, which with long-running siblings delays downstream failure handling by the length of the whole run. The parent-terminal fallback would then only be needed for max_retries=-1. If you take this, the "there is no per-task retries-exhausted signal" paragraph in notebook.rst/task.rst and this docstring should be updated to match.


Drafted-by: Claude Code (Fable 5); reviewed by @moomindani before posting

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thanks for pointing this out. I changed workflow task monitoring to treat a terminal failed attempt as conclusive when finite retries are exhausted (attempt_number >= max_retries), while keeping the parent-terminal wait only for unlimited retries. The same max_retries value is now passed into DatabricksExecutionTrigger, and the docs/docstring were updated to describe the finite-vs-unlimited behavior.


Drafted-by: Codex (GPT-5) (no human review before posting)


if isinstance(max_retries, bool) or max_retries is None:
return False
if isinstance(max_retries, str):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This str-coercion implies templated values were anticipated, but the new params are not in template_fields, so a Jinja string would reach the API unrendered. Either add the three params to template_fields (and keep this branch) or drop the str handling — as-is it only masks a misconfiguration.


Drafted-by: Claude Code (Fable 5); reviewed by @moomindani before posting

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I added the three retry params to template_fields on both DatabricksNotebookOperator and DatabricksTaskOperator, kept the coercion path, and added coverage that rendered Jinja values make it into the Databricks payload as the expected typed values.


Drafted-by: Codex (GPT-5) (no human review before posting)

@Beat-Nick

Copy link
Copy Markdown
Author

Good point on ending tasks early. I made the updates from your comments:

  • Workflow task monitoring now treats a terminal failed attempt as conclusive once finite Databricks retries are exhausted (attempt_number >= max_retries). Unlimited retries still wait for the parent workflow run to terminate.
  • The deferrable trigger now uses the same exhausted-retry behavior, with max_retries serialized into DatabricksExecutionTrigger.
  • Retry params are templated on both Databricks task operators, and rendered operator-param values are coerced before building the Databricks payload.
  • Added tests for workflow task_config retry settings, operator-argument precedence, submit payloads without retry attempts, templating, and exhausted-retry monitoring in sync and deferrable paths.
  • Updated the notebook/task docs to describe finite-vs-unlimited retry behavior.
  • Validated the attempt_number-dependent path with Databricks test Dags.

Drafted-by: Codex (GPT-5); reviewed by @Beat-Nick before posting

@moomindani moomindani left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick turnaround — verified the update.

  • _workflow_task_is_conclusive / _run_workflow_task now report failure as soon as attempt_number >= max_retries for finite retries, with the parent-run wait preserved only for max_retries=-1. Sync and deferrable paths implement the same check symmetrically.
  • max_retries/min_retry_interval_millis/retry_on_timeout are now in template_fields on both operators, with rendered values coerced in _retry_settings().
  • New tests cover both gaps I'd flagged: task_config-supplied workflow retry settings and operator-arg-over-task_config precedence in the workflow path.
  • Docs updated to describe the finite-vs-unlimited distinction accurately.

CI is green. LGTM — approving. Small non-blocking nit inline.


Drafted-by: Claude Code (Fable 5); reviewed by @moomindani before posting

if isinstance(max_retries, str):
try:
return int(max_retries)
except ValueError:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: an unparsable max_retries string (e.g. a bad template render) silently resolves to "no retry limit configured" rather than surfacing the misconfiguration, which falls back to waiting for the parent run instead of failing fast. Not blocking — the degraded behavior is the safe direction — but a log warning here would help a user notice the bad value.


Drafted-by: Claude Code (Fable 5); reviewed by @moomindani before posting

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants