Skip to content

Commit

Permalink
Support Lettuce 6 (#2589)
Browse files Browse the repository at this point in the history
* Add support for Lettuce 6

* Finish

* Remove unnecessary null check
  • Loading branch information
Anuraag Agrawal authored Mar 18, 2021
1 parent 6ea1e8d commit 2fd933b
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ muzzle {
pass {
group = "io.lettuce"
module = "lettuce-core"
versions = "[5.1.0.RELEASE,6.0.0.RELEASE)"
versions = "[5.1.0.RELEASE,)"
assertInverse = true
}
}
Expand All @@ -20,6 +20,8 @@ dependencies {
// Only 5.2+ will have command arguments in the db.statement tag.
testLibrary group: 'io.lettuce', name: 'lettuce-core', version: '5.2.0.RELEASE'
testInstrumentation project(':instrumentation:reactor-3.1:javaagent')
}

latestDepTestLibrary group: 'io.lettuce', name: 'lettuce-core', version: '5.+'
test {
systemProperty "testLatestDeps", testLatestDeps
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ dependencies {

implementation project(':instrumentation:lettuce:lettuce-common:library')

latestDepTestLibrary group: 'io.lettuce', name: 'lettuce-core', version: '5.+'

testImplementation project(':instrumentation:lettuce:lettuce-5.1:testing')
testImplementation project(':instrumentation:reactor-3.1:library')
}

test {
systemProperty "testLatestDeps", testLatestDeps
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

import static io.opentelemetry.instrumentation.lettuce.common.LettuceArgSplitter.splitArgs;

import io.lettuce.core.output.CommandOutput;
import io.lettuce.core.protocol.CompleteableCommand;
import io.lettuce.core.protocol.RedisCommand;
import io.lettuce.core.tracing.TraceContext;
import io.lettuce.core.tracing.TraceContextProvider;
import io.lettuce.core.tracing.Tracer;
Expand Down Expand Up @@ -52,7 +55,7 @@ public boolean isEnabled() {
return true;
}

// Added in lettuce 5.2
// Added in lettuce 5.2, ignored in 6.0+
// @Override
public boolean includeCommandArgsInSpanTags() {
return true;
Expand Down Expand Up @@ -196,6 +199,45 @@ public synchronized Tracer.Span remoteEndpoint(Endpoint endpoint) {
return this;
}

// Added and called in 6.0+
// @Override
public synchronized Tracer.Span start(RedisCommand<?, ?, ?> command) {
start();

Span span = this.span;
if (span == null) {
throw new IllegalStateException("Span started but null, this is a programming error.");
}
span.updateName(command.getType().name());

if (command.getArgs() != null) {
args = command.getArgs().toCommandString();
}

if (command instanceof CompleteableCommand) {
CompleteableCommand<?> completeableCommand = (CompleteableCommand<?>) command;
completeableCommand.onComplete(
(o, throwable) -> {
if (throwable != null) {
span.recordException(throwable);
}

CommandOutput<?, ?, ?> output = command.getOutput();
if (output != null) {
String error = output.getError();
if (error != null) {
span.setStatus(StatusCode.ERROR, error);
}
}

finish(span);
});
}

return this;
}

// Not called by Lettuce in 6.0+ (though we call it ourselves above).
@Override
public synchronized Tracer.Span start() {
span = spanBuilder.startSpan();
Expand Down Expand Up @@ -260,12 +302,16 @@ public synchronized Tracer.Span error(Throwable throwable) {
@Override
public synchronized void finish() {
if (span != null) {
if (name != null) {
String statement = RedisCommandSanitizer.sanitize(name, splitArgs(args));
span.setAttribute(SemanticAttributes.DB_STATEMENT, statement);
}
span.end();
finish(span);
}
}

private void finish(Span span) {
if (name != null) {
String statement = RedisCommandSanitizer.sanitize(name, splitArgs(args));
span.setAttribute(SemanticAttributes.DB_STATEMENT, statement);
}
span.end();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package io.opentelemetry.instrumentation.lettuce.v5_1

import static io.opentelemetry.api.trace.SpanKind.CLIENT

import io.lettuce.core.ClientOptions
import io.lettuce.core.ConnectionFuture
import io.lettuce.core.RedisClient
import io.lettuce.core.RedisFuture
Expand All @@ -32,8 +31,6 @@ import spock.util.concurrent.AsyncConditions
abstract class AbstractLettuceAsyncClientTest extends InstrumentationSpecification {
public static final String HOST = "127.0.0.1"
public static final int DB_INDEX = 0
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
public static final ClientOptions CLIENT_OPTIONS = ClientOptions.builder().autoReconnect(false).build()

abstract RedisClient createClient(String uri)

Expand Down Expand Up @@ -86,7 +83,7 @@ abstract class AbstractLettuceAsyncClientTest extends InstrumentationSpecificati

println "Using redis: $redisServer.args"
redisServer.start()
redisClient.setOptions(CLIENT_OPTIONS)
redisClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS)

connection = redisClient.connect()
asyncCommands = connection.async()
Expand All @@ -106,11 +103,11 @@ abstract class AbstractLettuceAsyncClientTest extends InstrumentationSpecificati
def "connect using get on ConnectionFuture"() {
setup:
RedisClient testConnectionClient = RedisClient.create(embeddedDbUri)
testConnectionClient.setOptions(CLIENT_OPTIONS)
testConnectionClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS)

when:
ConnectionFuture connectionFuture = testConnectionClient.connectAsync(StringCodec.UTF8,
new RedisURI(HOST, port, 3, TimeUnit.SECONDS))
RedisURI.create("redis://${HOST}:${port}?timeout=3s"))
StatefulConnection connection = connectionFuture.get()

then:
Expand All @@ -125,11 +122,11 @@ abstract class AbstractLettuceAsyncClientTest extends InstrumentationSpecificati
def "connect exception inside the connection future"() {
setup:
RedisClient testConnectionClient = RedisClient.create(dbUriNonExistent)
testConnectionClient.setOptions(CLIENT_OPTIONS)
testConnectionClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS)

when:
ConnectionFuture connectionFuture = testConnectionClient.connectAsync(StringCodec.UTF8,
new RedisURI(HOST, incorrectPort, 3, TimeUnit.SECONDS))
RedisURI.create("redis://${HOST}:${incorrectPort}?timeout=3s"))
StatefulConnection connection = connectionFuture.get()

then:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package io.opentelemetry.instrumentation.lettuce.v5_1
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace

import io.lettuce.core.ClientOptions
import io.lettuce.core.RedisClient
import io.lettuce.core.api.StatefulConnection
import io.lettuce.core.api.reactive.RedisReactiveCommands
Expand All @@ -24,8 +23,6 @@ import spock.util.concurrent.AsyncConditions
abstract class AbstractLettuceReactiveClientTest extends InstrumentationSpecification {
public static final String HOST = "127.0.0.1"
public static final int DB_INDEX = 0
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
public static final ClientOptions CLIENT_OPTIONS = ClientOptions.builder().autoReconnect(false).build()

abstract RedisClient createClient(String uri)

Expand Down Expand Up @@ -60,7 +57,7 @@ abstract class AbstractLettuceReactiveClientTest extends InstrumentationSpecific

println "Using redis: $redisServer.args"
redisServer.start()
redisClient.setOptions(CLIENT_OPTIONS)
redisClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS)

connection = redisClient.connect()
reactiveCommands = connection.reactive()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package io.opentelemetry.instrumentation.lettuce.v5_1

import static io.opentelemetry.api.trace.SpanKind.CLIENT

import io.lettuce.core.ClientOptions
import io.lettuce.core.RedisClient
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.instrumentation.test.utils.PortUtils
Expand All @@ -18,8 +17,6 @@ import spock.lang.Shared
abstract class AbstractLettuceSyncClientAuthTest extends InstrumentationSpecification {
public static final String HOST = "127.0.0.1"
public static final int DB_INDEX = 0
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
public static final ClientOptions CLIENT_OPTIONS = ClientOptions.builder().autoReconnect(false).build()

abstract RedisClient createClient(String uri)

Expand Down Expand Up @@ -55,7 +52,7 @@ abstract class AbstractLettuceSyncClientAuthTest extends InstrumentationSpecific

def setup() {
redisClient = createClient(embeddedDbUri)
redisClient.setOptions(CLIENT_OPTIONS)
redisClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS)
redisServer.start()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ package io.opentelemetry.instrumentation.lettuce.v5_1
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static java.nio.charset.StandardCharsets.UTF_8

import io.lettuce.core.ClientOptions
import io.lettuce.core.RedisClient
import io.lettuce.core.RedisConnectionException
import io.lettuce.core.RedisException
import io.lettuce.core.ScriptOutputType
import io.lettuce.core.api.StatefulConnection
import io.lettuce.core.api.sync.RedisCommands
Expand All @@ -23,8 +23,6 @@ import spock.lang.Shared
abstract class AbstractLettuceSyncClientTest extends InstrumentationSpecification {
public static final String HOST = "127.0.0.1"
public static final int DB_INDEX = 0
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
public static final ClientOptions CLIENT_OPTIONS = ClientOptions.builder().autoReconnect(false).build()

abstract RedisClient createClient(String uri)

Expand Down Expand Up @@ -76,6 +74,7 @@ abstract class AbstractLettuceSyncClientTest extends InstrumentationSpecificatio

def setup() {
redisClient = createClient(embeddedDbUri)
redisClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS)

redisServer.start()
connection = redisClient.connect()
Expand All @@ -96,7 +95,7 @@ abstract class AbstractLettuceSyncClientTest extends InstrumentationSpecificatio
def "connect"() {
setup:
RedisClient testConnectionClient = createClient(embeddedDbUri)
testConnectionClient.setOptions(CLIENT_OPTIONS)
testConnectionClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS)

when:
StatefulConnection connection = testConnectionClient.connect()
Expand All @@ -112,7 +111,7 @@ abstract class AbstractLettuceSyncClientTest extends InstrumentationSpecificatio
def "connect exception"() {
setup:
RedisClient testConnectionClient = createClient(dbUriNonExistent)
testConnectionClient.setOptions(CLIENT_OPTIONS)
testConnectionClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS)

when:
testConnectionClient.connect()
Expand Down Expand Up @@ -157,7 +156,7 @@ abstract class AbstractLettuceSyncClientTest extends InstrumentationSpecificatio
def "set command localhost"() {
setup:
RedisClient testConnectionClient = createClient(embeddedDbLocalhostUri)
testConnectionClient.setOptions(CLIENT_OPTIONS)
testConnectionClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS)
StatefulConnection connection = testConnectionClient.connect()
String res = connection.sync().set("TESTSETKEY", "TESTSETVAL")

Expand Down Expand Up @@ -451,16 +450,72 @@ abstract class AbstractLettuceSyncClientTest extends InstrumentationSpecificatio
syncCommands.debugSegfault()
expect:
// lettuce tracing does not trace debug
assertTraces(0) {}
assertTraces(1) {
trace(0, 1) {
span(0) {
name "DEBUG"
// Disconnect not an actual error even though an exception is recorded.
errored false
attributes {
"${SemanticAttributes.NET_TRANSPORT.key}" "IP.TCP"
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
"${SemanticAttributes.NET_PEER_PORT.key}" port
"${SemanticAttributes.DB_CONNECTION_STRING.key}" "redis://127.0.0.1:$port"
"${SemanticAttributes.DB_SYSTEM.key}" "redis"
"${SemanticAttributes.DB_STATEMENT.key}" "DEBUG SEGFAULT"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
if (Boolean.getBoolean("testLatestDeps")) {
// Seems to only be recorded with Lettuce 6+
errorEvent(RedisException, "Connection disconnected", 2)
}
}
}
}
}

def "shutdown command (returns void) produces no span"() {
setup:
syncCommands.shutdown(false)
expect:
// lettuce tracing does not trace shutdown
assertTraces(0) {}
assertTraces(1) {
trace(0, 1) {
span(0) {
name "SHUTDOWN"
if (Boolean.getBoolean("testLatestDeps")) {
// Seems to only be treated as an error with Lettuce 6+
errored true
}
attributes {
"${SemanticAttributes.NET_TRANSPORT.key}" "IP.TCP"
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
"${SemanticAttributes.NET_PEER_PORT.key}" port
"${SemanticAttributes.DB_CONNECTION_STRING.key}" "redis://127.0.0.1:$port"
"${SemanticAttributes.DB_SYSTEM.key}" "redis"
"${SemanticAttributes.DB_STATEMENT.key}" "SHUTDOWN NOSAVE"
if (!Boolean.getBoolean("testLatestDeps")) {
// Lettuce adds this tag before 6.0
// TODO(anuraaga): Filter this out?
"error" "Connection disconnected"
}
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
if (Boolean.getBoolean("testLatestDeps")) {
errorEvent(RedisException, "Connection disconnected", 2)
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.lettuce.v5_1

import groovy.transform.PackageScope
import io.lettuce.core.ClientOptions

@PackageScope
final class LettuceTestUtil {

static final ClientOptions CLIENT_OPTIONS

static {
def options = ClientOptions.builder()
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
.autoReconnect(false)
if (Boolean.getBoolean("testLatestDeps")) {
// Force RESP2 on 6+ for consistency in tests
options
.pingBeforeActivateConnection(false)
.protocolVersion(Class.forName("io.lettuce.core.protocol.ProtocolVersion").getField("RESP2").get(null))
}
CLIENT_OPTIONS = options.build()
}

private LettuceTestUtil() {}
}

0 comments on commit 2fd933b

Please sign in to comment.