Skip to content

Commit

Permalink
Rename max_execution_date (apache#44236)
Browse files Browse the repository at this point in the history
* Rename max_execution_date and remove execution_date

* Update airflow/config_templates/config.yml

Co-authored-by: Wei Lee <weilee.rx@gmail.com>

---------

Co-authored-by: Wei Lee <weilee.rx@gmail.com>
  • Loading branch information
sunank200 and Lee-W authored Nov 21, 2024
1 parent d43052e commit fbf6116
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 14 deletions.
4 changes: 2 additions & 2 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ logging:
Specify prefix pattern like mentioned below with stream handler ``TaskHandlerWithCustomFormatter``
version_added: 2.0.0
type: string
example: "{{ti.dag_id}}-{{ti.task_id}}-{{execution_date}}-{{ti.try_number}}"
example: "{{ti.dag_id}}-{{ti.task_id}}-{{logical_date}}-{{ti.try_number}}"
is_template: true
default: ""
log_filename_template:
Expand Down Expand Up @@ -2370,7 +2370,7 @@ scheduler:
Setting this to ``True`` will make first task instance of a task
ignore depends_on_past setting. A task instance will be considered
as the first task instance of a task when there is no task instance
in the DB with an execution_date earlier than it., i.e. no manual marking
in the DB with a logical_date earlier than it., i.e. no manual marking
success will be needed for a newly added task to be scheduled.
version_added: 2.3.0
type: boolean
Expand Down
6 changes: 3 additions & 3 deletions airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def _get_latest_runs_stmt(dag_ids: Collection[str]) -> Select:
if len(dag_ids) == 1: # Index optimized fast path to avoid more complicated & slower groupby queryplan.
(dag_id,) = dag_ids
last_automated_runs_subq = (
select(func.max(DagRun.logical_date).label("max_execution_date"))
select(func.max(DagRun.logical_date).label("max_logical_date"))
.where(
DagRun.dag_id == dag_id,
DagRun.run_type.in_((DagRunType.BACKFILL_JOB, DagRunType.SCHEDULED)),
Expand All @@ -102,7 +102,7 @@ def _get_latest_runs_stmt(dag_ids: Collection[str]) -> Select:
)
else:
last_automated_runs_subq = (
select(DagRun.dag_id, func.max(DagRun.logical_date).label("max_execution_date"))
select(DagRun.dag_id, func.max(DagRun.logical_date).label("max_logical_date"))
.where(
DagRun.dag_id.in_(dag_ids),
DagRun.run_type.in_((DagRunType.BACKFILL_JOB, DagRunType.SCHEDULED)),
Expand All @@ -112,7 +112,7 @@ def _get_latest_runs_stmt(dag_ids: Collection[str]) -> Select:
)
query = select(DagRun).where(
DagRun.dag_id == last_automated_runs_subq.c.dag_id,
DagRun.logical_date == last_automated_runs_subq.c.max_execution_date,
DagRun.logical_date == last_automated_runs_subq.c.max_logical_date,
)
return query.options(
load_only(
Expand Down
12 changes: 6 additions & 6 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -935,21 +935,21 @@ def index(self):
dag_run_subquery = (
select(
DagRun.dag_id,
sqla.func.max(DagRun.logical_date).label("max_execution_date"),
sqla.func.max(DagRun.logical_date).label("max_logical_date"),
)
.group_by(DagRun.dag_id)
.subquery()
)
current_dags = current_dags.outerjoin(
dag_run_subquery, and_(dag_run_subquery.c.dag_id == DagModel.dag_id)
)
null_case = case((dag_run_subquery.c.max_execution_date.is_(None), 1), else_=0)
null_case = case((dag_run_subquery.c.max_logical_date.is_(None), 1), else_=0)
if arg_sorting_direction == "desc":
current_dags = current_dags.order_by(
null_case, dag_run_subquery.c.max_execution_date.desc()
null_case, dag_run_subquery.c.max_logical_date.desc()
)
else:
current_dags = current_dags.order_by(null_case, dag_run_subquery.c.max_execution_date)
current_dags = current_dags.order_by(null_case, dag_run_subquery.c.max_logical_date)
else:
sort_column = DagModel.__table__.c.get(arg_sorting_key)
if sort_column is not None:
Expand Down Expand Up @@ -1336,7 +1336,7 @@ def last_dagruns(self, session: Session = NEW_SESSION):
last_runs_subquery = (
select(
DagRun.dag_id,
sqla.func.max(DagRun.logical_date).label("max_execution_date"),
sqla.func.max(DagRun.logical_date).label("max_logical_date"),
)
.group_by(DagRun.dag_id)
.where(DagRun.dag_id.in_(filter_dag_ids)) # Only include accessible/selected DAGs.
Expand All @@ -1356,7 +1356,7 @@ def last_dagruns(self, session: Session = NEW_SESSION):
last_runs_subquery,
and_(
last_runs_subquery.c.dag_id == DagRun.dag_id,
last_runs_subquery.c.max_execution_date == DagRun.logical_date,
last_runs_subquery.c.max_logical_date == DagRun.logical_date,
),
)
)
Expand Down
6 changes: 3 additions & 3 deletions tests/dag_processing/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def test_statement_latest_runs_one_dag():
"dag_run.data_interval_start, dag_run.data_interval_end",
"FROM dag_run",
"WHERE dag_run.dag_id = :dag_id_1 AND dag_run.logical_date = ("
"SELECT max(dag_run.logical_date) AS max_execution_date",
"SELECT max(dag_run.logical_date) AS max_logical_date",
"FROM dag_run",
"WHERE dag_run.dag_id = :dag_id_2 AND dag_run.run_type IN (__[POSTCOMPILE_run_type_1]))",
]
Expand All @@ -55,10 +55,10 @@ def test_statement_latest_runs_many_dag():
"SELECT dag_run.id, dag_run.dag_id, dag_run.logical_date, "
"dag_run.data_interval_start, dag_run.data_interval_end",
"FROM dag_run, (SELECT dag_run.dag_id AS dag_id, "
"max(dag_run.logical_date) AS max_execution_date",
"max(dag_run.logical_date) AS max_logical_date",
"FROM dag_run",
"WHERE dag_run.dag_id IN (__[POSTCOMPILE_dag_id_1]) "
"AND dag_run.run_type IN (__[POSTCOMPILE_run_type_1]) GROUP BY dag_run.dag_id) AS anon_1",
"WHERE dag_run.dag_id = anon_1.dag_id AND dag_run.logical_date = anon_1.max_execution_date",
"WHERE dag_run.dag_id = anon_1.dag_id AND dag_run.logical_date = anon_1.max_logical_date",
]
assert actual == expected, compiled_stmt

0 comments on commit fbf6116

Please sign in to comment.