Skip to content

Commit

Permalink
Merge pull request #292 from johntruckenbrodt/feature/auxdata_file_lo…
Browse files Browse the repository at this point in the history
…cking

auxdata: file locking and VRT source checks
  • Loading branch information
johntruckenbrodt authored Apr 15, 2024
2 parents da3b3f7 + 79d62a5 commit 23a7f4b
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 55 deletions.
2 changes: 1 addition & 1 deletion LICENSE.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2014-2023, the pyroSAR Developers.
# Copyright (c) 2014-2024, the pyroSAR Developers.

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
Expand Down
149 changes: 95 additions & 54 deletions pyroSAR/auxdata.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
###############################################################################
# tools for handling auxiliary data in software pyroSAR

# Copyright (c) 2019-2023, the pyroSAR Developers.
# Copyright (c) 2019-2024, the pyroSAR Developers.

# This file is part of the pyroSAR Project. It is subject to the
# license terms in the LICENSE.txt file found in the top-level
Expand All @@ -26,6 +26,7 @@
from lxml import etree as ET
from packaging import version
from pyroSAR.examine import ExamineSnap
from pyroSAR.ancillary import Lock
from spatialist.raster import Raster, Dtype
from spatialist.vector import bbox
from spatialist.ancillary import dissolve, finder
Expand Down Expand Up @@ -246,7 +247,8 @@ def dem_autoload(geometries, demType, vrt=None, buffer=None, username=None,

def dem_create(src, dst, t_srs=None, tr=None, threads=None,
geoid_convert=False, geoid='EGM96', nodata=None,
resampleAlg='bilinear', dtype=None, pbar=False, **kwargs):
resampleAlg='bilinear', dtype=None, pbar=False,
lock_timeout=600, **kwargs):
"""
Create a new DEM GeoTIFF file and optionally convert heights from geoid to ellipsoid.
This is basically a convenience wrapper around :func:`osgeo.gdal.Warp` via :func:`spatialist.auxil.gdalwarp`.
Expand Down Expand Up @@ -298,6 +300,8 @@ def dem_create(src, dst, t_srs=None, tr=None, threads=None,
See :class:`spatialist.raster.Dtype`.
pbar: bool
add a progressbar?
lock_timeout: int
how long to wait to acquire a lock on `dst`?
**kwargs
additional keyword arguments to be passed to :func:`spatialist.auxil.gdalwarp`.
See :func:`osgeo.gdal.WarpOptions` for options. The following arguments cannot
Expand All @@ -314,6 +318,8 @@ def dem_create(src, dst, t_srs=None, tr=None, threads=None,
"""

vrt_check_sources(src)

with Raster(src) as ras:
if nodata is None:
nodata = ras.nodata
Expand Down Expand Up @@ -390,20 +396,23 @@ def dem_create(src, dst, t_srs=None, tr=None, threads=None,
msg = "argument '{}' cannot be set via kwargs as it is set internally."
raise RuntimeError(msg.format(key))

try:
message = 'creating mosaic'
crs = gdalwarp_args['dstSRS']
if crs != 'EPSG:4326':
message += ' and reprojecting to {}'.format(crs)
message += ': {}'.format(dst)
log.info(message)
gdalwarp(src=src, dst=dst, pbar=pbar, **gdalwarp_args)
except Exception:
if os.path.isfile(dst):
os.remove(dst)
raise
finally:
gdal.SetConfigOption('GDAL_NUM_THREADS', threads_system)
with Lock(dst, timeout=lock_timeout):
try:
if not os.path.isfile(dst):
message = 'creating mosaic'
crs = gdalwarp_args['dstSRS']
if crs != 'EPSG:4326':
message += ' and reprojecting to {}'.format(crs)
log.info(f'{message}: {dst}')
gdalwarp(src=src, dst=dst, pbar=pbar, **gdalwarp_args)
else:
log.info(f'mosaic already exists: {dst}')
except Exception:
if os.path.isfile(dst):
os.remove(dst)
raise
finally:
gdal.SetConfigOption('GDAL_NUM_THREADS', threads_system)


class DEMHandler:
Expand Down Expand Up @@ -636,7 +645,7 @@ def __get_resolution(self, dem_type, y):
return val

@staticmethod
def __retrieve(url, filenames, outdir):
def __retrieve(url, filenames, outdir, lock_timeout=600):
# check that base URL is reachable
url_parse = urlparse(url)
url_base = url_parse.scheme + '://' + url_parse.netloc
Expand All @@ -651,27 +660,29 @@ def __retrieve(url, filenames, outdir):
for i, file in enumerate(files):
remote = '{}/{}'.format(url, file)
local = os.path.join(outdir, os.path.basename(file))
if not os.path.isfile(local):
r = requests.get(remote)
# a tile might not exist over ocean
if r.status_code == 404:
with Lock(local, timeout=lock_timeout):
if not os.path.isfile(local):
r = requests.get(remote)
# a tile might not exist over ocean
if r.status_code == 404:
r.close()
continue
msg = '[{i: >{w}}/{n}] {l} <<-- {r}'
log.info(msg.format(i=i + 1, w=len(str(n)), n=n, l=local, r=remote))
r.raise_for_status()
with open(local, 'wb') as output:
output.write(r.content)
r.close()
continue
msg = '[{i: >{w}}/{n}] {l} <<-- {r}'
log.info(msg.format(i=i + 1, w=len(str(n)), n=n, l=local, r=remote))
r.raise_for_status()
with open(local, 'wb') as output:
output.write(r.content)
r.close()
else:
msg = '[{i: >{w}}/{n}] found local file: {l}'
log.info(msg.format(i=i + 1, w=len(str(n)), n=n, l=local))
else:
msg = '[{i: >{w}}/{n}] found local file: {l}'
log.info(msg.format(i=i + 1, w=len(str(n)), n=n, l=local))
if os.path.isfile(local):
locals.append(local)
return sorted(locals)

@staticmethod
def __retrieve_ftp(url, filenames, outdir, username, password, port=0, offline=False):
def __retrieve_ftp(url, filenames, outdir, username, password,
port=0, offline=False, lock_timeout=600):
files = list(set(filenames))
os.makedirs(outdir, exist_ok=True)

Expand All @@ -698,25 +709,26 @@ def __retrieve_ftp(url, filenames, outdir, username, password, port=0, offline=F
ftp = None
locals = []
n = len(files)
for i, product_remote in enumerate(files):
product_local = os.path.join(outdir, os.path.basename(product_remote))
if not os.path.isfile(product_local) and not offline:
try:
targetlist = ftp.nlst(product_remote)
except ftplib.error_temp:
continue
address = '{}://{}/{}{}'.format(parsed.scheme, parsed.netloc,
parsed.path + '/' if parsed.path != '' else '',
product_remote)
msg = '[{i: >{w}}/{n}] {l} <<-- {r}'
log.info(msg.format(i=i + 1, w=len(str(n)), n=n, l=product_local, r=address))
with open(product_local, 'wb') as myfile:
ftp.retrbinary('RETR {}'.format(product_remote), myfile.write)
else:
msg = '[{i: >{w}}/{n}] found local file: {l}'
log.info(msg.format(i=i + 1, w=len(str(n)), n=n, l=product_local))
if os.path.isfile(product_local):
locals.append(product_local)
for i, remote in enumerate(files):
local = os.path.join(outdir, os.path.basename(remote))
with Lock(local, timeout=lock_timeout):
if not os.path.isfile(local) and not offline:
try:
targetlist = ftp.nlst(remote)
except ftplib.error_temp:
continue
address = '{}://{}/{}{}'.format(parsed.scheme, parsed.netloc,
parsed.path + '/' if parsed.path != '' else '',
remote)
msg = '[{i: >{w}}/{n}] {l} <<-- {r}'
log.info(msg.format(i=i + 1, w=len(str(n)), n=n, l=local, r=address))
with open(local, 'wb') as myfile:
ftp.retrbinary('RETR {}'.format(remote), myfile.write)
else:
msg = '[{i: >{w}}/{n}] found local file: {l}'
log.info(msg.format(i=i + 1, w=len(str(n)), n=n, l=local))
if os.path.isfile(local):
locals.append(local)
if ftp is not None:
ftp.close()
return sorted(locals)
Expand Down Expand Up @@ -949,7 +961,7 @@ def config(self):
}

def load(self, dem_type, vrt=None, buffer=None, username=None,
password=None, product='dem', crop=True):
password=None, product='dem', crop=True, lock_timeout=600):
"""
obtain DEM tiles for the given geometries and either return the file names in a list
or combine them into a VRT mosaic. The VRT is cropped to the combined extent of the geometries
Expand Down Expand Up @@ -1044,6 +1056,8 @@ def load(self, dem_type, vrt=None, buffer=None, username=None,
or return the full extent of the DEM tiles? In the latter case, the common
bounding box of the geometries is expanded so that the coordinates are
multiples of the tile size of the respective DEM option.
lock_timeout: int
how long to wait to acquire a lock on downloaded files?
Returns
-------
Expand Down Expand Up @@ -1077,10 +1091,12 @@ def load(self, dem_type, vrt=None, buffer=None, username=None,
locals = self.__retrieve_ftp(url=self.config[dem_type]['url'],
filenames=candidates,
outdir=outdir, username=username,
password=password, port=port)
password=password, port=port,
lock_timeout=lock_timeout)
else:
locals = self.__retrieve(url=self.config[dem_type]['url'],
filenames=candidates, outdir=outdir)
filenames=candidates, outdir=outdir,
lock_timeout=lock_timeout)

resolution = None
datatype = None
Expand Down Expand Up @@ -1486,3 +1502,28 @@ def sock(self, value):
if value is not None and not isinstance(value, ssl.SSLSocket):
value = self.context.wrap_socket(value)
self._sock = value


def vrt_check_sources(fname):
"""
check the sanity of all source files of a given VRT.
Currently does not check in-memory VRTs.
Parameters
----------
fname: str
the VRT file name
Returns
-------
Raises
------
RuntimeError
"""
if os.path.isfile(fname):
tree = etree.parse(fname)
sources = [x.text for x in tree.findall('.//SourceFilename')]
for source in sources:
if not os.path.isfile(source):
raise RuntimeError(f'missing VRT source file: {source}')

0 comments on commit 23a7f4b

Please sign in to comment.