Skip to content

Commit

Permalink
Merge pull request #153 from rabbitmq/first-chunk-data-loss-fix
Browse files Browse the repository at this point in the history
Better handle data loss in first chunk
  • Loading branch information
michaelklishin authored Dec 1, 2023
2 parents 94450a4 + 1f6733e commit 023626a
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 8 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/rabbitmq-oci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ jobs:
path: ${{ steps.resolve-artifact-path.outputs.ARTIFACT_PATH }}

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
uses: docker/setup-buildx-action@v3

- name: Cache Docker layers
uses: actions/cache@v2
uses: actions/cache@v3
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-${{ matrix.image_tag_suffix }}-buildx-${{ github.event.pull_request.head.sha || github.sha }}
Expand All @@ -125,7 +125,7 @@ jobs:
- name: Login to DockerHub
if: steps.authorized.outputs.PUSH == 'true'
uses: docker/login-action@v1
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_PASSWORD }}
Expand Down Expand Up @@ -160,7 +160,7 @@ jobs:
echo "::set-output name=TAG_4::${TAG_4}"
- name: Build and push
uses: docker/build-push-action@v2
uses: docker/build-push-action@v5
with:
context: rabbitmq-server/packaging/docker-image
pull: true
Expand Down
35 changes: 31 additions & 4 deletions src/osiris_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@
make_counter/1]).

-export([dump_init/1,
dump_init_idx/1,
dump_chunk/1,
dump_index/1,
dump_crc_check/1]).
%% for testing
-export([
Expand Down Expand Up @@ -597,10 +599,12 @@ init(#{dir := Dir,
%% the empty log case
{ok, SegFd} = open(Filename, ?FILE_OPTS_WRITE),
{ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE),
%% TODO: do we potentially need to truncate the segment
%% here too?
{ok, _} = file:position(SegFd, eof),
{ok, _} = file:position(IdxFd, eof),
{ok, _} = file:position(SegFd, ?LOG_HEADER_SIZE),
%% the segment could potentially have trailing data here so we'll
%% do a truncate just in case. The index would have been truncated
%% earlier
ok = file:truncate(SegFd),
{ok, _} = file:position(IdxFd, ?IDX_HEADER_SIZE),
osiris_log_shared:set_first_chunk_id(Shared, DefaultNextOffset - 1),
#?MODULE{cfg = Cfg,
mode =
Expand Down Expand Up @@ -3029,6 +3033,29 @@ dump_init(File) ->
{ok, <<"OSIL", _V:4/binary>> } = file:read(Fd, ?LOG_HEADER_SIZE),
Fd.

dump_init_idx(File) ->
{ok, Fd} = file:open(File, [raw, binary, read]),
{ok, <<"OSII", _V:4/binary>> } = file:read(Fd, ?IDX_HEADER_SIZE),
Fd.

dump_index(Fd) ->
case file:read(Fd, ?INDEX_RECORD_SIZE_B) of
{ok,
<<ChunkId:64/unsigned,
Timestamp:64/signed,
Epoch:64/unsigned,
FilePos:32/unsigned,
ChType:8/unsigned>>} ->
#{chunk_id => ChunkId,
timestamp => Timestamp,
epoch => Epoch,
file_pos => FilePos,
type => ChType};
Err ->
Err
end.



dump_chunk(Fd) ->
{ok, Pos} = file:position(Fd, cur),
Expand Down
50 changes: 50 additions & 0 deletions test/osiris_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ all() ->

all_tests() ->
[single_node_write,
single_node_write_sub_batch_restart,
single_node_uncorrelated_write,
cluster_write_replication_plain,
cluster_write_replication_tls,
Expand Down Expand Up @@ -178,6 +179,49 @@ single_node_write(Config) ->
?assertEqual(42, osiris:fetch_writer_seq(Leader, Wid)),
ok.

single_node_write_sub_batch_restart(Config) ->
Name = ?config(cluster_name, Config),
Dir = ?config(priv_dir, Config),
Conf0 =
#{name => Name,
reference => Name,
epoch => 1,
leader_node => node(),
replica_nodes => [],
tracking_max_writers => 255,
dir => Dir},
SDir = filename:join(Dir, Name),
{ok, #{leader_pid := Leader}} = osiris:start_cluster(Conf0),
Entries = [simple(<<"abcdefghikjlmn">>) || _ <- lists:seq(1, 11)],
Batch = {batch, 11, 0, iolist_size(Entries), Entries},
ok = osiris:write(Leader, undefined, 42, Batch),
receive
{osiris_written, _Name, _WriterId, [42]} ->
ok = osiris_writer:stop(Conf0),
%% simulate data loss of first chunk, only header remains
truncate(filename:join(SDir, "00000000000000000000.segment"), 56),
{ok, Leader1} = osiris_writer:start(Conf0#{epoch => 3}),
osiris_writer:read_tracking(Leader1),
ok = osiris:write(Leader1, undefined, 43, Batch),
receive
{osiris_written, _, _, [43]} ->
ok = osiris_writer:stop(Conf0),
{ok, Leader2} = osiris_writer:start(Conf0#{epoch => 4}),
%% read tracking to ensure write is actually running ok
#{} = osiris_writer:read_tracking(Leader2),
ok
after 2000 ->
flush(),
exit(osiris_written_timeout)
end,
timer:sleep(1000),
ok
after 2000 ->
flush(),
exit(osiris_written_timeout)
end,
ok.

single_node_uncorrelated_write(Config) ->
Name = ?config(cluster_name, Config),
Conf0 =
Expand Down Expand Up @@ -2113,3 +2157,9 @@ wildcard(Wc) when is_list(Wc) ->
wildcard(Wc) ->
wildcard(unicode:characters_to_list(Wc)).

truncate(File, Sz) ->
{ok, Fd} = file:open(File, [raw, binary, read, write]),
{ok, _} = file:position(Fd, Sz),
ok = file:truncate(Fd),
ok = file:close(Fd),
ok.

0 comments on commit 023626a

Please sign in to comment.