Skip to content

Commit

Permalink
fix: Fix errors caused by client library upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
palmere-google committed Jul 30, 2021
1 parent 56ce037 commit 874f811
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private static SubscriberServiceClient newSubscriberServiceClient(
RoutingMetadata.of(path, partition),
settingsBuilder);
return SubscriberServiceClient.create(
addDefaultSettings(path.location().region(), settingsBuilder));
addDefaultSettings(path.location().extractRegion(), settingsBuilder));
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
Expand All @@ -127,7 +127,9 @@ AdminClient getAdminClient() {
return adminClientSupplier().get();
}
return AdminClient.create(
AdminClientSettings.newBuilder().setRegion(subscriptionPath().location().region()).build());
AdminClientSettings.newBuilder()
.setRegion(subscriptionPath().location().extractRegion())
.build());
}

CursorClient getCursorClient() {
Expand All @@ -136,7 +138,7 @@ CursorClient getCursorClient() {
}
return CursorClient.create(
CursorClientSettings.newBuilder()
.setRegion(subscriptionPath().location().region())
.setRegion(subscriptionPath().location().extractRegion())
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private static PublisherServiceClient newServiceClient(TopicPath topic, Partitio
settingsBuilder);
try {
return PublisherServiceClient.create(
addDefaultSettings(topic.location().region(), settingsBuilder));
addDefaultSettings(topic.location().extractRegion(), settingsBuilder));
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
Expand All @@ -69,7 +69,7 @@ private static Publisher<MessageMetadata> newPublisher(PublisherOptions options)
.setServiceClient(newServiceClient(options.topicPath(), partition))
.setBatchingSettings(PublisherSettings.DEFAULT_BATCHING_SETTINGS)
.build())
.setAdminClient(getAdminClient(options.topicPath().location().region()))
.setAdminClient(getAdminClient(options.topicPath().location().extractRegion()))
.build()
.instantiate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ public void failed(State s, Throwable t) {
@VisibleForTesting
public synchronized void set(T options, Publisher<MessageMetadata> toCache) {
livePublishers.put(options, toCache);
toCache.addListener(
new Listener() {
@Override
public void failed(State s, Throwable t) {
evict(options);
}
},
SystemExecutors.getAlarmExecutor());
}

@Override
Expand Down

0 comments on commit 874f811

Please sign in to comment.