Skip to content

Commit

Permalink
Run the task with the configured dag bundle (apache#44752)
Browse files Browse the repository at this point in the history
Ensures that dag runs are created with a reference to the bundle that was in effect at the time.  And when a dag run has  bundle info, the task will be run with that dag bundle version.
  • Loading branch information
dstandish authored Jan 15, 2025
1 parent b5b2383 commit 6d048c4
Show file tree
Hide file tree
Showing 23 changed files with 751 additions and 617 deletions.
5 changes: 4 additions & 1 deletion airflow/cli/commands/remote_commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ def _get_ti(
# we do refresh_from_task so that if TI has come back via RPC, we ensure that ti.task
# is the original task object and not the result of the round trip
ti.refresh_from_task(task, pool_override=pool)

ti.dag_model # we must ensure dag model is loaded eagerly for bundle info

return ti, dr_created


Expand Down Expand Up @@ -286,7 +289,7 @@ def _run_task_by_executor(args, dag: DAG, ti: TaskInstance) -> None:
if executor.queue_workload.__func__ is not BaseExecutor.queue_workload: # type: ignore[attr-defined]
from airflow.executors import workloads

workload = workloads.ExecuteTask.make(ti, dag_path=dag.relative_fileloc)
workload = workloads.ExecuteTask.make(ti, dag_rel_path=dag.relative_fileloc)
with create_session() as session:
executor.queue_workload(workload, session)
else:
Expand Down
1 change: 1 addition & 0 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ class DagFileProcessorProcess(WatchedSubprocess):
@classmethod
def start( # type: ignore[override]
cls,
*,
path: str | os.PathLike[str],
callbacks: list[CallbackRequest],
target: Callable[[], None] = _parse_file_entrypoint,
Expand Down
3 changes: 2 additions & 1 deletion airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ def _execute_work(log: logging.Logger, workload: workloads.ExecuteTask) -> None:
supervise(
# This is the "wrong" ti type, but it duck types the same. TODO: Create a protocol for this.
ti=workload.ti, # type: ignore[arg-type]
dag_path=workload.dag_path,
dag_rel_path=workload.dag_rel_path,
bundle_info=workload.bundle_info,
token=workload.token,
server=conf.get("workers", "execution_api_server_url", fallback="http://localhost:9091/execution/"),
log_path=workload.log_path,
Expand Down
27 changes: 17 additions & 10 deletions airflow/executors/workloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ class BaseActivity(BaseModel):
"""The identity token for this workload"""


class BundleInfo(BaseModel):
"""Schema for telling task which bundle to run with."""

name: str
version: str | None = None


class TaskInstance(BaseModel):
"""Schema for TaskInstance with minimal required fields needed for Executors and Task SDK."""

Expand Down Expand Up @@ -73,30 +80,30 @@ class ExecuteTask(BaseActivity):

ti: TaskInstance
"""The TaskInstance to execute"""
dag_path: os.PathLike[str]
dag_rel_path: os.PathLike[str]
"""The filepath where the DAG can be found (likely prefixed with `DAG_FOLDER/`)"""

bundle_info: BundleInfo

log_path: str | None
"""The rendered relative log filename template the task logs should be written to"""

kind: Literal["ExecuteTask"] = Field(init=False, default="ExecuteTask")

@classmethod
def make(cls, ti: TIModel, dag_path: Path | None = None) -> ExecuteTask:
def make(cls, ti: TIModel, dag_rel_path: Path | None = None) -> ExecuteTask:
from pathlib import Path

from airflow.utils.helpers import log_filename_template_renderer

ser_ti = TaskInstance.model_validate(ti, from_attributes=True)

dag_path = dag_path or Path(ti.dag_run.dag_model.relative_fileloc)

if dag_path and not dag_path.is_absolute():
# TODO: What about multiple dag sub folders
dag_path = "DAGS_FOLDER" / dag_path

bundle_info = BundleInfo.model_construct(
name=ti.dag_model.bundle_name,
version=ti.dag_run.bundle_version,
)
path = dag_rel_path or Path(ti.dag_run.dag_model.relative_fileloc)
fname = log_filename_template_renderer()(ti=ti)
return cls(ti=ser_ti, dag_path=dag_path, token="", log_path=fname)
return cls(ti=ser_ti, dag_rel_path=path, token="", log_path=fname, bundle_info=bundle_info)


All = Union[ExecuteTask]
4 changes: 4 additions & 0 deletions airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,16 @@ def upgrade():
batch_op.create_foreign_key(
batch_op.f("dag_bundle_name_fkey"), "dag_bundle", ["bundle_name"], ["name"]
)
with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.add_column(sa.Column("bundle_version", sa.String(length=250), nullable=True))


def downgrade():
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.drop_constraint(batch_op.f("dag_bundle_name_fkey"), type_="foreignkey")
batch_op.drop_column("latest_bundle_version")
batch_op.drop_column("bundle_name")
with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.drop_column("bundle_version")

op.drop_table("dag_bundle")
8 changes: 8 additions & 0 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,13 @@ def _create_orm_dagrun(
session,
triggered_by,
):
bundle_version = session.scalar(
select(
DagModel.latest_bundle_version,
).where(
DagModel.dag_id == dag.dag_id,
)
)
run = DagRun(
dag_id=dag_id,
run_id=run_id,
Expand All @@ -276,6 +283,7 @@ def _create_orm_dagrun(
data_interval=data_interval,
triggered_by=triggered_by,
backfill_id=backfill_id,
bundle_version=bundle_version,
)
# Load defaults into the following two fields to ensure result can be serialized detached
run.log_template_id = int(session.scalar(select(func.max(LogTemplate.__table__.c.id))))
Expand Down
3 changes: 3 additions & 0 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ class DagRun(Base, LoggingMixin):
"""
dag_version_id = Column(UUIDType(binary=False), ForeignKey("dag_version.id", ondelete="CASCADE"))
dag_version = relationship("DagVersion", back_populates="dag_runs")
bundle_version = Column(StringID())

# Remove this `if` after upgrading Sphinx-AutoAPI
if not TYPE_CHECKING and "BUILDING_AIRFLOW_DOCS" in os.environ:
Expand Down Expand Up @@ -238,13 +239,15 @@ def __init__(
triggered_by: DagRunTriggeredByType | None = None,
backfill_id: int | None = None,
dag_version: DagVersion | None = None,
bundle_version: str | None = None,
):
if data_interval is None:
# Legacy: Only happen for runs created prior to Airflow 2.2.
self.data_interval_start = self.data_interval_end = None
else:
self.data_interval_start, self.data_interval_end = data_interval

self.bundle_version = bundle_version
self.dag_id = dag_id
self.run_id = run_id
self.logical_date = logical_date
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ca59d711e6304f8bfdb25f49339d455602430dd6b880e420869fc892faef0596
00d5d138d0773a6b700ada4650f5c60cc3972afefd3945ea434dea50abfda834
Loading

0 comments on commit 6d048c4

Please sign in to comment.