Skip to content

Commit

Permalink
python: python examples rewrite
Browse files Browse the repository at this point in the history
Rewrote iio_readdev.py and iio_writedev.py in a more pythonic fashion.

Signed-off-by: Cristi Iacob <[email protected]>
  • Loading branch information
rgetz committed May 18, 2020
1 parent c7d589f commit ed8f90a
Show file tree
Hide file tree
Showing 2 changed files with 355 additions and 322 deletions.
323 changes: 173 additions & 150 deletions bindings/python/examples/iio_readdev.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,173 +17,196 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.

import iio
import sys
import signal
import argparse

parser = argparse.ArgumentParser(description='iio_readdev')
parser.add_argument('-n', '--network', type=str, metavar='',
help='Use the network backend with the provided hostname.')
parser.add_argument('-u', '--uri', type=str, metavar='',
help='Use the context with the provided URI.')
parser.add_argument('-b', '--buffer-size', type=int, metavar='',
help='Size of the capture buffer. Default is 256.')
parser.add_argument('-s', '--samples', type=int, metavar='',
help='Number of samples to capture, 0 = infinite. Default is 0.')
parser.add_argument('-T', '--timeout', type=int, metavar='',
help='Buffer timeout in milliseconds. 0 = no timeout')
parser.add_argument('-a', '--auto', action='store_true',
help='Scan for available contexts and if only one is available use it.')
parser.add_argument('device', type=str, nargs=1)
parser.add_argument('channel', type=str, nargs='*')

arg_ip = ""
arg_uri = ""
scan_for_context = False
buffer_size = 256
num_samples = 0
timeout = 0
device_name = None
channels = None


def read_arguments():
"""
Method for reading the command line parameters and setting the corresponding variables.
"""
global arg_ip, arg_uri, scan_for_context, buffer_size, num_samples, timeout, device_name, channels

args = parser.parse_args()

if args.network is not None:
arg_ip = str(args.network)

if args.uri is not None:
arg_uri = str(args.uri)

if args.auto is True:
scan_for_context = True

if args.buffer_size is not None:
buffer_size = int(args.buffer_size)

if args.samples is not None:
num_samples = int(args.samples)

if args.timeout is not None:
timeout = int(args.timeout)

device_name = args.device[0]
channels = args.channel


def create_context(scan_for_context, arg_uri, arg_ip):
"""
Method for creating the corresponding context.
parameters:
scan_for_context: type=bool
Scan for available contexts and if only one is available use it.
arg_uri: type=string
The URI on which the program should look for a Context.
arg_ip: type=string
The IP on which the program should look for a Network Context.
returns: type:iio.Context
The resulted context.
"""
ctx = None

try:
if scan_for_context:
contexts = iio.scan_contexts()
if len(contexts) == 0:
sys.stderr.write("No IIO context found.\n")
exit(1)
elif len(contexts) == 1:
uri, _ = contexts.popitem()
ctx = iio.Context(_context=uri)
else:
print("Multiple contexts found. Please select one using --uri!")

for uri, _ in contexts:
print(uri)
elif arg_uri != "":
ctx = iio.Context(_context=arg_uri)
elif arg_ip != "":
ctx = iio.NetworkContext(arg_ip)
else:
ctx = iio.Context()
except FileNotFoundError:
sys.stderr.write('Unable to create IIO context\n')
exit(1)

return ctx
import iio


def keyboard_interrupt_handler(signal, frame):
sys.exit(0)
class Arguments:
"""Class for parsing the input arguments."""

def __init__(self):
"""Arguments class constructor."""
self.parser = argparse.ArgumentParser(description='iio_readdev')
self._add_parser_arguments()
args = self.parser.parse_args()

self.network = str(args.network) if args.network else None
self.arg_uri = str(args.uri) if args.uri else None
self.scan_for_context = args.auto
self.buffer_size = int(args.buffer_size) if args.buffer_size else 256
self.num_samples = int(args.samples) if args.samples else 0
self.timeout = int(args.timeout) if args.timeout else 0
self.device_name = args.device[0]
self.channels = args.channel

def _add_parser_arguments(self):
self.parser.add_argument(
'-n', '--network', type=str, metavar='', help='Use the network backend with the provided hostname.')
self.parser.add_argument(
'-u', '--uri', type=str, metavar='', help='Use the context with the provided URI.')
self.parser.add_argument(
'-b', '--buffer-size', type=int, metavar='', help='Size of the capture buffer. Default is 256.')
self.parser.add_argument(
'-s', '--samples', type=int, metavar='', help='Number of samples to capture, 0 = infinite. Default is 0.')
self.parser.add_argument(
'-T', '--timeout', type=int, metavar='', help='Buffer timeout in milliseconds. 0 = no timeout')
self.parser.add_argument(
'-a', '--auto', action='store_true',
help='Scan for available contexts and if only one is available use it.'
)
self.parser.add_argument('device', type=str, nargs=1)
self.parser.add_argument('channel', type=str, nargs='*')


class ContextBuilder:
"""Class for creating the requested context."""

def __init__(self, arguments):
"""
ContextBuilder class constructor.
Args:
arguments: type=Arguments
Contains the input arguments.
"""
self.ctx = None
self.arguments = arguments

def _timeout(self):
if self.arguments.timeout >= 0:
self.ctx.timeout = self.arguments.timeout
return self

def _auto(self):
contexts = iio.scan_contexts()
if len(contexts) == 0:
raise Exception('No IIO context found.\n')
if len(contexts) == 1:
uri, _ = contexts.popitem()
self.ctx = iio.Context(_context=uri)
else:
print('Multiple contexts found. Please select one using --uri!')
for uri, _ in contexts.items():
print(uri)
sys.exit(0)

return self

def _uri(self):
self.ctx = iio.Context(_context=self.arguments.arg_uri)
return self

def _network(self):
self.ctx = iio.NetworkContext(self.arguments.network)
return self

def _default(self):
self.ctx = iio.Context()
return self

def create(self):
"""Create the requested context."""
try:
if self.arguments.scan_for_context:
self._auto()
elif self.arguments.arg_uri:
self._uri()
elif self.arguments.arg_ip:
self._network()
else:
self._default()
except FileNotFoundError:
raise Exception('Unable to create IIO context!\n')

self._timeout()

signal.signal(signal.SIGINT, keyboard_interrupt_handler)
return self.ctx


def read_data(buffer, num_samples):
"""
Method for reading data from the buffer.
class BufferBuilder:
"""Class for creating the buffer."""

parameters:
buffer: type=iio.Buffer
Current buffer.
num_samples: type=int
Number of samples to capture, 0 = infinite. Default is 0.
def __init__(self, ctx, arguments):
"""
BufferBuilder class constructor.
Args:
ctx: type=iio.Context
This buffer's context.
arguments: type=Arguments
Contains the input arguments.
"""
self.ctx = ctx
self.arguments = arguments
self.dev = None

returns: type=None
Reads data from buffer.
"""
if buffer is None:
sys.stderr.write('Unable to create buffer!\n')
exit(1)
def _device(self):
self.dev = self.ctx.find_device(self.arguments.device_name)

while True:
buffer.refill()
samples = buffer.read()
if self.dev is None:
raise Exception('Device %s not found!' % self.arguments.device_name)

if num_samples > 0:
sys.stdout.buffer.write(samples[:min(num_samples, len(samples))])
num_samples -= min(num_samples, len(samples))
return self

if num_samples == 0:
break
def _channels(self):
if len(self.arguments.channels) == 0:
for channel in self.dev.channels:
channel.enabled = True
else:
sys.stdout.buffer.write(bytes(samples))
for channel_idx in self.arguments.channels:
self.dev.channels[int(channel_idx)].enabled = True

return self

def create(self):
"""Create the IIO buffer."""
self._device()
self._channels()
buffer = iio.Buffer(self.dev, self.arguments.buffer_size)

if buffer is None:
raise Exception('Unable to create buffer!\n')

return buffer


class DataReader:
"""Class for reading samples from the device."""

def __init__(self, ctx, arguments):
"""
DataWriter class constructor.
Args:
ctx: type=iio.Context
Current context.
arguments: type=Arguments
Contains the input arguments.
"""
buffer_builder = BufferBuilder(ctx, arguments)
self.buffer = buffer_builder.create()
self.arguments = arguments

def read(self):
"""Read data from the buffer."""
while True:
self.buffer.refill()
samples = self.buffer.read()

if self.arguments.num_samples > 0:
sys.stdout.buffer.write(samples[:min(self.arguments.num_samples, len(samples))])
self.arguments.num_samples -= min(self.arguments.num_samples, len(samples))

if self.arguments.num_samples == 0:
break
else:
sys.stdout.buffer.write(bytes(samples))


def main():
read_arguments()

ctx = create_context(scan_for_context, arg_uri, arg_ip)

if timeout >= 0:
ctx.set_timeout(timeout)

dev = ctx.find_device(device_name)

if dev is None:
sys.stderr.write('Device %s not found!\n' % device_name)
exit(1)

if len(channels) == 0:
for channel in dev.channels:
channel.enabled = True
else:
for channel_idx in channels:
dev.channels[int(channel_idx)].enabled = True

buffer = iio.Buffer(dev, buffer_size)

read_data(buffer, num_samples)
"""Module's main method."""
arguments = Arguments()
context_builder = ContextBuilder(arguments)
reader = DataReader(context_builder.create(), arguments)
reader.read()


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit ed8f90a

Please sign in to comment.