-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathrabbit_msg_store.erl
2154 lines (1962 loc) · 95.9 KB
/
rabbit_msg_store.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(rabbit_msg_store).
-behaviour(gen_server2).
-export([start_link/5, successfully_recovered_state/1,
client_init/3, client_terminate/1, client_delete_and_terminate/1,
client_pre_hibernate/1, client_ref/1,
write/4, write_flow/4, read/2, read_many/2, contains/2, remove/2]).
-export([compact_file/2, truncate_file/4, delete_file/2]). %% internal
-export([scan_file_for_valid_messages/1, scan_file_for_valid_messages/2]). %% salvage tool
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, prioritise_call/4, prioritise_cast/3,
prioritise_info/3, format_message_queue/2]).
%%----------------------------------------------------------------------------
-include_lib("rabbit_common/include/rabbit.hrl").
-type(msg() :: any()).
-record(msg_location, {msg_id, ref_count, file, offset, total_size}).
%% We flush to disk at an interval to make sure we don't keep
%% the data in memory too long. Confirms are sent after the
%% data is flushed to disk.
-define(SYNC_INTERVAL, 200). %% Milliseconds.
-define(CLEAN_FILENAME, "clean.dot").
-define(FILE_SUMMARY_FILENAME, "file_summary.ets").
-define(FILE_EXTENSION, ".rdq").
%% We keep track of flying messages for writes and removes. The idea is that
%% when a remove comes in before we could process the write, we skip the
%% write and send a publisher confirm immediately. We later skip the remove
%% as well since we didn't process the write.
%%
%% Flying events. They get added as things happen. The entry in the table
%% may get removed after the server write or after the server remove. When
%% the value is removed after a write, when the value is added again it
%% will be as if the value was never removed.
%%
%% So the possible values are only:
%% - 1: client write
%% - 3: client and server write
%% - 7: client and server wrote, client remove before entry could be deleted
%%
%% Values 1 and 7 indicate there is a message in flight: a write or a remove.
-define(FLYING_WRITE, 1). %% Message in transit for writing.
-define(FLYING_WRITE_DONE, 2). %% Message written in the store.
-define(FLYING_REMOVE, 4). %% Message removed from the store.
%% Useful states.
-define(FLYING_IS_WRITTEN, ?FLYING_WRITE + ?FLYING_WRITE_DONE). %% Write was handled.
-define(FLYING_IS_IGNORED, ?FLYING_WRITE + ?FLYING_REMOVE). %% Remove before write was handled.
-define(FLYING_IS_REMOVED, ?FLYING_WRITE + ?FLYING_WRITE_DONE + ?FLYING_REMOVE). %% Remove.
%%----------------------------------------------------------------------------
-record(msstate,
{
%% store directory
dir :: file:filename(),
%% index table
index_ets,
%% current file name as number
current_file,
%% current file handle since the last fsync?
current_file_handle,
%% file handle cache
current_file_offset,
%% TRef for our interval timer
sync_timer_ref,
%% files that had removes
gc_candidates,
%% timer ref for checking gc_candidates
gc_check_timer,
%% pid of our GC
gc_pid,
%% tid of the shared file handles table
file_handles_ets,
%% tid of the file summary table
file_summary_ets,
%% tid of current file cache table
cur_file_cache_ets,
%% tid of writes/removes in flight
flying_ets,
%% set of dying clients
dying_clients,
%% map of references of all registered clients
%% to callbacks
clients,
%% boolean: did we recover state?
successfully_recovered,
%% how big are our files allowed to get?
file_size_limit,
%% client ref to synced messages mapping
cref_to_msg_ids,
%% See CREDIT_DISC_BOUND in rabbit.hrl
credit_disc_bound
}).
-record(client_msstate,
{ server,
client_ref,
reader,
index_ets,
dir,
file_handles_ets,
cur_file_cache_ets,
flying_ets,
credit_disc_bound
}).
-record(file_summary,
{file, valid_total_size, unused1, unused2, file_size, locked, unused3}).
-record(gc_state,
{ dir,
index_ets,
file_summary_ets,
file_handles_ets,
msg_store
}).
-record(dying_client,
{ client_ref,
file,
offset
}).
%%----------------------------------------------------------------------------
-export_type([gc_state/0, file_num/0]).
-type gc_state() :: #gc_state { dir :: file:filename(),
index_ets :: ets:tid(),
file_summary_ets :: ets:tid(),
file_handles_ets :: ets:tid(),
msg_store :: server()
}.
-type server() :: pid() | atom().
-type client_ref() :: binary().
-type file_num() :: non_neg_integer().
-type client_msstate() :: #client_msstate {
server :: server(),
client_ref :: client_ref(),
reader :: undefined | {non_neg_integer(), file:fd()},
index_ets :: any(),
%% Stored as binary() as opposed to file:filename() to save memory.
dir :: binary(),
file_handles_ets :: ets:tid(),
cur_file_cache_ets :: ets:tid(),
flying_ets :: ets:tid(),
credit_disc_bound :: {pos_integer(), pos_integer()}}.
-type msg_ref_delta_gen(A) ::
fun ((A) -> 'finished' |
{rabbit_types:msg_id(), non_neg_integer(), A}).
-type maybe_msg_id_fun() ::
'undefined' | fun ((sets:set(), 'written' | 'ignored') -> any()).
%%----------------------------------------------------------------------------
%% Message store is responsible for storing messages
%% on disk and loading them back. The store handles both
%% persistent messages and transient ones.
%% The store is responsible for locating messages
%% on disk and maintaining an index.
%%
%% There are two message stores per vhost: one for transient
%% and one for persistent messages.
%%
%% Queue processes interact with the stores via clients.
%%
%% The components:
%%
%% Index: this is a mapping from MsgId to #msg_location{}.
%% By default, it's in ETS, but other implementations can
%% be used.
%% FileSummary: this maps File to #file_summary{} and is stored
%% in ETS.
%%
%% The basic idea is that messages are appended to the current file up
%% until that file becomes too big (> file_size_limit). At that point,
%% the file is closed and a new file is created. Files are named
%% numerically ascending, thus the file with the lowest name is the
%% eldest file.
%%
%% We need to keep track of which messages are in which files (this is
%% the index) and how much useful data is in each file: this is the
%% purpose of the file summary table.
%%
%% As messages are removed from files, holes appear in these
%% files. The field ValidTotalSize contains the total amount of useful
%% data left in the file. This is needed for garbage collection.
%%
%% When we discover that a file is now empty, we delete it. When we
%% discover that a file has less valid data than removed data (or in
%% other words, the valid data accounts for less than half the total
%% size of the file), we start a garbage collection run concurrently,
%% which will compact the file. This keeps utilisation high.
%%
%% The discovery is done periodically on files that had messages
%% acked and removed. We delibirately do this lazily in order to
%% prevent doing GC on files which are soon to be emptied (and
%% hence deleted).
%%
%% Compaction is a two step process: first the file gets compacted;
%% then it gets truncated.
%%
%% Compaction is done concurrently regardless of there being readers
%% for this file. The compaction process carefully moves the data
%% at the end of the file to holes at the beginning of the file,
%% without overwriting anything. Index information is updated once
%% that data is fully moved and so the message data is always
%% available to readers, including those reading the data from
%% the end of the file.
%%
%% Truncation gets scheduled afterwards and only happens when
%% we know there are no readers using the data from the end of
%% the file. We do this by keeping track of what time readers
%% started operating and comparing it to the time at which
%% truncation was scheduled. It is therefore possible for the
%% file to be truncated when there are readers, but only when
%% those readers are reading from the start of the file.
%%
%% While there is no need to lock the files for compaction or
%% truncation, we still mark the file as soft-locked in the file
%% summary table in order to benefit from optimisations related
%% to fan-out scenarios and transient stores as there the behavior
%% will change depending on whether the file is getting
%% compacted or not.
%%
%% On non-clean startup, we scan the files we discover, dealing with
%% the possibilities of a crash having occurred during a compaction
%% (we keep duplicate messages near the start of the file rather
%% than the end), and rebuild the file summary and index ETS table.
%%
%%
%% Messages are reference-counted. When a message with the same msg id
%% is written several times we only store it once, and only remove it
%% from the store when it has been removed the same number of times.
%%
%% The reference counts do not persist. Therefore the initialisation
%% function must be provided with a generator that produces ref count
%% deltas for all recovered messages. This is only used on startup
%% when the shutdown was non-clean.
%%
%% Reads are always performed directly by clients without calling the
%% server. This is safe because multiple file handles can be used to
%% read files, and the compaction method ensures the data is always
%% available for reading by clients.
%%
%% When a message is removed, its reference count is decremented. Even
%% if the reference count becomes 0, its entry is not removed. This is
%% because in the event of the same message being sent to several
%% different queues, there is the possibility of one queue writing and
%% removing the message before other queues write it at all. Thus
%% accommodating 0-reference counts allows us to avoid unnecessary
%% writes here. Of course, there are complications: the file to which
%% the message has already been written could be locked pending
%% deletion or GC, which means we have to rewrite the message as the
%% original copy will now be lost. (Messages with 0 references get
%% deleted on GC as they're not considered valid data.)
%%
%% The current file to which messages are being written has a
%% write-back cache. This is written to immediately by clients and can
%% be read from by clients too. This means that there are only ever
%% writes made to the current file, thus eliminating delays due to
%% flushing write buffers in order to be able to safely read from the
%% current file. The one exception to this is that on start up, the
%% cache is not populated with msgs found in the current file, and
%% thus in this case only, reads may have to come from the file
%% itself. The effect of this is that even if the msg_store process is
%% heavily overloaded, clients can still write and read messages with
%% very low latency and not block at all.
%%
%% Clients of the msg_store are required to register before using the
%% msg_store. This provides them with the necessary client-side state
%% to allow them to directly access the various caches and files. When
%% they terminate, they should deregister. They can do this by calling
%% either client_terminate/1 or client_delete_and_terminate/1. The
%% differences are: (a) client_terminate is synchronous. As a result,
%% if the msg_store is badly overloaded and has lots of in-flight
%% writes and removes to process, this will take some time to
%% return. However, once it does return, you can be sure that all the
%% actions you've issued to the msg_store have been processed. (b) Not
%% only is client_delete_and_terminate/1 asynchronous, but it also
%% permits writes and subsequent removes from the current
%% (terminating) client which are still in flight to be safely
%% ignored. Thus from the point of view of the msg_store itself, and
%% all from the same client:
%%
%% (T) = termination; (WN) = write of msg N; (RN) = remove of msg N
%% --> W1, W2, W1, R1, T, W3, R2, W2, R1, R2, R3, W4 -->
%%
%% The client obviously sent T after all the other messages (up to
%% W4), but because the msg_store prioritises messages, the T can be
%% promoted and thus received early.
%%
%% Thus at the point of the msg_store receiving T, we have messages 1
%% and 2 with a refcount of 1. After T, W3 will be ignored because
%% it's an unknown message, as will R3, and W4. W2, R1 and R2 won't be
%% ignored because the messages that they refer to were already known
%% to the msg_store prior to T. However, it can be a little more
%% complex: after the first R2, the refcount of msg 2 is 0. At that
%% point, if a GC occurs or file deletion, msg 2 could vanish, which
%% would then mean that the subsequent W2 and R2 are then ignored.
%%
%% The use case then for client_delete_and_terminate/1 is if the
%% client wishes to remove everything it's written to the msg_store:
%% it issues removes for all messages it's written and not removed,
%% and then calls client_delete_and_terminate/1. At that point, any
%% in-flight writes (and subsequent removes) can be ignored, but
%% removes and writes for messages the msg_store already knows about
%% will continue to be processed normally (which will normally just
%% involve modifying the reference count, which is fast). Thus we save
%% disk bandwidth for writes which are going to be immediately removed
%% again by the the terminating client.
%%
%% We use a separate set to keep track of the dying clients in order
%% to keep that set, which is inspected on every write and remove, as
%% small as possible. Inspecting the set of all clients would degrade
%% performance with many healthy clients and few, if any, dying
%% clients, which is the typical case.
%% @todo Honestly if there wasn't gen_server2 we could just selective
%% receive everything and drop the messages instead of doing
%% all this.
%%
%% When the msg_store has a backlog (i.e. it has unprocessed messages
%% in its mailbox / gen_server priority queue), a further optimisation
%% opportunity arises: we can eliminate pairs of 'write' and 'remove'
%% from the same client for the same message. A typical occurrence of
%% these is when an empty durable queue delivers persistent messages
%% to ack'ing consumers. The queue will asynchronously ask the
%% msg_store to 'write' such messages, and when they are acknowledged
%% it will issue a 'remove'. That 'remove' may be issued before the
%% msg_store has processed the 'write'. There is then no point going
%% ahead with the processing of that 'write'.
%%
%% To detect this situation a 'flying_ets' table is shared between the
%% clients and the server. The table is keyed on the combination of
%% client (reference) and msg id, and the value represents an
%% integration of all the writes and removes currently "in flight" for
%% that message between the client and server - '+1' means all the
%% writes/removes add up to a single 'write', '-1' to a 'remove', and
%% '0' to nothing. (NB: the integration can never add up to more than
%% one 'write' or 'read' since clients must not write/remove a message
%% more than once without first removing/writing it).
%%
%% Maintaining this table poses two challenges: 1) both the clients
%% and the server access and update the table, which causes
%% concurrency issues, 2) we must ensure that entries do not stay in
%% the table forever, since that would constitute a memory leak. We
%% address the former by carefully modelling all operations as
%% sequences of atomic actions that produce valid results in all
%% possible interleavings. We address the latter by deleting table
%% entries whenever the server finds a 0-valued entry during the
%% processing of a write/remove. 0 is essentially equivalent to "no
%% entry". If, OTOH, the value is non-zero we know there is at least
%% one other 'write' or 'remove' in flight, so we get an opportunity
%% later to delete the table entry when processing these.
%%
%% There are two further complications. We need to ensure that 1)
%% eliminated writes still get confirmed, and 2) the write-back cache
%% doesn't grow unbounded. These are quite straightforward to
%% address. See the comments in the code.
%%
%% For notes on Clean Shutdown and startup, see documentation in
%% rabbit_variable_queue.
%%----------------------------------------------------------------------------
%% public API
%%----------------------------------------------------------------------------
-spec start_link
(binary(), atom(), file:filename(), [binary()] | 'undefined',
{msg_ref_delta_gen(A), A}) -> rabbit_types:ok_pid_or_error().
start_link(VHost, Type, Dir, ClientRefs, StartupFunState) when is_atom(Type) ->
gen_server2:start_link(?MODULE,
[VHost, Type, Dir, ClientRefs, StartupFunState],
[{timeout, infinity}]).
-spec successfully_recovered_state(server()) -> boolean().
successfully_recovered_state(Server) ->
gen_server2:call(Server, successfully_recovered_state, infinity).
-spec client_init(server(), client_ref(), maybe_msg_id_fun()) -> client_msstate().
client_init(Server, Ref, MsgOnDiskFun) when is_pid(Server); is_atom(Server) ->
{IndexEts, Dir, FileHandlesEts, CurFileCacheEts, FlyingEts} =
gen_server2:call(
Server, {new_client_state, Ref, self(), MsgOnDiskFun},
infinity),
CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound,
?CREDIT_DISC_BOUND),
#client_msstate { server = Server,
client_ref = Ref,
reader = undefined,
index_ets = IndexEts,
dir = rabbit_file:filename_to_binary(Dir),
file_handles_ets = FileHandlesEts,
cur_file_cache_ets = CurFileCacheEts,
flying_ets = FlyingEts,
credit_disc_bound = CreditDiscBound }.
-spec client_terminate(client_msstate()) -> 'ok'.
client_terminate(CState = #client_msstate { client_ref = Ref, reader = Reader }) ->
ok = server_call(CState, {client_terminate, Ref}),
reader_close(Reader).
-spec client_delete_and_terminate(client_msstate()) -> 'ok'.
client_delete_and_terminate(CState = #client_msstate { client_ref = Ref, reader = Reader }) ->
ok = server_cast(CState, {client_dying, Ref}),
ok = server_cast(CState, {client_delete, Ref}),
reader_close(Reader).
-spec client_pre_hibernate(client_msstate()) -> client_msstate().
client_pre_hibernate(CState = #client_msstate{ reader = Reader }) ->
reader_close(Reader),
CState#client_msstate{ reader = undefined }.
-spec client_ref(client_msstate()) -> client_ref().
client_ref(#client_msstate { client_ref = Ref }) -> Ref.
-spec write_flow(any(), rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'.
write_flow(MsgRef, MsgId, Msg,
CState = #client_msstate {
server = Server,
credit_disc_bound = CreditDiscBound }) ->
%% Here we are tracking messages sent by the
%% rabbit_amqqueue_process process via the
%% rabbit_variable_queue. We are accessing the
%% rabbit_amqqueue_process process dictionary.
credit_flow:send(Server, CreditDiscBound),
client_write(MsgRef, MsgId, Msg, flow, CState).
-spec write(any(), rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'.
%% This function is only used by tests.
write(MsgRef, MsgId, Msg, CState) -> client_write(MsgRef, MsgId, Msg, noflow, CState).
-spec read(rabbit_types:msg_id(), client_msstate()) ->
{rabbit_types:ok(msg()) | 'not_found', client_msstate()}.
read(MsgId, CState = #client_msstate { index_ets = IndexEts,
cur_file_cache_ets = CurFileCacheEts }) ->
%% Check the cur file cache
case ets:lookup(CurFileCacheEts, MsgId) of
[] ->
%% @todo It's probably a bug if we don't get a positive ref count.
case index_lookup_positive_ref_count(IndexEts, MsgId) of
not_found -> {not_found, CState};
MsgLocation -> client_read3(MsgLocation, CState)
end;
[{MsgId, Msg, _CacheRefCount}] ->
{{ok, Msg}, CState}
end.
-spec read_many([rabbit_types:msg_id()], client_msstate())
-> {#{rabbit_types:msg_id() => msg()}, client_msstate()}.
read_many(MsgIds, CState) ->
%% We receive MsgIds in rouhgly the younger->older order so
%% we can look for messages in the cache directly.
read_many_cache(MsgIds, CState, #{}).
%% We read from the cache until we can't. Then we read from disk.
read_many_cache([MsgId|Tail], CState = #client_msstate{ cur_file_cache_ets = CurFileCacheEts }, Acc) ->
case ets:lookup(CurFileCacheEts, MsgId) of
[] ->
read_many_disk([MsgId|Tail], CState, Acc);
[{MsgId, Msg, _CacheRefCount}] ->
read_many_cache(Tail, CState, Acc#{MsgId => Msg})
end;
read_many_cache([], CState, Acc) ->
{Acc, CState}.
%% We will read from disk one file at a time in no particular order.
read_many_disk([MsgId|Tail], CState = #client_msstate{ index_ets = IndexEts }, Acc) ->
case index_lookup_positive_ref_count(IndexEts, MsgId) of
%% We ignore this message if it's not found and will try
%% to read it individually later instead. We can't call
%% the message store and expect good performance here.
%% @todo Can this even happen? Isn't this a bug?
not_found -> read_many_disk(Tail, CState, Acc);
MsgLocation -> read_many_file2([MsgId|Tail], CState, Acc, MsgLocation#msg_location.file)
end;
read_many_disk([], CState, Acc) ->
{Acc, CState}.
read_many_file2(MsgIds0, CState = #client_msstate{ dir = Dir,
index_ets = IndexEts,
file_handles_ets = FileHandlesEts,
reader = Reader0,
client_ref = Ref }, Acc0, File) ->
%% Mark file handle open.
mark_handle_open(FileHandlesEts, File, Ref),
%% Get index for all Msgids in File.
%% It's possible that we get no results here if compaction
%% was in progress. That's OK: we will try again with those
%% MsgIds to get them from the new file.
MsgLocations0 = index_select_from_file(IndexEts, MsgIds0, File),
case MsgLocations0 of
[] ->
read_many_file3(MsgIds0, CState, Acc0, File);
_ ->
%% At this point either the file is guaranteed to remain
%% for us to read from it or we got zero locations.
%%
%% First we must order the locations so that we can consolidate
%% the preads and improve read performance.
MsgLocations = lists:keysort(#msg_location.offset, MsgLocations0),
%% Then we can do the consolidation to get the pread LocNums.
LocNums = consolidate_reads(MsgLocations, []),
%% Read the data from the file.
Reader = reader_open(Reader0, Dir, File),
{ok, Msgs} = reader_pread(Reader, LocNums),
%% Before we continue the read_many calls we must remove the
%% MsgIds we have read from the list and add the messages to
%% the Acc.
{Acc, MsgIdsRead} = lists:foldl(
fun(Msg, {Acc1, MsgIdsAcc}) ->
MsgIdRead = mc:get_annotation(id, Msg),
{Acc1#{MsgIdRead => Msg}, [MsgIdRead|MsgIdsAcc]}
end, {Acc0, []}, Msgs),
MsgIds = MsgIds0 -- MsgIdsRead,
%% Unmark opened files and continue.
read_many_file3(MsgIds, CState#client_msstate{ reader = Reader }, Acc, File)
end.
consolidate_reads([#msg_location{offset=NextOffset, total_size=NextSize}|Locs], [{Offset, Size}|Acc])
when Offset + Size =:= NextOffset ->
consolidate_reads(Locs, [{Offset, Size + NextSize}|Acc]);
consolidate_reads([#msg_location{offset=NextOffset, total_size=NextSize}|Locs], Acc) ->
consolidate_reads(Locs, [{NextOffset, NextSize}|Acc]);
consolidate_reads([], Acc) ->
lists:reverse(Acc).
%% Cleanup opened files and continue.
read_many_file3(MsgIds, CState = #client_msstate{ file_handles_ets = FileHandlesEts,
client_ref = Ref }, Acc, File) ->
mark_handle_closed(FileHandlesEts, File, Ref),
read_many_disk(MsgIds, CState, Acc).
-spec contains(rabbit_types:msg_id(), client_msstate()) -> boolean().
contains(MsgId, CState) -> server_call(CState, {contains, MsgId}).
-spec remove([rabbit_types:msg_id()], client_msstate()) -> {'ok', [rabbit_types:msg_id()]}.
remove([], _CState) -> ok;
remove(MsgIds, CState = #client_msstate { flying_ets = FlyingEts,
client_ref = CRef }) ->
%% If the entry was deleted we act as if it wasn't by using the right default.
Res = [{MsgId, ets:update_counter(FlyingEts, {CRef, MsgRef}, ?FLYING_REMOVE, {'', ?FLYING_IS_WRITTEN})}
|| {MsgRef, MsgId} <- MsgIds],
server_cast(CState, {remove, CRef, MsgIds}),
{ok, [MsgId || {MsgId, ?FLYING_IS_IGNORED} <- Res]}.
%%----------------------------------------------------------------------------
%% Client-side-only helpers
%%----------------------------------------------------------------------------
server_call(#client_msstate { server = Server }, Msg) ->
gen_server2:call(Server, Msg, infinity).
server_cast(#client_msstate { server = Server }, Msg) ->
gen_server2:cast(Server, Msg).
client_write(MsgRef, MsgId, Msg, Flow,
CState = #client_msstate { flying_ets = FlyingEts,
cur_file_cache_ets = CurFileCacheEts,
client_ref = CRef }) ->
%% We are guaranteed that the insert will succeed.
%% This is true even for queue crashes because CRef will change.
true = ets:insert_new(FlyingEts, {{CRef, MsgRef}, ?FLYING_WRITE}),
ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
ok = server_cast(CState, {write, CRef, MsgRef, MsgId, Flow}).
%% We no longer check for whether the message's file is locked because we
%% rely on the fact that the file gets removed only when there are no
%% readers. So we mark the file as being opened and then check that
%% the message data is within this file and if that's the case then
%% we are guaranteed able to read from it until we release the file
%% handle. This is because the compaction has been reworked to copy
%% the data before changing the index information, therefore if the
%% index information points to the file we are expecting we are good.
%% And the file only gets deleted after all data was copied, index
%% was updated and file handles got closed.
client_read3(#msg_location { msg_id = MsgId, file = File },
CState = #client_msstate { index_ets = IndexEts,
file_handles_ets = FileHandlesEts,
client_ref = Ref }) ->
%% We immediately mark the handle open so that we don't get the
%% file truncated while we are reading from it. The file may still
%% be truncated past that point but that's OK because we do a second
%% index lookup to ensure that we get the updated message location.
mark_handle_open(FileHandlesEts, File, Ref),
case index_lookup(IndexEts, MsgId) of
#msg_location { file = File, ref_count = RefCount } = MsgLocation when RefCount > 0 ->
{Msg, CState1} = read_from_disk(MsgLocation, CState),
mark_handle_closed(FileHandlesEts, File, Ref),
{{ok, Msg}, CState1}
end.
read_from_disk(#msg_location { msg_id = MsgId, file = File, offset = Offset,
total_size = TotalSize }, State = #client_msstate{ reader = Reader0, dir = Dir }) ->
Reader = reader_open(Reader0, Dir, File),
Msg = case reader_pread(Reader, [{Offset, TotalSize}]) of
{ok, [Msg0]} ->
Msg0;
Other ->
{error, {misread, [{old_state, State},
{file_num, File},
{offset, Offset},
{msg_id, MsgId},
{read, Other},
{proc_dict, get()}
]}}
end,
{Msg, State#client_msstate{ reader = Reader }}.
%%----------------------------------------------------------------------------
%% Reader functions. A reader is a file num + fd tuple.
%%----------------------------------------------------------------------------
%% The reader tries to keep the FD open for subsequent reads.
%% When we attempt to read we first check if the opened file
%% is the one we want. If not we close/open the right one.
%%
%% The FD will be closed when the queue hibernates to save
%% resources.
reader_open(Reader, Dir, File) ->
{ok, Fd} = case Reader of
{File, Fd0} ->
{ok, Fd0};
{_AnotherFile, Fd0} ->
ok = file:close(Fd0),
file:open(form_filename(Dir, filenum_to_name(File)),
[binary, read, raw]);
undefined ->
file:open(form_filename(Dir, filenum_to_name(File)),
[binary, read, raw])
end,
{File, Fd}.
reader_pread({_, Fd}, LocNums) ->
case file:pread(Fd, LocNums) of
{ok, DataL} -> {ok, reader_pread_parse(DataL)};
KO -> KO
end.
reader_pread_parse([<<Size:64,
_MsgId:16/binary,
Rest0/bits>>|Tail]) ->
BodyBinSize = Size - 16, %% Remove size of MsgId.
<<MsgBodyBin:BodyBinSize/binary,
255, %% OK marker.
Rest/bits>> = Rest0,
[binary_to_term(MsgBodyBin)|reader_pread_parse([Rest|Tail])];
reader_pread_parse([<<>>]) ->
[];
reader_pread_parse([<<>>|Tail]) ->
reader_pread_parse(Tail).
reader_close(Reader) ->
case Reader of
undefined -> ok;
{_File, Fd} -> ok = file:close(Fd)
end.
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
init([VHost, Type, BaseDir, ClientRefs, StartupFunState]) ->
process_flag(trap_exit, true),
pg:join({?MODULE, VHost, Type}, self()),
Dir = filename:join(BaseDir, atom_to_list(Type)),
Name = filename:join(filename:basename(BaseDir), atom_to_list(Type)),
AttemptFileSummaryRecovery =
case ClientRefs of
%% Transient.
undefined -> ok = rabbit_file:recursive_delete([Dir]),
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
false;
%% Persistent.
_ -> ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
true
end,
%% Always attempt to recover the file summary for persistent store.
%% If the shutdown isn't clean we will delete all objects before
%% we start recovering messages from the files on disk.
{FileSummaryRecovered, FileSummaryEts} =
recover_file_summary(AttemptFileSummaryRecovery, Dir),
{CleanShutdown, IndexEts, ClientRefs1} =
recover_index_and_client_refs(FileSummaryRecovered,
ClientRefs, Dir, Name),
Clients = maps:from_list(
[{CRef, {undefined, undefined}} ||
CRef <- ClientRefs1]),
%% CleanShutdown => msg location index and file_summary both
%% recovered correctly.
true = case {FileSummaryRecovered, CleanShutdown} of
{true, false} -> ets:delete_all_objects(FileSummaryEts);
_ -> true
end,
%% CleanShutdown <=> msg location index and file_summary both
%% recovered correctly.
FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles,
[ordered_set, public]),
CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public,
{read_concurrency, true},
{write_concurrency, true}]),
FlyingEts = ets:new(rabbit_msg_store_flying, [set, public,
{read_concurrency, true},
{write_concurrency, true}]),
{ok, FileSizeLimit} = application:get_env(rabbit, msg_store_file_size_limit),
{ok, GCPid} = rabbit_msg_store_gc:start_link(
#gc_state { dir = Dir,
index_ets = IndexEts,
file_summary_ets = FileSummaryEts,
file_handles_ets = FileHandlesEts,
msg_store = self()
}),
CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound,
?CREDIT_DISC_BOUND),
State = #msstate { dir = Dir,
index_ets = IndexEts,
current_file = 0,
current_file_handle = undefined,
current_file_offset = 0,
sync_timer_ref = undefined,
gc_candidates = #{},
gc_check_timer = undefined,
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
flying_ets = FlyingEts,
dying_clients = #{},
clients = Clients,
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
cref_to_msg_ids = #{},
credit_disc_bound = CreditDiscBound
},
%% If we didn't recover the msg location index then we need to
%% rebuild it now.
Cleanliness = case CleanShutdown of
true -> "clean";
false -> "unclean"
end,
rabbit_log:debug("Rebuilding message location index after ~ts shutdown...",
[Cleanliness]),
{CurOffset, State1 = #msstate { current_file = CurFile }} =
build_index(CleanShutdown, StartupFunState, State),
rabbit_log:debug("Finished rebuilding index", []),
%% Open the most recent file.
{ok, CurHdl} = writer_recover(Dir, CurFile, CurOffset),
{ok, State1 #msstate { current_file_handle = CurHdl,
current_file_offset = CurOffset },
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
successfully_recovered_state -> 7;
{new_client_state, _Ref, _Pid, _MODC} -> 7;
_ -> 0
end.
prioritise_cast(Msg, _Len, _State) ->
case Msg of
{compacted_file, _File} -> 8;
{client_dying, _Pid} -> 7;
_ -> 0
end.
prioritise_info(Msg, _Len, _State) ->
case Msg of
sync -> 8;
_ -> 0
end.
handle_call(successfully_recovered_state, _From, State) ->
reply(State #msstate.successfully_recovered, State);
handle_call({new_client_state, CRef, CPid, MsgOnDiskFun}, _From,
State = #msstate { dir = Dir,
index_ets = IndexEts,
file_handles_ets = FileHandlesEts,
cur_file_cache_ets = CurFileCacheEts,
flying_ets = FlyingEts,
clients = Clients }) ->
Clients1 = maps:put(CRef, {CPid, MsgOnDiskFun}, Clients),
erlang:monitor(process, CPid),
reply({IndexEts, Dir, FileHandlesEts,
CurFileCacheEts, FlyingEts},
State #msstate { clients = Clients1 });
handle_call({client_terminate, CRef}, _From, State) ->
reply(ok, clear_client(CRef, State));
handle_call({contains, MsgId}, From, State) ->
State1 = contains_message(MsgId, From, State),
noreply(State1).
handle_cast({client_dying, CRef},
State = #msstate { dying_clients = DyingClients,
current_file = CurFile,
current_file_offset = CurOffset }) ->
DyingClients1 = maps:put(CRef,
#dying_client{client_ref = CRef,
file = CurFile,
offset = CurOffset},
DyingClients),
noreply(State #msstate { dying_clients = DyingClients1 });
handle_cast({client_delete, CRef},
State = #msstate { clients = Clients }) ->
State1 = State #msstate { clients = maps:remove(CRef, Clients) },
noreply(clear_client(CRef, State1));
handle_cast({write, CRef, MsgRef, MsgId, Flow},
State = #msstate { index_ets = IndexEts,
cur_file_cache_ets = CurFileCacheEts,
clients = Clients,
credit_disc_bound = CreditDiscBound }) ->
case Flow of
flow -> {CPid, _} = maps:get(CRef, Clients),
%% We are going to process a message sent by the
%% rabbit_amqqueue_process. Now we are accessing the
%% msg_store process dictionary.
credit_flow:ack(CPid, CreditDiscBound);
noflow -> ok
end,
true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
case flying_write({CRef, MsgRef}, State) of
process ->
[{MsgId, Msg, _PWC}] = ets:lookup(CurFileCacheEts, MsgId),
noreply(write_message(MsgId, Msg, CRef, State));
ignore ->
%% A 'remove' has already been issued and eliminated the
%% 'write'.
%%
%% If all writes get eliminated, cur_file_cache_ets could
%% grow unbounded. To prevent that we delete the cache
%% entry here, but only if the message isn't in the
%% current file. That way reads of the message can
%% continue to be done client side, from either the cache
%% or the non-current files. If the message *is* in the
%% current file then the cache entry will be removed by
%% the normal logic for that in write_message/4 and
%% flush_or_roll_to_new_file/2.
case index_lookup(IndexEts, MsgId) of
#msg_location { file = File }
when File == State #msstate.current_file ->
ok;
_ ->
true = ets:match_delete(CurFileCacheEts, {MsgId, '_', 0})
end,
noreply(State)
end;
handle_cast({remove, CRef, MsgIds}, State) ->
State1 =
lists:foldl(
fun ({MsgRef, MsgId}, State2) ->
case flying_remove({CRef, MsgRef}, State2) of
process -> remove_message(MsgId, CRef, State2);
ignore -> State2
end
end, State, MsgIds),
noreply(State1);
handle_cast({compacted_file, File},
State = #msstate { file_summary_ets = FileSummaryEts }) ->
%% This can return false if the file gets deleted immediately
%% after compaction ends, but before we can process this message.
%% So this will become a no-op and we can ignore the return value.
_ = ets:update_element(FileSummaryEts, File,
{#file_summary.locked, false}),
noreply(State).
handle_info(sync, State) ->
noreply(internal_sync(State));
handle_info(timeout, State) ->
noreply(internal_sync(State));
handle_info({timeout, TimerRef, {maybe_gc, Candidates0}},
State = #msstate{ gc_candidates = NewCandidates,
gc_check_timer = TimerRef }) ->
%% We do not want to consider candidates for GC that had
%% a message removed since we sent that maybe_gc message.
%% In that case we simply defer the GC to the next maybe_gc.
Candidates = maps:without(maps:keys(NewCandidates), Candidates0),
noreply(maybe_gc(Candidates, State));
%% @todo When a CQ crashes the message store does not remove
%% the client information and clean up. This eventually
%% leads to the queue running a full recovery on the next
%% message store restart because the store will keep the
%% crashed queue's ref in its persistent state and fail
%% to find the corresponding ref during start.
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) ->
%% similar to what happens in
%% rabbit_amqqueue_process:handle_ch_down but with a relation of
%% msg_store -> rabbit_amqqueue_process instead of
%% rabbit_amqqueue_process -> rabbit_channel.
credit_flow:peer_down(Pid),
noreply(State);
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
terminate(Reason, State = #msstate { index_ets = IndexEts,
current_file_handle = CurHdl,
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
flying_ets = FlyingEts,
clients = Clients,
dir = Dir }) ->
{ExtraLog, ExtraLogArgs} = case Reason of
normal -> {"", []};
shutdown -> {"", []};
{shutdown, _} -> {"", []};
_ -> {" with reason ~0p", [Reason]}
end,
rabbit_log:info("Stopping message store for directory '~ts'" ++ ExtraLog, [Dir|ExtraLogArgs]),
%% stop the gc first, otherwise it could be working and we pull
%% out the ets tables from under it.
ok = rabbit_msg_store_gc:stop(GCPid),
State3 = case CurHdl of
undefined -> State;
_ -> State2 = internal_sync(State),
ok = writer_close(CurHdl),
State2
end,
case store_file_summary(FileSummaryEts, Dir) of
ok -> ok;
{error, FSErr} ->
rabbit_log:error("Unable to store file summary"
" for vhost message store for directory ~tp~n"
"Error: ~tp",
[Dir, FSErr])
end,
[true = ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts,
CurFileCacheEts, FlyingEts]],
index_terminate(IndexEts, Dir),
case store_recovery_terms([{client_refs, maps:keys(Clients)}], Dir) of
ok ->
rabbit_log:info("Message store for directory '~ts' is stopped", [Dir]),
ok;
{error, RTErr} ->
rabbit_log:error("Unable to save message store recovery terms"
" for directory ~tp~nError: ~tp",
[Dir, RTErr])
end,
State3 #msstate { current_file_handle = undefined,
current_file_offset = 0 }.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
%%----------------------------------------------------------------------------
%% general helper functions
%%----------------------------------------------------------------------------