-
Notifications
You must be signed in to change notification settings - Fork 74
/
Copy pathauth.go
146 lines (126 loc) · 3.27 KB
/
auth.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package kafka
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"io/ioutil"
"log"
"os"
"time"
kafkago "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
)
const (
None = "none"
Plain = "plain"
SHA256 = "sha256"
SHA512 = "sha512"
)
type Credentials struct {
Username string `json:"username"`
Password string `json:"password"`
Algorithm string `json:"algorithm"`
ClientCertPem string `json:"clientCertPem"`
ClientKeyPem string `json:"clientKeyPem"`
ServerCaPem string `json:"serverCaPem"`
}
func unmarshalCredentials(auth string) (creds *Credentials, err error) {
creds = &Credentials{
Algorithm: None,
}
err = json.Unmarshal([]byte(auth), &creds)
return
}
func getDialerFromCreds(creds *Credentials) (dialer *kafkago.Dialer) {
dialer = &kafkago.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: tlsConfig(creds),
}
if creds.Algorithm == Plain {
mechanism := plain.Mechanism{
Username: creds.Username,
Password: creds.Password,
}
dialer.SASLMechanism = mechanism
return
} else if creds.Algorithm == SHA256 || creds.Algorithm == SHA512 {
hashes := make(map[string]scram.Algorithm)
hashes["sha256"] = scram.SHA256
hashes["sha512"] = scram.SHA512
mechanism, err := scram.Mechanism(
hashes[creds.Algorithm],
creds.Username,
creds.Password,
)
if err != nil {
ReportError(err, "authentication failed")
return nil
}
dialer.SASLMechanism = mechanism
return
}
return
}
func getDialerFromAuth(auth string) (dialer *kafkago.Dialer) {
if auth != "" {
// Parse the auth string
creds, err := unmarshalCredentials(auth)
if err != nil {
ReportError(err, "Unable to unmarshal credentials")
return nil
}
// Try to create an authenticated dialer from the credentials
// with TLS enabled if the credentials specify a client cert
// and key.
dialer = getDialerFromCreds(creds)
if dialer == nil {
ReportError(nil, "Dialer cannot authenticate")
return nil
}
} else {
// Create a normal (unauthenticated) dialer
dialer = &kafkago.Dialer{
Timeout: 10 * time.Second,
DualStack: false,
}
}
return
}
func fileExists(filename string) bool {
_, err := os.Stat(filename)
return err == nil
}
func tlsConfig(creds *Credentials) *tls.Config {
var clientCertFile = &creds.ClientCertPem
if !fileExists(*clientCertFile) {
ReportError(nil, "client certificate file not found")
return nil
}
var clientKeyFile = &creds.ClientKeyPem
if !fileExists(*clientKeyFile) {
ReportError(nil, "client key file not found")
return nil
}
var cert, err = tls.LoadX509KeyPair(*clientCertFile, *clientKeyFile)
if err != nil {
log.Fatalf("Error creating x509 keypair from client cert file %s and client key file %s", *clientCertFile, *clientKeyFile)
}
var caCertFile = &creds.ServerCaPem
if !fileExists(*caCertFile) {
ReportError(nil, "CA certificate file not found")
return nil
}
caCert, err := ioutil.ReadFile(*caCertFile)
if err != nil {
log.Fatalf("Error opening cert file %s, Error: %s", *caCertFile, err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
return &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
MinVersion: tls.VersionTLS12,
}
}