Skip to content

Commit

Permalink
Merge pull request #1456 from glopesdev/streaming-serialread
Browse files Browse the repository at this point in the history
Add support for streaming binary serial read buffers
  • Loading branch information
glopesdev authored Jul 5, 2023
2 parents f9d12ca + 7d7712a commit 2e76051
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 11 deletions.
38 changes: 38 additions & 0 deletions Bonsai.System/IO/Ports/ObservableSerialPort.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,44 @@ namespace Bonsai.IO.Ports
{
static class ObservableSerialPort
{
public static IObservable<byte[]> Read(string portName, int count)
{
return Observable.Create<byte[]>((observer, cancellationToken) =>
{
return Task.Factory.StartNew(() =>
{
using var connection = SerialPortManager.ReserveConnection(portName);
using var cancellation = cancellationToken.Register(connection.Dispose);
var serialPort = connection.SerialPort;
while (!cancellationToken.IsCancellationRequested)
{
try
{
var bytesRead = 0;
var buffer = new byte[count];
while (bytesRead < count)
{
bytesRead += serialPort.Read(buffer, bytesRead, count - bytesRead);
}
observer.OnNext(buffer);
}
catch (Exception ex)
{
if (!cancellationToken.IsCancellationRequested)
{
observer.OnError(ex);
}

break;
}
}
},
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
});
}

public static IObservable<string> ReadLine(string portName, string newLine)
{
return Observable.Create<string>((observer, cancellationToken) =>
Expand Down
23 changes: 12 additions & 11 deletions Bonsai.System/IO/Ports/SerialRead.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.ComponentModel;
using System.Reactive;
using System.Reactive.Linq;
using System.Threading.Tasks;
using System.Xml.Serialization;

namespace Bonsai.IO.Ports
Expand Down Expand Up @@ -38,7 +39,7 @@ public class SerialRead : Source<byte[]>
/// </returns>
public override IObservable<byte[]> Generate()
{
return Generate(Observable.Return(Unit.Default));
return ObservableSerialPort.Read(PortName, Count);
}

/// <summary>
Expand All @@ -60,18 +61,18 @@ public IObservable<byte[]> Generate<TSource>(IObservable<TSource> source)
{
var count = Count;
return Observable.Using(
() => SerialPortManager.ReserveConnection(PortName),
connection =>
cancellationToken => Task.FromResult(SerialPortManager.ReserveConnection(PortName)),
(connection, cancellationToken) => Task.FromResult(source.Select(_ =>
{
var serialPort = connection.SerialPort;
return source.SelectMany(async (_, cancellationToken) =>
using var cancellation = cancellationToken.Register(connection.Dispose);
var bytesRead = 0;
var buffer = new byte[count];
while (bytesRead < count)
{
var buffer = new byte[count];
await serialPort.BaseStream.ReadAsync(
buffer, 0, buffer.Length, cancellationToken);
return buffer;
});
});
bytesRead += connection.SerialPort.Read(buffer, bytesRead, count - bytesRead);
}
return buffer;
})));
}
}
}

0 comments on commit 2e76051

Please sign in to comment.