Skip to content

Commit

Permalink
driver/reader: return the next offset instead of sarama.OffsetNewest (#…
Browse files Browse the repository at this point in the history
…198) (#203)

Suppose the ts in queue is
1 2 3
we use ts = 3 to seek the offset, it return sarama.OffsetNewest
at the same time some data pushed to queue it become
1 2 3 4
we use sarama.OffsetNewest to consume, so we lost the msg ts = 4
  • Loading branch information
july2993 authored Feb 22, 2019
1 parent 2a5d942 commit 8c419d8
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 2 deletions.
2 changes: 1 addition & 1 deletion tidb-binlog/driver/reader/offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (ks *KafkaSeeker) seekOffset(topic string, partition int32, start int64, en
}

if endTS <= ts {
return sarama.OffsetNewest, nil
return end + 1, nil
}

return end, nil
Expand Down
2 changes: 1 addition & 1 deletion tidb-binlog/driver/reader/offset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (to *testOffsetSuite) TestOffset(c *C) {
10: testPoss[20],
15: testPoss[20],
20: testPoss[30],
35: sarama.OffsetNewest,
35: testPoss[30] + 1,
}
for ts, offset := range testCases {
offsetFounds, err := sk.Seek(topic, ts, []int32{0})
Expand Down

0 comments on commit 8c419d8

Please sign in to comment.