Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][fn] enable Go function token auth #6

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
8 changes: 7 additions & 1 deletion pulsar-function-go/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,13 @@ type Conf struct {
ProcessingGuarantees int32 `json:"processingGuarantees" yaml:"processingGuarantees"`
SecretsMap string `json:"secretsMap" yaml:"secretsMap"`
Runtime int32 `json:"runtime" yaml:"runtime"`
//Deprecated
// Authentication
ClientAuthenticationPlugin string `json:"clientAuthenticationPlugin" yaml:"clientAuthenticationPlugin"`
ClientAuthenticationParameters string `json:"clientAuthenticationParameters" yaml:"clientAuthenticationParameters"`
TLSTrustCertsFilePath string `json:"tlsTrustCertsFilePath" yaml:"tlsTrustCertsFilePath"`
TLSAllowInsecureConnection bool `json:"tlsAllowInsecureConnection" yaml:"tlsAllowInsecureConnection"`
TLSHostnameVerificationEnable bool `json:"tlsHostnameVerificationEnable" yaml:"tlsHostnameVerificationEnable"`
// Deprecated
AutoACK bool `json:"autoAck" yaml:"autoAck"`
Parallelism int32 `json:"parallelism" yaml:"parallelism"`
//source config
Expand Down
2 changes: 0 additions & 2 deletions pulsar-function-go/examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ go 1.13
require (
github.com/apache/pulsar-client-go v0.8.1
github.com/apache/pulsar/pulsar-function-go v0.0.0
github.com/datadog/zstd v1.4.6-0.20200617134701-89f69fb7df32 // indirect
github.com/yahoo/athenz v1.8.55 // indirect
)

replace github.com/apache/pulsar/pulsar-function-go => ../
Expand Down
121 changes: 39 additions & 82 deletions pulsar-function-go/examples/go.sum

Large diffs are not rendered by default.

37 changes: 34 additions & 3 deletions pulsar-function-go/pf/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ package pf

import (
"context"
"fmt"
"math"
"strconv"
"strings"
"time"

"github.com/golang/protobuf/ptypes/empty"
Expand Down Expand Up @@ -192,11 +194,40 @@ CLOSE:
return nil
}

const (
authPluginToken = "org.apache.pulsar.client.impl.auth.AuthenticationToken"
authPluginNone = ""
)

func (gi *goInstance) setupClient() error {
client, err := pulsar.NewClient(pulsar.ClientOptions{
ic := gi.context.instanceConf

clientOpts := pulsar.ClientOptions{
URL: ic.pulsarServiceURL,
TLSTrustCertsFilePath: ic.tlsTrustCertsPath,
TLSAllowInsecureConnection: ic.tlsAllowInsecure,
TLSValidateHostname: ic.tlsHostnameVerification,
}

switch ic.authPlugin {
case authPluginToken:
switch {
case strings.HasPrefix(ic.authParams, "file://"):
clientOpts.Authentication = pulsar.NewAuthenticationTokenFromFile(ic.authParams[7:])
case strings.HasPrefix(ic.authParams, "token:"):
clientOpts.Authentication = pulsar.NewAuthenticationToken(ic.authParams[6:])
case ic.authParams == "":
return fmt.Errorf("auth plugin %s given, but authParams is empty", authPluginToken)
default:
return fmt.Errorf(`unknown token format - expecting "file://" or "token:" prefix`)
}
case authPluginNone:
clientOpts.Authentication, _ = pulsar.NewAuthentication("", "") // ret: auth.NewAuthDisabled()
default:
return fmt.Errorf("unknown auth provider: %s", ic.authPlugin)
}

URL: gi.context.instanceConf.pulsarServiceURL,
})
client, err := pulsar.NewClient(clientOpts)
if err != nil {
log.Errorf("create client error:%v", err)
gi.stats.incrTotalSysExceptions(err)
Expand Down
10 changes: 10 additions & 0 deletions pulsar-function-go/pf/instanceConf.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ type instanceConf struct {
killAfterIdle time.Duration
expectedHealthCheckInterval int32
metricsPort int
authPlugin string
authParams string
tlsTrustCertsPath string
tlsAllowInsecure bool
tlsHostnameVerification bool
}

func newInstanceConfWithConf(cfg *conf.Conf) *instanceConf {
Expand Down Expand Up @@ -107,6 +112,11 @@ func newInstanceConfWithConf(cfg *conf.Conf) *instanceConf {
},
UserConfig: cfg.UserConfig,
},
authPlugin: cfg.ClientAuthenticationPlugin,
authParams: cfg.ClientAuthenticationParameters,
tlsTrustCertsPath: cfg.TLSTrustCertsFilePath,
tlsAllowInsecure: cfg.TLSAllowInsecureConnection,
tlsHostnameVerification: cfg.TLSHostnameVerificationEnable,
}

if instanceConf.funcDetails.ProcessingGuarantees == pb.ProcessingGuarantees_EFFECTIVELY_ONCE {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ public class GoInstanceConfig {
private int processingGuarantees;
private String secretsMap = "";
private String userConfig = "";

private String clientAuthenticationPlugin = "";
private String clientAuthenticationParameters = "";
private String tlsTrustCertsFilePath = "";
private boolean tlsHostnameVerificationEnable = false;
private boolean tlsAllowInsecureConnection = false;

private int runtime;
private boolean autoAck;
private int parallelism;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public static List<String> getArgsBeforeCmd(InstanceConfig instanceConfig, Strin
*/

public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
AuthenticationConfig authConfig,
String originalCodeFileName,
String pulsarServiceUrl,
boolean k8sRuntime) throws IOException {
Expand Down Expand Up @@ -187,6 +188,23 @@ public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
goInstanceConfig.setParallelism(instanceConfig.getFunctionDetails().getParallelism());
}

if (authConfig != null) {
if (isNotBlank(authConfig.getClientAuthenticationPlugin())
&& isNotBlank(authConfig.getClientAuthenticationParameters())) {
goInstanceConfig.setClientAuthenticationPlugin(authConfig.getClientAuthenticationPlugin());
goInstanceConfig.setClientAuthenticationParameters(authConfig.getClientAuthenticationParameters());
}
goInstanceConfig.setTlsAllowInsecureConnection(
authConfig.isTlsAllowInsecureConnection());
goInstanceConfig.setTlsHostnameVerificationEnable(
authConfig.isTlsHostnameVerificationEnable());
if (isNotBlank(authConfig.getTlsTrustCertsFilePath())){
goInstanceConfig.setTlsTrustCertsFilePath(
authConfig.getTlsTrustCertsFilePath());
}

}

if (instanceConfig.getMaxBufferedTuples() != 0) {
goInstanceConfig.setMaxBufTuples(instanceConfig.getMaxBufferedTuples());
}
Expand Down Expand Up @@ -292,7 +310,8 @@ public static List<String> getCmd(InstanceConfig instanceConfig,
final List<String> args = new LinkedList<>();

if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.GO) {
return getGoInstanceCmd(instanceConfig, originalCodeFileName, pulsarServiceUrl, k8sRuntime);
return getGoInstanceCmd(instanceConfig, authConfig,
originalCodeFileName, pulsarServiceUrl, k8sRuntime);
}

if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ public void getGoInstanceCmd(boolean k8sRuntime) throws IOException {
instanceConfig.setPort(1337);
instanceConfig.setMetricsPort(60000);

AuthenticationConfig authConfig = AuthenticationConfig.builder()
.clientAuthenticationPlugin("org.apache.pulsar.client.impl.auth.AuthenticationToken")
.clientAuthenticationParameters("file:///secret/token.jwt")
.tlsTrustCertsFilePath("/secret/ca.cert.pem")
.tlsHostnameVerificationEnable(true)
.tlsAllowInsecureConnection(false)
.build();

JSONObject userConfig = new JSONObject();
userConfig.put("word-of-the-day", "der Weltschmerz");
Expand Down Expand Up @@ -116,7 +123,7 @@ public void getGoInstanceCmd(boolean k8sRuntime) throws IOException {

instanceConfig.setFunctionDetails(functionDetails);

List<String> commands = RuntimeUtils.getGoInstanceCmd(instanceConfig, "config", "pulsar://localhost:6650", k8sRuntime);
List<String> commands = RuntimeUtils.getGoInstanceCmd(instanceConfig, authConfig,"config", "pulsar://localhost:6650", k8sRuntime);
if (k8sRuntime) {
goInstanceConfig = new ObjectMapper().readValue(commands.get(2).replaceAll("^\'|\'$", ""), HashMap.class);
} else {
Expand Down Expand Up @@ -160,6 +167,11 @@ public void getGoInstanceCmd(boolean k8sRuntime) throws IOException {
Assert.assertEquals(goInstanceConfig.get("deadLetterTopic"), "go-func-deadletter");
Assert.assertEquals(goInstanceConfig.get("userConfig"), userConfig.toString());
Assert.assertEquals(goInstanceConfig.get("metricsPort"), 60000);
Assert.assertEquals(goInstanceConfig.get("clientAuthenticationPlugin"), "org.apache.pulsar.client.impl.auth.AuthenticationToken");
Assert.assertEquals(goInstanceConfig.get("clientAuthenticationParameters"), "file:///secret/token.jwt");
Assert.assertEquals(goInstanceConfig.get("tlsTrustCertsFilePath"), "/secret/ca.cert.pem");
Assert.assertEquals(goInstanceConfig.get("tlsHostnameVerificationEnable"), true);
Assert.assertEquals(goInstanceConfig.get("tlsAllowInsecureConnection"), false);
}

@DataProvider(name = "k8sRuntime")
Expand Down