diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java
index 6fde6f4425df..48de7002d54f 100644
--- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java
+++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java
@@ -31,7 +31,7 @@
  *
  * @see <a href="https://cloud.google.com/pubsub/">Google Cloud Pub/Sub</a>
  */
-public interface PubSub extends Service<PubSubOptions> {
+public interface PubSub extends AutoCloseable, Service<PubSubOptions> {
 
   /**
    * Class for specifying options for listing topics and subscriptions.
diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java
index 19b2e5a35fec..bd69103b9819 100644
--- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java
+++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java
@@ -278,4 +278,9 @@ public Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, Ti
       Iterable<String> ackIds) {
     return null;
   }
+
+  @Override
+  public void close() throws Exception {
+    rpc.close();
+  }
 }
diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java
index d4f00fd7cf37..6ce27e3de56d 100644
--- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java
+++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java
@@ -19,6 +19,8 @@
 import com.google.api.gax.core.RetrySettings;
 import com.google.api.gax.grpc.ApiCallSettings;
 import com.google.api.gax.grpc.ApiException;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.AuthCredentials;
 import com.google.cloud.RetryParams;
 import com.google.cloud.pubsub.PubSubException;
 import com.google.cloud.pubsub.PubSubOptions;
@@ -27,8 +29,10 @@
 import com.google.cloud.pubsub.spi.v1.SubscriberApi;
 import com.google.cloud.pubsub.spi.v1.SubscriberSettings;
 import com.google.common.base.Function;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.protobuf.Empty;
 import com.google.pubsub.v1.AcknowledgeRequest;
 import com.google.pubsub.v1.DeleteSubscriptionRequest;
@@ -50,48 +54,78 @@
 import com.google.pubsub.v1.Subscription;
 import com.google.pubsub.v1.Topic;
 
+import io.grpc.ManagedChannel;
+import io.grpc.Status.Code;
+import io.grpc.netty.NegotiationType;
+import io.grpc.netty.NettyChannelBuilder;
+
 import org.joda.time.Duration;
 
 import java.io.IOException;
 import java.util.Set;
 import java.util.concurrent.Future;
-
-import autovalue.shaded.com.google.common.common.collect.Sets;
-import io.grpc.Status.Code;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 public class DefaultPubSubRpc implements PubSubRpc {
 
   private final PublisherApi publisherApi;
   private final SubscriberApi subscriberApi;
+  private final ScheduledExecutorService executor =
+      MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(8));
 
   public DefaultPubSubRpc(PubSubOptions options) throws IOException {
     try {
-      // Provide (and use a common thread-pool).
-      // This depends on https://github.com/googleapis/gax-java/issues/73
-      PublisherSettings.Builder pbuilder =
-          PublisherSettings.defaultBuilder()
-              .provideChannelWith(options.authCredentials().credentials())
-              .applyToAllApiMethods(apiCallSettings(options));
-      publisherApi = PublisherApi.create(pbuilder.build());
-      SubscriberSettings.Builder sBuilder =
-          SubscriberSettings.defaultBuilder()
-              .provideChannelWith(options.authCredentials().credentials())
-              .applyToAllApiMethods(apiCallSettings(options));
-      subscriberApi = SubscriberApi.create(sBuilder.build());
+      PublisherSettings.Builder pubBuilder =
+          PublisherSettings.defaultBuilder().provideExecutorWith(executor, false);
+      SubscriberSettings.Builder subBuilder =
+          SubscriberSettings.defaultBuilder().provideExecutorWith(executor, false);
+      // todo(mziccard): PublisherSettings should support null/absent credentials for testing
+      if (options.host().contains("localhost")
+          || options.authCredentials().equals(AuthCredentials.noAuth())) {
+        ManagedChannel channel = NettyChannelBuilder.forTarget(options.host())
+            .negotiationType(NegotiationType.PLAINTEXT)
+            .build();
+        pubBuilder.provideChannelWith(channel, true);
+        subBuilder.provideChannelWith(channel, true);
+      } else {
+        GoogleCredentials credentials = options.authCredentials().credentials();
+        pubBuilder.provideChannelWith(
+            credentials.createScoped(PublisherSettings.DEFAULT_SERVICE_SCOPES));
+        subBuilder.provideChannelWith(
+            credentials.createScoped(SubscriberSettings.DEFAULT_SERVICE_SCOPES));
+      }
+      pubBuilder.applyToAllApiMethods(apiCallSettings(options));
+      subBuilder.applyToAllApiMethods(apiCallSettings(options));
+      publisherApi = PublisherApi.create(pubBuilder.build());
+      subscriberApi = SubscriberApi.create(subBuilder.build());
     } catch (Exception ex) {
       throw new IOException(ex);
     }
   }
 
+  private static long translateTimeout(long timeout) {
+    if (timeout < 0) {
+      return 20000;
+    } else if (timeout == 0) {
+      return Long.MAX_VALUE;
+    }
+    return timeout;
+  }
+
   private static ApiCallSettings.Builder apiCallSettings(PubSubOptions options) {
     // TODO: specify timeout these settings:
     // retryParams.retryMaxAttempts(), retryParams.retryMinAttempts()
     RetryParams retryParams = options.retryParams();
+    long connectTimeout = translateTimeout(options.connectTimeout());
+    long readTimeout = translateTimeout(options.readTimeout());
+    long maxTimeout = connectTimeout == Long.MAX_VALUE || readTimeout == Long.MAX_VALUE
+        ? Long.MAX_VALUE : connectTimeout + readTimeout;
     final RetrySettings.Builder builder = RetrySettings.newBuilder()
         .setTotalTimeout(Duration.millis(retryParams.totalRetryPeriodMillis()))
-        .setInitialRpcTimeout(Duration.millis(options.connectTimeout()))
+        .setInitialRpcTimeout(Duration.millis(connectTimeout))
         .setRpcTimeoutMultiplier(1.5)
-        .setMaxRpcTimeout(Duration.millis(options.connectTimeout() + options.readTimeout()))
+        .setMaxRpcTimeout(Duration.millis(maxTimeout))
         .setInitialRetryDelay(Duration.millis(retryParams.initialRetryDelayMillis()))
         .setRetryDelayMultiplier(retryParams.retryDelayBackoffFactor())
         .setMaxRetryDelay(Duration.millis(retryParams.maxRetryDelayMillis()));
@@ -117,7 +151,7 @@ public V apply(ApiException exception) {
 
   @Override
   public Future<Topic> create(Topic topic) {
-    // TODO: it would be nice if we can get the idempotent inforamtion from the ApiCallSettings
+    // TODO: it would be nice if we can get the idempotent information from the ApiCallSettings
     // or from the exception
     return translate(publisherApi.createTopicCallable().futureCall(topic), true);
   }
@@ -149,7 +183,6 @@ public Future<ListTopicSubscriptionsResponse> list(ListTopicSubscriptionsRequest
 
   @Override
   public Future<Empty> delete(DeleteTopicRequest request) {
-    // TODO: check if null is not going to work for Empty
     return translate(publisherApi.deleteTopicCallable().futureCall(request), true,
         Code.NOT_FOUND.value());
   }
@@ -195,4 +228,16 @@ public Future<PullResponse> pull(PullRequest request) {
   public Future<Empty> modify(ModifyPushConfigRequest request) {
     return translate(subscriberApi.modifyPushConfigCallable().futureCall(request), false);
   }
+
+  @Override
+  public ScheduledExecutorService executor() {
+    return executor;
+  }
+
+  @Override
+  public void close() throws Exception {
+    subscriberApi.close();
+    publisherApi.close();
+    executor.shutdown();
+  }
 }
diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java
index 8474ba042234..5af166a9f1ee 100644
--- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java
+++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java
@@ -38,8 +38,9 @@
 import com.google.pubsub.v1.Topic;
 
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 
-public interface PubSubRpc {
+public interface PubSubRpc extends AutoCloseable {
 
   // in all cases root cause of ExecutionException is PubSubException
   Future<Topic> create(Topic topic);
@@ -69,4 +70,6 @@ public interface PubSubRpc {
   Future<PullResponse> pull(PullRequest request);
 
   Future<Empty> modify(ModifyPushConfigRequest request);
+
+  ScheduledExecutorService executor();
 }