Skip to content

Commit

Permalink
Merge pull request #375 from gatewayd-io/refactor-getters
Browse files Browse the repository at this point in the history
Refactor getter functions
  • Loading branch information
mostafa authored Nov 19, 2023
2 parents ffb1442 + 91b3fe1 commit c01a05d
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 322 deletions.
169 changes: 128 additions & 41 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,20 +210,33 @@ var runCmd = &cobra.Command{
// Create and initialize loggers from the config.
for name, cfg := range conf.Global.Loggers {
loggers[name] = logging.NewLogger(runCtx, logging.LoggerConfig{
Output: cfg.GetOutput(),
Level: cfg.GetLevel(),
TimeFormat: cfg.GetTimeFormat(),
ConsoleTimeFormat: cfg.GetConsoleTimeFormat(),
NoColor: cfg.NoColor,
FileName: cfg.FileName,
MaxSize: cfg.MaxSize,
MaxBackups: cfg.MaxBackups,
MaxAge: cfg.MaxAge,
Compress: cfg.Compress,
LocalTime: cfg.LocalTime,
SyslogPriority: cfg.GetSyslogPriority(),
RSyslogNetwork: cfg.RSyslogNetwork,
RSyslogAddress: cfg.RSyslogAddress,
Output: cfg.GetOutput(),
Level: config.If[zerolog.Level](
config.Exists[string, zerolog.Level](config.LogLevels, cfg.Level),
config.LogLevels[cfg.Level],
config.LogLevels[config.DefaultLogLevel],
),
TimeFormat: config.If[string](
config.Exists[string, string](config.TimeFormats, cfg.TimeFormat),
config.TimeFormats[cfg.TimeFormat],
config.DefaultTimeFormat,
),
ConsoleTimeFormat: config.If[string](
config.Exists[string, string](
config.ConsoleTimeFormats, cfg.ConsoleTimeFormat),
config.ConsoleTimeFormats[cfg.ConsoleTimeFormat],
config.DefaultConsoleTimeFormat,
),
NoColor: cfg.NoColor,
FileName: cfg.FileName,
MaxSize: cfg.MaxSize,
MaxBackups: cfg.MaxBackups,
MaxAge: cfg.MaxAge,
Compress: cfg.Compress,
LocalTime: cfg.LocalTime,
SyslogPriority: cfg.GetSyslogPriority(),
RSyslogNetwork: cfg.RSyslogNetwork,
RSyslogAddress: cfg.RSyslogAddress,
})
}

Expand All @@ -239,10 +252,26 @@ var runCmd = &cobra.Command{
// The plugins are loaded and hooks registered before the configuration is loaded.
pluginRegistry = plugin.NewRegistry(
runCtx,
conf.Plugin.GetPluginCompatibilityPolicy(),
conf.Plugin.GetVerificationPolicy(),
conf.Plugin.GetAcceptancePolicy(),
conf.Plugin.GetTerminationPolicy(),
config.If[config.CompatibilityPolicy](
config.Exists[string, config.CompatibilityPolicy](
config.CompatibilityPolicies, conf.Plugin.CompatibilityPolicy),
config.CompatibilityPolicies[conf.Plugin.CompatibilityPolicy],
config.DefaultCompatibilityPolicy),
config.If[config.VerificationPolicy](
config.Exists[string, config.VerificationPolicy](
config.VerificationPolicies, conf.Plugin.VerificationPolicy),
config.VerificationPolicies[conf.Plugin.VerificationPolicy],
config.DefaultVerificationPolicy),
config.If[config.AcceptancePolicy](
config.Exists[string, config.AcceptancePolicy](
config.AcceptancePolicies, conf.Plugin.AcceptancePolicy),
config.AcceptancePolicies[conf.Plugin.AcceptancePolicy],
config.DefaultAcceptancePolicy),
config.If[config.TerminationPolicy](
config.Exists[string, config.TerminationPolicy](
config.TerminationPolicies, conf.Plugin.TerminationPolicy),
config.TerminationPolicies[conf.Plugin.TerminationPolicy],
config.DefaultTerminationPolicy),
logger,
devMode,
)
Expand Down Expand Up @@ -413,14 +442,20 @@ var runCmd = &cobra.Command{
handler = mergedMetricsHandler(handler)
}

readHeaderTimeout := config.If[time.Duration](
metricsConfig.ReadHeaderTimeout > 0,
metricsConfig.ReadHeaderTimeout,
config.DefaultReadHeaderTimeout,
)

// Check if the metrics server is already running before registering the handler.
if _, err = http.Get(address); err != nil { //nolint:gosec
// The timeout handler limits the nested handlers from running for too long.
mux.Handle(
metricsConfig.Path,
http.TimeoutHandler(
gziphandler.GzipHandler(handler),
metricsConfig.GetTimeout(),
readHeaderTimeout,
"The request timed out while fetching the metrics",
),
)
Expand All @@ -430,19 +465,24 @@ var runCmd = &cobra.Command{
}

// Create a new metrics server.
timeout := config.If[time.Duration](
metricsConfig.Timeout > 0,
metricsConfig.Timeout,
config.DefaultMetricsServerTimeout,
)
metricsServer = &http.Server{
Addr: metricsConfig.Address,
Handler: mux,
ReadHeaderTimeout: metricsConfig.GetReadHeaderTimeout(),
ReadTimeout: metricsConfig.GetTimeout(),
WriteTimeout: metricsConfig.GetTimeout(),
IdleTimeout: metricsConfig.GetTimeout(),
ReadHeaderTimeout: readHeaderTimeout,
ReadTimeout: timeout,
WriteTimeout: timeout,
IdleTimeout: timeout,
}

logger.Info().Fields(map[string]interface{}{
"address": address,
"timeout": metricsConfig.GetTimeout().String(),
"readHeaderTimeout": metricsConfig.GetReadHeaderTimeout().String(),
"timeout": timeout.String(),
"readHeaderTimeout": readHeaderTimeout.String(),
}).Msg("Metrics are exposed")

if metricsConfig.CertFile != "" && metricsConfig.KeyFile != "" {
Expand Down Expand Up @@ -499,11 +539,22 @@ var runCmd = &cobra.Command{
// Create and initialize pools of connections.
for name, cfg := range conf.Global.Pools {
logger := loggers[name]
pools[name] = pool.NewPool(runCtx, cfg.GetSize())
// Check if the pool size is greater than zero.
currentPoolSize := config.If[int](
cfg.Size > 0,
// Check if the pool size is greater than the minimum pool size.
config.If[int](
cfg.Size > config.MinimumPoolSize,
cfg.Size,
config.MinimumPoolSize,
),
config.DefaultPoolSize,
)
pools[name] = pool.NewPool(runCtx, currentPoolSize)

span.AddEvent("Create pool", trace.WithAttributes(
attribute.String("name", name),
attribute.Int("size", cfg.GetSize()),
attribute.Int("size", currentPoolSize),
))

// Get client config from the config file.
Expand All @@ -517,21 +568,49 @@ var runCmd = &cobra.Command{
}

// Fill the missing and zero values with the default ones.
clients[name].TCPKeepAlivePeriod = clients[name].GetTCPKeepAlivePeriod()
clients[name].ReceiveDeadline = clients[name].GetReceiveDeadline()
clients[name].ReceiveTimeout = clients[name].GetReceiveTimeout()
clients[name].SendDeadline = clients[name].GetSendDeadline()
clients[name].ReceiveChunkSize = clients[name].GetReceiveChunkSize()
clients[name].DialTimeout = clients[name].GetDialTimeout()
clients[name].TCPKeepAlivePeriod = config.If[time.Duration](
clients[name].TCPKeepAlivePeriod > 0,
clients[name].TCPKeepAlivePeriod,
config.DefaultTCPKeepAlivePeriod,
)
clients[name].ReceiveDeadline = config.If[time.Duration](
clients[name].ReceiveDeadline > 0,
clients[name].ReceiveDeadline,
config.DefaultReceiveDeadline,
)
clients[name].ReceiveTimeout = config.If[time.Duration](
clients[name].ReceiveTimeout > 0,
clients[name].ReceiveTimeout,
config.DefaultReceiveTimeout,
)
clients[name].SendDeadline = config.If[time.Duration](
clients[name].SendDeadline > 0,
clients[name].SendDeadline,
config.DefaultSendDeadline,
)
clients[name].ReceiveChunkSize = config.If[int](
clients[name].ReceiveChunkSize > 0,
clients[name].ReceiveChunkSize,
config.DefaultChunkSize,
)
clients[name].DialTimeout = config.If[time.Duration](
clients[name].DialTimeout > 0,
clients[name].DialTimeout,
config.DefaultDialTimeout,
)

// Add clients to the pool.
for i := 0; i < cfg.GetSize(); i++ {
for i := 0; i < currentPoolSize; i++ {
clientConfig := clients[name]
client := network.NewClient(
runCtx, clientConfig, logger,
network.NewRetry(
clientConfig.Retries,
clientConfig.GetBackoff(),
config.If[time.Duration](
clientConfig.Backoff > 0,
clientConfig.Backoff,
config.DefaultBackoff,
),
clientConfig.BackoffMultiplier,
clientConfig.DisableBackoffCaps,
loggers[name],
Expand All @@ -553,7 +632,7 @@ var runCmd = &cobra.Command{
attribute.String("localAddress", client.LocalAddr()),
attribute.String("remoteAddress", client.RemoteAddr()),
attribute.Int("retries", clientConfig.Retries),
attribute.String("backoff", clientConfig.GetBackoff().String()),
attribute.String("backoff", client.Retry().Backoff.String()),
attribute.Float64("backoffMultiplier", clientConfig.BackoffMultiplier),
attribute.Bool("disableBackoffCaps", clientConfig.DisableBackoffCaps),
)
Expand Down Expand Up @@ -583,7 +662,7 @@ var runCmd = &cobra.Command{
"localAddress": client.LocalAddr(),
"remoteAddress": client.RemoteAddr(),
"retries": clientConfig.Retries,
"backoff": clientConfig.GetBackoff().String(),
"backoff": client.Retry().Backoff.String(),
"backoffMultiplier": clientConfig.BackoffMultiplier,
"disableBackoffCaps": clientConfig.DisableBackoffCaps,
}
Expand Down Expand Up @@ -611,7 +690,7 @@ var runCmd = &cobra.Command{
"count": strconv.Itoa(pools[name].Size()),
}).Msg("There are clients available in the pool")

if pools[name].Size() != cfg.GetSize() {
if pools[name].Size() != currentPoolSize {
logger.Error().Msg(
"The pool size is incorrect, either because " +
"the clients cannot connect due to no network connectivity " +
Expand All @@ -626,7 +705,7 @@ var runCmd = &cobra.Command{

_, err = pluginRegistry.Run(
pluginTimeoutCtx,
map[string]interface{}{"name": name, "size": cfg.GetSize()},
map[string]interface{}{"name": name, "size": currentPoolSize},
v1.HookName_HOOK_NAME_ON_NEW_POOL)
if err != nil {
logger.Error().Err(err).Msg("Failed to run OnNewPool hooks")
Expand All @@ -642,7 +721,11 @@ var runCmd = &cobra.Command{
logger := loggers[name]
clientConfig := clients[name]
// Fill the missing and zero value with the default one.
cfg.HealthCheckPeriod = cfg.GetHealthCheckPeriod()
cfg.HealthCheckPeriod = config.If[time.Duration](
cfg.HealthCheckPeriod > 0,
cfg.HealthCheckPeriod,
config.DefaultHealthCheckPeriod,
)

proxies[name] = network.NewProxy(
runCtx,
Expand Down Expand Up @@ -689,7 +772,11 @@ var runCmd = &cobra.Command{
runCtx,
cfg.Network,
cfg.Address,
cfg.GetTickInterval(),
config.If[time.Duration](
cfg.TickInterval > 0,
cfg.TickInterval,
config.DefaultTickInterval,
),
network.Option{
// Can be used to send keepalive messages to the client.
EnableTicker: cfg.EnableTicker,
Expand Down
6 changes: 6 additions & 0 deletions config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,10 @@ const (
DefaultHTTPAPIAddress = "localhost:18080"
DefaultGRPCAPINetwork = "tcp"
DefaultGRPCAPIAddress = "localhost:19090"

// Policies.
DefaultCompatibilityPolicy = Strict
DefaultVerificationPolicy = PassDown
DefaultAcceptancePolicy = Accept
DefaultTerminationPolicy = Stop
)
Loading

0 comments on commit c01a05d

Please sign in to comment.