Skip to content

Commit

Permalink
Celery task to embed new contentfiles (#2044)
Browse files Browse the repository at this point in the history
* adding initial task to embed new contentfiles

* adding test

* adding celery task to periodically pick up contentfiles
  • Loading branch information
shanbady authored Feb 14, 2025
1 parent 8b57ae8 commit 46ef314
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 7 deletions.
13 changes: 7 additions & 6 deletions main/settings_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,6 @@
"schedule": crontab(minute=30, hour=18), # 2:30pm EST
"kwargs": {"period": "daily", "subscription_type": "channel_subscription_type"},
},
"daily_embed_new_learning_resources": {
"task": "vector_search.tasks.embed_new_learning_resources",
"schedule": get_int(
"EMBED_NEW_RESOURCES_SCHEDULE_SECONDS", 60 * 30
), # default is every 30 minutes
},
"send-search-subscription-emails-every-1-days": {
"task": "learning_resources_search.tasks.send_subscription_emails",
"schedule": crontab(minute=0, hour=19), # 3:00pm EST
Expand All @@ -156,6 +150,13 @@
"EMBED_NEW_RESOURCES_SCHEDULE_SECONDS", 60 * 30
), # default is every 30 minutes
}
CELERY_BEAT_SCHEDULE["daily_embed_new_content_files"] = {
"task": "vector_search.tasks.embed_new_content_files",
"schedule": get_int(
"EMBED_NEW_CONTENT_FILES_SCHEDULE_SECONDS", 60 * 30
), # default is every 30 minutes
}


CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
Expand Down
27 changes: 27 additions & 0 deletions vector_search/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,30 @@ def embed_new_learning_resources(self):
embed_tasks = celery.group(tasks)

return self.replace(embed_tasks)


@app.task(bind=True)
def embed_new_content_files(self):
"""
Embed new content files from the last day
"""
log.info("Running content file embedding task")
delta = datetime.timedelta(days=1)
since = now_in_utc() - delta
new_content_files = ContentFile.objects.filter(
published=True,
created_on__gt=since,
run__published=True,
)

tasks = [
generate_embeddings.si(ids, CONTENT_FILE_TYPE, overwrite=False)
for ids in chunks(
new_content_files.values_list("id", flat=True),
chunk_size=settings.QDRANT_CHUNK_SIZE,
)
]

embed_tasks = celery.group(tasks)

return self.replace(embed_tasks)
39 changes: 38 additions & 1 deletion vector_search/tasks_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
LearningResourceRunFactory,
ProgramFactory,
)
from learning_resources.models import LearningResource
from learning_resources.models import ContentFile, LearningResource
from learning_resources_search.constants import (
COURSE_TYPE,
)
from main.utils import now_in_utc
from vector_search.tasks import (
embed_learning_resources_by_id,
embed_new_content_files,
embed_new_learning_resources,
start_embed_resources,
)
Expand Down Expand Up @@ -149,6 +150,42 @@ def test_embed_new_learning_resources(mocker, mocked_celery):
assert sorted(daily_resource_ids) == sorted(embedded_ids)


def test_embed_new_content_files(mocker, mocked_celery):
"""
embed_new_content_files should generate embeddings for new content files
created within the last day
"""
mocker.patch("vector_search.tasks.load_course_blocklist", return_value=[])

daily_since = now_in_utc() - datetime.timedelta(hours=5)

ContentFileFactory.create_batch(4, created_on=daily_since, published=True)
# create resources older than a day
ContentFileFactory.create_batch(
4,
created_on=now_in_utc() - datetime.timedelta(days=5),
published=True,
)

daily_content_file_ids = [
resource.id
for resource in ContentFile.objects.filter(
created_on__gt=now_in_utc() - datetime.timedelta(days=1)
)
]

generate_embeddings_mock = mocker.patch(
"vector_search.tasks.generate_embeddings", autospec=True
)

with pytest.raises(mocked_celery.replace_exception_class):
embed_new_content_files.delay()
list(mocked_celery.group.call_args[0][0])

embedded_ids = generate_embeddings_mock.si.mock_calls[0].args[0]
assert sorted(daily_content_file_ids) == sorted(embedded_ids)


def test_embed_learning_resources_by_id(mocker, mocked_celery):
"""
embed_learning_resources_by_id should generate embeddings for resources
Expand Down

0 comments on commit 46ef314

Please sign in to comment.