Skip to content

Commit

Permalink
Feature/enable comment on maintenance mode (apache#46519)
Browse files Browse the repository at this point in the history
* Initial implementation

* modify register

* Fix database migration

* fix html

* new approach to cerate new column

* empty line

* no f string

* fix ""

* remove empty line

* Modify default value

* define max length

* increment version

* fix version in comment

* fix mypy

* Make html nicer

* modifiy maintenance comment to 1024

* Implement maintenance comment update

* fix static checks

* Apply review findings

---------

Co-authored-by: Majoros Donat (XC-DX/EET2-Bp) <[email protected]>
  • Loading branch information
majorosdonat and Majoros Donat (XC-DX/EET2-Bp) authored Feb 10, 2025
1 parent f1f5212 commit bef4824
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 31 deletions.
6 changes: 3 additions & 3 deletions providers/edge/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
Package ``apache-airflow-providers-edge``

Release: ``0.13.1pre0``
Release: ``0.14.0pre0``


Handle edge workers on remote sites via HTTP(s) connection and orchestrates work over distributed sites
Expand All @@ -37,7 +37,7 @@ This is a provider package for ``edge`` provider. All classes for this provider
are in ``airflow.providers.edge`` python package.

You can find package information and changelog for the provider
in the `documentation <https://airflow.apache.org/docs/apache-airflow-providers-edge/0.13.1pre0/>`_.
in the `documentation <https://airflow.apache.org/docs/apache-airflow-providers-edge/0.14.0pre0/>`_.

Installation
------------
Expand All @@ -60,4 +60,4 @@ PIP package Version required
================== ===================

The changelog for the provider package can be found in the
`changelog <https://airflow.apache.org/docs/apache-airflow-providers-edge/0.13.1pre0/changelog.html>`_.
`changelog <https://airflow.apache.org/docs/apache-airflow-providers-edge/0.14.0pre0/changelog.html>`_.
9 changes: 9 additions & 0 deletions providers/edge/docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@
Changelog
---------

0.14.0pre0
..........

Misc
~~~~

* ``Add maintenance comment field, to make maintenance reason transparent.``


0.13.1pre0
..........

Expand Down
2 changes: 1 addition & 1 deletion providers/edge/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ source-date-epoch: 1737371680

# note that those versions are maintained by release manager - do not update them manually
versions:
- 0.13.1pre0
- 0.14.0pre0

plugins:
- name: edge_executor
Expand Down
6 changes: 3 additions & 3 deletions providers/edge/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ build-backend = "flit_core.buildapi"

[project]
name = "apache-airflow-providers-edge"
version = "0.13.1pre0"
version = "0.14.0pre0"
description = "Provider package apache-airflow-providers-edge for Apache Airflow"
readme = "README.rst"
authors = [
Expand Down Expand Up @@ -61,8 +61,8 @@ dependencies = [
]

[project.urls]
"Documentation" = "https://airflow.apache.org/docs/apache-airflow-providers-edge/0.13.1pre0"
"Changelog" = "https://airflow.apache.org/docs/apache-airflow-providers-edge/0.13.1pre0/changelog.html"
"Documentation" = "https://airflow.apache.org/docs/apache-airflow-providers-edge/0.14.0pre0"
"Changelog" = "https://airflow.apache.org/docs/apache-airflow-providers-edge/0.14.0pre0/changelog.html"
"Bug Tracker" = "https://github.com/apache/airflow/issues"
"Source Code" = "https://github.com/apache/airflow"
"Slack Chat" = "https://s.apache.org/airflow-slack"
Expand Down
2 changes: 1 addition & 1 deletion providers/edge/src/airflow/providers/edge/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

__all__ = ["__version__"]

__version__ = "0.13.1pre0"
__version__ = "0.14.0pre0"

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
"2.10.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ def _check_db_schema(self, engine: Engine) -> None:
if edge_job_columns and "concurrency_slots" not in edge_job_columns:
EdgeJobModel.metadata.drop_all(engine, tables=[EdgeJobModel.__table__])

edge_worker_columns = None
try:
edge_worker_columns = [column["name"] for column in inspector.get_columns("edge_worker")]
except NoSuchTableError:
pass

# version 0.14.0pre0 added new column maintenance_comment
if edge_worker_columns and "maintenance_comment" not in edge_worker_columns:
connection = engine.connect()
query = "ALTER TABLE edge_worker ADD maintenance_comment VARCHAR(1024);"
connection.execute(query)

@provide_session
def start(self, session: Session = NEW_SESSION):
"""If EdgeExecutor provider is loaded first time, ensure table exists."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def get_provider_info():
"description": "Handle edge workers on remote sites via HTTP(s) connection and orchestrates work over distributed sites\n",
"state": "not-ready",
"source-date-epoch": 1737371680,
"versions": ["0.13.1pre0"],
"versions": ["0.14.0pre0"],
"plugins": [
{
"name": "edge_executor",
Expand Down
19 changes: 18 additions & 1 deletion providers/edge/src/airflow/providers/edge/models/edge_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class EdgeWorkerModel(Base, LoggingMixin):
__tablename__ = "edge_worker"
worker_name = Column(String(64), primary_key=True, nullable=False)
state = Column(String(20))
maintenance_comment = Column(String(1024))
_queues = Column("queues", String(256))
first_online = Column(UtcDateTime)
last_update = Column(UtcDateTime)
Expand All @@ -91,12 +92,14 @@ def __init__(
queues: list[str] | None,
first_online: datetime | None = None,
last_update: datetime | None = None,
maintenance_comment: str | None = None,
):
self.worker_name = worker_name
self.state = state
self.queues = queues
self.first_online = first_online or timezone.utcnow()
self.last_update = last_update
self.maintenance_comment = maintenance_comment
super().__init__()

@property
Expand Down Expand Up @@ -184,11 +187,14 @@ def reset_metrics(worker_name: str) -> None:


@provide_session
def request_maintenance(worker_name: str, session: Session = NEW_SESSION) -> None:
def request_maintenance(
worker_name: str, maintenance_comment: str | None, session: Session = NEW_SESSION
) -> None:
"""Writes maintenance request to the db"""
query = select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name == worker_name)
worker: EdgeWorkerModel = session.scalar(query)
worker.state = EdgeWorkerState.MAINTENANCE_REQUEST
worker.maintenance_comment = maintenance_comment


@provide_session
Expand All @@ -197,9 +203,20 @@ def exit_maintenance(worker_name: str, session: Session = NEW_SESSION) -> None:
query = select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name == worker_name)
worker: EdgeWorkerModel = session.scalar(query)
worker.state = EdgeWorkerState.MAINTENANCE_EXIT
worker.maintenance_comment = None


@provide_session
def remove_worker(worker_name: str, session: Session = NEW_SESSION) -> None:
"""Remove a worker that is offline or just gone from DB"""
session.execute(delete(EdgeWorkerModel).where(EdgeWorkerModel.worker_name == worker_name))


@provide_session
def change_maintenance_comment(
worker_name: str, maintenance_comment: str | None, session: Session = NEW_SESSION
) -> None:
"""Writes maintenance comment in the db."""
query = select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name == worker_name)
worker: EdgeWorkerModel = session.scalar(query)
worker.maintenance_comment = maintenance_comment
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any

from flask import Blueprint, redirect, url_for
from flask import Blueprint, redirect, request, url_for
from flask_appbuilder import BaseView, expose
from sqlalchemy import select

Expand Down Expand Up @@ -120,7 +120,8 @@ def status(self, session: Session = NEW_SESSION):
def worker_to_maintenance(self, worker_name: str):
from airflow.providers.edge.models.edge_worker import request_maintenance

request_maintenance(worker_name)
maintenance_comment = request.form.get("maintenance_comment")
request_maintenance(worker_name, maintenance_comment)
return redirect(url_for("EdgeWorkerHosts.status"))

@expose("/status/maintenance/<string:worker_name>/off", methods=["POST"])
Expand All @@ -139,6 +140,15 @@ def remove_worker(self, worker_name: str):
remove_worker(worker_name)
return redirect(url_for("EdgeWorkerHosts.status"))

@expose("/status/maintenance/<string:worker_name>/change_comment", methods=["POST"])
@has_access_view(AccessView.JOBS)
def change_maintenance_comment(self, worker_name: str):
from airflow.providers.edge.models.edge_worker import change_maintenance_comment

maintenance_comment = request.form.get("maintenance_comment")
change_maintenance_comment(worker_name, maintenance_comment)
return redirect(url_for("EdgeWorkerHosts.status"))


# Check if EdgeExecutor is actually loaded
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,22 @@ <h2>Edge Worker Hosts</h2>
<p>No Edge Workers connected or known currently.</p>
{% else %}

<script>
function showForm(worker_name) {
var button = document.getElementById("button_" + worker_name);
var form = document.getElementById("form_" + worker_name);
form.style.display = "block";
button.style.display = "none";
}
function showEditComment(worker_name) {
var display = document.getElementById("display_" + worker_name);
var textarea = document.getElementById("textarea_" + worker_name);
var button = document.getElementById("update_" + worker_name);
display.style.display = "none";
textarea.style.display = "block";
button.style.display = "inline";
}
</script>
<table class="table table-striped table-bordered">
<tr>
<th>Hostname</th>
Expand All @@ -43,7 +59,7 @@ <h2>Edge Worker Hosts</h2>
<th>Jobs Failed</th>
-->
<th>System Information</th>
<th>Maintenance mode</th>
<th>Operations</th>
</tr>
{% for host in hosts %}
<tr>
Expand Down Expand Up @@ -100,30 +116,57 @@ <h2>Edge Worker Hosts</h2>
{% endfor %}
</ul>
</td>
<td>
{%- if host.state in ["idle", "running"] -%}
<form action="../edgeworkerhosts/status/maintenance/{{ host.worker_name }}/on" method="POST">
{%- if host.state in ["idle", "running"] -%}
<td>
<button id="button_{{ host.worker_name }}" onclick="showForm('{{ host.worker_name }}')" class="btn btn-sm btn-primary">
Enter Maintenance
</button>
<form id="form_{{ host.worker_name }}" style="display: none" action="../edgeworkerhosts/status/maintenance/{{ host.worker_name }}/on" method="POST">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<button type="submit" style="padding: 10px 20px; background-color: blue; color: white; border: none; border-radius: 5px;">
Enter
<div>
<label for="maintenance_comment">Maintenance Comment:</label>
</div>
<textarea name="maintenance_comment" rows="3" maxlength="1024" style="width: 100%; margin-bottom: 5px;" required></textarea>
<br />
<button type="submit" class="btn btn-sm btn-primary">
Confirm Maintenance
</button>
</form>
{%- elif host.state in ["maintenance pending", "maintenance mode", "maintenance request"] -%}
<form action="../edgeworkerhosts/status/maintenance/{{ host.worker_name }}/off" method="POST">
</td>
</form>
{%- elif host.state in ["maintenance pending", "maintenance mode", "maintenance request"] -%}
<form action="../edgeworkerhosts/status/maintenance/{{ host.worker_name }}/off" method="POST">
<td>
<div id="display_{{ host.worker_name }}">
{{ host.maintenance_comment }}
<a onclick="showEditComment('{{ host.worker_name }}')" class="btn btn-sm btn-default" data-toggle="tooltip" rel="tooltip" title="Edit maintenance comment">
<span class="sr-only">Edit</span>
<i class="fa fa-edit"></i>
</a>
</div>
<textarea id="textarea_{{ host.worker_name }}" name="maintenance_comment" rows="3" maxlength="1024" style="display: none; width:100%;" required>{{ host.maintenance_comment }}</textarea>
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<button type="submit" style="padding: 10px 20px; background-color: blue; color: white; border: none; border-radius: 5px;">
Exit
</button>
</form>
{%- elif host.state in ["offline", "unknown", "offline maintenance"] -%}
<form action="../edgeworkerhosts/status/maintenance/{{ host.worker_name }}/remove" method="POST">
<div style="margin-top: 10px;">
<button type="submit" class="btn btn-sm btn-primary">
Exit Maintenance
</button>
<button id="update_{{ host.worker_name }}" type="submit" class="btn btn-sm btn-primary" style="display: none;" formaction="../edgeworkerhosts/status/maintenance/{{ host.worker_name }}/change_comment">
Update comment
</button>
</div>
</td>
</form>
{%- elif host.state in ["offline", "unknown", "offline maintenance"] -%}
<form action="../edgeworkerhosts/status/maintenance/{{ host.worker_name }}/remove" method="POST">
<td>
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<button type="submit" style="padding: 10px 20px; background-color: blue; color: white; border: none; border-radius: 5px;">
<button type="submit" class="btn btn-sm btn-primary">
Remove
</button>
</form>
{% endif %}
</td>
</td>
</form>
{%- else -%}
<td></td>
{% endif %}
</tr>
{% endfor %}
</table>
Expand Down

0 comments on commit bef4824

Please sign in to comment.