Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename master to aggregator #847

Merged
merged 1 commit into from
Aug 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 33 additions & 18 deletions cmd/sonobuoy/app/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package app

import (
"fmt"
"os"

"github.com/heptio/sonobuoy/pkg/config"
Expand All @@ -27,47 +28,61 @@ import (
"github.com/spf13/cobra"
)

var noExit bool
var kubecfg Kubeconfig
type aggregatorInput struct {
noExit bool
kubecfg Kubeconfig
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a single type/variable instead of 2 globals. I then create a single instance of this variable below in NewCmdAggregator which returns a closure around that instance.


func NewCmdMaster() *cobra.Command {
// NewCmdAggregator returns the command that runs Sonobuoy as an aggregator. It will
// load the config, launch plugins, gather results, and query the cluster for data.
func NewCmdAggregator() *cobra.Command {
input := aggregatorInput{}
cmd := &cobra.Command{
Use: "master",
Short: "Runs the master/aggregator component (for internal use)",
Long: "Sonobuoy is an introspective kubernetes component that generates reports on cluster conformance, configuration, and more",
Run: runMaster,
Use: "aggregator",
Short: "Runs the aggregator component (for internal use)",
Long: "Sonobuoy is an introspective kubernetes component that generates reports on cluster conformance, configuration, and more",
Run: func(cmd *cobra.Command, args []string) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't want to change this to RunE but will do that with a bunch of commands at the same time eventually. We should be writing our code in a way that allows us to bubble up errors to the main Execute method so we can centralize error handling rather than having logging/os.Exit all over the place.

if err := runAggregator(input); err != nil {
errlog.LogError(err)
os.Exit(1)
}
},
Hidden: true,
Args: cobra.ExactArgs(0),

// Original command but no longer used. Kept for backward compatibility.
Aliases: []string{"master"},
}
cmd.PersistentFlags().BoolVar(
&noExit, "no-exit", false,
&input.noExit, "no-exit", false,
"Use this if you want sonobuoy to block and not exit. Useful when you want to explicitly grab results.tar.gz",
)
AddKubeconfigFlag(&kubecfg, cmd.Flags())
AddKubeconfigFlag(&input.kubecfg, cmd.Flags())
return cmd
}

func runMaster(cmd *cobra.Command, args []string) {

func runAggregator(input aggregatorInput) error {
cfg, err := config.LoadConfig()
if err != nil {
errlog.LogError(errors.Wrap(err, "error loading sonobuoy configuration"))
os.Exit(1)
return errors.Wrap(err, "error loading sonobuoy configuration")
}

kcfg, err := kubecfg.Get()
kcfg, err := input.kubecfg.Get()
if err != nil {
errlog.LogError(err)
os.Exit(1)
return errors.Wrap(err, "getting kubeconfig")
}

// Run Discovery (gather API data, run plugins)
errcount := discovery.Run(kcfg, cfg)

if noExit {
if input.noExit {
logrus.Info("no-exit was specified, sonobuoy is now blocking")
select {}
}

os.Exit(errcount)
if errcount > 0 {
return fmt.Errorf("%v errors encountered during execution", errcount)
}

return nil
}
2 changes: 1 addition & 1 deletion cmd/sonobuoy/app/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewSonobuoyCommand() *cobra.Command {

cmds.ResetFlags()

cmds.AddCommand(NewCmdMaster())
cmds.AddCommand(NewCmdAggregator())
cmds.AddCommand(NewCmdDelete())
cmds.AddCommand(NewCmdE2E())

Expand Down
22 changes: 11 additions & 11 deletions cmd/sonobuoy/app/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ import (
"github.com/spf13/cobra"
)

// NewCmdWorker is the cobra command that acts as the entrypoint for Sonobuoy when running
// as a sidecar with a plugin. It will wait for a 'done' file then transmit the results to the
// aggregator pod.
func NewCmdWorker() *cobra.Command {

var workerCmd = &cobra.Command{
Use: "worker",
Short: "Gather and send data to the sonobuoy master instance (for internal use)",
Run: runGatherHelp,
Short: "Gather and send data to the sonobuoy aggregator instance (for internal use)",
Hidden: true,
Args: cobra.ExactArgs(0),
}
Expand All @@ -65,10 +66,6 @@ var singleNodeCmd = &cobra.Command{
Args: cobra.ExactArgs(0),
}

func runGatherHelp(cmd *cobra.Command, args []string) {
cmd.Help()
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to add this, if you don't set a Run or RunE it will show usage messages already.


// sigHandler returns a channel that will receive a message after the timeout
// elapses after a SIGTERM is received.
func sigHandler(timeout time.Duration) <-chan struct{} {
Expand Down Expand Up @@ -138,12 +135,15 @@ func runGather(global bool) error {
return errors.Wrap(err, "getting HTTP client")
}

// A single-node results URL looks like:
// http://sonobuoy-master:8080/api/v1/results/by-node/node1/systemd_logs
url := cfg.MasterURL + "/" + cfg.NodeName + "/" + cfg.ResultType
url := ""
if global {
// http://sonobuoy-master:8080/api/v1/results/global/systemd_logs
// A global results URL looks like:
// http://sonobuoy-aggregator:8080/api/v1/results/global/systemd_logs
url = cfg.MasterURL + "/" + cfg.ResultType
} else {
// A single-node results URL looks like:
// http://sonobuoy-aggregator:8080/api/v1/results/by-node/node1/systemd_logs
url = cfg.MasterURL + "/" + cfg.NodeName + "/" + cfg.ResultType
}

err = worker.GatherResults(cfg.ResultsDir+"/done", url, client, sigHandler(plugin.GracefulShutdownPeriod*time.Second))
Expand Down
1 change: 0 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/heptio/sonobuoy/cmd/sonobuoy/app"
)

// main entry point of the program
func main() {
err := app.NewSonobuoyCommand().Execute()
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/client/retrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var tarCommand = []string{
"/usr/bin/env",
"bash",
"-c",
fmt.Sprintf("tar cf - %s/*.tar.gz", config.MasterResultsPath),
fmt.Sprintf("tar cf - %s/*.tar.gz", config.AggregatorResultsPath),
}

// RetrieveResults copies results from a sonobuoy run into a Reader in tar format.
Expand Down Expand Up @@ -71,9 +71,9 @@ func (c *SonobuoyClient) RetrieveResults(cfg *RetrieveConfig) (io.Reader, <-chan
Name(podName).
Namespace(cfg.Namespace).
SubResource("exec").
Param("container", config.MasterContainerName)
Param("container", config.AggregatorContainerName)
req.VersionedParams(&corev1.PodExecOptions{
Container: config.MasterContainerName,
Container: config.AggregatorContainerName,
Command: tarCommand,
Stdin: false,
Stdout: true,
Expand Down
16 changes: 8 additions & 8 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

const (
// DefaultNamespace is the namespace where the master and plugin workers will run (but not necessarily the pods created by the plugin workers).
// DefaultNamespace is the namespace where the aggregator and plugin workers will run (but not necessarily the pods created by the plugin workers).
DefaultNamespace = "heptio-sonobuoy"

// DefaultKubeConformanceImageURL is the URL of the docker image to run for the kube conformance tests.
Expand All @@ -44,12 +44,12 @@ const (
DefaultAggregationServerBindAddress = "0.0.0.0"
// DefaultAggregationServerTimeoutSeconds is the default amount of time the aggregation server will wait for all plugins to complete.
DefaultAggregationServerTimeoutSeconds = 10800 // 180 min
// MasterPodName is the name of the main pod that runs plugins and collects results.
MasterPodName = "sonobuoy"
// MasterContainerName is the name of the main container in the master pod.
MasterContainerName = "kube-sonobuoy"
// MasterResultsPath is the location in the main container of the master pod where results will be archived.
MasterResultsPath = "/tmp/sonobuoy"
// AggregatorPodName is the name of the main pod that runs plugins and collects results.
AggregatorPodName = "sonobuoy"
// AggregatorContainerName is the name of the main container in the aggregator pod.
AggregatorContainerName = "kube-sonobuoy"
// AggregatorResultsPath is the location in the main container of the aggregator pod where results will be archived.
AggregatorResultsPath = "/tmp/sonobuoy"
// DefaultSonobuoyPullPolicy is the default pull policy used in the Sonobuoy config.
DefaultSonobuoyPullPolicy = "IfNotPresent"
// DefaultQueryQPS is the number of queries per second Sonobuoy will make when gathering data.
Expand Down Expand Up @@ -332,7 +332,7 @@ func New() *Config {
cfgUuid, _ := uuid.NewV4()
cfg.UUID = cfgUuid.String()
cfg.Description = "DEFAULT"
cfg.ResultsDir = MasterResultsPath
cfg.ResultsDir = AggregatorResultsPath
cfg.Version = buildinfo.Version

cfg.Filters.Namespaces = ".*"
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugin/aggregation/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (a *Aggregator) HandleHTTPResult(result *plugin.Result, w http.ResponseWrit
// Since most plugins submit over HTTP, this method is currently only used to
// consume an error stream from each plugin's Monitor() function.
//
// If we support plugins that are just simple commands that the sonobuoy master
// If we support plugins that are just simple commands that the Sonobuoy aggregator
// runs, those plugins can submit results through the same channel.
func (a *Aggregator) IngestResults(ctx context.Context, resultsCh <-chan *plugin.Result) {
for {
Expand Down
10 changes: 5 additions & 5 deletions pkg/plugin/driver/daemonset/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (
)

// Plugin is a plugin driver that dispatches containers to each node,
// expecting each pod to report to the master.
// expecting each pod to report to the aggregator.
type Plugin struct {
driver.Base
}
Expand All @@ -51,7 +51,7 @@ type Plugin struct {
var _ plugin.Interface = &Plugin{}

// NewPlugin creates a new DaemonSet plugin from the given Plugin Definition
// and sonobuoy master address.
// and sonobuoy aggregator address.
func NewPlugin(dfn manifest.Manifest, namespace, sonobuoyImage, imagePullPolicy, imagePullSecrets string, customAnnotations map[string]string) *Plugin {
return &Plugin{
driver.Base{
Expand Down Expand Up @@ -81,7 +81,7 @@ func (p *Plugin) ExpectedResults(nodes []v1.Node) []plugin.ExpectedResult {
return ret
}

func getMasterAddress(hostname string) string {
func getAggregatorAddress(hostname string) string {
return fmt.Sprintf("https://%s/api/v1/results/by-node", hostname)
}

Expand All @@ -107,7 +107,7 @@ func (p *Plugin) createDaemonSetDefinition(hostname string, cert *tls.Certificat
Labels: labels,
Annotations: annotations,
OwnerReferences: []metav1.OwnerReference{
metav1.OwnerReference{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a gofmt change; I have my editor set to use gofmt -s which simplifies more than the default/minimum.

{
APIVersion: "v1",
Kind: "Pod",
Name: ownerPod.GetName(),
Expand Down Expand Up @@ -160,7 +160,7 @@ func (p *Plugin) createDaemonSetDefinition(hostname string, cert *tls.Certificat

// Run dispatches worker pods according to the DaemonSet's configuration.
func (p *Plugin) Run(kubeclient kubernetes.Interface, hostname string, cert *tls.Certificate, ownerPod *v1.Pod) error {
daemonSet := p.createDaemonSetDefinition(getMasterAddress(hostname), cert, ownerPod)
daemonSet := p.createDaemonSetDefinition(getAggregatorAddress(hostname), cert, ownerPod)

secret, err := p.MakeTLSSecret(cert)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/plugin/driver/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Plugin struct {
var _ plugin.Interface = &Plugin{}

// NewPlugin creates a new DaemonSet plugin from the given Plugin Definition
// and sonobuoy master address.
// and sonobuoy aggregator address.
func NewPlugin(dfn manifest.Manifest, namespace, sonobuoyImage, imagePullPolicy, imagePullSecrets string, customAnnotations map[string]string) *Plugin {
return &Plugin{
driver.Base{
Expand All @@ -74,7 +74,7 @@ func (p *Plugin) ExpectedResults(nodes []v1.Node) []plugin.ExpectedResult {
}
}

func getMasterAddress(hostname string) string {
func getAggregatorAddress(hostname string) string {
return fmt.Sprintf("https://%s/api/v1/results/%v", hostname, plugin.GlobalResult)
}

Expand All @@ -100,7 +100,7 @@ func (p *Plugin) createPodDefinition(hostname string, cert *tls.Certificate, own
Labels: labels,
Annotations: annotations,
OwnerReferences: []metav1.OwnerReference{
metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "Pod",
Name: ownerPod.GetName(),
Expand Down Expand Up @@ -144,7 +144,7 @@ func (p *Plugin) createPodDefinition(hostname string, cert *tls.Certificate, own

// Run dispatches worker pods according to the Job's configuration.
func (p *Plugin) Run(kubeclient kubernetes.Interface, hostname string, cert *tls.Certificate, ownerPod *v1.Pod) error {
job := p.createPodDefinition(getMasterAddress(hostname), cert, ownerPod)
job := p.createPodDefinition(getAggregatorAddress(hostname), cert, ownerPod)

secret, err := p.MakeTLSSecret(cert)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugin/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ type AggregationConfig struct {

// WorkerConfig is the file given to the sonobuoy worker to configure it to phone home.
type WorkerConfig struct {
// MasterURL is the URL we talk to for submitting results
// MasterURL is the URL we talk to the aggregator pod on for submitting results
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not change this since it is part of an exported struct/config.

MasterURL string `json:"masterurl,omitempty" mapstructure:"masterurl"`
// NodeName is the node name we should call ourselves when sending results
NodeName string `json:"nodename,omitempty" mapstructure:"nodename"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/worker/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ limitations under the License.
*/

// Package worker is responsible for the logic behind submitting results data
// back to a sonobuoy master. This is intended for plugins to leverage, to
// back to a Sonobuoy aggregator. This is intended for plugins to leverage, to
// avoid uploading data manually.
package worker
8 changes: 4 additions & 4 deletions pkg/worker/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ func DoRequest(url string, client *http.Client, callback func() (io.Reader, stri
err = fmt.Errorf("unexpected status code %d", resp.StatusCode)
}
if err != nil {
errlog.LogError(errors.Wrapf(err, "could not send error message to master URL (%s)", url))
errlog.LogError(errors.Wrapf(err, "could not send error message to aggregator URL (%s)", url))
}

return errors.WithStack(err)
}

req, err := http.NewRequest(http.MethodPut, url, input)
if err != nil {
return errors.Wrapf(err, "error constructing master request to %v", url)
return errors.Wrapf(err, "error constructing aggregator request to %v", url)
}
req.Header.Add("content-type", mimeType)
if len(filename) > 0 {
Expand All @@ -98,11 +98,11 @@ func DoRequest(url string, client *http.Client, callback func() (io.Reader, stri

resp, err := pesterClient.Do(req)
if err != nil {
return errors.Wrapf(err, "error encountered dialing master at %v", url)
return errors.Wrapf(err, "error encountered dialing aggregator at %v", url)
}
if resp.StatusCode != http.StatusOK {
// TODO: retry logic for something like a 429 or otherwise
return errors.Errorf("got a %v response when dialing master to %v", resp.StatusCode, url)
return errors.Errorf("got a %v response when dialing aggregator to %v", resp.StatusCode, url)
}
return nil
}
2 changes: 1 addition & 1 deletion pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func init() {
//
// 1. Output data will be placed into an agreed upon results directory.
// 2. The Job will wait for a done file
// 3. The done file contains a single string of the results to be sent to the master
// 3. The done file contains a single string of the results to be sent to the aggregator
func GatherResults(waitfile string, url string, client *http.Client, stopc <-chan struct{}) error {
logrus.WithField("waitfile", waitfile).Info("Waiting for waitfile")
ticker := time.Tick(1 * time.Second)
Expand Down
2 changes: 1 addition & 1 deletion site/docs/v0.13.0/enhancements/v2-worker-master.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

## How it works now

Sonobuoy uses a worker-master model, where a master delegates tasks to worker
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one line from some old docs had gotten updated; I don't really care about the older ones. If its no different to anyone else I'll just leave it rather than trying to fix up other sections in this (or other older docs)

Copy link
Contributor

@zubron zubron Aug 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these files even linked to from the site? I can't find any references to them. :-/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The enhancements aren't linked but they do actually exist if you manually write the URL.

They were really for internal discussion/documentation back then; I made the decision that I didn't really want/need them to be displayed in our online docs site though.

Sonobuoy uses a worker-master model, where an aggregator delegates tasks to worker
pods. When those pods have finished, they need to report the results of their
work back to the master. Presently this is done over an ill-defined, ad hoc
REST-ish client/server model embedded in the server.
Expand Down