Skip to content

Commit

Permalink
translate exceptions and add more info regarding pull
Browse files Browse the repository at this point in the history
  • Loading branch information
aozarov authored and mziccard committed Apr 27, 2016
1 parent fb6562e commit 72b8533
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 74 deletions.
5 changes: 5 additions & 0 deletions gcloud-java-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,10 @@
<artifactId>protobuf-java</artifactId>
<version>3.0.0-beta-1</version>
</dependency>
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
<version>0.0.11</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.api.client.googleapis.json.GoogleJsonError;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.gax.grpc.ApiException;
import com.google.common.base.MoreObjects;

import java.io.IOException;
Expand Down Expand Up @@ -143,6 +144,16 @@ public BaseServiceException(int code, String message, String reason, boolean ide
this.debugInfo = null;
}

public BaseServiceException(ApiException apiException, boolean idempotent) {
super(apiException.getMessage(), apiException);
this.code = apiException.getStatusCode().value();
this.reason = apiException.getStatusCode().name();
this.idempotent = idempotent;
this.retryable = apiException.isRetryable();
this.location = null;
this.debugInfo = null;
}

protected Set<Error> retryableErrors() {
return Collections.emptySet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ final class PullOption implements Serializable {
private final Object value;

enum Option {
RETURN_IMMEDIATELY, MAX_MESSAGES
MAX_MESSAGES
}

private PullOption(Option option, Object value) {
Expand All @@ -90,10 +90,6 @@ Object value() {
return value;
}

public static PullOption returnImmediately() {
return new PullOption(Option.RETURN_IMMEDIATELY, true);
}

public static PullOption maxMessages(int maxMessages) {
return new PullOption(Option.MAX_MESSAGES, maxMessages);
}
Expand All @@ -108,11 +104,38 @@ interface MessageProcessor {
}

/**
* An interface to control asynchronous pulling.
* An interface to control message consumer settings.
*/
interface MessageConsumer extends AutoCloseable {

void start();
final class PullOption implements Serializable {

private final Option option;
private final Object value;

enum Option {
MAX_CONCURRENT_CALLBACKS
}

private PullOption(Option option, Object value) {
this.option = option;
this.value = value;
}

Option option() {
return option;
}

Object value() {
return value;
}

public static PullOption maxConcurrentCallbacks(int maxConcurrency) {
return new PullOption(Option.MAX_CONCURRENT_CALLBACKS, maxConcurrency);
}
}

void start(MessageConsumer.PullOption... options);

void stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,47 +16,31 @@

package com.google.cloud.pubsub;

import com.google.api.gax.grpc.ApiException;
import com.google.cloud.BaseServiceException;
import com.google.cloud.RetryHelper.RetryHelperException;
import com.google.cloud.RetryHelper.RetryInterruptedException;
import com.google.common.collect.ImmutableSet;

import java.io.IOException;
import java.util.Set;

/**
* Pub/Sub service exception.
*
* @see <a href="https://cloud.google.com/pubsub/error-codes">Google Cloud Pub/Sub error codes</a>
*/
public class PubSubException extends BaseServiceException {
public final class PubSubException extends BaseServiceException {

private static final long serialVersionUID = 6434989638600001226L;
private static final Set<Error> RETRYABLE_ERRORS = ImmutableSet.of(
new Error(499, null),
new Error(503, null),
new Error(429, null),
new Error(500, null),
new Error(504, null));

public PubSubException(int code, String message) {
super(code, message, null, true);

public PubSubException(IOException ex, boolean idempotent) {
super(ex, idempotent);
}

@Override
protected Set<Error> retryableErrors() {
return RETRYABLE_ERRORS;
public PubSubException(ApiException apiException, boolean idempotent) {
super(apiException, idempotent);
}

/**
* Translate RetryHelperException to the ResourceManagerException that caused the error. This
* method will always throw an exception.
*
* @throws PubSubException when {@code ex} was caused by a {@code
* ResourceManagerException}
* @throws RetryInterruptedException when {@code ex} is a {@code RetryInterruptedException}
*/
static PubSubException translateAndThrow(RetryHelperException ex) {
BaseServiceException.translateAndPropagateIfPossible(ex);
throw new PubSubException(UNKNOWN_CODE, ex.getMessage());
@Override
protected Set<Error> retryableErrors() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ private static <V> V get(Future<V> future) {
try {
return Uninterruptibles.getUninterruptibly(future);
} catch (ExecutionException ex) {
// TODO: we should propagate PubSubException
throw Throwables.propagate(ex.getCause());
}
}
Expand Down Expand Up @@ -193,16 +192,22 @@ public Future<AsyncPage<Subscription>> listSubscriptionsAsync(String topic,

@Override
public Iterator<ReceivedMessage> pull(String subscription, PullOption... options) {
// this should set return_immediately to true
return null;
}

@Override
public Future<Iterator<ReceivedMessage>> pullAsync(String subscription, PullOption... options) {
// though this method can set return_immediately to false (as future can be canceled) I
// suggest to keep it false so sync could delegate to asyc and use the same options
// this method also should use the VTKIT thread-pool to renew ack deadline for non consumed
// messages
return null;
}

@Override
public MessageConsumer pullAsync(String subscription, MessageProcessor callback) {
// this method should use the VTKIT thread-pool (maybe getting it should be part of the spi)
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package com.google.cloud.pubsub;

import static com.google.cloud.BaseServiceException.UNKNOWN_CODE;

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.spi.DefaultPubSubRpc;
import com.google.cloud.pubsub.spi.PubSubRpc;
Expand Down Expand Up @@ -51,17 +49,14 @@ public static PubSubOptions defaultInstance() {
}

public static class DefaultPubSubRpcFactory implements PubSubRpcFactory {
private static final PubSubRpcFactory INSTANCE =
new DefaultPubSubRpcFactory();
private static final PubSubRpcFactory INSTANCE = new DefaultPubSubRpcFactory();

@Override
public PubSubRpc create(PubSubOptions options) {
try {
return new DefaultPubSubRpc(options);
} catch (IOException e) {
PubSubException exception = new PubSubException(UNKNOWN_CODE, e.getMessage());
exception.initCause(e);
throw exception;
throw new PubSubException(e, true);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@
import com.google.api.gax.core.ConnectionSettings;
import com.google.api.gax.core.RetrySettings;
import com.google.api.gax.grpc.ApiCallSettings;
import com.google.api.gax.grpc.ApiException;
import com.google.cloud.RetryParams;
import com.google.cloud.pubsub.PubSubException;
import com.google.cloud.pubsub.PubSubOptions;
import com.google.cloud.pubsub.spi.v1.PublisherApi;
import com.google.cloud.pubsub.spi.v1.PublisherSettings;
import com.google.cloud.pubsub.spi.v1.SubscriberApi;
import com.google.cloud.pubsub.spi.v1.SubscriberSettings;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Empty;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.DeleteSubscriptionRequest;
Expand All @@ -51,22 +56,24 @@
import java.io.IOException;
import java.util.concurrent.Future;

import io.grpc.Status.Code;

public class DefaultPubSubRpc implements PubSubRpc {

private final PubSubOptions options;
private final PublisherApi publisherApi;
private final SubscriberApi subscriberApi;

public DefaultPubSubRpc(PubSubOptions options) throws IOException {
this.options = options;
try {
// Provide (and use a common thread-pool).
// This depends on https://github.com/googleapis/gax-java/issues/73
PublisherSettings.Builder pbuilder = PublisherSettings.defaultInstance().toBuilder();
pbuilder.provideChannelWith(ConnectionSettings.builder()
pbuilder.provideChannelWith(ConnectionSettings.newBuilder()
.provideCredentialsWith(options.authCredentials().credentials()).build());
pbuilder.applyToAllApiMethods(apiCallSettings(options));
publisherApi = PublisherApi.create(pbuilder.build());
SubscriberSettings.Builder sBuilder = SubscriberSettings.defaultInstance().toBuilder();
sBuilder.provideChannelWith(ConnectionSettings.builder()
sBuilder.provideChannelWith(ConnectionSettings.newBuilder()
.provideCredentialsWith(options.authCredentials().credentials()).build());
sBuilder.applyToAllApiMethods(apiCallSettings(options));
subscriberApi = SubscriberApi.create(sBuilder.build());
Expand All @@ -76,7 +83,7 @@ public DefaultPubSubRpc(PubSubOptions options) throws IOException {
}

private static ApiCallSettings.Builder apiCallSettings(PubSubOptions options) {
// TODO: figure out how to specify timeout these settings
// TODO: specify timeout these settings:
// retryParams.retryMaxAttempts(), retryParams.retryMinAttempts()
RetryParams retryParams = options.retryParams();
final RetrySettings.Builder builder = RetrySettings.newBuilder()
Expand All @@ -87,91 +94,97 @@ private static ApiCallSettings.Builder apiCallSettings(PubSubOptions options) {
.setInitialRetryDelay(Duration.millis(retryParams.initialRetryDelayMillis()))
.setRetryDelayMultiplier(retryParams.retryDelayBackoffFactor())
.setMaxRetryDelay(Duration.millis(retryParams.maxRetryDelayMillis()));
// TODO: this needs to be replaced with something like ApiCallSettings.of(null, retrySettings)
// once the gax supports it
return new ApiCallSettings.Builder() {

@Override
public RetrySettings.Builder getRetrySettingsBuilder() {
return builder;
}
return ApiCallSettings.newBuilder().setRetrySettingsBuilder(builder);
}

private static <V> Future<V> translate(ListenableFuture<V> from, final boolean idempotent,
final int... returnNullOn) {
return Futures.catching(from, ApiException.class, new Function<ApiException, V>() {
@Override
public ApiCallSettings build() {
return null;
public V apply(ApiException exception) {
throw new PubSubException(exception, idempotent);
}
};
});
}

@Override
public Future<Topic> create(Topic topic) {
// TODO: understand what the exception that could be thrown
// and how to get either retriable and/or the service error codes
return publisherApi.createTopicCallable().futureCall(topic);
// TODO: it would be nice if we can get the idempotent inforamtion from the ApiCallSettings
// or from the exception
return translate(publisherApi.createTopicCallable().futureCall(topic), true);
}

@Override
public Future<PublishResponse> publish(PublishRequest request) {
return publisherApi.publishCallable().futureCall(request);
return translate(publisherApi.publishCallable().futureCall(request), false);
}

@Override
public Future<Topic> get(GetTopicRequest request) {
return publisherApi.getTopicCallable().futureCall(request);
return translate(publisherApi.getTopicCallable().futureCall(request), true,
Code.NOT_FOUND.value());
}

@Override
public Future<ListTopicsResponse> list(ListTopicsRequest request) {
return publisherApi.listTopicsCallable().futureCall(request);
// we should consider using gax PageAccessor once
// https://github.com/googleapis/gax-java/issues/74 is fixed
// Though it is a cleaner SPI without it, but PageAccessor is an interface
// and if it saves code we should not easily dismiss it.
return translate(publisherApi.listTopicsCallable().futureCall(request), true);
}

@Override
public Future<ListTopicSubscriptionsResponse> list(ListTopicSubscriptionsRequest request) {
return publisherApi.listTopicSubscriptionsCallable().futureCall(request);
return translate(publisherApi.listTopicSubscriptionsCallable().futureCall(request), true);
}

@Override
public Future<Empty> delete(DeleteTopicRequest request) {
return publisherApi.deleteTopicCallable().futureCall(request);
// TODO: check if null is not going to work for Empty
return translate(publisherApi.deleteTopicCallable().futureCall(request), true,
Code.NOT_FOUND.value());
}

@Override
public Future<Subscription> create(Subscription subscription) {
return subscriberApi.createSubscriptionCallable().futureCall(subscription);
return translate(subscriberApi.createSubscriptionCallable().futureCall(subscription), false);
}

@Override
public Future<Subscription> get(GetSubscriptionRequest request) {
return subscriberApi.getSubscriptionCallable().futureCall(request);
return translate(subscriberApi.getSubscriptionCallable().futureCall(request), true,
Code.NOT_FOUND.value());
}

@Override
public Future<ListSubscriptionsResponse> list(ListSubscriptionsRequest request) {
return subscriberApi.listSubscriptionsCallable().futureCall(request);
return translate(subscriberApi.listSubscriptionsCallable().futureCall(request), true);
}

@Override
public Future<Empty> delete(DeleteSubscriptionRequest request) {
return subscriberApi.deleteSubscriptionCallable().futureCall(request);
return translate(subscriberApi.deleteSubscriptionCallable().futureCall(request), true,
Code.NOT_FOUND.value());
}

@Override
public Future<Empty> modify(ModifyAckDeadlineRequest request) {
return subscriberApi.modifyAckDeadlineCallable().futureCall(request);
return translate(subscriberApi.modifyAckDeadlineCallable().futureCall(request), false);
}

@Override
public Future<Empty> acknowledge(AcknowledgeRequest request) {
return subscriberApi.acknowledgeCallable().futureCall(request);
return translate(subscriberApi.acknowledgeCallable().futureCall(request), false);
}

@Override
public Future<PullResponse> pull(PullRequest request) {
return subscriberApi.pullCallable().futureCall(request);
return translate(subscriberApi.pullCallable().futureCall(request), false);
}

@Override
public Future<Empty> modify(ModifyPushConfigRequest request) {
return subscriberApi.modifyPushConfigCallable().futureCall(request);
return translate(subscriberApi.modifyPushConfigCallable().futureCall(request), false);
}
}
Loading

0 comments on commit 72b8533

Please sign in to comment.