Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Successful RQ job execution does not get removed from StartedJobRegistry #692

Open
choo0166 opened this issue Jan 22, 2025 · 1 comment
Open

Comments

@choo0166
Copy link

I have defined a job using the job decorator syntax like:

@job("calcium", timeout=3600, on_success=report_success, on_failure=report_failure)
def my_task(payload):
    start = time.perf_counter()
    response = requests.post(
        url,
        headers={
            "Content-Type": "application/json",
        },
        json=payload,
    )
    response.raise_for_status()
    end = time.perf_counter()

    response_body = response.json()

    return response_body

I have also defined the success callback handler:

def report_success(job, connection, result, *args, **kwargs):
    try:
        analysis = Analysis.objects.get(id=job.id)
        analysis.status = Analysis.Status.COMPLETED
        if "prediction" in result:
            PredictionResult.objects.create(
                analysis=analysis, prediction=result["prediction"]
            )
        if "segmentation" in result:
            # handle decoding and storing of base64 encoded segmentation masks
            for tp, (name, data) in result["segmentation"].items():
                # decode base64 string into binary and save to filefield
                f = ContentFile(content=base64.b64decode(data), name=name)
                SegmentationResult.objects.create(analysis=analysis, segmentation_mask=f, mask_type=tp)
        if "artifacts" in result:
            # handle decoding and storing of intermediate model artifacts
            for tp, (name, data) in result["artifacts"].items():
                f = ContentFile(content=base64.b64decode(data), name=name)
                AnalysisArtifact.objects.create(analysis=analysis, artifact=f, artifact_type=tp)
        analysis.save()
        logger.info(f"Analysis {analysis.id} processed successfully")
    except Analysis.DoesNotExist:
        logger.error(f"Analysis with job id {job.id} not found")

Basically, it handles saving of base64 encoded data into the respective Django model filefields.

When the job execution completes, RQ fails to remove the job from the StartedJobRegistry:

ai_worker-1     | Analysis 8df26860-bdcc-49da-a537-43088e7bddca processed successfully
ai_worker-1     | Handling successful execution of job 8df26860-bdcc-49da-a537-43088e7bddca
ai_worker-1     | Saving job 8df26860-bdcc-49da-a537-43088e7bddca's successful execution result
ai_worker-1     | Removing job 8df26860-bdcc-49da-a537-43088e7bddca from StartedJobRegistry
ai_worker-1     | Saving job 8df26860-bdcc-49da-a537-43088e7bddca's successful execution result
ai_worker-1     | Sent heartbeat to prevent worker timeout. Next one should arrive in 90 seconds.
ai_worker-1     | Removing job 8df26860-bdcc-49da-a537-43088e7bddca from StartedJobRegistry
ai_worker-1     | Saving job 8df26860-bdcc-49da-a537-43088e7bddca's successful execution result

The last two log lines basically repeat themselves till the job timeout is reached.

Any ideas to debug this issue?

@choo0166
Copy link
Author

I've found that it was the size of the result json that was too large to be saved in redis, setting result_ttl=0 in the job decorator is a workaround for this issue. Is there any way to specify an option to not save the results in redis, but save the job execution detail instead?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant