Skip to content

Commit

Permalink
Mux modification (#63)
Browse files Browse the repository at this point in the history
Solves issues: #67, #68

* Increase muxing capacity up to 4*4096 ABR segments (more than 8 hours).
* Add capability to mux to MP4 in addition to fMP4. This can be chosen using the format in transcoding parameters.
* During muxing adjust output PTS/DTS and start from zero.
* Better EOF detection when muxing multiple streams.
* Start muxing after the first keyframe detection.
* Some enhancements and fixes in mux output seeker interface.
* Fix a memory leak in avpipe mux when muxing is filished
  • Loading branch information
elv-reza authored Nov 22, 2024
1 parent 8e3de0b commit 2d9c98a
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 106 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ The following repositories can be checked out in any directory, but for better o
- `gsutil -m cp 'gs://eluvio-test-assets/*' .`
- Inside _<avpipe-path>_ run
- `go test -timeout 2000s`
- Instead of the above commands, you can run the following scripts:
- `run_tests.sh` to run avpipe core functionality and transcoding tests.
- `run_live_tests.sh`: to run avipe live-streaming functionality tests.

# Design
- Avpipe library has been built on top of the different libraries of ffmpeg, the most important ones are libx264, libx265, libavcodec, libavformat, libavfilter and libswresample. But in order to achieve all the features and capabilities some parts of ffmpeg library have been changed. Avpipe library is capable of transcoding or probing an input source (i.e a media file, or an UDP/RTMP stream) and producing output media or probe results. In order to start a transcoding job the transcoding parameters have to be set.
Expand Down Expand Up @@ -152,7 +155,7 @@ typedef struct xcparams_t {
- **Extracting images:** avpipe library can extract images either using a time interval or specific timestamps.
- **HDR support:** avpipe library allows to create HDR output while transcoding with H.265 encoder. To make an HDR content two parameters max_cll and master_display have to be set.
- **Bypass feature:** setting bypass_transcoding to 1, would avoid transcoding and copies the input packets to output. This feature is very useful (saves a lot of CPU and time) when input data matches with output and we can skip transcoding.
- **Muxing audio/video ABR segments and creating MP4 files:** this feature allows the creation of MP4 files from transcoded audio/video segments. In order to do this a muxing spec has to be made to tell avpipe which ABR segments should be stitched together to produce the final MP4. To make this feature working xc_type should be set to xc_mux and the mux_spec param should point to a buffer containing muxing spec.
- **Muxing audio/video ABR segments and creating fMP4/MP4 files:** this feature allows the creation of fMP4/MP4 files from transcoded audio/video segments. In order to do this a muxing spec has to be made to tell avpipe which ABR segments should be stitched together to produce the final fMP4/MP4. To make this feature working xc_type should be set to xc_mux and the mux_spec param should point to a buffer containing muxing spec. If the format is 'fmp4-segment' the output will be fMP4, otherwise MP4.
- **Transcoding from specific timebase offset:** the parameter start_time_ts can be used to skip some input and transcode from specified TS in start_time_ts. This feature is also very useful to start transcoding from a certain point and not from the beginning of file/stream.
- **Audio join/pan/merge filters:**
- setting xc_type = xc_audio_join would join 2 or more audio inputs and create a new audio output (for example joining two mono streams and creating one stereo).
Expand Down
22 changes: 11 additions & 11 deletions avpipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ int64_t AVPipeOpenOutput(int64_t, int, int, int64_t, int);
int64_t AVPipeOpenMuxOutput(char *, int);
int AVPipeWriteOutput(int64_t, int64_t, uint8_t *, int);
int AVPipeWriteMuxOutput(int64_t, uint8_t *, int);
int AVPipeSeekOutput(int64_t, int64_t, int64_t, int);
int AVPipeSeekMuxOutput(int64_t, int64_t, int);
int64_t AVPipeSeekOutput(int64_t, int64_t, int64_t, int);
int64_t AVPipeSeekMuxOutput(int64_t, int64_t, int);
int AVPipeCloseOutput(int64_t, int64_t);
int AVPipeCloseMuxOutput(int64_t);
int AVPipeStatOutput(int64_t, int64_t, int, avpipe_buftype_t, avp_stat_t, void *);
Expand Down Expand Up @@ -473,7 +473,7 @@ udp_in_read_packet(
free(udp_packet);
}
if (debug_frame_level)
elv_dbg("IN READ UDP read=%d pos=%"PRId64" total=%"PRId64", url=%s", r, c->read_pos, c->read_bytes, c->url);
elv_dbg("IN READ UDP read=%d, pos=%"PRId64", total=%"PRId64", url=%s", r, c->read_pos, c->read_bytes, c->url);
}

return r;
Expand Down Expand Up @@ -607,7 +607,7 @@ out_opener(
return -1;
}

outctx->opaque = (int *) malloc(sizeof(int64_t));
outctx->opaque = (int64_t *) malloc(sizeof(int64_t));
*((int64_t *)(outctx->opaque)) = fd;

return 0;
Expand Down Expand Up @@ -651,7 +651,7 @@ out_write_packet(
}

if (xcparams && xcparams->debug_frame_level)
elv_dbg("OUT WRITE stream_index=%d, fd=%"PRId64", size=%d written=%d pos=%d total=%d",
elv_dbg("OUT WRITE stream_index=%d, fd=%"PRId64", size=%d, written=%d, pos=%d, total=%d",
outctx->stream_index, fd, buf_size, bwritten, outctx->write_pos, outctx->written_bytes);

return buf_size;
Expand All @@ -667,7 +667,7 @@ out_seek(
ioctx_t *inctx = outctx->inctx;
int64_t h = *((int64_t *)(inctx->opaque));
int64_t fd = *(int64_t *)outctx->opaque;
int rc = AVPipeSeekOutput(h, fd, offset, whence);
int64_t rc = AVPipeSeekOutput(h, fd, offset, whence);
whence = whence & 0xFFFF; /* Mask out AVSEEK_SIZE and AVSEEK_FORCE */
switch (whence) {
case SEEK_SET:
Expand All @@ -680,7 +680,7 @@ out_seek(
elv_dbg("OUT SEEK - weird seek\n");
}

elv_dbg("OUT SEEK fd=%"PRId64" offset=%d whence=%d", fd, offset, whence);
elv_dbg("OUT SEEK fd=%"PRId64", offset=%d, whence=%d, rc=%"PRId64, fd, offset, whence, rc);

return rc;
}
Expand Down Expand Up @@ -1100,18 +1100,18 @@ in_mux_read_packet(
if (index == 0) {
/* Reached end of videos */
if (in_mux_ctx->video.index >= in_mux_ctx->video.n_parts)
return -1;
return AVERROR_EOF;
filepath = in_mux_ctx->video.parts[in_mux_ctx->video.index];
in_mux_ctx->video.index++;
} else if (index <= in_mux_ctx->last_audio_index) {
if (in_mux_ctx->audios[index-1].index >= in_mux_ctx->audios[index-1].n_parts)
return -1;
return AVERROR_EOF;
filepath = in_mux_ctx->audios[index-1].parts[in_mux_ctx->audios[index-1].index];
in_mux_ctx->audios[index-1].index++;
} else if (index <= in_mux_ctx->last_audio_index+in_mux_ctx->last_caption_index) {
i = index - in_mux_ctx->last_audio_index - 1;
if (in_mux_ctx->captions[i].index >= in_mux_ctx->captions[i].n_parts)
return -1;
return AVERROR_EOF;
filepath = in_mux_ctx->captions[i].parts[in_mux_ctx->captions[i].index];
in_mux_ctx->captions[i].index++;
} else {
Expand Down Expand Up @@ -1255,7 +1255,7 @@ out_mux_seek(
ioctx_t *inctx = outctx->inctx;
xcparams_t *xcparams = (inctx != NULL) ? inctx->params : NULL;
int64_t fd = *(int64_t *)outctx->opaque;
int rc = AVPipeSeekMuxOutput(fd, offset, whence);
int64_t rc = AVPipeSeekMuxOutput(fd, offset, whence);
if (xcparams != NULL && xcparams->debug_frame_level)
elv_dbg("OUT MUX SEEK fd=%"PRId64" offset=%d whence=%d", fd, offset, whence);
return rc;
Expand Down
25 changes: 17 additions & 8 deletions avpipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import "C"
import (
"encoding/json"
"fmt"
"io"
"math/big"
"sync"
"unsafe"
Expand Down Expand Up @@ -118,6 +119,13 @@ const (
XcProfileH264Heigh10 = C.FF_PROFILE_H264_HIGH_10 // 110
)

type SeekReadWriteCloser interface {
io.Seeker
io.Reader
io.Writer
io.Closer
}

func XcTypeFromString(xcTypeStr string) XcType {
var xcType XcType
switch xcTypeStr {
Expand Down Expand Up @@ -1007,36 +1015,36 @@ func (h *ioHandler) OutWriter(fd C.int64_t, buf []byte) (int, error) {
}

//export AVPipeSeekOutput
func AVPipeSeekOutput(handler C.int64_t, fd C.int64_t, offset C.int64_t, whence C.int) C.int {
func AVPipeSeekOutput(handler C.int64_t, fd C.int64_t, offset C.int64_t, whence C.int) C.int64_t {
gMutex.Lock()
h := gHandlers[int64(handler)]
if h == nil {
gMutex.Unlock()
return C.int(-1)
return C.int64_t(-1)
}
gMutex.Unlock()
n, err := h.OutSeeker(fd, offset, whence)
if err != nil {
return C.int(-1)
return C.int64_t(-1)
}
return C.int(n)
return C.int64_t(n)
}

//export AVPipeSeekMuxOutput
func AVPipeSeekMuxOutput(fd C.int64_t, offset C.int64_t, whence C.int) C.int {
func AVPipeSeekMuxOutput(fd C.int64_t, offset C.int64_t, whence C.int) C.int64_t {
gMutex.Lock()
outHandler := gMuxHandlers[int64(fd)]
if outHandler == nil {
gMutex.Unlock()
return C.int(-1)
return C.int64_t(-1)
}
gMutex.Unlock()

n, err := outHandler.Seek(int64(offset), int(whence))
if err != nil {
return C.int(-1)
return C.int64_t(-1)
}
return C.int(n)
return C.int64_t(n)
}

func (h *ioHandler) OutSeeker(fd C.int64_t, offset C.int64_t, whence C.int) (int64, error) {
Expand Down Expand Up @@ -1380,6 +1388,7 @@ func Mux(params *XcParams) error {
return EAV_PARAM
}

params.XcType = XcMux
cparams, err := getCParams(params)
if err != nil {
log.Error("Muxing failed", err, "url", params.Url)
Expand Down
6 changes: 6 additions & 0 deletions elvxc/cmd/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func InitMux(cmdRoot *cobra.Command) error {

cmdTranscode.PersistentFlags().StringP("filename", "f", "", "(mandatory) muxing output filename.")
cmdTranscode.PersistentFlags().String("mux-spec", "", "(mandatory) muxing spec file.")
cmdTranscode.PersistentFlags().StringP("format", "", "fmp4-segment", "package format, can be 'dash', 'hls', 'mp4', 'fmp4', 'segment', 'fmp4-segment', or 'image2'.")

return nil
}
Expand All @@ -208,6 +209,10 @@ func doMux(cmd *cobra.Command, args []string) error {
return fmt.Errorf("mux-spec is needed to do muxing")
}

format := cmd.Flag("format").Value.String()
if format != "dash" && format != "hls" && format != "mp4" && format != "fmp4" && format != "segment" && format != "fmp4-segment" && format != "image2" {
return fmt.Errorf("Package format is not valid, can be 'dash', 'hls', 'mp4', 'fmp4', 'segment', 'fmp4-segment', or 'image2'")
}
muxSpec, err := ioutil.ReadFile(muxSpecFile)
if err != nil {
return fmt.Errorf("Could not read mux-spec file %s", muxSpecFile)
Expand All @@ -218,6 +223,7 @@ func doMux(cmd *cobra.Command, args []string) error {
MuxingSpec: string(muxSpec),
Url: filename,
DebugFrameLevel: true,
Format: format,
}

avpipe.InitUrlMuxIOHandler(filename, &AVCmdMuxInputOpener{URL: filename}, &AVCmdMuxOutputOpener{})
Expand Down
10 changes: 5 additions & 5 deletions exc/elv_mux.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,18 @@ in_mux_read_packet(
if (index == 0) {
/* Reached end of videos */
if (in_mux_ctx->video.index >= in_mux_ctx->video.n_parts)
return -1;
return AVERROR_EOF;
filepath = in_mux_ctx->video.parts[in_mux_ctx->video.index];
in_mux_ctx->video.index++;
} else if (index <= in_mux_ctx->last_audio_index) {
if (in_mux_ctx->audios[index-1].index >= in_mux_ctx->audios[index-1].n_parts)
return -1;
return AVERROR_EOF;
filepath = in_mux_ctx->audios[index-1].parts[in_mux_ctx->audios[index-1].index];
in_mux_ctx->audios[index-1].index++;
} else if (index <= in_mux_ctx->last_audio_index+in_mux_ctx->last_caption_index) {
if (in_mux_ctx->captions[index - in_mux_ctx->last_audio_index - 1].index >=
in_mux_ctx->captions[index - in_mux_ctx->last_audio_index - 1].n_parts)
return -1;
return AVERROR_EOF;
filepath = in_mux_ctx->captions[index - in_mux_ctx->last_audio_index - 1].parts[in_mux_ctx->captions[index - in_mux_ctx->last_audio_index - 1].index];
in_mux_ctx->captions[index - in_mux_ctx->last_audio_index - 1].index++;
} else {
Expand Down Expand Up @@ -266,7 +266,7 @@ out_mux_seek(
ioctx_t *outctx = (ioctx_t *)opaque;
int fd = *(int *)outctx->opaque;

int rc = lseek(fd, offset, whence);
int64_t rc = lseek(fd, offset, whence);
whence = whence & 0xFFFF; /* Mask out AVSEEK_SIZE and AVSEEK_FORCE */
switch (whence) {
case SEEK_SET:
Expand All @@ -281,7 +281,7 @@ out_mux_seek(
elv_err("OUT MUX SEEK - weird seek\n");
}

elv_dbg("OUT MUX SEEK offset=%"PRId64" whence=%d rc=%d", offset, whence, rc);
elv_dbg("OUT MUX SEEK offset=%"PRId64" whence=%d rc=%"PRId64, offset, whence, rc);
return rc;
}

Expand Down
4 changes: 2 additions & 2 deletions libavpipe/include/avpipe_xc.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include "elv_channel.h"

#define MAX_STREAMS 64
#define MAX_MUX_IN_STREAM 4096
#define MAX_MUX_IN_STREAM (4*4096) // Up to 4*4096 ABR segments

#define AVIO_OUT_BUF_SIZE (1*1024*1024) // avio output buffer size
#define AVIO_IN_BUF_SIZE (1*1024*1024) // avio input buffer size
Expand Down Expand Up @@ -120,7 +120,7 @@ typedef struct udp_packet_t {
typedef struct mux_input_ctx_t {
int n_parts; /* Number of input parts */
int index; /* Index of current input part that should be processed */
char *parts[MAX_MUX_IN_STREAM]; /* All the input parts */
char **parts; /* All the input parts */
int header_size;
} mux_input_ctx_t;

Expand Down
Loading

0 comments on commit 2d9c98a

Please sign in to comment.