diff --git a/LICENSE.txt b/LICENSE.txt index dff3604f..ee0ff581 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2014-2023, the pyroSAR Developers. +# Copyright (c) 2014-2024, the pyroSAR Developers. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the diff --git a/pyroSAR/auxdata.py b/pyroSAR/auxdata.py index 59472ae1..422f3426 100644 --- a/pyroSAR/auxdata.py +++ b/pyroSAR/auxdata.py @@ -1,7 +1,7 @@ ############################################################################### # tools for handling auxiliary data in software pyroSAR -# Copyright (c) 2019-2023, the pyroSAR Developers. +# Copyright (c) 2019-2024, the pyroSAR Developers. # This file is part of the pyroSAR Project. It is subject to the # license terms in the LICENSE.txt file found in the top-level @@ -26,6 +26,7 @@ from lxml import etree as ET from packaging import version from pyroSAR.examine import ExamineSnap +from pyroSAR.ancillary import Lock from spatialist.raster import Raster, Dtype from spatialist.vector import bbox from spatialist.ancillary import dissolve, finder @@ -246,7 +247,8 @@ def dem_autoload(geometries, demType, vrt=None, buffer=None, username=None, def dem_create(src, dst, t_srs=None, tr=None, threads=None, geoid_convert=False, geoid='EGM96', nodata=None, - resampleAlg='bilinear', dtype=None, pbar=False, **kwargs): + resampleAlg='bilinear', dtype=None, pbar=False, + lock_timeout=600, **kwargs): """ Create a new DEM GeoTIFF file and optionally convert heights from geoid to ellipsoid. This is basically a convenience wrapper around :func:`osgeo.gdal.Warp` via :func:`spatialist.auxil.gdalwarp`. @@ -298,6 +300,8 @@ def dem_create(src, dst, t_srs=None, tr=None, threads=None, See :class:`spatialist.raster.Dtype`. pbar: bool add a progressbar? + lock_timeout: int + how long to wait to acquire a lock on `dst`? **kwargs additional keyword arguments to be passed to :func:`spatialist.auxil.gdalwarp`. See :func:`osgeo.gdal.WarpOptions` for options. The following arguments cannot @@ -314,6 +318,8 @@ def dem_create(src, dst, t_srs=None, tr=None, threads=None, """ + vrt_check_sources(src) + with Raster(src) as ras: if nodata is None: nodata = ras.nodata @@ -390,20 +396,23 @@ def dem_create(src, dst, t_srs=None, tr=None, threads=None, msg = "argument '{}' cannot be set via kwargs as it is set internally." raise RuntimeError(msg.format(key)) - try: - message = 'creating mosaic' - crs = gdalwarp_args['dstSRS'] - if crs != 'EPSG:4326': - message += ' and reprojecting to {}'.format(crs) - message += ': {}'.format(dst) - log.info(message) - gdalwarp(src=src, dst=dst, pbar=pbar, **gdalwarp_args) - except Exception: - if os.path.isfile(dst): - os.remove(dst) - raise - finally: - gdal.SetConfigOption('GDAL_NUM_THREADS', threads_system) + with Lock(dst, timeout=lock_timeout): + try: + if not os.path.isfile(dst): + message = 'creating mosaic' + crs = gdalwarp_args['dstSRS'] + if crs != 'EPSG:4326': + message += ' and reprojecting to {}'.format(crs) + log.info(f'{message}: {dst}') + gdalwarp(src=src, dst=dst, pbar=pbar, **gdalwarp_args) + else: + log.info(f'mosaic already exists: {dst}') + except Exception: + if os.path.isfile(dst): + os.remove(dst) + raise + finally: + gdal.SetConfigOption('GDAL_NUM_THREADS', threads_system) class DEMHandler: @@ -636,7 +645,7 @@ def __get_resolution(self, dem_type, y): return val @staticmethod - def __retrieve(url, filenames, outdir): + def __retrieve(url, filenames, outdir, lock_timeout=600): # check that base URL is reachable url_parse = urlparse(url) url_base = url_parse.scheme + '://' + url_parse.netloc @@ -651,27 +660,29 @@ def __retrieve(url, filenames, outdir): for i, file in enumerate(files): remote = '{}/{}'.format(url, file) local = os.path.join(outdir, os.path.basename(file)) - if not os.path.isfile(local): - r = requests.get(remote) - # a tile might not exist over ocean - if r.status_code == 404: + with Lock(local, timeout=lock_timeout): + if not os.path.isfile(local): + r = requests.get(remote) + # a tile might not exist over ocean + if r.status_code == 404: + r.close() + continue + msg = '[{i: >{w}}/{n}] {l} <<-- {r}' + log.info(msg.format(i=i + 1, w=len(str(n)), n=n, l=local, r=remote)) + r.raise_for_status() + with open(local, 'wb') as output: + output.write(r.content) r.close() - continue - msg = '[{i: >{w}}/{n}] {l} <<-- {r}' - log.info(msg.format(i=i + 1, w=len(str(n)), n=n, l=local, r=remote)) - r.raise_for_status() - with open(local, 'wb') as output: - output.write(r.content) - r.close() - else: - msg = '[{i: >{w}}/{n}] found local file: {l}' - log.info(msg.format(i=i + 1, w=len(str(n)), n=n, l=local)) + else: + msg = '[{i: >{w}}/{n}] found local file: {l}' + log.info(msg.format(i=i + 1, w=len(str(n)), n=n, l=local)) if os.path.isfile(local): locals.append(local) return sorted(locals) @staticmethod - def __retrieve_ftp(url, filenames, outdir, username, password, port=0, offline=False): + def __retrieve_ftp(url, filenames, outdir, username, password, + port=0, offline=False, lock_timeout=600): files = list(set(filenames)) os.makedirs(outdir, exist_ok=True) @@ -698,25 +709,26 @@ def __retrieve_ftp(url, filenames, outdir, username, password, port=0, offline=F ftp = None locals = [] n = len(files) - for i, product_remote in enumerate(files): - product_local = os.path.join(outdir, os.path.basename(product_remote)) - if not os.path.isfile(product_local) and not offline: - try: - targetlist = ftp.nlst(product_remote) - except ftplib.error_temp: - continue - address = '{}://{}/{}{}'.format(parsed.scheme, parsed.netloc, - parsed.path + '/' if parsed.path != '' else '', - product_remote) - msg = '[{i: >{w}}/{n}] {l} <<-- {r}' - log.info(msg.format(i=i + 1, w=len(str(n)), n=n, l=product_local, r=address)) - with open(product_local, 'wb') as myfile: - ftp.retrbinary('RETR {}'.format(product_remote), myfile.write) - else: - msg = '[{i: >{w}}/{n}] found local file: {l}' - log.info(msg.format(i=i + 1, w=len(str(n)), n=n, l=product_local)) - if os.path.isfile(product_local): - locals.append(product_local) + for i, remote in enumerate(files): + local = os.path.join(outdir, os.path.basename(remote)) + with Lock(local, timeout=lock_timeout): + if not os.path.isfile(local) and not offline: + try: + targetlist = ftp.nlst(remote) + except ftplib.error_temp: + continue + address = '{}://{}/{}{}'.format(parsed.scheme, parsed.netloc, + parsed.path + '/' if parsed.path != '' else '', + remote) + msg = '[{i: >{w}}/{n}] {l} <<-- {r}' + log.info(msg.format(i=i + 1, w=len(str(n)), n=n, l=local, r=address)) + with open(local, 'wb') as myfile: + ftp.retrbinary('RETR {}'.format(remote), myfile.write) + else: + msg = '[{i: >{w}}/{n}] found local file: {l}' + log.info(msg.format(i=i + 1, w=len(str(n)), n=n, l=local)) + if os.path.isfile(local): + locals.append(local) if ftp is not None: ftp.close() return sorted(locals) @@ -949,7 +961,7 @@ def config(self): } def load(self, dem_type, vrt=None, buffer=None, username=None, - password=None, product='dem', crop=True): + password=None, product='dem', crop=True, lock_timeout=600): """ obtain DEM tiles for the given geometries and either return the file names in a list or combine them into a VRT mosaic. The VRT is cropped to the combined extent of the geometries @@ -1044,6 +1056,8 @@ def load(self, dem_type, vrt=None, buffer=None, username=None, or return the full extent of the DEM tiles? In the latter case, the common bounding box of the geometries is expanded so that the coordinates are multiples of the tile size of the respective DEM option. + lock_timeout: int + how long to wait to acquire a lock on downloaded files? Returns ------- @@ -1077,10 +1091,12 @@ def load(self, dem_type, vrt=None, buffer=None, username=None, locals = self.__retrieve_ftp(url=self.config[dem_type]['url'], filenames=candidates, outdir=outdir, username=username, - password=password, port=port) + password=password, port=port, + lock_timeout=lock_timeout) else: locals = self.__retrieve(url=self.config[dem_type]['url'], - filenames=candidates, outdir=outdir) + filenames=candidates, outdir=outdir, + lock_timeout=lock_timeout) resolution = None datatype = None @@ -1486,3 +1502,28 @@ def sock(self, value): if value is not None and not isinstance(value, ssl.SSLSocket): value = self.context.wrap_socket(value) self._sock = value + + +def vrt_check_sources(fname): + """ + check the sanity of all source files of a given VRT. + Currently does not check in-memory VRTs. + + Parameters + ---------- + fname: str + the VRT file name + + Returns + ------- + + Raises + ------ + RuntimeError + """ + if os.path.isfile(fname): + tree = etree.parse(fname) + sources = [x.text for x in tree.findall('.//SourceFilename')] + for source in sources: + if not os.path.isfile(source): + raise RuntimeError(f'missing VRT source file: {source}')