From fd8c49ac02236784bff4c560f96f816a42607c0e Mon Sep 17 00:00:00 2001 From: Cameron Gutman Date: Mon, 15 Jul 2024 18:46:41 -0500 Subject: [PATCH] Implement S/G IO for batched sends and eliminate another frame copy --- src/crypto.cpp | 15 ++++--- src/crypto.h | 11 ++++++ src/platform/common.h | 43 ++++++++++++++++++++- src/platform/linux/misc.cpp | 70 ++++++++++++++++++++++++--------- src/platform/windows/misc.cpp | 37 +++++++++++++++--- src/stream.cpp | 73 +++++++++++++++++++++++------------ 6 files changed, 193 insertions(+), 56 deletions(-) diff --git a/src/crypto.cpp b/src/crypto.cpp index c1583a9305f..b16ab08ab37 100644 --- a/src/crypto.cpp +++ b/src/crypto.cpp @@ -185,7 +185,7 @@ namespace crypto { * The resulting ciphertext and the GCM tag are written into the tagged_cipher buffer. */ int - gcm_t::encrypt(const std::string_view &plaintext, std::uint8_t *tagged_cipher, aes_t *iv) { + gcm_t::encrypt(const std::string_view &plaintext, std::uint8_t *tag, std::uint8_t *ciphertext, aes_t *iv) { if (!encrypt_ctx && init_encrypt_gcm(encrypt_ctx, &key, iv, padding)) { return -1; } @@ -196,18 +196,15 @@ namespace crypto { return -1; } - auto tag = tagged_cipher; - auto cipher = tag + tag_size; - int update_outlen, final_outlen; // Encrypt into the caller's buffer - if (EVP_EncryptUpdate(encrypt_ctx.get(), cipher, &update_outlen, (const std::uint8_t *) plaintext.data(), plaintext.size()) != 1) { + if (EVP_EncryptUpdate(encrypt_ctx.get(), ciphertext, &update_outlen, (const std::uint8_t *) plaintext.data(), plaintext.size()) != 1) { return -1; } // GCM encryption won't ever fill ciphertext here but we have to call it anyway - if (EVP_EncryptFinal_ex(encrypt_ctx.get(), cipher + update_outlen, &final_outlen) != 1) { + if (EVP_EncryptFinal_ex(encrypt_ctx.get(), ciphertext + update_outlen, &final_outlen) != 1) { return -1; } @@ -218,6 +215,12 @@ namespace crypto { return update_outlen + final_outlen; } + int + gcm_t::encrypt(const std::string_view &plaintext, std::uint8_t *tagged_cipher, aes_t *iv) { + // This overload handles the common case of [GCM tag][cipher text] buffer layout + return encrypt(plaintext, tagged_cipher, tagged_cipher + tag_size, iv); + } + int ecb_t::decrypt(const std::string_view &cipher, std::vector &plaintext) { auto fg = util::fail_guard([this]() { diff --git a/src/crypto.h b/src/crypto.h index 207e4ebf4b3..20651ecb1ae 100644 --- a/src/crypto.h +++ b/src/crypto.h @@ -125,6 +125,17 @@ namespace crypto { gcm_t(const crypto::aes_t &key, bool padding = true); + /** + * @brief Encrypts the plaintext using AES GCM mode. + * @param plaintext The plaintext data to be encrypted. + * @param tag The buffer where the GCM tag will be written. + * @param ciphertext The buffer where the resulting ciphertext will be written. + * @param iv The initialization vector to be used for the encryption. + * @return The total length of the ciphertext and GCM tag. Returns -1 in case of an error. + */ + int + encrypt(const std::string_view &plaintext, std::uint8_t *tag, std::uint8_t *ciphertext, aes_t *iv); + /** * @brief Encrypts the plaintext using AES GCM mode. * length of cipher must be at least: round_to_pkcs7_padded(plaintext.size()) + crypto::cipher::tag_size diff --git a/src/platform/common.h b/src/platform/common.h index 595b0cbecb0..cf25df46cde 100644 --- a/src/platform/common.h +++ b/src/platform/common.h @@ -606,15 +606,54 @@ namespace platf { void restart(); - struct batched_send_info_t { + struct buffer_descriptor_t { const char *buffer; - size_t block_size; + size_t size; + }; + + struct batched_send_info_t { + // Optional headers to be prepended to each packet + const char *headers; + size_t header_size; + + // One or more data buffers to use for the payloads + // + // NB: Data buffers must be aligned to payload size! + std::vector &payload_buffers; + size_t payload_size; + + // The offset (in header+payload message blocks) in the header and payload + // buffers to begin sending messages from + size_t block_offset; + + // The number of header+payload message blocks to send size_t block_count; std::uintptr_t native_socket; boost::asio::ip::address &target_address; uint16_t target_port; boost::asio::ip::address &source_address; + + /** + * @brief Returns a payload buffer descriptor for the given payload offset. + * @param offset The offset in the total payload data (bytes). + * @return Buffer descriptor describing the region at the given offset. + */ + buffer_descriptor_t + buffer_for_payload_offset(ptrdiff_t offset) { + for (const auto &desc : payload_buffers) { + if (offset < desc.size) { + return { + desc.buffer + offset, + desc.size - offset, + }; + } + else { + offset -= desc.size; + } + } + return {}; + } }; bool send_batch(batched_send_info_t &send_info); diff --git a/src/platform/linux/misc.cpp b/src/platform/linux/misc.cpp index 81b88f134a0..63c8bb6cebb 100644 --- a/src/platform/linux/misc.cpp +++ b/src/platform/linux/misc.cpp @@ -433,22 +433,48 @@ namespace platf { memcpy(CMSG_DATA(pktinfo_cm), &pktInfo, sizeof(pktInfo)); } + auto const max_iovs_per_msg = send_info.payload_buffers.size() + (send_info.headers ? 1 : 0); + #ifdef UDP_SEGMENT { - struct iovec iov = {}; - - msg.msg_iov = &iov; - msg.msg_iovlen = 1; - // UDP GSO on Linux currently only supports sending 64K or 64 segments at a time size_t seg_index = 0; const size_t seg_max = 65536 / 1500; + struct iovec iovs[(send_info.headers ? send_info.block_count : 1) * max_iovs_per_msg] = {}; + auto msg_size = send_info.header_size + send_info.payload_size; while (seg_index < send_info.block_count) { - iov.iov_base = (void *) &send_info.buffer[seg_index * send_info.block_size]; - iov.iov_len = send_info.block_size * std::min(send_info.block_count - seg_index, seg_max); + int iovlen = 0; + auto segs_in_batch = std::min(send_info.block_count - seg_index, seg_max); + if (send_info.headers) { + // Interleave iovs for headers and payloads + for (auto i = 0; i < segs_in_batch; i++) { + iovs[iovlen].iov_base = (void *) &send_info.headers[(send_info.block_offset + seg_index + i) * send_info.header_size]; + iovs[iovlen].iov_len = send_info.header_size; + iovlen++; + auto payload_desc = send_info.buffer_for_payload_offset((send_info.block_offset + seg_index + i) * send_info.payload_size); + iovs[iovlen].iov_base = (void *) payload_desc.buffer; + iovs[iovlen].iov_len = send_info.payload_size; + iovlen++; + } + } + else { + // Translate buffer descriptors into iovs + auto payload_offset = (send_info.block_offset + seg_index) * send_info.payload_size; + auto payload_length = payload_offset + (segs_in_batch * send_info.payload_size); + while (payload_offset < payload_length) { + auto payload_desc = send_info.buffer_for_payload_offset(payload_offset); + iovs[iovlen].iov_base = (void *) payload_desc.buffer; + iovs[iovlen].iov_len = std::min(payload_desc.size, payload_length - payload_offset); + payload_offset += iovs[iovlen].iov_len; + iovlen++; + } + } + + msg.msg_iov = iovs; + msg.msg_iovlen = iovlen; // We should not use GSO if the data is <= one full block size - if (iov.iov_len > send_info.block_size) { + if (segs_in_batch > 1) { msg.msg_controllen = cmbuflen + CMSG_SPACE(sizeof(uint16_t)); // Enable GSO to perform segmentation of our buffer for us @@ -456,7 +482,7 @@ namespace platf { cm->cmsg_level = SOL_UDP; cm->cmsg_type = UDP_SEGMENT; cm->cmsg_len = CMSG_LEN(sizeof(uint16_t)); - *((uint16_t *) CMSG_DATA(cm)) = send_info.block_size; + *((uint16_t *) CMSG_DATA(cm)) = msg_size; } else { msg.msg_controllen = cmbuflen; @@ -483,10 +509,11 @@ namespace platf { continue; } + BOOST_LOG(warning) << "sendmsg() failed: "sv << errno; break; } - seg_index += bytes_sent / send_info.block_size; + seg_index += bytes_sent / msg_size; } // If we sent something, return the status and don't fall back to the non-GSO path. @@ -498,18 +525,25 @@ namespace platf { { // If GSO is not supported, use sendmmsg() instead. - struct mmsghdr msgs[send_info.block_count]; - struct iovec iovs[send_info.block_count]; + struct mmsghdr msgs[send_info.block_count] = {}; + struct iovec iovs[send_info.block_count * (send_info.headers ? 2 : 1)] = {}; + int iov_idx = 0; for (size_t i = 0; i < send_info.block_count; i++) { - iovs[i] = {}; - iovs[i].iov_base = (void *) &send_info.buffer[i * send_info.block_size]; - iovs[i].iov_len = send_info.block_size; + msgs[i].msg_hdr.msg_iov = &iovs[iov_idx]; + msgs[i].msg_hdr.msg_iovlen = send_info.headers ? 2 : 1; + + if (send_info.headers) { + iovs[iov_idx].iov_base = (void *) &send_info.headers[(send_info.block_offset + i) * send_info.header_size]; + iovs[iov_idx].iov_len = send_info.header_size; + iov_idx++; + } + auto payload_desc = send_info.buffer_for_payload_offset((send_info.block_offset + i) * send_info.payload_size); + iovs[iov_idx].iov_base = (void *) payload_desc.buffer; + iovs[iov_idx].iov_len = send_info.payload_size; + iov_idx++; - msgs[i] = {}; msgs[i].msg_hdr.msg_name = msg.msg_name; msgs[i].msg_hdr.msg_namelen = msg.msg_namelen; - msgs[i].msg_hdr.msg_iov = &iovs[i]; - msgs[i].msg_hdr.msg_iovlen = 1; msgs[i].msg_hdr.msg_control = cmbuf.buf; msgs[i].msg_hdr.msg_controllen = cmbuflen; } diff --git a/src/platform/windows/misc.cpp b/src/platform/windows/misc.cpp index 455771c6723..465a0085f5d 100644 --- a/src/platform/windows/misc.cpp +++ b/src/platform/windows/misc.cpp @@ -1452,12 +1452,37 @@ namespace platf { msg.namelen = sizeof(taddr_v4); } - WSABUF buf; - buf.buf = (char *) send_info.buffer; - buf.len = send_info.block_size * send_info.block_count; + auto const max_bufs_per_msg = send_info.payload_buffers.size() + (send_info.headers ? 1 : 0); - msg.lpBuffers = &buf; - msg.dwBufferCount = 1; + WSABUF bufs[(send_info.headers ? send_info.block_count : 1) * max_bufs_per_msg]; + DWORD bufcount = 0; + if (send_info.headers) { + // Interleave buffers for headers and payloads + for (auto i = 0; i < send_info.block_count; i++) { + bufs[bufcount].buf = (char *) &send_info.headers[(send_info.block_offset + i) * send_info.header_size]; + bufs[bufcount].len = send_info.header_size; + bufcount++; + auto payload_desc = send_info.buffer_for_payload_offset((send_info.block_offset + i) * send_info.payload_size); + bufs[bufcount].buf = (char *) payload_desc.buffer; + bufs[bufcount].len = send_info.payload_size; + bufcount++; + } + } + else { + // Translate buffer descriptors into WSABUFs + auto payload_offset = send_info.block_offset * send_info.payload_size; + auto payload_length = payload_offset + (send_info.block_count * send_info.payload_size); + while (payload_offset < payload_length) { + auto payload_desc = send_info.buffer_for_payload_offset(payload_offset); + bufs[bufcount].buf = (char *) payload_desc.buffer; + bufs[bufcount].len = std::min(payload_desc.size, payload_length - payload_offset); + payload_offset += bufs[bufcount].len; + bufcount++; + } + } + + msg.lpBuffers = bufs; + msg.dwBufferCount = bufcount; msg.dwFlags = 0; // At most, one DWORD option and one PKTINFO option @@ -1505,7 +1530,7 @@ namespace platf { cm->cmsg_level = IPPROTO_UDP; cm->cmsg_type = UDP_SEND_MSG_SIZE; cm->cmsg_len = WSA_CMSG_LEN(sizeof(DWORD)); - *((DWORD *) WSA_CMSG_DATA(cm)) = send_info.block_size; + *((DWORD *) WSA_CMSG_DATA(cm)) = send_info.header_size + send_info.payload_size; } msg.Control.len = cmbuflen; diff --git a/src/stream.cpp b/src/stream.cpp index 40adab09bd3..eebeebae8e7 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -126,11 +126,6 @@ namespace stream { }; struct video_packet_enc_prefix_t { - video_packet_raw_t * - payload() { - return (video_packet_raw_t *) (this + 1); - } - std::uint8_t iv[12]; // 12-byte IV is ideal for AES-GCM std::uint32_t frameNumber; std::uint8_t tag[16]; @@ -227,7 +222,6 @@ namespace stream { } constexpr std::size_t MAX_AUDIO_PACKET_SIZE = 1400; - using video_packet_t = util::c_ptr; using audio_aes_t = std::array; using av_session_id_t = std::variant; // IP address or SS-Ping-Payload from RTSP handshake @@ -619,15 +613,19 @@ namespace stream { size_t blocksize; size_t prefixsize; util::buffer_t shards; + util::buffer_t headers; + util::buffer_t shards_p; + + std::vector payload_buffers; char * data(size_t el) { - return &shards[(el + 1) * prefixsize + el * blocksize]; + return (char *) shards_p[el]; } char * prefix(size_t el) { - return &shards[el * (prefixsize + blocksize)]; + return prefixsize ? &headers[el * prefixsize] : nullptr; } size_t @@ -642,7 +640,8 @@ namespace stream { auto pad = payload_size % blocksize != 0; - auto data_shards = payload_size / blocksize + (pad ? 1 : 0); + auto aligned_data_shards = payload_size / blocksize; + auto data_shards = aligned_data_shards + (pad ? 1 : 0); auto parity_shards = (data_shards * fecpercentage + 99) / 100; // increase the FEC percentage for this frame if the parity shard minimum is not met @@ -655,27 +654,46 @@ namespace stream { auto nr_shards = data_shards + parity_shards; - util::buffer_t shards { nr_shards * (blocksize + prefixsize) }; + // If we need to store a zero-padded data shard, allocate that first to + // to keep the shards in order and reduce buffer fragmentation + auto parity_shard_offset = pad ? 1 : 0; + util::buffer_t shards { (parity_shard_offset + parity_shards) * blocksize }; util::buffer_t shards_p { nr_shards }; + std::vector payload_buffers; + payload_buffers.reserve(2); + // Point into the payload buffer for all except the final padded data shard auto next = std::begin(payload); - for (auto x = 0; x < nr_shards; ++x) { - shards_p[x] = (uint8_t *) &shards[(x + 1) * prefixsize + x * blocksize]; + for (auto x = 0; x < aligned_data_shards; ++x) { + shards_p[x] = (uint8_t *) next; + next += blocksize; + } + payload_buffers.push_back({ std::begin(payload), aligned_data_shards * blocksize }); + + // If the last data shard needs to be zero-padded, we must use the shards buffer + if (pad) { + shards_p[aligned_data_shards] = (uint8_t *) &shards[0]; // GCC doesn't figure out that std::copy_n() can be replaced with memcpy() here // and ends up compiling a horribly slow element-by-element copy loop, so we // help it by using memcpy()/memset() directly. auto copy_len = std::min(blocksize, std::end(payload) - next); - std::memcpy(shards_p[x], next, copy_len); + std::memcpy(shards_p[aligned_data_shards], next, copy_len); if (copy_len < blocksize) { // Zero any additional space after the end of the payload - std::memset(shards_p[x] + copy_len, 0, blocksize - copy_len); + std::memset(shards_p[aligned_data_shards] + copy_len, 0, blocksize - copy_len); } - - next += copy_len; } + // Add a payload buffer describing the shard buffer + payload_buffers.push_back(platf::buffer_descriptor_t { std::begin(shards), shards.size() }); + if (fecpercentage != 0) { + // Point into our allocated buffer for the parity shards + for (auto x = 0; x < parity_shards; ++x) { + shards_p[data_shards + x] = (uint8_t *) &shards[(parity_shard_offset + x) * blocksize]; + } + // packets = parity_shards + data_shards rs_t rs { reed_solomon_new(data_shards, parity_shards) }; @@ -688,7 +706,10 @@ namespace stream { fecpercentage, blocksize, prefixsize, - std::move(shards) + std::move(shards), + util::buffer_t { nr_shards * prefixsize }, + std::move(shards_p), + std::move(payload_buffers), }; } } // namespace fec @@ -1438,8 +1459,11 @@ namespace stream { auto peer_address = session->video.peer.address(); auto batch_info = platf::batched_send_info_t { - nullptr, - shards.prefixsize + shards.blocksize, + shards.headers.begin(), + shards.prefixsize, + shards.payload_buffers, + shards.blocksize, + 0, 0, (uintptr_t) sock.native_handle(), peer_address, @@ -1487,7 +1511,8 @@ namespace stream { auto *prefix = (video_packet_enc_prefix_t *) shards.prefix(x); prefix->frameNumber = packet->frame_index(); std::copy(std::begin(iv), std::end(iv), prefix->iv); - session->video.cipher->encrypt(std::string_view { (char *) inspect, (size_t) blocksize }, prefix->tag, &iv); + session->video.cipher->encrypt(std::string_view { (char *) inspect, (size_t) blocksize }, + prefix->tag, (uint8_t *) inspect, &iv); } if (x - next_shard_to_send + 1 >= send_batch_size || @@ -1510,7 +1535,7 @@ namespace stream { } size_t current_batch_size = x - next_shard_to_send + 1; - batch_info.buffer = shards.prefix(next_shard_to_send); + batch_info.block_offset = next_shard_to_send; batch_info.block_count = current_batch_size; frame_send_batch_latency_logger.first_point_now(); @@ -1520,10 +1545,10 @@ namespace stream { BOOST_LOG(verbose) << "Falling back to unbatched send"sv; for (auto y = 0; y < current_batch_size; y++) { auto send_info = platf::send_info_t { - nullptr, - 0, shards.prefix(next_shard_to_send + y), - shards.prefixsize + shards.blocksize, + shards.prefixsize, + shards.data(next_shard_to_send + y), + shards.blocksize, (uintptr_t) sock.native_handle(), peer_address, session->video.peer.port(),