Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support reading from last message from stream with xread #2725

Merged
merged 3 commits into from
Jun 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/StackExchange.Redis/RedisFeatures.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ namespace StackExchange.Redis
v6_0_6 = new Version(6, 0, 6),
v6_2_0 = new Version(6, 2, 0),
v7_0_0_rc1 = new Version(6, 9, 240), // 7.0 RC1 is version 6.9.240
v7_2_0_rc1 = new Version(7, 1, 240); // 7.2 RC1 is version 7.1.240
v7_2_0_rc1 = new Version(7, 1, 240), // 7.2 RC1 is version 7.1.240
v7_4_0_rc1 = new Version(7, 4, 240); // 7.4 RC1 is version 7.4.240

private readonly Version version;

Expand Down
61 changes: 61 additions & 0 deletions tests/StackExchange.Redis.Tests/StreamTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1511,6 +1511,24 @@ public void StreamReadEmptyStreams()
Assert.Equal(0, len2);
}

[Fact]
public void StreamReadLastMessage()
{
using var conn = Create(require: RedisFeatures.v7_4_0_rc1);
var db = conn.GetDatabase();
var key1 = Me();

// Read the entire stream from the beginning.
db.StreamRead(key1, "0-0");
db.StreamAdd(key1, "field2", "value2");
db.StreamAdd(key1, "fieldLast", "valueLast");
var entries = db.StreamRead(key1, "+");

Assert.NotNull(entries);
Assert.True(entries.Length > 0);
Assert.Equal(new[] { new NameValueEntry("fieldLast", "valueLast") }, entries[0].Values);
}

[Fact]
public void StreamReadExpectedExceptionInvalidCountMultipleStream()
{
Expand Down Expand Up @@ -1590,6 +1608,49 @@ public void StreamReadMultipleStreams()
Assert.Equal(id4, streams[1].Entries[1].Id);
}

[Fact]
public void StreamReadMultipleStreamsLastMessage()
{
using var conn = Create(require: RedisFeatures.v7_4_0_rc1);

var db = conn.GetDatabase();
var key1 = Me() + "a";
var key2 = Me() + "b";

var id1 = db.StreamAdd(key1, "field1", "value1");
var id2 = db.StreamAdd(key1, "field2", "value2");
var id3 = db.StreamAdd(key2, "field3", "value3");
var id4 = db.StreamAdd(key2, "field4", "value4");

var streamList = new[] { new StreamPosition(key1, "0-0"), new StreamPosition(key2, "0-0") };
db.StreamRead(streamList);

var streams = db.StreamRead(streamList);

db.StreamAdd(key1, "field5", "value5");
db.StreamAdd(key1, "field6", "value6");
db.StreamAdd(key2, "field7", "value7");
db.StreamAdd(key2, "field8", "value8");

streamList = new[] { new StreamPosition(key1, "+"), new StreamPosition(key2, "+") };

streams = db.StreamRead(streamList);

Assert.NotNull(streams);
Assert.True(streams.Length == 2);

var stream1 = streams.Where(e => e.Key == key1).First();
Assert.NotNull(stream1.Entries);
Assert.True(stream1.Entries.Length > 0);
Assert.Equal(new[] { new NameValueEntry("field6", "value6") }, stream1.Entries[0].Values);

var stream2 = streams.Where(e => e.Key == key2).First();
Assert.NotNull(stream2.Entries);
Assert.True(stream2.Entries.Length > 0);
Assert.Equal(new[] { new NameValueEntry("field8", "value8") }, stream2.Entries[0].Values);
}


[Fact]
public void StreamReadMultipleStreamsWithCount()
{
Expand Down
Loading