Skip to content

Commit

Permalink
Fix excess memory usage in django upload (#726)
Browse files Browse the repository at this point in the history
* Add get_memory_usage function

* Rewrite memory usage functions

* Added memory usage logging to finalise.py

* Fix associations memory leak

* Revert "Added memory usage logging to finalise.py"

This reverts commit 656ff1b.

* Revert "Rewrite memory usage functions"

This reverts commit bfd581d.

* Revert "Add get_memory_usage function"

This reverts commit bbae979.

* Remove hardcoded batch size

* Updated changelog
  • Loading branch information
ddobie authored Jul 12, 2024
1 parent a6a7c91 commit 32230f7
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 5 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### Changed

- Use chunks for associations upload [#726](https://github.com/askap-vast/vast-pipeline/pull/726)
- Updated all FITS loading to use a wrapper that can handle compressed FITS files [#694](https://github.com/askap-vast/vast-pipeline/pull/694)
- Downgrade ci-docs to python 3.8 [#702](https://github.com/askap-vast/vast-pipeline/pull/702)
- Update Gr1N poetry to v8, force python 3.8.10 [#701](https://github.com/askap-vast/vast-pipeline/pull/701)
Expand Down Expand Up @@ -77,6 +78,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### Fixed

- Fix memory leak in model upload [#726](https://github.com/askap-vast/vast-pipeline/pull/726)
- Implemented sensible dask dataframe partitioning [#724](https://github.com/askap-vast/vast-pipeline/pull/724)
- Fixed outdated `runpipeline` section on CLI docs page [#685](https://github.com/askap-vast/vast-pipeline/pull/685).
- Fixed link to JupyterHub [#676](https://github.com/askap-vast/vast-pipeline/pull/676).
Expand Down Expand Up @@ -116,6 +118,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### List of PRs

- [#726](https://github.com/askap-vast/vast-pipeline/pull/726): fix: Fix memory leak in model upload and use chunks for associations upload
- [#724](https://github.com/askap-vast/vast-pipeline/pull/724): fix, feat: Implemented sensible dask dataframe partitioning
- [#694](https://github.com/askap-vast/vast-pipeline/pull/694): feat: Handle compressed fits files.
- [#702](https://github.com/askap-vast/vast-pipeline/pull/702): fix: Downgrade ci-docs to python 3.8.
Expand Down
16 changes: 11 additions & 5 deletions vast_pipeline/pipeline/loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from typing import List, Optional, Dict, Tuple, Generator, Iterable
from itertools import islice
from django.db import transaction, connection, models
from django.db import transaction, connection, models, reset_queries

from vast_pipeline.image.main import SelavyImage
from vast_pipeline.pipeline.model_generator import (
Expand All @@ -29,7 +29,8 @@
def bulk_upload_model(
djmodel: models.Model,
generator: Iterable[Generator[models.Model, None, None]],
batch_size: int=10_000, return_ids: bool=False
batch_size: int=10_000,
return_ids: bool=False,
) -> List[int]:
'''
Bulk upload a list of generator objects of django models to db.
Expand All @@ -49,6 +50,8 @@ def bulk_upload_model(
None or a list of the database IDs of the uploaded objects.
'''
reset_queries()

bulk_ids = []
while True:
items = list(islice(generator, batch_size))
Expand Down Expand Up @@ -220,9 +223,12 @@ def make_upload_associations(associations_df: pd.DataFrame) -> None:
None.
"""
logger.info('Upload associations...')
bulk_upload_model(
Association, association_models_generator(associations_df)
)
assoc_chunk_size = 100000
for i in range(0,len(associations_df),assoc_chunk_size):
bulk_upload_model(
Association,
association_models_generator(associations_df[i:i+assoc_chunk_size])
)


def make_upload_measurements(measurements_df: pd.DataFrame) -> pd.DataFrame:
Expand Down

0 comments on commit 32230f7

Please sign in to comment.