-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds implementation of byte buffer. #622
Merged
Merged
Changes from 3 commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
9dea4d4
Adds byte_stream.md, a design documentation for a proposed feature.
balazsracz 5147a97
Initializes the raw buffer pool together with the main buffer pool.
balazsracz 90a72d2
Adds ByteBuffer header, matching the design document.
balazsracz 11d8484
Merge branch 'master' into bracz-bytebuffer
balazsracz 57d5229
Merge branch 'bracz-bytebuffer' into bracz-bytebuffer-impl
balazsracz File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
# Byte Streams | ||
|
||
***WARNING*** this is currently a proposal that is not implemented yet. | ||
|
||
This document describes a mechanism by which OpenMRN components can exchange a | ||
unidirectional untyped data stream. The API is laid out for compatibility with | ||
the StateFlow concept and provides asynchronous implementations to limit memory | ||
usage and flow control. | ||
|
||
## Use-cases | ||
|
||
- OpenLCB Stream sending. The client component sends data to the stream | ||
transmitter using Byte Streams. | ||
|
||
- OpenLCB Stream receiving. The client component is receiving data from the | ||
Stream service using Byte Streams. | ||
|
||
- A fast implementation of a TCP Hub should use Byte Streams to represent the | ||
TCP messages for proxying between ports. | ||
|
||
## Non-goals | ||
|
||
- It is not a goal to use the Byte Stream API to write from a State Flow to an | ||
fd. (Neither sockets, pipes, nor physical files.) Writing to an fd should be | ||
done by the native StateFlow features like `write_repeated`. | ||
|
||
## Requirements | ||
|
||
- Memory allocation | ||
|
||
- Allocation has to avoid memory fragmentation. While the expectation is that | ||
larger blocks of memory are allocated in one go, these blocks should be | ||
reused between different users of Byte Streams and at different times, | ||
instead of returning to malloc/free. | ||
|
||
- Block allocation size should be 1 kbyte. This is small enough that even | ||
32kbyte MCUs can use the codebase, but large enough to capture meaningful | ||
TCP packet sizes. | ||
|
||
- Use and copy | ||
|
||
- The implementation should be zero-copy. Once some data is in a byte buffer, | ||
that data should not need to be copied in order to forward it to other byte | ||
buffers or byte streams. | ||
|
||
- Reference counting should be available when the payload needs to be used | ||
in multiple places. | ||
|
||
- It should be possible to take a subset of a buffer and send it to a | ||
different byte stream. (Routers will do this.) | ||
|
||
- A special zero-copy mechanism should be available when the source data is | ||
already in memory or in flash. Sending these bytes into the flow should not | ||
need block allocation and data copy. | ||
|
||
- Flow control | ||
|
||
- The data source has to be blocked in memory allocation when the sink has | ||
not yet consumed the data. | ||
|
||
- The amount of read-ahead should be configurable (i.e., how much memory does | ||
the source fill in before it gets blocked on waiting for the sink). | ||
|
||
|
||
## Implementation | ||
|
||
We define two buffer types. | ||
|
||
- `using RawBuffer = Buffer<char[1024]>;` | ||
|
||
This buffer holds the raw data. This buffer is reference counted, shared | ||
between all sinks, and never entered into a `Q`. All sinks consider this | ||
buffer as read-only. | ||
|
||
- `using ByteBuffer = Buffer<ByteChunk>;` | ||
|
||
This buffer holds a reference to a RawBuffer, and start/end pointers within | ||
that describe the exact range of bytes to use. This buffer is not shareable, | ||
it can be entered into a queue, i.e., sent to a `StateFlowWithQueue` / | ||
`FlowInterface<ByteBuffer>`. | ||
|
||
### Memory allocation | ||
|
||
The `RawBuffer` data blocks should come from `rawBufferPool`, which is a | ||
`DynamicPool` that is not the same as mainBufferPool, but instantiated with a | ||
single bucket of ~1 kbyte size (technically, `sizeof(RawBuffer)`). This ensures | ||
that the memory fragmentation requirements are met. When a RawBuffer is | ||
released, it goes back to the freelist of the `rawBufferPool`. | ||
|
||
To limit the amount of memory allocated, a `LimitedPool` can be instantiated by | ||
the data source which specifies a fixed number of 1kbyte blocks. The backing | ||
pool shall be set to `rawBufferPool`. | ||
|
||
### Memory ownership / deallocation | ||
|
||
`ByteChunk` contains an `BufferPtr<RawBuffer>`, which is a unique_ptr that | ||
unref's the buffer upon the destructor. This represents the ownership of a | ||
reference to a `RawBuffer`. The destructor of `ByteChunk` will automatically | ||
release this reference. | ||
|
||
It is optional for a ByteChunk to own a RawBuffer reference. A ByteChunk can | ||
also be created from externally-owned memory, such as flash. In this case the | ||
`unique_ptr` remains as nullptr, and no unref happens in the destructor. | ||
|
||
### Zero-copy | ||
|
||
To make a copy of a piece of data, a new ByteBuffer is allocated, and | ||
initialized with a new reference of the same RawBuffer. The copy can have the | ||
same data, or a contiguous substring of the data. Using a substring is helpful | ||
for example when we take the payload of an incoming TCP stream message and | ||
forward it to the stream reader client. It can also be helpful when taking an | ||
incoming message, taking off the header, prepending a new header, and sending | ||
it out on a different port. | ||
|
||
## Definition | ||
|
||
``` | ||
struct RawData | ||
{ | ||
uint8_t payload[1024]; | ||
}; | ||
|
||
using RawBuffer = Buffer<RawData>; | ||
|
||
struct ByteChunk | ||
{ | ||
BufferPtr<RawData> ownedData_; | ||
|
||
uint8_t* data_ {nullptr}; | ||
|
||
size_t size_ {0}; | ||
}; | ||
|
||
using ByteBuffer = Buffer<ByteChunk>; | ||
|
||
using ByteSink = FlowInterface<Buffer<ByteChunk>>; | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
/** \copyright | ||
* Copyright (c) 2022, Balazs Racz | ||
* All rights reserved. | ||
* | ||
* Redistribution and use in source and binary forms, with or without | ||
* modification, are permitted provided that the following conditions are met: | ||
* | ||
* - Redistributions of source code must retain the above copyright notice, | ||
* this list of conditions and the following disclaimer. | ||
* | ||
* - Redistributions in binary form must reproduce the above copyright notice, | ||
* this list of conditions and the following disclaimer in the documentation | ||
* and/or other materials provided with the distribution. | ||
* | ||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | ||
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | ||
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | ||
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE | ||
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR | ||
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF | ||
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS | ||
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN | ||
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) | ||
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE | ||
* POSSIBILITY OF SUCH DAMAGE. | ||
* | ||
* \file ByteBuffer.hxx | ||
* | ||
* Specialization of the Buffer / Pool infrastructure for untyped data | ||
* stream. See { \link doc/byte_buffer.md } | ||
* | ||
* @author Balazs Racz | ||
* @date 17 Apr 2022 | ||
*/ | ||
|
||
#ifndef _UTILS_BYTEBUFFER_HXX_ | ||
#define _UTILS_BYTEBUFFER_HXX_ | ||
|
||
#include "utils/Buffer.hxx" | ||
|
||
/// This is how many bytes we have in each raw buffer allocation. | ||
static constexpr unsigned RAWBUFFER_SIZE = 1024; | ||
|
||
/// Use this BufferPool to allocate raw buffers. | ||
extern Pool* rawBufferPool; | ||
|
||
/// Container for holding an arbitrary untyped data stream. | ||
struct RawData | ||
{ | ||
uint8_t payload[RAWBUFFER_SIZE]; | ||
}; | ||
|
||
/// Buffers of this type will be allocated from the rawBufferPool to hold the | ||
/// payloads of untyped data streams. These buffers are never enqueued into Q | ||
/// or QList objects. | ||
using RawBuffer = Buffer<RawData>; | ||
|
||
/// Holds a reference to a raw buffer, with the start and size information. | ||
struct ByteChunk | ||
{ | ||
/// Owns a ref for a RawData buffer. If this is nullptr, then the data | ||
/// references by this chunk is externally owned. | ||
BufferPtr<RawData> ownedData_; | ||
|
||
/// Points to the first byte of the useful data. | ||
uint8_t* data_ {nullptr}; | ||
|
||
/// How many bytes from data_ does this chunk represent. | ||
size_t size_ {0}; | ||
|
||
/// @return number of bytes pointed to by this chunk. | ||
size_t size() const | ||
{ | ||
return size_; | ||
} | ||
|
||
/// Moves forward the data beginning pointer by a given number of | ||
/// bytes. Represents that some number of bytes were consumed by the sink. | ||
/// @param num_bytes how much data was consumed. Must be <= size(). | ||
void advance(size_t num_bytes) | ||
{ | ||
HASSERT(num_bytes <= size_); | ||
data_ += num_bytes; | ||
size_ -= num_bytes; | ||
} | ||
}; | ||
|
||
/// Buffer type of references. These are enqueued for byte sinks. | ||
using ByteBuffer = Buffer<ByteChunk>; | ||
|
||
template<class T> class FlowInterface; | ||
|
||
/// Interface for sending a stream of data from a source to a sink. | ||
using ByteSink = FlowInterface<ByteBuffer>; | ||
|
||
#endif // _UTILS_BYTEBUFFER_HXX_ |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a potential race condition here between two threads calling this for the "first" time. I suspect in reality, this is initialized before multi-threading is started. It would be good to either make the check/allocate atomic or make a note on the expected startup behavior in the header file comment block.
You could alternatively use pthread_once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is true in abstract, but in the concrete case it is unnecessary. As you see from the surrounding code, the mainBufferPool is not protected by a pthread_once either. The reason this is not a problem is that this code runs at least once at static initialization time, and there are no threads at that point.