forked from startreedata/pinot-client-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnectionFactory.go
88 lines (80 loc) · 2.23 KB
/
connectionFactory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package pinot
import (
"fmt"
"net/http"
"strings"
)
const (
defaultZkSessionTimeoutSec = 60
)
// NewFromBrokerList create a new Pinot connection with pre configured Pinot Broker list.
func NewFromBrokerList(brokerList []string) (*Connection, error) {
clientConfig := &ClientConfig{
BrokerList: brokerList,
}
return NewWithConfig(clientConfig)
}
// NewFromZookeeper create a new Pinot connection through Pinot Zookeeper.
func NewFromZookeeper(zkPath []string, zkPathPrefix string, pinotCluster string) (*Connection, error) {
clientConfig := &ClientConfig{
ZkConfig: &ZookeeperConfig{
ZookeeperPath: zkPath,
PathPrefix: strings.Join([]string{zkPathPrefix, pinotCluster}, "/"),
SessionTimeoutSec: defaultZkSessionTimeoutSec,
},
}
return NewWithConfig(clientConfig)
}
// NewFromController creates a new Pinot connection that periodically fetches available brokers via the Controller API.
func NewFromController(controllerAddress string) (*Connection, error) {
clientConfig := &ClientConfig{
ControllerConfig: &ControllerConfig{
ControllerAddress: controllerAddress,
},
}
return NewWithConfig(clientConfig)
}
// NewWithConfig create a new Pinot connection.
func NewWithConfig(config *ClientConfig) (*Connection, error) {
transport := &jsonAsyncHTTPClientTransport{
client: http.DefaultClient,
header: config.ExtraHTTPHeader,
}
// Set HTTPTimeout from config
if config.HTTPTimeout != 0 {
transport.client.Timeout = config.HTTPTimeout
}
var conn *Connection
if config.ZkConfig != nil {
conn = &Connection{
transport: transport,
brokerSelector: &dynamicBrokerSelector{
zkConfig: config.ZkConfig,
},
}
}
if config.BrokerList != nil && len(config.BrokerList) > 0 {
conn = &Connection{
transport: transport,
brokerSelector: &simpleBrokerSelector{
brokerList: config.BrokerList,
},
}
}
if config.ControllerConfig != nil {
conn = &Connection{
transport: transport,
brokerSelector: &controllerBasedSelector{
config: config.ControllerConfig,
client: http.DefaultClient,
},
}
}
if conn != nil {
conn.brokerSelector.init()
return conn, nil
}
return nil, fmt.Errorf(
"please specify at least one of Pinot Zookeeper, Pinot Broker or Pinot Controller to connect",
)
}