Skip to content

Commit

Permalink
support reading from last message from stream with xread (#2725)
Browse files Browse the repository at this point in the history
* support reading from last message from stream with xread

* recover previous formatting

* add redisversion and use it in integration tests
  • Loading branch information
atakavci authored Jun 8, 2024
1 parent 61c13c2 commit 39b992f
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 1 deletion.
3 changes: 2 additions & 1 deletion src/StackExchange.Redis/RedisFeatures.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ namespace StackExchange.Redis
v6_0_6 = new Version(6, 0, 6),
v6_2_0 = new Version(6, 2, 0),
v7_0_0_rc1 = new Version(6, 9, 240), // 7.0 RC1 is version 6.9.240
v7_2_0_rc1 = new Version(7, 1, 240); // 7.2 RC1 is version 7.1.240
v7_2_0_rc1 = new Version(7, 1, 240), // 7.2 RC1 is version 7.1.240
v7_4_0_rc1 = new Version(7, 4, 240); // 7.4 RC1 is version 7.4.240

private readonly Version version;

Expand Down
61 changes: 61 additions & 0 deletions tests/StackExchange.Redis.Tests/StreamTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1511,6 +1511,24 @@ public void StreamReadEmptyStreams()
Assert.Equal(0, len2);
}

[Fact]
public void StreamReadLastMessage()
{
using var conn = Create(require: RedisFeatures.v7_4_0_rc1);
var db = conn.GetDatabase();
var key1 = Me();

// Read the entire stream from the beginning.
db.StreamRead(key1, "0-0");
db.StreamAdd(key1, "field2", "value2");
db.StreamAdd(key1, "fieldLast", "valueLast");
var entries = db.StreamRead(key1, "+");

Assert.NotNull(entries);
Assert.True(entries.Length > 0);
Assert.Equal(new[] { new NameValueEntry("fieldLast", "valueLast") }, entries[0].Values);
}

[Fact]
public void StreamReadExpectedExceptionInvalidCountMultipleStream()
{
Expand Down Expand Up @@ -1590,6 +1608,49 @@ public void StreamReadMultipleStreams()
Assert.Equal(id4, streams[1].Entries[1].Id);
}

[Fact]
public void StreamReadMultipleStreamsLastMessage()
{
using var conn = Create(require: RedisFeatures.v7_4_0_rc1);

var db = conn.GetDatabase();
var key1 = Me() + "a";
var key2 = Me() + "b";

var id1 = db.StreamAdd(key1, "field1", "value1");
var id2 = db.StreamAdd(key1, "field2", "value2");
var id3 = db.StreamAdd(key2, "field3", "value3");
var id4 = db.StreamAdd(key2, "field4", "value4");

var streamList = new[] { new StreamPosition(key1, "0-0"), new StreamPosition(key2, "0-0") };
db.StreamRead(streamList);

var streams = db.StreamRead(streamList);

db.StreamAdd(key1, "field5", "value5");
db.StreamAdd(key1, "field6", "value6");
db.StreamAdd(key2, "field7", "value7");
db.StreamAdd(key2, "field8", "value8");

streamList = new[] { new StreamPosition(key1, "+"), new StreamPosition(key2, "+") };

streams = db.StreamRead(streamList);

Assert.NotNull(streams);
Assert.True(streams.Length == 2);

var stream1 = streams.Where(e => e.Key == key1).First();
Assert.NotNull(stream1.Entries);
Assert.True(stream1.Entries.Length > 0);
Assert.Equal(new[] { new NameValueEntry("field6", "value6") }, stream1.Entries[0].Values);

var stream2 = streams.Where(e => e.Key == key2).First();
Assert.NotNull(stream2.Entries);
Assert.True(stream2.Entries.Length > 0);
Assert.Equal(new[] { new NameValueEntry("field8", "value8") }, stream2.Entries[0].Values);
}


[Fact]
public void StreamReadMultipleStreamsWithCount()
{
Expand Down

0 comments on commit 39b992f

Please sign in to comment.