diff --git a/galvani/BioLogic.py b/galvani/BioLogic.py index acddd3a..44b1d07 100644 --- a/galvani/BioLogic.py +++ b/galvani/BioLogic.py @@ -5,7 +5,7 @@ # # SPDX-License-Identifier: GPL-3.0-or-later -__all__ = ['MPTfileCSV', 'MPTfile'] +__all__ = ["MPTfileCSV", "MPTfile"] import re import csv @@ -21,19 +21,44 @@ def fieldname_to_dtype(fieldname): """Converts a column header from the MPT file into a tuple of canonical name and appropriate numpy dtype""" - if fieldname == 'mode': - return ('mode', np.uint8) - elif fieldname in ("ox/red", "error", "control changes", "Ns changes", - "counter inc."): + if fieldname == "mode": + return ("mode", np.uint8) + elif fieldname in ( + "ox/red", + "error", + "control changes", + "Ns changes", + "counter inc.", + ): return (fieldname, np.bool_) - elif fieldname in ("time/s", "P/W", "(Q-Qo)/mA.h", "x", "control/V", - "control/mA", "control/V/mA", "(Q-Qo)/C", "dQ/C", - "freq/Hz", "|Ewe|/V", "|I|/A", "Phase(Z)/deg", - "|Z|/Ohm", "Re(Z)/Ohm", "-Im(Z)/Ohm"): + elif fieldname in ( + "time/s", + "P/W", + "(Q-Qo)/mA.h", + "x", + "control/V", + "control/mA", + "control/V/mA", + "(Q-Qo)/C", + "dQ/C", + "freq/Hz", + "|Ewe|/V", + "|I|/A", + "Phase(Z)/deg", + "|Z|/Ohm", + "Re(Z)/Ohm", + "-Im(Z)/Ohm", + ): return (fieldname, np.float_) - elif fieldname in ("Q charge/discharge/mA.h", "step time/s", - "Q charge/mA.h", "Q discharge/mA.h", - "Temperature/°C", "Efficiency/%", "Capacity/mA.h"): + elif fieldname in ( + "Q charge/discharge/mA.h", + "step time/s", + "Q charge/mA.h", + "Q discharge/mA.h", + "Temperature/°C", + "Efficiency/%", + "Capacity/mA.h", + ): return (fieldname, np.float_) elif fieldname in ("cycle number", "I Range", "Ns", "half cycle"): return (fieldname, np.int_) @@ -43,12 +68,28 @@ def fieldname_to_dtype(fieldname): return ("I/mA", np.float_) elif fieldname in ("Ewe/V", "/V", "Ecell/V"): return ("Ewe/V", np.float_) - elif fieldname.endswith(("/s", "/Hz", "/deg", - "/W", "/mW", "/W.h", "/mW.h", - "/A", "/mA", "/A.h", "/mA.h", - "/V", "/mV", - "/F", "/mF", "/uF", - "/C", "/Ohm",)): + elif fieldname.endswith( + ( + "/s", + "/Hz", + "/deg", + "/W", + "/mW", + "/W.h", + "/mW.h", + "/A", + "/mA", + "/A.h", + "/mA.h", + "/V", + "/mV", + "/F", + "/mF", + "/uF", + "/C", + "/Ohm", + ) + ): return (fieldname, np.float_) else: raise ValueError("Invalid column header: %s" % fieldname) @@ -56,11 +97,11 @@ def fieldname_to_dtype(fieldname): def comma_converter(float_text): """Convert text to float whether the decimal point is '.' or ','""" - trans_table = bytes.maketrans(b',', b'.') + trans_table = bytes.maketrans(b",", b".") return float(float_text.translate(trans_table)) -def MPTfile(file_or_path, encoding='ascii'): +def MPTfile(file_or_path, encoding="ascii"): """Opens .mpt files as numpy record arrays Checks for the correct headings, skips any comments and returns a @@ -68,16 +109,15 @@ def MPTfile(file_or_path, encoding='ascii'): """ if isinstance(file_or_path, str): - mpt_file = open(file_or_path, 'rb') + mpt_file = open(file_or_path, "rb") else: mpt_file = file_or_path magic = next(mpt_file) - if magic not in (b'EC-Lab ASCII FILE\r\n', b'BT-Lab ASCII FILE\r\n'): + if magic not in (b"EC-Lab ASCII FILE\r\n", b"BT-Lab ASCII FILE\r\n"): raise ValueError("Bad first line for EC-Lab file: '%s'" % magic) - nb_headers_match = re.match(rb'Nb header lines : (\d+)\s*$', - next(mpt_file)) + nb_headers_match = re.match(rb"Nb header lines : (\d+)\s*$", next(mpt_file)) nb_headers = int(nb_headers_match.group(1)) if nb_headers < 3: raise ValueError("Too few header lines: %d" % nb_headers) @@ -86,14 +126,12 @@ def MPTfile(file_or_path, encoding='ascii'): # make three lines. Every additional line is a comment line. comments = [next(mpt_file) for i in range(nb_headers - 3)] - fieldnames = next(mpt_file).decode(encoding).strip().split('\t') + fieldnames = next(mpt_file).decode(encoding).strip().split("\t") record_type = np.dtype(list(map(fieldname_to_dtype, fieldnames))) # Must be able to parse files where commas are used for decimal points - converter_dict = dict(((i, comma_converter) - for i in range(len(fieldnames)))) - mpt_array = np.loadtxt(mpt_file, dtype=record_type, - converters=converter_dict) + converter_dict = dict(((i, comma_converter) for i in range(len(fieldnames)))) + mpt_array = np.loadtxt(mpt_file, dtype=record_type, converters=converter_dict) return mpt_array, comments @@ -106,15 +144,15 @@ def MPTfileCSV(file_or_path): """ if isinstance(file_or_path, str): - mpt_file = open(file_or_path, 'r') + mpt_file = open(file_or_path, "r") else: mpt_file = file_or_path magic = next(mpt_file) - if magic.rstrip() != 'EC-Lab ASCII FILE': + if magic.rstrip() != "EC-Lab ASCII FILE": raise ValueError("Bad first line for EC-Lab file: '%s'" % magic) - nb_headers_match = re.match(r'Nb header lines : (\d+)\s*$', next(mpt_file)) + nb_headers_match = re.match(r"Nb header lines : (\d+)\s*$", next(mpt_file)) nb_headers = int(nb_headers_match.group(1)) if nb_headers < 3: raise ValueError("Too few header lines: %d" % nb_headers) @@ -123,154 +161,206 @@ def MPTfileCSV(file_or_path): # make three lines. Every additional line is a comment line. comments = [next(mpt_file) for i in range(nb_headers - 3)] - mpt_csv = csv.DictReader(mpt_file, dialect='excel-tab') + mpt_csv = csv.DictReader(mpt_file, dialect="excel-tab") expected_fieldnames = ( - ["mode", "ox/red", "error", "control changes", "Ns changes", - "counter inc.", "time/s", "control/V/mA", "Ewe/V", "dq/mA.h", - "P/W", "/mA", "(Q-Qo)/mA.h", "x"], - ['mode', 'ox/red', 'error', 'control changes', 'Ns changes', - 'counter inc.', 'time/s', 'control/V', 'Ewe/V', 'dq/mA.h', - '/mA', '(Q-Qo)/mA.h', 'x'], - ["mode", "ox/red", "error", "control changes", "Ns changes", - "counter inc.", "time/s", "control/V", "Ewe/V", "I/mA", - "dQ/mA.h", "P/W"], - ["mode", "ox/red", "error", "control changes", "Ns changes", - "counter inc.", "time/s", "control/V", "Ewe/V", "/mA", - "dQ/mA.h", "P/W"]) + [ + "mode", + "ox/red", + "error", + "control changes", + "Ns changes", + "counter inc.", + "time/s", + "control/V/mA", + "Ewe/V", + "dq/mA.h", + "P/W", + "/mA", + "(Q-Qo)/mA.h", + "x", + ], + [ + "mode", + "ox/red", + "error", + "control changes", + "Ns changes", + "counter inc.", + "time/s", + "control/V", + "Ewe/V", + "dq/mA.h", + "/mA", + "(Q-Qo)/mA.h", + "x", + ], + [ + "mode", + "ox/red", + "error", + "control changes", + "Ns changes", + "counter inc.", + "time/s", + "control/V", + "Ewe/V", + "I/mA", + "dQ/mA.h", + "P/W", + ], + [ + "mode", + "ox/red", + "error", + "control changes", + "Ns changes", + "counter inc.", + "time/s", + "control/V", + "Ewe/V", + "/mA", + "dQ/mA.h", + "P/W", + ], + ) if mpt_csv.fieldnames not in expected_fieldnames: raise ValueError("Unrecognised headers for MPT file format") return mpt_csv, comments -VMPmodule_hdr = np.dtype([('shortname', 'S10'), - ('longname', 'S25'), - ('length', ' ?? - 9: ('Ece/V', '/mA', '/V', '/V', '/V', '/V', '/V', '/V', ' ?? + 9: ("Ece/V", "/mA", "/V", "/V", "/V", "/V", "/V", "/V", " 1: - unique_field_name = '%s %d' % (field_name, count) + unique_field_name = "%s %d" % (field_name, count) else: unique_field_name = field_name type_list.append((unique_field_name, field_type)) else: - raise NotImplementedError("Column ID {cid} after column {prev} " - "is unknown" - .format(cid=colID, - prev=type_list[-1][0])) + raise NotImplementedError( + "Column ID {cid} after column {prev} " + "is unknown".format(cid=colID, prev=type_list[-1][0]) + ) return np.dtype(type_list), flags_dict @@ -341,12 +433,13 @@ def read_VMP_modules(fileobj, read_module_data=True): N.B. the offset yielded is the offset to the start of the data i.e. after the end of the header. The data runs from (offset) to (offset+length)""" while True: - module_magic = fileobj.read(len(b'MODULE')) + module_magic = fileobj.read(len(b"MODULE")) if len(module_magic) == 0: # end of file break - elif module_magic != b'MODULE': - raise ValueError("Found %r, expecting start of new VMP MODULE" - % module_magic) + elif module_magic != b"MODULE": + raise ValueError( + "Found %r, expecting start of new VMP MODULE" % module_magic + ) hdr_bytes = fileobj.read(VMPmodule_hdr.itemsize) if len(hdr_bytes) < VMPmodule_hdr.itemsize: @@ -354,23 +447,24 @@ def read_VMP_modules(fileobj, read_module_data=True): hdr = np.frombuffer(hdr_bytes, dtype=VMPmodule_hdr, count=1) hdr_dict = dict(((n, hdr[n][0]) for n in VMPmodule_hdr.names)) - hdr_dict['offset'] = fileobj.tell() + hdr_dict["offset"] = fileobj.tell() if read_module_data: - hdr_dict['data'] = fileobj.read(hdr_dict['length']) - if len(hdr_dict['data']) != hdr_dict['length']: - raise IOError("""Unexpected end of file while reading data + hdr_dict["data"] = fileobj.read(hdr_dict["length"]) + if len(hdr_dict["data"]) != hdr_dict["length"]: + raise IOError( + """Unexpected end of file while reading data current module: %s length read: %d - length expected: %d""" % (hdr_dict['longname'], - len(hdr_dict['data']), - hdr_dict['length'])) + length expected: %d""" + % (hdr_dict["longname"], len(hdr_dict["data"]), hdr_dict["length"]) + ) yield hdr_dict else: yield hdr_dict - fileobj.seek(hdr_dict['offset'] + hdr_dict['length'], SEEK_SET) + fileobj.seek(hdr_dict["offset"] + hdr_dict["length"], SEEK_SET) -MPR_MAGIC = b'BIO-LOGIC MODULAR FILE\x1a'.ljust(48) + b'\x00\x00\x00\x00' +MPR_MAGIC = b"BIO-LOGIC MODULAR FILE\x1a".ljust(48) + b"\x00\x00\x00\x00" class MPRfile: @@ -392,41 +486,44 @@ class MPRfile: def __init__(self, file_or_path): self.loop_index = None if isinstance(file_or_path, str): - mpr_file = open(file_or_path, 'rb') + mpr_file = open(file_or_path, "rb") else: mpr_file = file_or_path magic = mpr_file.read(len(MPR_MAGIC)) if magic != MPR_MAGIC: - raise ValueError('Invalid magic for .mpr file: %s' % magic) + raise ValueError("Invalid magic for .mpr file: %s" % magic) modules = list(read_VMP_modules(mpr_file)) self.modules = modules - settings_mod, = (m for m in modules if m['shortname'] == b'VMP Set ') - data_module, = (m for m in modules if m['shortname'] == b'VMP data ') - maybe_loop_module = [m for m in modules if m['shortname'] == b'VMP loop '] - maybe_log_module = [m for m in modules if m['shortname'] == b'VMP LOG '] - - n_data_points = np.frombuffer(data_module['data'][:4], dtype=' 40000 and ole_timestamp1 < 50000: ole_timestamp = ole_timestamp1 @@ -483,14 +584,16 @@ def __init__(self, file_or_path): ole_timedelta = timedelta(days=ole_timestamp[0]) self.timestamp = ole_base + ole_timedelta if self.startdate != self.timestamp.date(): - raise ValueError("Date mismatch:\n" - + " Start date: %s\n" % self.startdate - + " End date: %s\n" % self.enddate - + " Timestamp: %s\n" % self.timestamp) + raise ValueError( + "Date mismatch:\n" + + " Start date: %s\n" % self.startdate + + " End date: %s\n" % self.enddate + + " Timestamp: %s\n" % self.timestamp + ) def get_flag(self, flagname): if flagname in self.flags_dict: mask, dtype = self.flags_dict[flagname] - return np.array(self.data['flags'] & mask, dtype=dtype) + return np.array(self.data["flags"] & mask, dtype=dtype) else: raise AttributeError("Flag '%s' not present" % flagname) diff --git a/galvani/__init__.py b/galvani/__init__.py index e3445f3..621880e 100644 --- a/galvani/__init__.py +++ b/galvani/__init__.py @@ -4,4 +4,4 @@ from .BioLogic import MPRfile, MPTfile -__all__ = ['MPRfile', 'MPTfile'] +__all__ = ["MPRfile", "MPTfile"] diff --git a/galvani/res2sqlite.py b/galvani/res2sqlite.py index bd2da06..2329473 100755 --- a/galvani/res2sqlite.py +++ b/galvani/res2sqlite.py @@ -16,43 +16,43 @@ # $ mdb-schema oracle mdb_tables = [ - 'Version_Table', - 'Global_Table', - 'Resume_Table', - 'Channel_Normal_Table', - 'Channel_Statistic_Table', - 'Auxiliary_Table', - 'Event_Table', - 'Smart_Battery_Info_Table', - 'Smart_Battery_Data_Table', + "Version_Table", + "Global_Table", + "Resume_Table", + "Channel_Normal_Table", + "Channel_Statistic_Table", + "Auxiliary_Table", + "Event_Table", + "Smart_Battery_Info_Table", + "Smart_Battery_Data_Table", ] mdb_5_23_tables = [ - 'MCell_Aci_Data_Table', - 'Aux_Global_Data_Table', - 'Smart_Battery_Clock_Stretch_Table', + "MCell_Aci_Data_Table", + "Aux_Global_Data_Table", + "Smart_Battery_Clock_Stretch_Table", ] mdb_5_26_tables = [ - 'Can_BMS_Info_Table', - 'Can_BMS_Data_Table', + "Can_BMS_Info_Table", + "Can_BMS_Data_Table", ] mdb_tables_text = { - 'Version_Table', - 'Global_Table', - 'Event_Table', - 'Smart_Battery_Info_Table', - 'Can_BMS_Info_Table', + "Version_Table", + "Global_Table", + "Event_Table", + "Smart_Battery_Info_Table", + "Can_BMS_Info_Table", } mdb_tables_numeric = { - 'Resume_Table', - 'Channel_Normal_Table', - 'Channel_Statistic_Table', - 'Auxiliary_Table', - 'Smart_Battery_Data_Table', - 'MCell_Aci_Data_Table', - 'Aux_Global_Data_Table', - 'Smart_Battery_Clock_Stretch_Table', - 'Can_BMS_Data_Table', + "Resume_Table", + "Channel_Normal_Table", + "Channel_Statistic_Table", + "Auxiliary_Table", + "Smart_Battery_Data_Table", + "MCell_Aci_Data_Table", + "Aux_Global_Data_Table", + "Smart_Battery_Clock_Stretch_Table", + "Can_BMS_Data_Table", } mdb_create_scripts = { @@ -191,7 +191,7 @@ Event_Type INTEGER, Event_Describe TEXT ); """, - "Smart_Battery_Info_Table": """ + "Smart_Battery_Info_Table": """ CREATE TABLE Smart_Battery_Info_Table ( Test_ID INTEGER PRIMARY KEY REFERENCES Global_Table(Test_ID), @@ -271,7 +271,7 @@ REFERENCES Channel_Normal_Table (Test_ID, Data_Point) ); """, # The following tables are not present in version 1.14, but are in 5.23 - 'MCell_Aci_Data_Table': """ + "MCell_Aci_Data_Table": """ CREATE TABLE MCell_Aci_Data_Table ( Test_ID INTEGER, @@ -285,7 +285,7 @@ FOREIGN KEY (Test_ID, Data_Point) REFERENCES Channel_Normal_Table (Test_ID, Data_Point) );""", - 'Aux_Global_Data_Table': """ + "Aux_Global_Data_Table": """ CREATE TABLE Aux_Global_Data_Table ( Channel_Index INTEGER, @@ -295,7 +295,7 @@ Unit TEXT, PRIMARY KEY (Channel_Index, Auxiliary_Index, Data_Type) );""", - 'Smart_Battery_Clock_Stretch_Table': """ + "Smart_Battery_Clock_Stretch_Table": """ CREATE TABLE Smart_Battery_Clock_Stretch_Table ( Test_ID INTEGER, @@ -344,7 +344,7 @@ REFERENCES Channel_Normal_Table (Test_ID, Data_Point) );""", # The following tables are not present in version 5.23, but are in 5.26 - 'Can_BMS_Info_Table': """ + "Can_BMS_Info_Table": """ CREATE TABLE "Can_BMS_Info_Table" ( Channel_Index INTEGER PRIMARY KEY, @@ -352,7 +352,7 @@ CAN_Configuration TEXT ); """, - 'Can_BMS_Data_Table': """ + "Can_BMS_Data_Table": """ CREATE TABLE "Can_BMS_Data_Table" ( Test_ID INTEGER, @@ -371,7 +371,8 @@ CREATE UNIQUE INDEX data_point_index ON Channel_Normal_Table (Test_ID, Data_Point); CREATE INDEX voltage_index ON Channel_Normal_Table (Test_ID, Voltage); CREATE INDEX test_time_index ON Channel_Normal_Table (Test_ID, Test_Time); -"""} +""" +} helper_table_script = """ CREATE TEMPORARY TABLE capacity_helper( @@ -438,17 +439,19 @@ def mdb_get_data_text(s3db, filename, table): print("Reading %s..." % table) insert_pattern = re.compile( - r'INSERT INTO "\w+" \([^)]+?\) VALUES \(("[^"]*"|[^")])+?\);\n', - re.IGNORECASE + r'INSERT INTO "\w+" \([^)]+?\) VALUES \(("[^"]*"|[^")])+?\);\n', re.IGNORECASE ) try: # Initialize values to avoid NameError in except clause - mdb_output = '' + mdb_output = "" insert_match = None - with sp.Popen(['mdb-export', '-I', 'postgres', filename, table], - bufsize=-1, stdin=sp.DEVNULL, stdout=sp.PIPE, - universal_newlines=True) as mdb_sql: - + with sp.Popen( + ["mdb-export", "-I", "postgres", filename, table], + bufsize=-1, + stdin=sp.DEVNULL, + stdout=sp.PIPE, + universal_newlines=True, + ) as mdb_sql: mdb_output = mdb_sql.stdout.read() while len(mdb_output) > 0: insert_match = insert_pattern.match(mdb_output) @@ -459,8 +462,10 @@ def mdb_get_data_text(s3db, filename, table): except OSError as e: if e.errno == 2: - raise RuntimeError('Could not locate the `mdb-export` executable. ' - 'Check that mdbtools is properly installed.') + raise RuntimeError( + "Could not locate the `mdb-export` executable. " + "Check that mdbtools is properly installed." + ) else: raise except BaseException: @@ -475,14 +480,18 @@ def mdb_get_data_text(s3db, filename, table): def mdb_get_data_numeric(s3db, filename, table): print("Reading %s..." % table) try: - with sp.Popen(['mdb-export', filename, table], - bufsize=-1, stdin=sp.DEVNULL, stdout=sp.PIPE, - universal_newlines=True) as mdb_sql: + with sp.Popen( + ["mdb-export", filename, table], + bufsize=-1, + stdin=sp.DEVNULL, + stdout=sp.PIPE, + universal_newlines=True, + ) as mdb_sql: mdb_csv = csv.reader(mdb_sql.stdout) mdb_headers = next(mdb_csv) quoted_headers = ['"%s"' % h for h in mdb_headers] - joined_headers = ', '.join(quoted_headers) - joined_placemarks = ', '.join(['?' for h in mdb_headers]) + joined_headers = ", ".join(quoted_headers) + joined_placemarks = ", ".join(["?" for h in mdb_headers]) insert_stmt = 'INSERT INTO "{0}" ({1}) VALUES ({2});'.format( table, joined_headers, @@ -492,8 +501,10 @@ def mdb_get_data_numeric(s3db, filename, table): s3db.commit() except OSError as e: if e.errno == 2: - raise RuntimeError('Could not locate the `mdb-export` executable. ' - 'Check that mdbtools is properly installed.') + raise RuntimeError( + "Could not locate the `mdb-export` executable. " + "Check that mdbtools is properly installed." + ) else: raise @@ -504,7 +515,9 @@ def mdb_get_data(s3db, filename, table): elif table in mdb_tables_numeric: mdb_get_data_numeric(s3db, filename, table) else: - raise ValueError("'%s' is in neither mdb_tables_text nor mdb_tables_numeric" % table) + raise ValueError( + "'%s' is in neither mdb_tables_text nor mdb_tables_numeric" % table + ) def mdb_get_version(filename): @@ -514,9 +527,13 @@ def mdb_get_version(filename): """ print("Reading version number...") try: - with sp.Popen(['mdb-export', filename, 'Version_Table'], - bufsize=-1, stdin=sp.DEVNULL, stdout=sp.PIPE, - universal_newlines=True) as mdb_sql: + with sp.Popen( + ["mdb-export", filename, "Version_Table"], + bufsize=-1, + stdin=sp.DEVNULL, + stdout=sp.PIPE, + universal_newlines=True, + ) as mdb_sql: mdb_csv = csv.reader(mdb_sql.stdout) mdb_headers = next(mdb_csv) mdb_values = next(mdb_csv) @@ -525,23 +542,31 @@ def mdb_get_version(filename): except StopIteration: pass else: - raise ValueError('Version_Table of %s lists multiple versions' % filename) + raise ValueError( + "Version_Table of %s lists multiple versions" % filename + ) except OSError as e: if e.errno == 2: - raise RuntimeError('Could not locate the `mdb-export` executable. ' - 'Check that mdbtools is properly installed.') + raise RuntimeError( + "Could not locate the `mdb-export` executable. " + "Check that mdbtools is properly installed." + ) else: raise - if 'Version_Schema_Field' not in mdb_headers: - raise ValueError('Version_Table of %s does not contain a Version_Schema_Field column' - % filename) + if "Version_Schema_Field" not in mdb_headers: + raise ValueError( + "Version_Table of %s does not contain a Version_Schema_Field column" + % filename + ) version_fields = dict(zip(mdb_headers, mdb_values)) - version_text = version_fields['Version_Schema_Field'] - version_match = re.fullmatch('Results File ([.0-9]+)', version_text) + version_text = version_fields["Version_Schema_Field"] + version_match = re.fullmatch("Results File ([.0-9]+)", version_text) if not version_match: - raise ValueError('File version "%s" did not match expected format' % version_text) + raise ValueError( + 'File version "%s" did not match expected format' % version_text + ) version_string = version_match.group(1) - version_tuple = tuple(map(int, version_string.split('.'))) + version_tuple = tuple(map(int, version_string.split("."))) return version_tuple @@ -581,12 +606,14 @@ def main(argv=None): parser = argparse.ArgumentParser( description="Convert Arbin .res files to sqlite3 databases using mdb-export", ) - parser.add_argument('input_file', type=str) # need file name to pass to sp.Popen - parser.add_argument('output_file', type=str) # need file name to pass to sqlite3.connect + parser.add_argument("input_file", type=str) # need file name to pass to sp.Popen + parser.add_argument( + "output_file", type=str + ) # need file name to pass to sqlite3.connect args = parser.parse_args(argv) convert_arbin_to_sqlite(args.input_file, args.output_file) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/setup.py b/setup.py index 1be220b..ca4b80b 100644 --- a/setup.py +++ b/setup.py @@ -7,35 +7,35 @@ from setuptools import setup -with open(os.path.join(os.path.dirname(__file__), 'README.md')) as f: +with open(os.path.join(os.path.dirname(__file__), "README.md")) as f: readme = f.read() setup( - name='galvani', - version='0.2.1', - description='Open and process battery charger log data files', + name="galvani", + version="0.2.1", + description="Open and process battery charger log data files", long_description=readme, long_description_content_type="text/markdown", - url='https://github.com/echemdata/galvani', - author='Chris Kerr', - author_email='chris.kerr@mykolab.ch', - license='GPLv3+', + url="https://github.com/echemdata/galvani", + author="Chris Kerr", + author_email="chris.kerr@mykolab.ch", + license="GPLv3+", classifiers=[ - 'Development Status :: 4 - Beta', - 'Intended Audience :: Developers', - 'Intended Audience :: Science/Research', - 'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)', - 'Natural Language :: English', - 'Programming Language :: Python :: 3 :: Only', - 'Topic :: Scientific/Engineering :: Chemistry', + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "Intended Audience :: Science/Research", + "License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)", + "Natural Language :: English", + "Programming Language :: Python :: 3 :: Only", + "Topic :: Scientific/Engineering :: Chemistry", ], - packages=['galvani'], + packages=["galvani"], entry_points={ - 'console_scripts': [ - 'res2sqlite = galvani.res2sqlite:main', + "console_scripts": [ + "res2sqlite = galvani.res2sqlite:main", ], }, - python_requires='>=3.6', - install_requires=['numpy'], - tests_require=['pytest'], + python_requires=">=3.6", + install_requires=["numpy"], + tests_require=["pytest"], ) diff --git a/tests/conftest.py b/tests/conftest.py index 95a8a47..0b63a0e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,7 +9,7 @@ import pytest -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def testdata_dir(): """Path to the testdata directory.""" - return os.path.join(os.path.dirname(__file__), 'testdata') + return os.path.join(os.path.dirname(__file__), "testdata") diff --git a/tests/test_Arbin.py b/tests/test_Arbin.py index 2286b2d..d708d3f 100644 --- a/tests/test_Arbin.py +++ b/tests/test_Arbin.py @@ -13,8 +13,7 @@ from galvani import res2sqlite -have_mdbtools = (subprocess.call(['which', 'mdb-export'], - stdout=subprocess.DEVNULL) == 0) +have_mdbtools = subprocess.call(["which", "mdb-export"], stdout=subprocess.DEVNULL) == 0 def test_res2sqlite_help(): @@ -22,39 +21,47 @@ def test_res2sqlite_help(): This should work even when mdbtools is not installed. """ - help_output = subprocess.check_output(['res2sqlite', '--help']) - assert b'Convert Arbin .res files to sqlite3 databases' in help_output + help_output = subprocess.check_output(["res2sqlite", "--help"]) + assert b"Convert Arbin .res files to sqlite3 databases" in help_output -@pytest.mark.skipif(have_mdbtools, reason='This tests the failure when mdbtools is not installed') +@pytest.mark.skipif( + have_mdbtools, reason="This tests the failure when mdbtools is not installed" +) def test_convert_Arbin_no_mdbtools(testdata_dir, tmpdir): """Checks that the conversion fails with an appropriate error message.""" - res_file = os.path.join(testdata_dir, 'arbin1.res') - sqlite_file = os.path.join(str(tmpdir), 'arbin1.s3db') - with pytest.raises(RuntimeError, match="Could not locate the `mdb-export` executable."): + res_file = os.path.join(testdata_dir, "arbin1.res") + sqlite_file = os.path.join(str(tmpdir), "arbin1.s3db") + with pytest.raises( + RuntimeError, match="Could not locate the `mdb-export` executable." + ): res2sqlite.convert_arbin_to_sqlite(res_file, sqlite_file) -@pytest.mark.skipif(not have_mdbtools, reason='Reading the Arbin file requires MDBTools') -@pytest.mark.parametrize('basename', ['arbin1', 'UM34_Test005E']) +@pytest.mark.skipif( + not have_mdbtools, reason="Reading the Arbin file requires MDBTools" +) +@pytest.mark.parametrize("basename", ["arbin1", "UM34_Test005E"]) def test_convert_Arbin_to_sqlite_function(testdata_dir, tmpdir, basename): """Convert an Arbin file to SQLite using the functional interface.""" - res_file = os.path.join(testdata_dir, basename + '.res') - sqlite_file = os.path.join(str(tmpdir), basename + '.s3db') + res_file = os.path.join(testdata_dir, basename + ".res") + sqlite_file = os.path.join(str(tmpdir), basename + ".s3db") res2sqlite.convert_arbin_to_sqlite(res_file, sqlite_file) assert os.path.isfile(sqlite_file) with sqlite3.connect(sqlite_file) as conn: - csr = conn.execute('SELECT * FROM Channel_Normal_Table;') + csr = conn.execute("SELECT * FROM Channel_Normal_Table;") csr.fetchone() -@pytest.mark.skipif(not have_mdbtools, reason='Reading the Arbin file requires MDBTools') +@pytest.mark.skipif( + not have_mdbtools, reason="Reading the Arbin file requires MDBTools" +) def test_convert_cmdline(testdata_dir, tmpdir): """Checks that the conversion fails with an appropriate error message.""" - res_file = os.path.join(testdata_dir, 'arbin1.res') - sqlite_file = os.path.join(str(tmpdir), 'arbin1.s3db') - subprocess.check_call(['res2sqlite', res_file, sqlite_file]) + res_file = os.path.join(testdata_dir, "arbin1.res") + sqlite_file = os.path.join(str(tmpdir), "arbin1.s3db") + subprocess.check_call(["res2sqlite", res_file, sqlite_file]) assert os.path.isfile(sqlite_file) with sqlite3.connect(sqlite_file) as conn: - csr = conn.execute('SELECT * FROM Channel_Normal_Table;') + csr = conn.execute("SELECT * FROM Channel_Normal_Table;") csr.fetchone() diff --git a/tests/test_BioLogic.py b/tests/test_BioLogic.py index ad7be6f..df90b18 100644 --- a/tests/test_BioLogic.py +++ b/tests/test_BioLogic.py @@ -17,33 +17,55 @@ def test_open_MPT(testdata_dir): - mpt1, comments = MPTfile(os.path.join(testdata_dir, 'bio_logic1.mpt')) + mpt1, comments = MPTfile(os.path.join(testdata_dir, "bio_logic1.mpt")) assert comments == [] assert mpt1.dtype.names == ( - "mode", "ox/red", "error", "control changes", "Ns changes", - "counter inc.", "time/s", "control/V/mA", "Ewe/V", "dQ/mA.h", "P/W", - "I/mA", "(Q-Qo)/mA.h", "x", + "mode", + "ox/red", + "error", + "control changes", + "Ns changes", + "counter inc.", + "time/s", + "control/V/mA", + "Ewe/V", + "dQ/mA.h", + "P/W", + "I/mA", + "(Q-Qo)/mA.h", + "x", ) def test_open_MPT_fails_for_bad_file(testdata_dir): - with pytest.raises(ValueError, match='Bad first line'): - MPTfile(os.path.join(testdata_dir, 'bio_logic1.mpr')) + with pytest.raises(ValueError, match="Bad first line"): + MPTfile(os.path.join(testdata_dir, "bio_logic1.mpr")) def test_open_MPT_csv(testdata_dir): - mpt1, comments = MPTfileCSV(os.path.join(testdata_dir, 'bio_logic1.mpt')) + mpt1, comments = MPTfileCSV(os.path.join(testdata_dir, "bio_logic1.mpt")) assert comments == [] assert mpt1.fieldnames == [ - "mode", "ox/red", "error", "control changes", "Ns changes", - "counter inc.", "time/s", "control/V/mA", "Ewe/V", "dq/mA.h", "P/W", - "/mA", "(Q-Qo)/mA.h", "x", + "mode", + "ox/red", + "error", + "control changes", + "Ns changes", + "counter inc.", + "time/s", + "control/V/mA", + "Ewe/V", + "dq/mA.h", + "P/W", + "/mA", + "(Q-Qo)/mA.h", + "x", ] def test_open_MPT_csv_fails_for_bad_file(testdata_dir): with pytest.raises((ValueError, UnicodeDecodeError)): - MPTfileCSV(os.path.join(testdata_dir, 'bio_logic1.mpr')) + MPTfileCSV(os.path.join(testdata_dir, "bio_logic1.mpr")) def test_colID_map_uniqueness(): @@ -59,13 +81,16 @@ def test_colID_map_uniqueness(): assert not set(field_names).intersection(flag_names) -@pytest.mark.parametrize('colIDs, expected', [ - ([1, 2, 3], [('flags', 'u1')]), - ([4, 6], [('time/s', '