Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EDU-3801: Update Money Transfer tutorial code to support Temporal Cloud #35

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions src/main/java/moneytransferapp/ClientProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package moneytransferapp;

import io.grpc.Metadata;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import io.grpc.stub.MetadataUtils;
import io.temporal.authorization.AuthorizationGrpcMetadataProvider;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowClientOptions;
import io.temporal.serviceclient.SimpleSslContextBuilder;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import javax.net.ssl.SSLException;

public class ClientProvider {

/**
* @return a Workflow Client that is configured to use mTLS, API Key, or no
* authentication, based on the whether / how environment variables are set.
* @throws java.io.IOException if there is a problem with configuration
*/
public static WorkflowClient getClient() throws IOException {
// Specify the address of your Temporal Service (format: hostname:port)
String hostPort = System.getenv("TEMPORAL_ADDRESS");
if (hostPort == null) {
hostPort = "localhost:7233";
}

// Specify the Namespace to use for requests from this Client
String namespace = System.getenv("TEMPORAL_NAMESPACE");
if (namespace == null) {
namespace = "default";
}

// Specify the path to the certificate and private key paths if using mTLS auth
String clientCertFile = System.getenv("TEMPORAL_TLS_CERT");
String clientCertPrivateKey = System.getenv("TEMPORAL_TLS_KEY");

// If using API Key authentication, specify the API Key value
String apiKey = System.getenv("TEMPORAL_API_KEY");

// Default to no authentication
WorkflowClient client = getDefaultClient(hostPort, namespace);
if (clientCertFile != null && clientCertPrivateKey != null) {
client = getMtlsClient(hostPort, namespace, clientCertFile, clientCertPrivateKey);
} else if (apiKey != null) {
client = getApiKeyClient(hostPort, namespace, apiKey);
}

return client;
}

/**
* @param hostPort Temporal Service address (format as hostname:port)
* @param namespace Namespace that this Client should use for requests
* @return a Workflow Client that is not configured to use either mTLS or
* API Key authentication (this is intended for use with a local Temporal
* Service started from the <code>temporal</code> CLI)
*/
public static WorkflowClient getDefaultClient(String hostPort, String namespace) {
WorkflowServiceStubsOptions stubsOptions = WorkflowServiceStubsOptions.newBuilder()
.setTarget(hostPort)
.build();

WorkflowServiceStubs service = WorkflowServiceStubs.newServiceStubs(stubsOptions);

WorkflowClientOptions options = WorkflowClientOptions.newBuilder()
.setNamespace(namespace)
.build();

return WorkflowClient.newInstance(service, options);
}

/**
* @param hostPort Temporal Service address (format as hostname:port)
* @param namespace Namespace that this Client should use for requests
* @param tlsCertificatePath Path to the TLS certificate file
* @param tlsPrivateKeyPath Path to the TLS private key file
* @return a Workflow Client configured to use API Key authentication
* @throws java.io.FileNotFoundException if there is a problem loading TLS
* key or certificate
* @throws javax.net.ssl.SSLException if there is a problem with
* establishing a TLS connection
*/
public static WorkflowClient getMtlsClient(String hostPort, String namespace,
String tlsCertificatePath, String tlsPrivateKeyPath) throws FileNotFoundException, SSLException {

InputStream clientCertInputStream = new FileInputStream(tlsCertificatePath);
InputStream clientKeyInputStream = new FileInputStream(tlsPrivateKeyPath);
SslContext sslContext = SimpleSslContextBuilder.forPKCS8(clientCertInputStream, clientKeyInputStream).build();

// Specify the IP address, port, and SSL Context for the Service Stubs options
WorkflowServiceStubsOptions stubsOptions = WorkflowServiceStubsOptions.newBuilder()
.setSslContext(sslContext)
.setTarget(hostPort)
.build();

WorkflowServiceStubs service = WorkflowServiceStubs.newServiceStubs(stubsOptions);

WorkflowClientOptions options = WorkflowClientOptions.newBuilder()
.setNamespace(namespace)
.build();

return WorkflowClient.newInstance(service, options);
}

/**
* @param hostPort Temporal Service address (format as hostname:port)
* @param namespace Namespace that this Client should use for requests
* @param apiKey Value of the API key to use for authentication
* @return a Workflow Client configured to use API Key authentication
* @throws javax.net.ssl.SSLException if there is a problem with
* establishing a TLS connection
*/
public static WorkflowClient getApiKeyClient(String hostPort, String namespace,
String apiKey) throws SSLException {

Metadata.Key<String> TEMPORAL_NAMESPACE_HEADER_KEY
= Metadata.Key.of("temporal-namespace", Metadata.ASCII_STRING_MARSHALLER);
Metadata metadata = new Metadata();
metadata.put(TEMPORAL_NAMESPACE_HEADER_KEY, namespace);

WorkflowServiceStubsOptions.Builder stubOptions
= WorkflowServiceStubsOptions.newBuilder()
.setChannelInitializer(
(channel) -> {
channel.intercept(MetadataUtils.newAttachHeadersInterceptor(metadata));
})
.addGrpcMetadataProvider(
new AuthorizationGrpcMetadataProvider(() -> "Bearer " + apiKey))
.setTarget(hostPort);
stubOptions.setSslContext(SimpleSslContextBuilder.noKeyOrCertChain().setUseInsecureTrustManager(false).build());

WorkflowServiceStubs service = WorkflowServiceStubs.newServiceStubs(stubOptions.build());

WorkflowClientOptions clientOptions = WorkflowClientOptions.newBuilder().setNamespace(namespace).build();
return WorkflowClient.newInstance(service, clientOptions);
}
}
25 changes: 5 additions & 20 deletions src/main/java/moneytransferapp/MoneyTransferWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,25 @@
package moneytransferapp;

import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import java.io.IOException;

public class MoneyTransferWorker {

public static void main(String[] args) {
// Create a stub that accesses a Temporal Service on the local development machine
WorkflowServiceStubs serviceStub = WorkflowServiceStubs.newLocalServiceStubs();
public static void main(String[] args) throws IOException {

// The Worker uses the Client to communicate with the Temporal Service
WorkflowClient client = WorkflowClient.newInstance(serviceStub);
// Call a utility method that will return a Temporal Client that is
// configured based on the presence of environment variables
WorkflowClient client = ClientProvider.getClient();

// A WorkerFactory creates Workers
WorkerFactory factory = WorkerFactory.newInstance(client);

// A Worker listens to one Task Queue.
// This Worker processes both Workflows and Activities
Worker worker = factory.newWorker(Shared.MONEY_TRANSFER_TASK_QUEUE);

// Register a Workflow implementation with this Worker
// The implementation must be known at runtime to dispatch Workflow tasks
// Workflows are stateful so a type is needed to create instances.
worker.registerWorkflowImplementationTypes(MoneyTransferWorkflowImpl.class);

// Register Activity implementation(s) with this Worker.
// The implementation must be known at runtime to dispatch Activity tasks
// Activities are stateless and thread safe so a shared instance is used.
worker.registerActivitiesImplementations(new AccountActivityImpl());

System.out.println("Worker is running and actively polling the Task Queue.");
System.out.println("To quit, use ^C to interrupt.");

// Start all registered Workers. The Workers will start polling the Task Queue.
factory.start();
}
}
Expand Down
20 changes: 6 additions & 14 deletions src/main/java/moneytransferapp/TransferApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;

import java.security.SecureRandom;
import java.time.Instant;
import java.util.UUID;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.concurrent.ThreadLocalRandom;
Expand All @@ -30,27 +28,21 @@ public static String randomAccountIdentifier() {
}

public static void main(String[] args) throws Exception {
// Call a utility method that will return a Temporal Client that is
// configured based on the presence of environment variables
WorkflowClient client = ClientProvider.getClient();

// In the Java SDK, a stub represents an element that participates in
// Temporal orchestration and communicates using gRPC.

// A WorkflowServiceStubs communicates with the Temporal front-end service.
WorkflowServiceStubs serviceStub = WorkflowServiceStubs.newLocalServiceStubs();

// A WorkflowClient wraps the stub.
// It can be used to start, signal, query, cancel, and terminate Workflows.
WorkflowClient client = WorkflowClient.newInstance(serviceStub);


// Workflow options configure Workflow stubs.
// A WorkflowId prevents duplicate instances, which are removed.
WorkflowOptions options = WorkflowOptions.newBuilder()
WorkflowOptions workflowOptions = WorkflowOptions.newBuilder()
.setTaskQueue(Shared.MONEY_TRANSFER_TASK_QUEUE)
.setWorkflowId("money-transfer-workflow")
.build();

// WorkflowStubs enable calls to methods as if the Workflow object is local
// but actually perform a gRPC call to the Temporal Service.
MoneyTransferWorkflow workflow = client.newWorkflowStub(MoneyTransferWorkflow.class, options);
MoneyTransferWorkflow workflow = client.newWorkflowStub(MoneyTransferWorkflow.class, workflowOptions);

// Configure the details for this money transfer request
String referenceId = UUID.randomUUID().toString().substring(0, 18);
Expand Down