forked from mpv-player/mpv
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdemux.c
4486 lines (3778 loc) · 149 KB
/
demux.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* This file is part of mpv.
*
* mpv is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* mpv is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with mpv. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <unistd.h>
#include <limits.h>
#include <pthread.h>
#include <stdint.h>
#include <math.h>
#include <sys/types.h>
#include <sys/stat.h>
#include "cache.h"
#include "config.h"
#include "options/m_config.h"
#include "options/m_option.h"
#include "mpv_talloc.h"
#include "common/av_common.h"
#include "common/msg.h"
#include "common/global.h"
#include "common/recorder.h"
#include "misc/charset_conv.h"
#include "misc/thread_tools.h"
#include "osdep/atomic.h"
#include "osdep/timer.h"
#include "osdep/threads.h"
#include "stream/stream.h"
#include "demux.h"
#include "timeline.h"
#include "stheader.h"
#include "cue.h"
// Demuxer list
extern const struct demuxer_desc demuxer_desc_edl;
extern const struct demuxer_desc demuxer_desc_cue;
extern const demuxer_desc_t demuxer_desc_rawaudio;
extern const demuxer_desc_t demuxer_desc_rawvideo;
extern const demuxer_desc_t demuxer_desc_mf;
extern const demuxer_desc_t demuxer_desc_matroska;
extern const demuxer_desc_t demuxer_desc_lavf;
extern const demuxer_desc_t demuxer_desc_playlist;
extern const demuxer_desc_t demuxer_desc_disc;
extern const demuxer_desc_t demuxer_desc_rar;
extern const demuxer_desc_t demuxer_desc_libarchive;
extern const demuxer_desc_t demuxer_desc_null;
extern const demuxer_desc_t demuxer_desc_timeline;
static const demuxer_desc_t *const demuxer_list[] = {
&demuxer_desc_disc,
&demuxer_desc_edl,
&demuxer_desc_cue,
&demuxer_desc_rawaudio,
&demuxer_desc_rawvideo,
&demuxer_desc_matroska,
#if HAVE_LIBARCHIVE
&demuxer_desc_libarchive,
#endif
&demuxer_desc_lavf,
&demuxer_desc_mf,
&demuxer_desc_playlist,
&demuxer_desc_null,
NULL
};
struct demux_opts {
int enable_cache;
int disk_cache;
int64_t max_bytes;
int64_t max_bytes_bw;
double min_secs;
int force_seekable;
double min_secs_cache;
int access_references;
int seekable_cache;
int create_ccs;
char *record_file;
int video_back_preroll;
int audio_back_preroll;
int back_batch[STREAM_TYPE_COUNT];
double back_seek_size;
char *meta_cp;
};
#define OPT_BASE_STRUCT struct demux_opts
#define MAX_BYTES MPMIN(INT64_MAX, SIZE_MAX / 2)
const struct m_sub_options demux_conf = {
.opts = (const struct m_option[]){
OPT_CHOICE("cache", enable_cache, 0,
({"no", 0}, {"auto", -1}, {"yes", 1})),
OPT_FLAG("cache-on-disk", disk_cache, 0),
OPT_DOUBLE("demuxer-readahead-secs", min_secs, M_OPT_MIN, .min = 0),
// (The MAX_BYTES sizes may not be accurate because the max field is
// of double type.)
OPT_BYTE_SIZE("demuxer-max-bytes", max_bytes, 0, 0, MAX_BYTES),
OPT_BYTE_SIZE("demuxer-max-back-bytes", max_bytes_bw, 0, 0, MAX_BYTES),
OPT_FLAG("force-seekable", force_seekable, 0),
OPT_DOUBLE("cache-secs", min_secs_cache, M_OPT_MIN, .min = 0),
OPT_FLAG("access-references", access_references, 0),
OPT_CHOICE("demuxer-seekable-cache", seekable_cache, 0,
({"auto", -1}, {"no", 0}, {"yes", 1})),
OPT_FLAG("sub-create-cc-track", create_ccs, 0),
OPT_STRING("stream-record", record_file, 0),
OPT_CHOICE_OR_INT("video-backward-overlap", video_back_preroll, 0, 0,
1024, ({"auto", -1})),
OPT_CHOICE_OR_INT("audio-backward-overlap", audio_back_preroll, 0, 0,
1024, ({"auto", -1})),
OPT_INTRANGE("video-backward-batch", back_batch[STREAM_VIDEO], 0, 0, 1024),
OPT_INTRANGE("audio-backward-batch", back_batch[STREAM_AUDIO], 0, 0, 1024),
OPT_DOUBLE("demuxer-backward-playback-step", back_seek_size, M_OPT_MIN,
.min = 0),
OPT_STRING("metadata-codepage", meta_cp, 0),
{0}
},
.size = sizeof(struct demux_opts),
.defaults = &(const struct demux_opts){
.enable_cache = -1, // auto
.max_bytes = 150 * 1024 * 1024,
.max_bytes_bw = 50 * 1024 * 1024,
.min_secs = 1.0,
.min_secs_cache = 10.0 * 60 * 60,
.seekable_cache = -1,
.access_references = 1,
.video_back_preroll = -1,
.audio_back_preroll = -1,
.back_seek_size = 60,
.back_batch = {
[STREAM_VIDEO] = 1,
[STREAM_AUDIO] = 10,
},
.meta_cp = "utf-8",
},
};
struct demux_internal {
struct mp_log *log;
struct mpv_global *global;
bool can_cache; // not a slave demuxer; caching makes sense
bool can_record; // stream recording is allowed
// The demuxer runs potentially in another thread, so we keep two demuxer
// structs; the real demuxer can access the shadow struct only.
struct demuxer *d_thread; // accessed by demuxer impl. (producer)
struct demuxer *d_user; // accessed by player (consumer)
// The lock protects the packet queues (struct demux_stream),
// and the fields below.
pthread_mutex_t lock;
pthread_cond_t wakeup;
pthread_t thread;
// -- All the following fields are protected by lock.
struct demux_opts *opts;
struct m_config_cache *opts_cache;
bool thread_terminate;
bool threading;
bool shutdown_async;
void (*wakeup_cb)(void *ctx);
void *wakeup_cb_ctx;
struct sh_stream **streams;
int num_streams;
char *meta_charset;
// If non-NULL, a stream which is used for global (timed) metadata. It will
// be an arbitrary stream, which hopefully will happen to work.
struct sh_stream *metadata_stream;
int events;
struct demux_cache *cache;
bool warned_queue_overflow;
bool last_eof; // last actual global EOF status
bool eof; // whether we're in EOF state (reset for retry)
bool idle;
double min_secs;
size_t max_bytes;
size_t max_bytes_bw;
bool seekable_cache;
bool using_network_cache_opts;
char *record_filename;
// At least one decoder actually requested data since init or the last seek.
// Do this to allow the decoder thread to select streams before starting.
bool reading;
// Set if we just performed a seek, without reading packets yet. Used to
// avoid a redundant initial seek after enabling streams. We could just
// allow it, but to avoid buggy seeking affecting normal playback, we don't.
bool after_seek;
// Set in addition to after_seek if we think we seeked to the start of the
// file (or if the demuxer was just opened).
bool after_seek_to_start;
// Demuxing backwards. Since demuxer implementations don't support this
// directly, it is emulated by seeking backwards for every packet run. Also,
// packets between keyframes are demuxed forwards (you can't decode that
// stuff otherwise), which adds complexity on top of it.
bool back_demuxing;
// For backward demuxing:
bool need_back_seek; // back-step seek needs to be triggered
bool back_any_need_recheck; // at least 1 ds->back_need_recheck set
bool tracks_switched; // thread needs to inform demuxer of this
bool seeking; // there's a seek queued
int seek_flags; // flags for next seek (if seeking==true)
double seek_pts;
// (fields for debugging)
double seeking_in_progress; // low level seek state
int low_level_seeks; // number of started low level seeks
double demux_ts; // last demuxed DTS or PTS
double ts_offset; // timestamp offset to apply to everything
// (sorted by least recent use: index 0 is least recently used)
struct demux_cached_range **ranges;
int num_ranges;
size_t total_bytes; // total sum of packet data buffered
// Range from which decoder is reading, and to which demuxer is appending.
// This is normally never NULL. This is always ranges[num_ranges - 1].
// This is can be NULL during initialization or deinitialization.
struct demux_cached_range *current_range;
double highest_av_pts; // highest non-subtitle PTS seen - for duration
bool blocked;
// Transient state.
double duration;
// Cached state.
int64_t stream_size;
int64_t last_speed_query;
uint64_t bytes_per_second;
int64_t next_cache_update;
// demux user state (user thread, somewhat similar to reader/decoder state)
double last_playback_pts; // last playback_pts from demux_update()
bool force_metadata_update;
int cached_metadata_index; // speed up repeated lookups
struct mp_recorder *dumper;
int dumper_status;
bool owns_stream;
// -- Access from demuxer thread only
bool enable_recording;
struct mp_recorder *recorder;
int64_t slave_unbuffered_read_bytes; // value repoted from demuxer impl.
int64_t hack_unbuffered_read_bytes; // for demux_get_bytes_read_hack()
int64_t cache_unbuffered_read_bytes; // for demux_reader_state.bytes_per_second
int64_t byte_level_seeks; // for demux_reader_state.byte_level_seeks
};
struct timed_metadata {
double pts;
struct mp_tags *tags;
bool from_stream;
};
// A continuous range of cached packets for all enabled streams.
// (One demux_queue for each known stream.)
struct demux_cached_range {
// streams[] is indexed by demux_stream->index
struct demux_queue **streams;
int num_streams;
// Computed from the stream queue's values. These fields (unlike as with
// demux_queue) are always either NOPTS, or fully valid.
double seek_start, seek_end;
bool is_bof; // set if the file begins with this range
bool is_eof; // set if the file ends with this range
struct timed_metadata **metadata;
int num_metadata;
};
#define QUEUE_INDEX_SIZE_MASK(queue) ((queue)->index_size - 1)
// Access the idx-th entry in the given demux_queue.
// Requirement: idx >= 0 && idx < queue->num_index
#define QUEUE_INDEX_ENTRY(queue, idx) \
((queue)->index[((queue)->index0 + (idx)) & QUEUE_INDEX_SIZE_MASK(queue)])
// Don't index packets whose timestamps that are within the last index entry by
// this amount of time (it's better to seek them manually).
#define INDEX_STEP_SIZE 1.0
struct index_entry {
double pts;
struct demux_packet *pkt;
};
// A continuous list of cached packets for a single stream/range. There is one
// for each stream and range. Also contains some state for use during demuxing
// (keeping it across seeks makes it easier to resume demuxing).
struct demux_queue {
struct demux_stream *ds;
struct demux_cached_range *range;
struct demux_packet *head;
struct demux_packet *tail;
uint64_t tail_cum_pos; // cumulative size including tail packet
bool correct_dts; // packet DTS is strictly monotonically increasing
bool correct_pos; // packet pos is strictly monotonically increasing
int64_t last_pos; // for determining correct_pos
double last_dts; // for determining correct_dts
double last_ts; // timestamp of the last packet added to queue
// for incrementally determining seek PTS range
struct demux_packet *keyframe_latest;
struct demux_packet *keyframe_first; // cached value of first KF packet
// incrementally maintained seek range, possibly invalid
double seek_start, seek_end;
double last_pruned; // timestamp of last pruned keyframe
bool is_bof; // started demuxing at beginning of file
bool is_eof; // received true EOF here
// Complete index, though it may skip some entries to reduce density.
struct index_entry *index; // ring buffer
size_t index_size; // size of index[] (0 or a power of 2)
size_t index0; // first index entry
size_t num_index; // number of index entries (wraps on index_size)
};
struct demux_stream {
struct demux_internal *in;
struct sh_stream *sh; // ds->sh->ds == ds
enum stream_type type; // equals to sh->type
int index; // equals to sh->index
// --- all fields are protected by in->lock
void (*wakeup_cb)(void *ctx);
void *wakeup_cb_ctx;
// demuxer state
bool selected; // user wants packets from this stream
bool eager; // try to keep at least 1 packet queued
// if false, this stream is disabled, or passively
// read (like subtitles)
bool still_image; // stream has still video images
bool refreshing; // finding old position after track switches
bool eof; // end of demuxed stream? (true if no more packets)
bool global_correct_dts;// all observed so far
bool global_correct_pos;
// current queue - used both for reading and demuxing (this is never NULL)
struct demux_queue *queue;
// reader (decoder) state (bitrate calculations are part of it because we
// want to return the bitrate closest to the "current position")
double base_ts; // timestamp of the last packet returned to decoder
double last_br_ts; // timestamp of last packet bitrate was calculated
size_t last_br_bytes; // summed packet sizes since last bitrate calculation
double bitrate;
struct demux_packet *reader_head; // points at current decoder position
bool skip_to_keyframe;
bool attached_picture_added;
bool need_wakeup; // call wakeup_cb on next reader_head state change
// For demux_internal.dumper. Currently, this is used only temporarily
// during blocking dumping.
struct demux_packet *dump_pos;
// for refresh seeks: pos/dts of last packet returned to reader
int64_t last_ret_pos;
double last_ret_dts;
// Backwards demuxing.
bool back_need_recheck; // flag for incremental find_backward_restart_pos work
// pos/dts of the previous keyframe packet returned; always valid if back-
// demuxing is enabled, and back_restart_eof/back_restart_next are false.
int64_t back_restart_pos;
double back_restart_dts;
bool back_restart_eof; // restart position is at EOF; overrides pos/dts
bool back_restart_next; // restart before next keyframe; overrides above
bool back_restarting; // searching keyframe before restart pos
// Current PTS lower bound for back demuxing.
double back_seek_pos;
// pos/dts of the packet to resume demuxing from when another stream caused
// a seek backward to get more packets. reader_head will be reset to this
// packet as soon as it's encountered again.
int64_t back_resume_pos;
double back_resume_dts;
bool back_resuming; // resuming mode (above fields are valid/used)
// Set to true if the first packet (keyframe) of a range was returned.
bool back_range_started;
// Number of KF packets at start of range yet to return. -1 is used for BOF.
int back_range_count;
// Number of KF packets yet to return that are marked as preroll.
int back_range_preroll;
// Static packet preroll count.
int back_preroll;
// for closed captions (demuxer_feed_caption)
struct sh_stream *cc;
bool ignore_eof; // ignore stream in underrun detection
};
static void switch_to_fresh_cache_range(struct demux_internal *in);
static void demuxer_sort_chapters(demuxer_t *demuxer);
static void *demux_thread(void *pctx);
static void update_cache(struct demux_internal *in);
static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp);
static struct demux_packet *advance_reader_head(struct demux_stream *ds);
static bool queue_seek(struct demux_internal *in, double seek_pts, int flags,
bool clear_back_state);
static struct demux_packet *compute_keyframe_times(struct demux_packet *pkt,
double *out_kf_min,
double *out_kf_max);
static void find_backward_restart_pos(struct demux_stream *ds);
static struct demux_packet *find_seek_target(struct demux_queue *queue,
double pts, int flags);
static void prune_old_packets(struct demux_internal *in);
static void dumper_close(struct demux_internal *in);
static void demux_convert_tags_charset(struct demuxer *demuxer);
static uint64_t get_foward_buffered_bytes(struct demux_stream *ds)
{
if (!ds->reader_head)
return 0;
return ds->queue->tail_cum_pos - ds->reader_head->cum_pos;
}
#if 0
// very expensive check for redundant cached queue state
static void check_queue_consistency(struct demux_internal *in)
{
uint64_t total_bytes = 0;
assert(in->current_range && in->num_ranges > 0);
assert(in->current_range == in->ranges[in->num_ranges - 1]);
for (int n = 0; n < in->num_ranges; n++) {
struct demux_cached_range *range = in->ranges[n];
int range_num_packets = 0;
assert(range->num_streams == in->num_streams);
for (int i = 0; i < range->num_streams; i++) {
struct demux_queue *queue = range->streams[i];
assert(queue->range == range);
size_t fw_bytes = 0;
bool is_forward = false;
bool kf_found = false;
bool kf1_found = false;
size_t next_index = 0;
uint64_t queue_total_bytes = 0;
for (struct demux_packet *dp = queue->head; dp; dp = dp->next) {
is_forward |= dp == queue->ds->reader_head;
kf_found |= dp == queue->keyframe_latest;
kf1_found |= dp == queue->keyframe_first;
size_t bytes = demux_packet_estimate_total_size(dp);
total_bytes += bytes;
queue_total_bytes += bytes;
if (is_forward) {
fw_bytes += bytes;
assert(range == in->current_range);
assert(queue->ds->queue == queue);
}
range_num_packets += 1;
if (!dp->next)
assert(queue->tail == dp);
if (next_index < queue->num_index &&
QUEUE_INDEX_ENTRY(queue, next_index).pkt == dp)
next_index += 1;
}
if (!queue->head)
assert(!queue->tail);
assert(next_index == queue->num_index);
uint64_t queue_total_bytes2 = 0;
if (queue->head)
queue_total_bytes2 = queue->tail_cum_pos - queue->head->cum_pos;
assert(queue_total_bytes == queue_total_bytes2);
// If the queue is currently used...
if (queue->ds->queue == queue) {
// ...reader_head and others must be in the queue.
assert(is_forward == !!queue->ds->reader_head);
assert(kf_found == !!queue->keyframe_latest);
uint64_t fw_bytes2 = get_foward_buffered_bytes(queue->ds);
assert(fw_bytes == fw_bytes2);
}
assert(kf1_found == !!queue->keyframe_first);
if (range != in->current_range) {
assert(fw_bytes == 0);
}
if (queue->keyframe_latest)
assert(queue->keyframe_latest->keyframe);
total_bytes += queue->index_size * sizeof(struct index_entry);
}
// Invariant needed by pruning; violation has worse effects than just
// e.g. broken seeking due to incorrect seek ranges.
if (range->seek_start != MP_NOPTS_VALUE)
assert(range_num_packets > 0);
}
assert(in->total_bytes == total_bytes);
}
#endif
// (this doesn't do most required things for a switch, like updating ds->queue)
static void set_current_range(struct demux_internal *in,
struct demux_cached_range *range)
{
in->current_range = range;
// Move to in->ranges[in->num_ranges-1] (for LRU sorting/invariant)
for (int n = 0; n < in->num_ranges; n++) {
if (in->ranges[n] == range) {
MP_TARRAY_REMOVE_AT(in->ranges, in->num_ranges, n);
break;
}
}
MP_TARRAY_APPEND(in, in->ranges, in->num_ranges, range);
}
static void prune_metadata(struct demux_cached_range *range)
{
int first_needed = 0;
if (range->seek_start == MP_NOPTS_VALUE) {
first_needed = range->num_metadata;
} else {
for (int n = 0; n < range->num_metadata ; n++) {
if (range->metadata[n]->pts > range->seek_start)
break;
first_needed = n;
}
}
// Always preserve the last entry.
first_needed = MPMIN(first_needed, range->num_metadata - 1);
// (Could make this significantly more efficient for large first_needed,
// however that might be very rare and even then it might not matter.)
for (int n = 0; n < first_needed; n++) {
talloc_free(range->metadata[0]);
MP_TARRAY_REMOVE_AT(range->metadata, range->num_metadata, 0);
}
}
// Refresh range->seek_start/end. Idempotent.
static void update_seek_ranges(struct demux_cached_range *range)
{
range->seek_start = range->seek_end = MP_NOPTS_VALUE;
range->is_bof = true;
range->is_eof = true;
double min_start_pts = MP_NOPTS_VALUE;
double max_end_pts = MP_NOPTS_VALUE;
for (int n = 0; n < range->num_streams; n++) {
struct demux_queue *queue = range->streams[n];
if (queue->ds->selected && queue->ds->eager) {
if (queue->is_bof) {
min_start_pts = MP_PTS_MIN(min_start_pts, queue->seek_start);
} else {
range->seek_start =
MP_PTS_MAX(range->seek_start, queue->seek_start);
}
if (queue->is_eof) {
max_end_pts = MP_PTS_MAX(max_end_pts, queue->seek_end);
} else {
range->seek_end = MP_PTS_MIN(range->seek_end, queue->seek_end);
}
range->is_eof &= queue->is_eof;
range->is_bof &= queue->is_bof;
bool empty = queue->is_eof && !queue->head;
if (queue->seek_start >= queue->seek_end && !empty)
goto broken;
}
}
if (range->is_eof)
range->seek_end = max_end_pts;
if (range->is_bof)
range->seek_start = min_start_pts;
// Sparse (subtitle) stream behavior is not very clearly defined, but
// usually we don't want it to restrict the range of other streams. For
// example, if there are subtitle packets at position 5 and 10 seconds, and
// the demuxer demuxed the other streams until position 7 seconds, the seek
// range end position is 7.
// Assume that reading a non-sparse (audio/video) packet gets all sparse
// packets that are needed before that non-sparse packet.
// This is incorrect in any of these cases:
// - sparse streams only (it's unknown how to determine an accurate range)
// - if sparse streams have non-keyframe packets (we set queue->last_pruned
// to the start of the pruned keyframe range - we'd need the end or so)
// We also assume that ds->eager equals to a stream not being sparse
// (usually true, except if only sparse streams are selected).
// We also rely on the fact that the demuxer position will always be ahead
// of the seek_end for audio/video, because they need to prefetch at least
// 1 packet to detect the end of a keyframe range. This means that there's
// a relatively high guarantee to have all sparse (subtitle) packets within
// the seekable range.
// As a consequence, the code _never_ checks queue->seek_end for a sparse
// queue, as the end of it is implied by the highest PTS of a non-sparse
// stream (i.e. the latest demuxer position).
// On the other hand, if a sparse packet was pruned, and that packet has
// a higher PTS than seek_start for non-sparse queues, that packet is
// missing. So the range's seek_start needs to be adjusted accordingly.
for (int n = 0; n < range->num_streams; n++) {
struct demux_queue *queue = range->streams[n];
if (queue->ds->selected && !queue->ds->eager &&
queue->last_pruned != MP_NOPTS_VALUE &&
range->seek_start != MP_NOPTS_VALUE)
{
// (last_pruned is _exclusive_ to the seekable range, so add a small
// value to exclude it from the valid range.)
range->seek_start =
MP_PTS_MAX(range->seek_start, queue->last_pruned + 0.1);
}
}
if (range->seek_start >= range->seek_end)
goto broken;
prune_metadata(range);
return;
broken:
range->seek_start = range->seek_end = MP_NOPTS_VALUE;
prune_metadata(range);
}
// Remove queue->head from the queue.
static void remove_head_packet(struct demux_queue *queue)
{
struct demux_packet *dp = queue->head;
assert(queue->ds->reader_head != dp);
if (queue->keyframe_first == dp)
queue->keyframe_first = NULL;
if (queue->keyframe_latest == dp)
queue->keyframe_latest = NULL;
queue->is_bof = false;
uint64_t end_pos = dp->next ? dp->next->cum_pos : queue->tail_cum_pos;
queue->ds->in->total_bytes -= end_pos - dp->cum_pos;
if (queue->num_index && queue->index[queue->index0].pkt == dp) {
queue->index0 = (queue->index0 + 1) & QUEUE_INDEX_SIZE_MASK(queue);
queue->num_index -= 1;
}
queue->head = dp->next;
if (!queue->head)
queue->tail = NULL;
talloc_free(dp);
}
static void free_index(struct demux_queue *queue)
{
struct demux_stream *ds = queue->ds;
struct demux_internal *in = ds->in;
in->total_bytes -= queue->index_size * sizeof(queue->index[0]);
queue->index_size = 0;
queue->index0 = 0;
queue->num_index = 0;
TA_FREEP(&queue->index);
}
static void clear_queue(struct demux_queue *queue)
{
struct demux_stream *ds = queue->ds;
struct demux_internal *in = ds->in;
if (queue->head)
in->total_bytes -= queue->tail_cum_pos - queue->head->cum_pos;
free_index(queue);
struct demux_packet *dp = queue->head;
while (dp) {
struct demux_packet *dn = dp->next;
assert(ds->reader_head != dp);
talloc_free(dp);
dp = dn;
}
queue->head = queue->tail = NULL;
queue->keyframe_first = NULL;
queue->keyframe_latest = NULL;
queue->seek_start = queue->seek_end = queue->last_pruned = MP_NOPTS_VALUE;
queue->correct_dts = queue->correct_pos = true;
queue->last_pos = -1;
queue->last_ts = queue->last_dts = MP_NOPTS_VALUE;
queue->is_eof = false;
queue->is_bof = false;
}
static void clear_cached_range(struct demux_internal *in,
struct demux_cached_range *range)
{
for (int n = 0; n < range->num_streams; n++)
clear_queue(range->streams[n]);
for (int n = 0; n < range->num_metadata; n++)
talloc_free(range->metadata[n]);
range->num_metadata = 0;
update_seek_ranges(range);
}
// Remove ranges with no data (except in->current_range). Also remove excessive
// ranges.
static void free_empty_cached_ranges(struct demux_internal *in)
{
while (1) {
struct demux_cached_range *worst = NULL;
int end = in->num_ranges - 1;
// (Not set during early init or late destruction.)
if (in->current_range) {
assert(in->current_range && in->num_ranges > 0);
assert(in->current_range == in->ranges[in->num_ranges - 1]);
end -= 1;
}
for (int n = end; n >= 0; n--) {
struct demux_cached_range *range = in->ranges[n];
if (range->seek_start == MP_NOPTS_VALUE || !in->seekable_cache) {
clear_cached_range(in, range);
MP_TARRAY_REMOVE_AT(in->ranges, in->num_ranges, n);
for (int i = 0; i < range->num_streams; i++)
talloc_free(range->streams[i]);
talloc_free(range);
} else {
if (!worst || (range->seek_end - range->seek_start <
worst->seek_end - worst->seek_start))
worst = range;
}
}
if (in->num_ranges <= MAX_SEEK_RANGES || !worst)
break;
clear_cached_range(in, worst);
}
}
static void ds_clear_reader_queue_state(struct demux_stream *ds)
{
ds->reader_head = NULL;
ds->eof = false;
ds->need_wakeup = true;
}
static void ds_clear_reader_state(struct demux_stream *ds,
bool clear_back_state)
{
ds_clear_reader_queue_state(ds);
ds->base_ts = ds->last_br_ts = MP_NOPTS_VALUE;
ds->last_br_bytes = 0;
ds->bitrate = -1;
ds->skip_to_keyframe = false;
ds->attached_picture_added = false;
ds->last_ret_pos = -1;
ds->last_ret_dts = MP_NOPTS_VALUE;
if (clear_back_state) {
ds->back_restart_pos = -1;
ds->back_restart_dts = MP_NOPTS_VALUE;
ds->back_restart_eof = false;
ds->back_restart_next = ds->in->back_demuxing;
ds->back_restarting = ds->in->back_demuxing && ds->eager;
ds->back_seek_pos = MP_NOPTS_VALUE;
ds->back_resume_pos = -1;
ds->back_resume_dts = MP_NOPTS_VALUE;
ds->back_resuming = false;
ds->back_range_started = false;
ds->back_range_count = 0;
ds->back_range_preroll = 0;
}
}
// called locked, from user thread only
static void clear_reader_state(struct demux_internal *in,
bool clear_back_state)
{
for (int n = 0; n < in->num_streams; n++)
ds_clear_reader_state(in->streams[n]->ds, clear_back_state);
in->warned_queue_overflow = false;
in->d_user->filepos = -1; // implicitly synchronized
in->blocked = false;
in->need_back_seek = false;
}
// Call if the observed reader state on this stream somehow changes. The wakeup
// is skipped if the reader successfully read a packet, because that means we
// expect it to come back and ask for more.
static void wakeup_ds(struct demux_stream *ds)
{
if (ds->need_wakeup) {
if (ds->wakeup_cb) {
ds->wakeup_cb(ds->wakeup_cb_ctx);
} else if (ds->in->wakeup_cb) {
ds->in->wakeup_cb(ds->in->wakeup_cb_ctx);
}
ds->need_wakeup = false;
pthread_cond_signal(&ds->in->wakeup);
}
}
static void update_stream_selection_state(struct demux_internal *in,
struct demux_stream *ds)
{
ds->eof = false;
ds->refreshing = false;
// We still have to go over the whole stream list to update ds->eager for
// other streams too, because they depend on other stream's selections.
bool any_av_streams = false;
bool any_streams = false;
for (int n = 0; n < in->num_streams; n++) {
struct demux_stream *s = in->streams[n]->ds;
s->still_image = s->sh->still_image;
s->eager = s->selected && !s->sh->attached_picture;
if (s->eager && !s->still_image)
any_av_streams |= s->type != STREAM_SUB;
any_streams |= s->selected;
}
// Subtitles are only eagerly read if there are no other eagerly read
// streams.
if (any_av_streams) {
for (int n = 0; n < in->num_streams; n++) {
struct demux_stream *s = in->streams[n]->ds;
if (s->type == STREAM_SUB)
s->eager = false;
}
}
if (!any_streams)
in->blocked = false;
ds_clear_reader_state(ds, true);
// Make sure any stream reselection or addition is reflected in the seek
// ranges, and also get rid of data that is not needed anymore (or
// rather, which can't be kept consistent). This has to happen after we've
// updated all the subtle state (like s->eager).
for (int n = 0; n < in->num_ranges; n++) {
struct demux_cached_range *range = in->ranges[n];
if (!ds->selected)
clear_queue(range->streams[ds->index]);
update_seek_ranges(range);
}
free_empty_cached_ranges(in);
wakeup_ds(ds);
}
void demux_set_ts_offset(struct demuxer *demuxer, double offset)
{
struct demux_internal *in = demuxer->in;
pthread_mutex_lock(&in->lock);
in->ts_offset = offset;
pthread_mutex_unlock(&in->lock);
}
static void add_missing_streams(struct demux_internal *in,
struct demux_cached_range *range)
{
for (int n = range->num_streams; n < in->num_streams; n++) {
struct demux_stream *ds = in->streams[n]->ds;
struct demux_queue *queue = talloc_ptrtype(NULL, queue);
*queue = (struct demux_queue){
.ds = ds,
.range = range,
};
clear_queue(queue);
MP_TARRAY_APPEND(range, range->streams, range->num_streams, queue);
assert(range->streams[ds->index] == queue);
}
}
// Allocate a new sh_stream of the given type. It either has to be released
// with talloc_free(), or added to a demuxer with demux_add_sh_stream(). You
// cannot add or read packets from the stream before it has been added.
struct sh_stream *demux_alloc_sh_stream(enum stream_type type)
{
struct sh_stream *sh = talloc_ptrtype(NULL, sh);
*sh = (struct sh_stream) {
.type = type,
.index = -1,
.ff_index = -1, // may be overwritten by demuxer
.demuxer_id = -1, // ... same
.codec = talloc_zero(sh, struct mp_codec_params),
.tags = talloc_zero(sh, struct mp_tags),
};
sh->codec->type = type;
return sh;
}
// Add a new sh_stream to the demuxer. Note that as soon as the stream has been
// added, it must be immutable, and must not be released (this will happen when
// the demuxer is destroyed).
static void demux_add_sh_stream_locked(struct demux_internal *in,
struct sh_stream *sh)
{
assert(!sh->ds); // must not be added yet
sh->index = in->num_streams;
sh->ds = talloc(sh, struct demux_stream);
*sh->ds = (struct demux_stream) {
.in = in,
.sh = sh,
.type = sh->type,
.index = sh->index,
.global_correct_dts = true,
.global_correct_pos = true,
};
struct demux_stream *ds = sh->ds;
if (!sh->codec->codec)
sh->codec->codec = "";
if (sh->ff_index < 0)
sh->ff_index = sh->index;
MP_TARRAY_APPEND(in, in->streams, in->num_streams, sh);
assert(in->streams[sh->index] == sh);
if (in->current_range) {
for (int n = 0; n < in->num_ranges; n++)
add_missing_streams(in, in->ranges[n]);
sh->ds->queue = in->current_range->streams[sh->ds->index];
}