diff --git a/luigi/metrics.py b/luigi/metrics.py index cd3364e251..30d45164f4 100644 --- a/luigi/metrics.py +++ b/luigi/metrics.py @@ -66,6 +66,9 @@ def handle_task_disabled(self, task, config): def handle_task_done(self, task): pass + def handle_task_statistics(self, task, statistics): + pass + def generate_latest(self): return diff --git a/luigi/scheduler.py b/luigi/scheduler.py index 81532d3c96..9aca3c90cd 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -1646,3 +1646,9 @@ def task_history(self): @rpc_method() def update_metrics_task_started(self, task): self._state._metrics_collector.handle_task_started(task) + + @rpc_method() + def report_task_statistics(self, task_id, statistics): + if self._state.has_task(task_id): + task = self._state.get_task(task_id) + self._state._metrics_collector.handle_task_statistics(task, statistics) diff --git a/luigi/worker.py b/luigi/worker.py index a11f808ae5..43e5dc0c0f 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -339,6 +339,9 @@ def update_progress_percentage(self, percentage): def decrease_running_resources(self, decrease_resources): self._scheduler.decrease_running_task_resources(self._task_id, decrease_resources) + def report_task_statistics(self, statistics): + self._scheduler.report_task_statistics(self._task_id, statistics) + class SchedulerMessage: """