Skip to content

Commit

Permalink
Simplify internal indexing (#652)
Browse files Browse the repository at this point in the history
* Simplify _index function.

* Refactor _build_index for clarity/speed.

* Remove Project._sp_index and Project._index_cache.

* Remove Project._index.

* Avoid unnecessary data copying when updating Collection indexes.

* Revert optimizations to Collection.

* Inline include_job_document.
  • Loading branch information
bdice authored and vyasr committed May 2, 2022
1 parent fff47ed commit f97ab4c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 99 deletions.
123 changes: 27 additions & 96 deletions signac/contrib/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ def __init__(self, config=None):
)
raise

# Internal caches
self._index_cache = {}
# Internal state point cache
# Note that the state point cache is a superset of the jobs in the
# project, and its contents cannot be invalidated. The cached mapping
# of "id: statepoint" is valid even after a job has been removed, and
Expand Down Expand Up @@ -678,7 +677,7 @@ def detect_schema(self, exclude_const=False, subset=None):
"""
from .schema import _build_job_statepoint_index

index = self._index(include_job_document=False)
index = self._build_index(include_job_document=False)
if subset is not None:
subset = {str(s) for s in subset}
index = [doc for doc in index if doc["_id"] in subset]
Expand Down Expand Up @@ -720,10 +719,9 @@ def _find_job_ids(self, filter=None):
if not filter:
return list(self._job_dirs())
filter = dict(parse_filter(_add_prefix("sp.", filter)))
if "doc" in _root_keys(filter):
index = self._index(include_job_document=True)
else:
index = self._sp_index()
index = list(
self._build_index(include_job_document="doc" in _root_keys(filter))
)
return list(Collection(index, _trust=True)._find(filter))

def find_jobs(self, filter=None, *args, **kwargs):
Expand Down Expand Up @@ -1339,24 +1337,6 @@ def repair(self, job_ids=None):
if corrupted:
raise JobsCorruptedError(corrupted)

def _sp_index(self):
"""Update and return the state point index cache.
Returns
-------
dict
Dictionary containing ids and state points in the cache.
"""
job_ids = set(self._job_dirs())
to_add = job_ids.difference(self._index_cache)
to_remove = set(self._index_cache).difference(job_ids)
for _id in to_remove:
del self._index_cache[_id]
for _id in to_add:
self._index_cache[_id] = dict(sp=self._get_statepoint(_id), _id=_id)
return self._index_cache.values()

def _build_index(self, include_job_document=False):
"""Generate a basic state point index.
Expand All @@ -1366,22 +1346,30 @@ def _build_index(self, include_job_document=False):
Whether to include the job document in the index (Default value =
False).
Yields
------
dict
Dictionary with keys ``_id`` containing the job id, ``sp``
containing the state point, and ``doc`` containing the job document
if requested.
"""
wd = self.workspace if self.Job is Job else None
for _id in self._find_job_ids():
doc = dict(_id=_id, sp=self._get_statepoint(_id))
for job_id in self._find_job_ids():
doc = dict(_id=job_id, sp=self._get_statepoint(job_id))
if include_job_document:
if wd is None:
doc["doc"] = self.open_job(id=_id).document
else: # use optimized path
try:
with open(
os.path.join(wd, _id, self.Job.FN_DOCUMENT), "rb"
) as file:
doc["doc"] = json.loads(file.read().decode())
except OSError as error:
if error.errno != errno.ENOENT:
raise
try:
# Performance-critical path. We can rely on the project
# workspace, job id, and document file name to be
# well-formed, so just use str.join with os.sep instead of
# os.path.join for speed.
fn_document = os.sep.join(
(self.workspace, job_id, self.Job.FN_DOCUMENT)
)
with open(fn_document, "rb") as file:
doc["doc"] = json.loads(file.read().decode())
except OSError as error:
if error.errno != errno.ENOENT:
raise
yield doc

def _update_in_memory_cache(self):
Expand Down Expand Up @@ -1476,63 +1464,6 @@ def _read_cache(self):
logger.debug(f"Read cache in {delta:.3f} seconds.")
return cache

def _index(self, *, include_job_document=True):
r"""Generate an index of the project's workspace.
This generator function indexes every file in the project's
workspace until the specified `depth`.
The job document if it exists, is always indexed, other
files need to be specified with the formats argument.
See :ref:`signac project -i <signac-cli-project>` for the command line equivalent.
Parameters
----------
formats : str, dict
The format definitions as a pattern string (e.g. ``r'.*\.txt'``)
or a mapping from pattern strings to formats (e.g.
``'TextFile'``). If None, only the job document is indexed
(Default value = None).
depth : int
Specifies the crawling depth. A value of 0 means no limit
(Default value = 0).
skip_errors : bool
Skip all errors which occur during indexing. This is useful when
trying to repair a broken workspace (Default value = False).
include_job_document : bool
Include the contents of job documents (Default value = True).
Yields
------
dict
Index document.
"""
root = self.workspace

def _full_doc(doc):
"""Add `signac_id` and `root` to the index document.
Parameters
----------
doc : dict
Index document.
Returns
-------
dict
Modified index document.
"""
doc["signac_id"] = doc["_id"]
doc["root"] = root
return doc

docs = self._build_index(include_job_document=include_job_document)
docs = map(_full_doc, docs)
for doc in docs:
yield doc

@contextmanager
def temporary_project(self, name=None, dir=None):
"""Context manager for the initialization of a temporary project.
Expand Down
6 changes: 3 additions & 3 deletions tests/test_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,15 +669,15 @@ def test_repair_corrupted_workspace(self):
logging.disable(logging.NOTSET)

def test_index(self):
docs = list(self.project._index(include_job_document=True))
docs = list(self.project._build_index(include_job_document=True))
assert len(docs) == 0
docs = list(self.project._index(include_job_document=False))
docs = list(self.project._build_index(include_job_document=False))
assert len(docs) == 0
statepoints = [{"a": i} for i in range(5)]
for sp in statepoints:
self.project.open_job(sp).document["test"] = True
job_ids = {job.id for job in self.project.find_jobs()}
docs = list(self.project._index())
docs = list(self.project._build_index())
job_ids_cmp = {doc["_id"] for doc in docs}
assert job_ids == job_ids_cmp
assert len(docs) == len(statepoints)
Expand Down

0 comments on commit f97ab4c

Please sign in to comment.