From 35ab0ea62738f75e36b3585f03e58de8b47beec0 Mon Sep 17 00:00:00 2001 From: winlin Date: Thu, 29 Jan 2015 23:54:04 +0800 Subject: [PATCH] for #250, use buffer to cache bytes, for system will split the udp packet. --- trunk/src/app/srs_app_mpegts_udp.cpp | 38 ++++++++++++++++++++++++---- trunk/src/app/srs_app_mpegts_udp.hpp | 2 ++ 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/trunk/src/app/srs_app_mpegts_udp.cpp b/trunk/src/app/srs_app_mpegts_udp.cpp index 32a5d2454f..1006c4d0bf 100644 --- a/trunk/src/app/srs_app_mpegts_udp.cpp +++ b/trunk/src/app/srs_app_mpegts_udp.cpp @@ -35,6 +35,7 @@ using namespace std; #include #include #include +#include #ifdef SRS_AUTO_STREAM_CASTER @@ -42,11 +43,13 @@ SrsMpegtsOverUdp::SrsMpegtsOverUdp(SrsConfDirective* c) { stream = new SrsStream(); context = new SrsTsContext(); + buffer = new SrsSimpleBuffer(); output = _srs_config->get_stream_caster_output(c); } SrsMpegtsOverUdp::~SrsMpegtsOverUdp() { + srs_freep(buffer); srs_freep(stream); srs_freep(context); } @@ -58,16 +61,36 @@ int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) std::string peer_ip = inet_ntoa(from->sin_addr); int peer_port = ntohs(from->sin_port); + // append to buffer. + buffer->append(buf, nb_buf); + + // find the sync byte of mpegts. + char* p = buffer->bytes(); + for (int i = 0; i < buffer->length(); i++) { + if (p[i] != 0x47) { + continue; + } + + if (i > 0) { + buffer->erase(i); + } + break; + } + // drop ts packet when size not modulus by 188 - if (nb_buf < SRS_TS_PACKET_SIZE || (nb_buf % SRS_TS_PACKET_SIZE) != 0) { - srs_warn("udp: drop %s:%d packet %d bytes", peer_ip.c_str(), peer_port, nb_buf); + if (buffer->length() < SRS_TS_PACKET_SIZE) { + srs_info("udp: wait %s:%d packet %d/%d bytes", + peer_ip.c_str(), peer_port, nb_buf, buffer->length()); return ret; } - srs_info("udp: got %s:%d packet %d bytes", peer_ip.c_str(), peer_port, nb_buf); + srs_info("udp: got %s:%d packet %d/%d bytes", + peer_ip.c_str(), peer_port, nb_buf, buffer->length()); // use stream to parse ts packet. - for (int i = 0; i < nb_buf; i += SRS_TS_PACKET_SIZE) { - if ((ret = stream->initialize(buf + i, SRS_TS_PACKET_SIZE)) != ERROR_SUCCESS) { + int nb_packet = buffer->length() / SRS_TS_PACKET_SIZE; + for (int i = 0; i < nb_packet; i++) { + char* p = buffer->bytes() + (i * SRS_TS_PACKET_SIZE); + if ((ret = stream->initialize(p, SRS_TS_PACKET_SIZE)) != ERROR_SUCCESS) { return ret; } @@ -80,6 +103,11 @@ int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) } srs_info("mpegts: parse udp packet completed"); + // erase consumed bytes + if (nb_packet > 0) { + buffer->erase(nb_packet * SRS_TS_PACKET_SIZE); + } + return ret; } diff --git a/trunk/src/app/srs_app_mpegts_udp.hpp b/trunk/src/app/srs_app_mpegts_udp.hpp index b30c2886a8..304ea836a8 100644 --- a/trunk/src/app/srs_app_mpegts_udp.hpp +++ b/trunk/src/app/srs_app_mpegts_udp.hpp @@ -36,6 +36,7 @@ class sockaddr_in; class SrsStream; class SrsTsContext; class SrsConfDirective; +class SrsSimpleBuffer; #ifdef SRS_AUTO_STREAM_CASTER @@ -49,6 +50,7 @@ class SrsMpegtsOverUdp : public ISrsTsHandler private: SrsStream* stream; SrsTsContext* context; + SrsSimpleBuffer* buffer; std::string output; public: SrsMpegtsOverUdp(SrsConfDirective* c);