From ed8f90a45ebfa4d5370013fe79599423e5850baf Mon Sep 17 00:00:00 2001 From: Robin Getz Date: Mon, 18 May 2020 14:10:51 -0400 Subject: [PATCH] python: python examples rewrite Rewrote iio_readdev.py and iio_writedev.py in a more pythonic fashion. Signed-off-by: Cristi Iacob --- bindings/python/examples/iio_readdev.py | 323 +++++++++++---------- bindings/python/examples/iio_writedev.py | 354 ++++++++++++----------- 2 files changed, 355 insertions(+), 322 deletions(-) diff --git a/bindings/python/examples/iio_readdev.py b/bindings/python/examples/iio_readdev.py index bfd875776..ede034c8f 100644 --- a/bindings/python/examples/iio_readdev.py +++ b/bindings/python/examples/iio_readdev.py @@ -17,173 +17,196 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -import iio import sys -import signal import argparse - -parser = argparse.ArgumentParser(description='iio_readdev') -parser.add_argument('-n', '--network', type=str, metavar='', - help='Use the network backend with the provided hostname.') -parser.add_argument('-u', '--uri', type=str, metavar='', - help='Use the context with the provided URI.') -parser.add_argument('-b', '--buffer-size', type=int, metavar='', - help='Size of the capture buffer. Default is 256.') -parser.add_argument('-s', '--samples', type=int, metavar='', - help='Number of samples to capture, 0 = infinite. Default is 0.') -parser.add_argument('-T', '--timeout', type=int, metavar='', - help='Buffer timeout in milliseconds. 0 = no timeout') -parser.add_argument('-a', '--auto', action='store_true', - help='Scan for available contexts and if only one is available use it.') -parser.add_argument('device', type=str, nargs=1) -parser.add_argument('channel', type=str, nargs='*') - -arg_ip = "" -arg_uri = "" -scan_for_context = False -buffer_size = 256 -num_samples = 0 -timeout = 0 -device_name = None -channels = None - - -def read_arguments(): - """ - Method for reading the command line parameters and setting the corresponding variables. - """ - global arg_ip, arg_uri, scan_for_context, buffer_size, num_samples, timeout, device_name, channels - - args = parser.parse_args() - - if args.network is not None: - arg_ip = str(args.network) - - if args.uri is not None: - arg_uri = str(args.uri) - - if args.auto is True: - scan_for_context = True - - if args.buffer_size is not None: - buffer_size = int(args.buffer_size) - - if args.samples is not None: - num_samples = int(args.samples) - - if args.timeout is not None: - timeout = int(args.timeout) - - device_name = args.device[0] - channels = args.channel - - -def create_context(scan_for_context, arg_uri, arg_ip): - """ - Method for creating the corresponding context. - - parameters: - scan_for_context: type=bool - Scan for available contexts and if only one is available use it. - arg_uri: type=string - The URI on which the program should look for a Context. - arg_ip: type=string - The IP on which the program should look for a Network Context. - - returns: type:iio.Context - The resulted context. - """ - ctx = None - - try: - if scan_for_context: - contexts = iio.scan_contexts() - if len(contexts) == 0: - sys.stderr.write("No IIO context found.\n") - exit(1) - elif len(contexts) == 1: - uri, _ = contexts.popitem() - ctx = iio.Context(_context=uri) - else: - print("Multiple contexts found. Please select one using --uri!") - - for uri, _ in contexts: - print(uri) - elif arg_uri != "": - ctx = iio.Context(_context=arg_uri) - elif arg_ip != "": - ctx = iio.NetworkContext(arg_ip) - else: - ctx = iio.Context() - except FileNotFoundError: - sys.stderr.write('Unable to create IIO context\n') - exit(1) - - return ctx +import iio -def keyboard_interrupt_handler(signal, frame): - sys.exit(0) +class Arguments: + """Class for parsing the input arguments.""" + + def __init__(self): + """Arguments class constructor.""" + self.parser = argparse.ArgumentParser(description='iio_readdev') + self._add_parser_arguments() + args = self.parser.parse_args() + + self.network = str(args.network) if args.network else None + self.arg_uri = str(args.uri) if args.uri else None + self.scan_for_context = args.auto + self.buffer_size = int(args.buffer_size) if args.buffer_size else 256 + self.num_samples = int(args.samples) if args.samples else 0 + self.timeout = int(args.timeout) if args.timeout else 0 + self.device_name = args.device[0] + self.channels = args.channel + + def _add_parser_arguments(self): + self.parser.add_argument( + '-n', '--network', type=str, metavar='', help='Use the network backend with the provided hostname.') + self.parser.add_argument( + '-u', '--uri', type=str, metavar='', help='Use the context with the provided URI.') + self.parser.add_argument( + '-b', '--buffer-size', type=int, metavar='', help='Size of the capture buffer. Default is 256.') + self.parser.add_argument( + '-s', '--samples', type=int, metavar='', help='Number of samples to capture, 0 = infinite. Default is 0.') + self.parser.add_argument( + '-T', '--timeout', type=int, metavar='', help='Buffer timeout in milliseconds. 0 = no timeout') + self.parser.add_argument( + '-a', '--auto', action='store_true', + help='Scan for available contexts and if only one is available use it.' + ) + self.parser.add_argument('device', type=str, nargs=1) + self.parser.add_argument('channel', type=str, nargs='*') + + +class ContextBuilder: + """Class for creating the requested context.""" + + def __init__(self, arguments): + """ + ContextBuilder class constructor. + Args: + arguments: type=Arguments + Contains the input arguments. + """ + self.ctx = None + self.arguments = arguments + + def _timeout(self): + if self.arguments.timeout >= 0: + self.ctx.timeout = self.arguments.timeout + return self + + def _auto(self): + contexts = iio.scan_contexts() + if len(contexts) == 0: + raise Exception('No IIO context found.\n') + if len(contexts) == 1: + uri, _ = contexts.popitem() + self.ctx = iio.Context(_context=uri) + else: + print('Multiple contexts found. Please select one using --uri!') + for uri, _ in contexts.items(): + print(uri) + sys.exit(0) + + return self + + def _uri(self): + self.ctx = iio.Context(_context=self.arguments.arg_uri) + return self + + def _network(self): + self.ctx = iio.NetworkContext(self.arguments.network) + return self + + def _default(self): + self.ctx = iio.Context() + return self + + def create(self): + """Create the requested context.""" + try: + if self.arguments.scan_for_context: + self._auto() + elif self.arguments.arg_uri: + self._uri() + elif self.arguments.arg_ip: + self._network() + else: + self._default() + except FileNotFoundError: + raise Exception('Unable to create IIO context!\n') + self._timeout() -signal.signal(signal.SIGINT, keyboard_interrupt_handler) + return self.ctx -def read_data(buffer, num_samples): - """ - Method for reading data from the buffer. +class BufferBuilder: + """Class for creating the buffer.""" - parameters: - buffer: type=iio.Buffer - Current buffer. - num_samples: type=int - Number of samples to capture, 0 = infinite. Default is 0. + def __init__(self, ctx, arguments): + """ + BufferBuilder class constructor. + Args: + ctx: type=iio.Context + This buffer's context. + arguments: type=Arguments + Contains the input arguments. + """ + self.ctx = ctx + self.arguments = arguments + self.dev = None - returns: type=None - Reads data from buffer. - """ - if buffer is None: - sys.stderr.write('Unable to create buffer!\n') - exit(1) + def _device(self): + self.dev = self.ctx.find_device(self.arguments.device_name) - while True: - buffer.refill() - samples = buffer.read() + if self.dev is None: + raise Exception('Device %s not found!' % self.arguments.device_name) - if num_samples > 0: - sys.stdout.buffer.write(samples[:min(num_samples, len(samples))]) - num_samples -= min(num_samples, len(samples)) + return self - if num_samples == 0: - break + def _channels(self): + if len(self.arguments.channels) == 0: + for channel in self.dev.channels: + channel.enabled = True else: - sys.stdout.buffer.write(bytes(samples)) + for channel_idx in self.arguments.channels: + self.dev.channels[int(channel_idx)].enabled = True + + return self + + def create(self): + """Create the IIO buffer.""" + self._device() + self._channels() + buffer = iio.Buffer(self.dev, self.arguments.buffer_size) + + if buffer is None: + raise Exception('Unable to create buffer!\n') + + return buffer + + +class DataReader: + """Class for reading samples from the device.""" + + def __init__(self, ctx, arguments): + """ + DataWriter class constructor. + Args: + ctx: type=iio.Context + Current context. + arguments: type=Arguments + Contains the input arguments. + """ + buffer_builder = BufferBuilder(ctx, arguments) + self.buffer = buffer_builder.create() + self.arguments = arguments + + def read(self): + """Read data from the buffer.""" + while True: + self.buffer.refill() + samples = self.buffer.read() + + if self.arguments.num_samples > 0: + sys.stdout.buffer.write(samples[:min(self.arguments.num_samples, len(samples))]) + self.arguments.num_samples -= min(self.arguments.num_samples, len(samples)) + + if self.arguments.num_samples == 0: + break + else: + sys.stdout.buffer.write(bytes(samples)) def main(): - read_arguments() - - ctx = create_context(scan_for_context, arg_uri, arg_ip) - - if timeout >= 0: - ctx.set_timeout(timeout) - - dev = ctx.find_device(device_name) - - if dev is None: - sys.stderr.write('Device %s not found!\n' % device_name) - exit(1) - - if len(channels) == 0: - for channel in dev.channels: - channel.enabled = True - else: - for channel_idx in channels: - dev.channels[int(channel_idx)].enabled = True - - buffer = iio.Buffer(dev, buffer_size) - - read_data(buffer, num_samples) + """Module's main method.""" + arguments = Arguments() + context_builder = ContextBuilder(arguments) + reader = DataReader(context_builder.create(), arguments) + reader.read() if __name__ == '__main__': diff --git a/bindings/python/examples/iio_writedev.py b/bindings/python/examples/iio_writedev.py index b35400d0a..dfc899bd3 100644 --- a/bindings/python/examples/iio_writedev.py +++ b/bindings/python/examples/iio_writedev.py @@ -16,208 +16,218 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -import time -import iio +import time import sys -import signal import argparse +import iio -parser = argparse.ArgumentParser() -parser.add_argument('-n', '--network', type=str, metavar='', - help='Use the network backend with the provided hostname.') -parser.add_argument('-u', '--uri', type=str, metavar='', - help='Use the context with the provided URI.') -parser.add_argument('-b', '--buffer-size', type=int, metavar='', - help='Size of the capture buffer. Default is 256.') -parser.add_argument('-s', '--samples', type=int, metavar='', - help='Number of samples to capture, 0 = infinite. Default is 0.') -parser.add_argument('-T', '--timeout', type=int, metavar='', - help='Buffer timeout in milliseconds. 0 = no timeout') -parser.add_argument('-a', '--auto', action='store_true', - help='Scan for available contexts and if only one is available use it.') -parser.add_argument('-c', '--cyclic', action='store_true', - help='Use cyclic buffer mode.') -parser.add_argument('device', type=str, nargs=1) -parser.add_argument('channel', type=str, nargs='*') - -arg_ip = '' -arg_uri = '' -scan_for_context = False -buffer_size = 256 -num_samples = 0 -timeout = 0 -cyclic = False -device_name = None -channels = None - - -def read_arguments(): - """ - Method for reading the command line parameters and setting the corresponding variables. - """ - global arg_ip, arg_uri, scan_for_context, buffer_size, num_samples, timeout, cyclic, device_name, channels - - args = parser.parse_args() - - if args.network is not None: - arg_ip = str(args.network) - - if args.uri is not None: - arg_uri = str(args.uri) - - if args.auto is True: - scan_for_context = True - - if args.buffer_size is not None: - buffer_size = int(args.buffer_size) - - if args.samples is not None: - num_samples = int(args.samples) - - if args.timeout is not None: - timeout = int(args.timeout) - - if args.cyclic is True: - cyclic = True - - device_name = args.device[0] - channels = args.channel - - -def create_context(scan_for_context, arg_uri, arg_ip): - """ - Method for creating the corresponding context. - - parameters: - scan_for_context: type=bool - Scan for available contexts and if only one is available use it. - arg_uri: type=string - The URI on which the program should look for a Context. - arg_ip: type=string - The IP on which the program should look for a Network Context. - - returns: type:iio.Context - The resulted context. - """ - ctx = None - - try: - if scan_for_context: - contexts = iio.scan_contexts() - if len(contexts) == 0: - sys.stderr.write('No IIO context found.') - exit(1) - elif len(contexts) == 1: - uri, _ = contexts.popitem() - ctx = iio.Context(_context=uri) - else: - print('Multiple contexts found. Please select one using --uri!') - - for uri, _ in contexts: - print(uri) - elif arg_uri != '': - ctx = iio.Context(_context=arg_uri) - elif arg_ip != '': - ctx = iio.NetworkContext(arg_ip) - else: - ctx = iio.Context() - except FileNotFoundError: - sys.stderr.write('Unable to create IIO context') - exit(1) - - return ctx +class Arguments: + """Class for parsing the input arguments.""" + + def __init__(self): + """Arguments class constructor.""" + self.parser = argparse.ArgumentParser(description='iio_writedev') + self._add_parser_arguments() + args = self.parser.parse_args() + + self.network = str(args.network) if args.network else None + self.arg_uri = str(args.uri) if args.uri else None + self.scan_for_context = args.auto + self.buffer_size = int(args.buffer_size) if args.buffer_size else 256 + self.num_samples = int(args.samples) if args.samples else 0 + self.timeout = int(args.timeout) if args.timeout else 0 + self.cyclic = args.cyclic + self.device_name = args.device[0] + self.channels = args.channel + + def _add_parser_arguments(self): + self.parser.add_argument( + '-n', '--network', type=str, metavar='', help='Use the network backend with the provided hostname.') + self.parser.add_argument( + '-u', '--uri', type=str, metavar='', help='Use the context with the provided URI.') + self.parser.add_argument( + '-b', '--buffer-size', type=int, metavar='', help='Size of the capture buffer. Default is 256.') + self.parser.add_argument( + '-s', '--samples', type=int, metavar='', help='Number of samples to capture, 0 = infinite. Default is 0.') + self.parser.add_argument( + '-T', '--timeout', type=int, metavar='', help='Buffer timeout in milliseconds. 0 = no timeout') + self.parser.add_argument( + '-a', '--auto', action='store_true', + help='Scan for available contexts and if only one is available use it.' + ) + self.parser.add_argument( + '-c', '--cyclic', action='store_true', help='Use cyclic buffer mode.') + self.parser.add_argument('device', type=str, nargs=1) + self.parser.add_argument('channel', type=str, nargs='*') + + +class ContextBuilder: + """Class for creating the requested context.""" + + def __init__(self, arguments): + """ + ContextBuilder class constructor. + Args: + arguments: type=Arguments + Contains the input arguments. + """ + self.ctx = None + self.arguments = arguments + + def _timeout(self): + if self.arguments.timeout >= 0: + self.ctx.timeout = self.arguments.timeout + return self + + def _auto(self): + contexts = iio.scan_contexts() + if len(contexts) == 0: + raise Exception('No IIO context found.\n') + if len(contexts) == 1: + uri, _ = contexts.popitem() + self.ctx = iio.Context(_context=uri) + else: + print('Multiple contexts found. Please select one using --uri!') + for uri, _ in contexts.items(): + print(uri) + sys.exit(0) + + return self + + def _uri(self): + self.ctx = iio.Context(_context=self.arguments.arg_uri) + return self + + def _network(self): + self.ctx = iio.NetworkContext(self.arguments.network) + return self + + def _default(self): + self.ctx = iio.Context() + return self + + def create(self): + """Create the requested context.""" + try: + if self.arguments.scan_for_context: + self._auto() + elif self.arguments.arg_uri: + self._uri() + elif self.arguments.arg_ip: + self._network() + else: + self._default() + except FileNotFoundError: + raise Exception('Unable to create IIO context!\n') -def keyboard_interrupt_handler(signal, frame): - sys.exit(0) + self._timeout() + return self.ctx -signal.signal(signal.SIGINT, keyboard_interrupt_handler) +class BufferBuilder: + """Class for creating the buffer.""" -def write_data(dev, buffer, num_samples, buffer_size, cyclic): - """ - Method for pushing data to the buffer. + def __init__(self, ctx, arguments): + """ + BufferBuilder class constructor. + Args: + ctx: type=iio.Context + This buffer's context. + arguments: type=Arguments + Contains the input arguments. + """ + self.ctx = ctx + self.arguments = arguments + self.dev = None - parameters: - dev: type=iio.Device - Current device. - buffer: type=iio.Buffer - Current buffer. - num_samples: type=int - Number of samples to capture, 0 = infinite. Default is 0. - buffer_size: type=int - Size of the capture buffer. Default is 256. - cyclic: type=bool - Use cyclic buffer mode. + def _device(self): + self.dev = self.ctx.find_device(self.arguments.device_name) - returns: type=None - Push data to buffer. - """ - app_running = True - num_samples_set = num_samples > 0 + if self.dev is None: + raise Exception('Device %s not found!' % self.arguments.device_name) - while app_running: - bytes_to_read = buffer._length if not num_samples_set else min(buffer._length, num_samples * dev.sample_size) - write_len = bytes_to_read - data = [] + return self - while bytes_to_read > 0: - read_data = sys.stdin.buffer.read(bytes_to_read) - bytes_to_read -= len(read_data) - data.extend(read_data) + def _channels(self): + if len(self.arguments.channels) == 0: + for channel in self.dev.channels: + channel.enabled = True + else: + for channel_idx in self.arguments.channels: + self.dev.channels[int(channel_idx)].enabled = True - if len(read_data) == 0: - exit(0) + return self - if num_samples_set: - num_samples -= int(write_len / dev.sample_size) + def create(self): + """Create the IIO buffer.""" + self._device() + self._channels() + buffer = iio.Buffer(self.dev, self.arguments.buffer_size, self.arguments.cyclic) - if num_samples == 0 and write_len < buffer_size * dev.sample_size: - exit(0) + if buffer is None: + raise Exception('Unable to create buffer!\n') - ret = buffer.write(bytearray(data)) - buffer.push() + return buffer - if ret == 0: - if app_running: - sys.stderr.write('Unable to push buffer!') - exit(1) - break - while app_running and cyclic: - time.sleep(1) +class DataWriter: + """Class for writing samples to the device.""" + def __init__(self, ctx, arguments): + """ + DataWriter class constructor. + Args: + ctx: type=iio.Context + Current context. + arguments: type=Arguments + Contains the input arguments. + """ + buffer_builder = BufferBuilder(ctx, arguments) + self.buffer = buffer_builder.create() + self.device = buffer_builder.dev + self.arguments = arguments -def main(): - read_arguments() + def write(self): + """Push data into the buffer.""" + app_running = True + num_samples = self.arguments.num_samples - ctx = create_context(scan_for_context, arg_uri, arg_ip) + while app_running: + bytes_to_read = self.buffer._length if num_samples == 0 \ + else min(self.buffer._length, num_samples * self.device.sample_size) + write_len = bytes_to_read + data = [] - if timeout >= 0: - ctx.set_timeout(timeout) + while bytes_to_read > 0: + read_data = sys.stdin.buffer.read(bytes_to_read) + if len(read_data) == 0: + exit(0) + bytes_to_read -= len(read_data) + data.extend(read_data) - dev = ctx.find_device(device_name) + if self.buffer.write(bytearray(data)) == 0: + raise Exception('Unable to push buffer!') - if dev is None: - sys.stderr.write('Device %s not found!' % device_name) - exit(1) + self.buffer.push() - if len(channels) == 0: - for channel in dev.channels: - channel.enabled = True - else: - for channel_idx in channels: - dev.channels[int(channel_idx)].enabled = True + while app_running and self.arguments.cyclic: + time.sleep(1) - buffer = iio.Buffer(dev, buffer_size, cyclic=cyclic) + if num_samples > 0: + num_samples -= write_len // self.device.sample_size + if num_samples == 0: + app_running = False - if buffer is None: - sys.stderr.write('Unable to create buffer!') - exit(1) - write_data(dev, buffer, num_samples, buffer_size, cyclic) +def main(): + """Module's main method.""" + arguments = Arguments() + context_builder = ContextBuilder(arguments) + writer = DataWriter(context_builder.create(), arguments) + writer.write() if __name__ == '__main__':