Skip to content

Commit

Permalink
Feat: Add biweekly and bimonthly and fix translations (Koed00#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
GDay authored Nov 13, 2022
1 parent 1720554 commit 542e04d
Show file tree
Hide file tree
Showing 15 changed files with 929 additions and 635 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ jobs:

- name: Install dependencies
run: |
python -m pip install -U pip setuptools poetry
python -m pip install poetry
apt-get update
apt-get -y install gettext
python -m pip install pip setuptools django poetry
# compile messages to get .mo files
django-admin compilemessages
- name: Build and publish package
if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags')
run: poetry --build --username=__token__ --password=${{ secrets.PYPI_TOKEN }} publish
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
## [Unreleased](https://github.com/GDay/django-q2/tree/HEAD)

**Merged pull requests:**
- Fix: Deprecation warning for Django 5.x
- Fix: Deprecation warning for Django 5.x https://github.com/GDay/django-q2/pull/34
- Feat: Add biweekly and bimonthly https://github.com/GDay/django-q2/pull/36
- Fix: Fix all translation strings and remove compiled https://github.com/GDay/django-q2/pull/36

## [v1.4.3](https://github.com/GDay/django-q2/tree/v1.4.3) (2022-11-07)

Expand Down
70 changes: 37 additions & 33 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,27 +74,27 @@ def start(self) -> int:
),
)
self.sentinel.start()
logger.info(_(f"Q Cluster {self.name} starting."))
logger.info(_("Q Cluster %(name)s starting.") % {'name': self.name})
while not self.start_event.is_set():
sleep(0.1)
return self.pid

def stop(self) -> bool:
if not self.sentinel.is_alive():
return False
logger.info(_(f"Q Cluster {self.name} stopping."))
logger.info(_("Q Cluster %(name)s stopping.") % {'name': self.name})
self.stop_event.set()
self.sentinel.join()
logger.info(_(f"Q Cluster {self.name} has stopped."))
logger.info(_("Q Cluster %(name)s has stopped.") % {'name': self.name})
self.start_event = None
self.stop_event = None
return True

def sig_handler(self, signum, frame):
logger.debug(
_(
f'{current_process().name} got signal {Conf.SIGNAL_NAMES.get(signum, "UNKNOWN")}'
)
'%(name)s got signal %(signal)s'
) % {'name': current_process().name, 'signal': Conf.SIGNAL_NAMES.get(signum, "UNKNOWN")}
)
self.stop()

Expand Down Expand Up @@ -216,21 +216,21 @@ def reincarnate(self, process):
db.connections.close_all()
if process == self.monitor:
self.monitor = self.spawn_monitor()
logger.error(_(f"reincarnated monitor {process.name} after sudden death"))
logger.error(_("reincarnated monitor %(name)s after sudden death") % {'name': process.name})
elif process == self.pusher:
self.pusher = self.spawn_pusher()
logger.error(_(f"reincarnated pusher {process.name} after sudden death"))
logger.error(_("reincarnated pusher %(name)s after sudden death") % {'name': process.name})
else:
self.pool.remove(process)
self.spawn_worker()
if process.timer.value == 0:
# only need to terminate on timeout, otherwise we risk destabilizing the queues
process.terminate()
logger.warning(_(f"reincarnated worker {process.name} after timeout"))
logger.warning(_("reincarnated worker %(name)s after timeout") % {'name': process.name})
elif int(process.timer.value) == -2:
logger.info(_(f"recycled worker {process.name}"))
logger.info(_("recycled worker %(name)s") % {'name': process.name})
else:
logger.error(_(f"reincarnated worker {process.name} after death"))
logger.error(_("reincarnated worker %(name)s after death") % {'name': process.name})

self.reincarnations += 1

Expand All @@ -253,12 +253,12 @@ def spawn_cluster(self):
def guard(self):
logger.info(
_(
f"{current_process().name} guarding cluster {humanize(self.cluster_id.hex)}"
)
"%(name)s guarding cluster %(cluster_name)s"
) % {'name': current_process().name, 'cluster_name': humanize(self.cluster_id.hex)}
)
self.start_event.set()
Stat(self).save()
logger.info(_(f"Q Cluster {humanize(self.cluster_id.hex)} running."))
logger.info(_("Q Cluster %(cluster_name)s running.") % {'cluster_name': humanize(self.cluster_id.hex)})
counter = 0
cycle = Conf.GUARD_CYCLE # guard loop sleep in seconds
# Guard loop. Runs at least once
Expand Down Expand Up @@ -292,7 +292,7 @@ def guard(self):
def stop(self):
Stat(self).save()
name = current_process().name
logger.info(_(f"{name} stopping cluster processes"))
logger.info(_("%(name)s stopping cluster processes") % {'name': name})
# Stopping pusher
self.event_out.set()
# Wait for it to stop
Expand All @@ -317,7 +317,7 @@ def stop(self):
self.result_queue.close()
# Wait for the result queue to empty
self.result_queue.join_thread()
logger.info(_(f"{name} waiting for the monitor."))
logger.info(_("%(name)s waiting for the monitor.") % {'name': name})
# Wait for everything to close or time out
count = 0
if not self.timeout:
Expand All @@ -339,7 +339,7 @@ def pusher(task_queue: Queue, event: Event, broker: Broker = None):
"""
if not broker:
broker = get_broker()
logger.info(_(f"{current_process().name} pushing tasks at {current_process().pid}"))
logger.info(_("%(process_name)s pushing tasks at %(id)s") % {'process_name': current_process().name, 'id': current_process().pid})
while True:
try:
task_set = broker.dequeue()
Expand All @@ -360,10 +360,10 @@ def pusher(task_queue: Queue, event: Event, broker: Broker = None):
continue
task["ack_id"] = ack_id
task_queue.put(task)
logger.debug(_(f"queueing from {broker.list_key}"))
logger.debug(_("queueing from %(list_key)s") % {'list_key': broker.list_key})
if event.is_set():
break
logger.info(_(f"{current_process().name} stopped pushing tasks"))
logger.info(_("%(name)s stopped pushing tasks") % {'name': current_process().name})


def monitor(result_queue: Queue, broker: Broker = None):
Expand All @@ -375,7 +375,7 @@ def monitor(result_queue: Queue, broker: Broker = None):
if not broker:
broker = get_broker()
name = current_process().name
logger.info(_(f"{name} monitoring at {current_process().pid}"))
logger.info(_("%(name)s monitoring at %(id)s") % {'name': name, 'id': current_process().pid})
for task in iter(result_queue.get, "STOP"):
# save the result
if task.get("cached", False):
Expand All @@ -392,11 +392,11 @@ def monitor(result_queue: Queue, broker: Broker = None):
info_name = get_func_repr(task['func'])
if task["success"]:
# log success
logger.info(_(f"Processed '{info_name}' ({task['name']})"))
logger.info(_("Processed '%(info_name)s' (%(task_name)s)") % {'info_name': info_name, 'task_name': task['name']})
else:
# log failure
logger.error(_(f"Failed '{info_name}' ({task['name']}) - {task['result']}"))
logger.info(_(f"{name} stopped monitoring results"))
logger.error(_("Failed '%(info_name)s' (%(task_name)s) - %(task_result)s") % {'info_name': info_name, 'task_name': task['name'], 'task_result': task['result']})
logger.info(_("%(name)s stopped monitoring results") % {'name': name})


def worker(
Expand All @@ -410,7 +410,7 @@ def worker(
:type timer: multiprocessing.Value
"""
proc_name = current_process().name
logger.info(_(f"{proc_name} ready for work at {current_process().pid}"))
logger.info(_("%(proc_name)s ready for work at %(id)s") % {'proc_name': proc_name, 'id': current_process().pid})
task_count = 0
if timeout is None:
timeout = -1
Expand All @@ -422,7 +422,7 @@ def worker(
# Get the function from the task
func = task["func"]
func_name = get_func_repr(func)
logger.info(_(f"{proc_name} processing '{func_name}' ({task['name']})"))
logger.info(_("%(proc_name)s processing '%(func_name)s' (%(task_name)s)") % {'proc_name': proc_name, 'func_name': func_name, 'task_name': task['name']})
f = task["func"]
# if it's not an instance try to get it from the string
if not callable(task["func"]):
Expand All @@ -437,7 +437,7 @@ def worker(
res = f(*task["args"], **task["kwargs"])
result = (res, True)
except Exception:
result = (f"Could not process '{func_name}'. Check the location of the function and the args/kwargs.", False)
result = (_("Could not process '%(func_name)s'. Check the location of the function and the args/kwargs.") % {'func_name': func_name}, False)
if error_reporter:
error_reporter.report()
if task.get("sync", False):
Expand All @@ -453,7 +453,7 @@ def worker(
if task_count == Conf.RECYCLE or rss_check():
timer.value = -2 # Recycled
break
logger.info(_(f"{proc_name} stopped doing work"))
logger.info(_("%(proc_name)s stopped doing work") % {'proc_name': proc_name})

def save_task(task, broker: Broker):
"""
Expand Down Expand Up @@ -632,8 +632,12 @@ def scheduler(broker: Broker = None):
next_run = next_run + timedelta(days=1)
elif s.schedule_type == s.WEEKLY:
next_run = next_run + timedelta(weeks=1)
elif s.schedule_type == s.BIWEEKLY:
next_run = next_run + timedelta(weeks=2)
elif s.schedule_type == s.MONTHLY:
next_run = add_months(next_run, 1)
elif s.schedule_type == s.BIMONTHLY:
next_run = add_months(next_run, 2)
elif s.schedule_type == s.QUARTERLY:
next_run = add_months(next_run, 3)
elif s.schedule_type == s.YEARLY:
Expand Down Expand Up @@ -665,14 +669,14 @@ def scheduler(broker: Broker = None):
if not s.task:
logger.error(
_(
f"{current_process().name} failed to create a task from schedule [{s.name or s.id}]"
)
"%(process_name)s failed to create a task from schedule [%(schedule)s]"
) % {'process_name': current_process().name, 'schedule': s.name or s.id}
)
else:
logger.info(
_(
f"{current_process().name} created a task from schedule [{s.name or s.id}]"
)
"%(process_name)s created a task from schedule [%(schedule)s]"
) % {'process_name': current_process().name, 'schedule': s.name or s.id}
)
# default behavior is to delete a ONCE schedule
if s.schedule_type == s.ONCE:
Expand Down Expand Up @@ -711,12 +715,12 @@ def set_cpu_affinity(n: int, process_ids: list, actual: bool = not Conf.TESTING)
"""
# check if we have the psutil module
if not psutil:
logger.warning("Skipping cpu affinity because psutil was not found.")
logger.warning(_("Skipping cpu affinity because psutil was not found."))
return
# check if the platform supports cpu_affinity
if actual and not hasattr(psutil.Process(process_ids[0]), "cpu_affinity"):
logger.warning(
"Faking cpu affinity because it is not supported on this platform"
_("Faking cpu affinity because it is not supported on this platform")
)
actual = False
# get the available processors
Expand All @@ -737,7 +741,7 @@ def set_cpu_affinity(n: int, process_ids: list, actual: bool = not Conf.TESTING)
p = psutil.Process(pid)
if actual:
p.cpu_affinity(affinity)
logger.info(_(f"{pid} will use cpu {affinity}"))
logger.info(_("%(pid)s will use cpu %(affinity)s") % {'pid': pid, 'affinity': affinity})


def rss_check():
Expand Down
2 changes: 1 addition & 1 deletion django_q/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class Conf:

# Verify SAVE_LIMIT_PER is valid
if SAVE_LIMIT_PER not in ["group", "name", "func", None]:
warn(f"SAVE_LIMIT_PER ({SAVE_LIMIT_PER}) is not a valid option. Options are: 'group', 'name', 'func' and None. Default is None.")
warn(_("SAVE_LIMIT_PER (%(option)s) is not a valid option. Options are: 'group', 'name', 'func' and None. Default is None.") % {'option': SAVE_LIMIT_PER})

# Guard loop sleep in seconds. Should be between 0 and 60 seconds.
GUARD_CYCLE = conf.get("guard_cycle", 0.5)
Expand Down
Binary file removed django_q/locale/de/LC_MESSAGES/django.mo
Binary file not shown.
Loading

0 comments on commit 542e04d

Please sign in to comment.