diff --git a/docs/articles/router-dealer.md b/docs/articles/router-dealer.md index 1b56cb5..2aefaa5 100644 --- a/docs/articles/router-dealer.md +++ b/docs/articles/router-dealer.md @@ -29,10 +29,4 @@ In the example below, multiple clients send periodic requests to an asynchronous ![Req-Router workflow](~/workflows/req-router.bonsai) ::: -The operator can be used to extract all identity frames from a multiple part request. Content frames are stored after the empty delimiter frame. For simple requests this is usually the last frame, which can be retrieved directly using the `Last` property. - -After processing the response, we can simply merge the identity frames with the response frame to generate an outgoing message ready to be routed. - -:::workflow -![Req-Router processing](~/workflows/req-router-processing.bonsai) -::: \ No newline at end of file +Content frames are stored after the empty delimiter frame. For simple requests this is usually the last frame, which can be retrieved directly using the `Last` property. The operator automatically takes care of pushing all identity frames from the request into the response to generate an outgoing message ready to be routed. \ No newline at end of file diff --git a/docs/workflows/req-rep.bonsai b/docs/workflows/req-rep.bonsai index c7b1776..54db439 100644 --- a/docs/workflows/req-rep.bonsai +++ b/docs/workflows/req-rep.bonsai @@ -46,9 +46,6 @@ reply {0} - - - @@ -56,7 +53,6 @@ - diff --git a/docs/workflows/req-router-processing.bonsai b/docs/workflows/req-router-processing.bonsai deleted file mode 100644 index d53cb33..0000000 --- a/docs/workflows/req-router-processing.bonsai +++ /dev/null @@ -1,47 +0,0 @@ - - - - - - Source1 - - - Last - - - - - - reply {0} - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/docs/workflows/req-router-processing.svg b/docs/workflows/req-router-processing.svg deleted file mode 100644 index 0df570f..0000000 --- a/docs/workflows/req-router-processing.svg +++ /dev/null @@ -1,3 +0,0 @@ - -]>WorkflowOutputToMessageMergeGetIdentityConvertToFrameFormatConvertToStringLastSource1 \ No newline at end of file diff --git a/docs/workflows/req-router.bonsai b/docs/workflows/req-router.bonsai index 79e3aab..9afab0e 100644 --- a/docs/workflows/req-router.bonsai +++ b/docs/workflows/req-router.bonsai @@ -66,30 +66,13 @@ reply {0} - - - - - - - - - - - - - - - - - diff --git a/src/Bonsai.ZeroMQ/SendResponse.cs b/src/Bonsai.ZeroMQ/SendResponse.cs index 373265b..866f7a0 100644 --- a/src/Bonsai.ZeroMQ/SendResponse.cs +++ b/src/Bonsai.ZeroMQ/SendResponse.cs @@ -56,9 +56,12 @@ public override Expression Build(IEnumerable arguments) { var selector = Expression.Lambda(selectorBody, inputParameter); var selectorObservableType = selector.ReturnType.GetGenericArguments()[0]; - if (selectorObservableType != typeof(NetMQMessage)) + if (selectorObservableType != typeof(NetMQMessage) && + selectorObservableType != typeof(NetMQFrame) && + selectorObservableType != typeof(string) && + selectorObservableType != typeof(byte[])) { - throw new InvalidOperationException($"The specified response workflow must have a single output of type {typeof(NetMQMessage)}."); + throw new InvalidOperationException($"The type of the response workflow is not compatible with {typeof(NetMQMessage)}."); } return Expression.Call(GetType(), nameof(Process), null, source, selector); @@ -72,5 +75,64 @@ static IObservable Process(IObservable source, Fu .LastAsync() .Do(context.Response)); } + + static IObservable Process( + IObservable source, + Func, IObservable> selector) + { + return Process(source, selector, (response, buffer) => response.Append(buffer)); + } + + static IObservable Process( + IObservable source, + Func, IObservable> selector) + { + return Process(source, selector, (response, message) => response.Append(message, SendReceiveConstants.DefaultEncoding)); + } + + static IObservable Process( + IObservable source, + Func, IObservable> selector) + { + return Process(source, selector, (response, frame) => response.Append(frame)); + } + + static IObservable Process( + IObservable source, + Func, IObservable> selector, + Action appendFrame) + { + return source.SelectMany(context => selector(Observable + .Return(context.Request)) + .ToList() + .Select(frames => + { + var delimiter = context.Request.FrameCount - 1; + for (; delimiter >= 0; delimiter--) + { + if (context.Request[delimiter].IsEmpty) + { + break; + } + } + + var response = new NetMQMessage(frames.Count + delimiter + 1); + if (delimiter >= 0) + { + for (int i = 0; i <= delimiter; i++) + { + response.Append(context.Request[i]); + } + } + + foreach (var frame in frames) + { + appendFrame(response, frame); + } + + return response; + }) + .Do(context.Response)); + } } }