From a7d3a0f524ba685eeac89e5db84787b1529a27cf Mon Sep 17 00:00:00 2001 From: liujianping Date: Thu, 9 May 2019 18:55:52 +0800 Subject: [PATCH] refactor for gocyclo & golint --- cmd/jobs.go | 76 +++++++++++++++++++ cmd/main.go | 191 ++++++++++++++++++++--------------------------- cmd/root.go | 12 ++- config/config.go | 15 ---- exec/reporter.go | 10 +++ 5 files changed, 178 insertions(+), 126 deletions(-) create mode 100644 cmd/jobs.go diff --git a/cmd/jobs.go b/cmd/jobs.go new file mode 100644 index 0000000..ad800e3 --- /dev/null +++ b/cmd/jobs.go @@ -0,0 +1,76 @@ +package cmd + +import ( + "context" + "sort" + + "github.com/liujianping/job/config" + "github.com/liujianping/job/exec" + "github.com/x-mod/routine" +) + +//JOBs type +type JOBs struct { + jds []*config.JD + report *exec.Reporter +} + +//NewJOBs new JOBs +func NewJOBs(jds []*config.JD, report *exec.Reporter) *JOBs { + return &JOBs{ + jds: jds, + report: report, + } +} + +//Len of JOBs +func (jobs JOBs) Len() int { + return len(jobs.jds) +} + +//Less Cmp of JOBs +func (jobs JOBs) Less(i, j int) bool { + return jobs.jds[i].Order.Weight < jobs.jds[j].Order.Weight +} + +//Swap of JOBs +func (jobs JOBs) Swap(i, j int) { + jobs.jds[i], jobs.jds[j] = jobs.jds[j], jobs.jds[i] +} + +//Sort of JOBs +func (jobs JOBs) Sort() { + sort.Sort(jobs) +} + +//Execute impl executor +func (jobs JOBs) Execute(ctx context.Context) error { + jmap := make(map[string]chan error, jobs.Len()) + var tail chan error + for _, jd := range jobs.jds { + if len(jd.Order.Precondition) == 0 { + job := exec.NewJob(jd, jobs.report) + ch := routine.Go(ctx, job) + jmap[job.String()] = ch + tail = ch + if jd.Order.Wait { + <-ch + } + } + } + for _, jd := range jobs.jds { + for _, pre := range jd.Order.Precondition { + if ch, ok := jmap[pre]; ok { + <-ch + } + job := exec.NewJob(jd, jobs.report) + ch := routine.Go(ctx, job) + jmap[job.String()] = ch + tail = ch + if jd.Order.Wait { + <-ch + } + } + } + return <-tail +} diff --git a/cmd/main.go b/cmd/main.go index f75eb47..3a48f61 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -4,54 +4,98 @@ import ( "context" "fmt" "os" - "runtime" - "sort" "github.com/x-mod/httpclient" - "gopkg.in/yaml.v2" - "github.com/liujianping/job/config" "github.com/liujianping/job/exec" - "github.com/liujianping/job/build" "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/x-mod/errors" "github.com/x-mod/routine" + "gopkg.in/yaml.v2" ) -func CheckErr(err error) { - if err != nil { - fmt.Println("job failed: ", errors.CauseFrom(err)) - fmt.Println() - } -} - -func Main(cmd *cobra.Command, args []string) { - if viper.GetBool("version") { - build.Print() - os.Exit(0) - } - - if len(viper.GetString("config")) == 0 && len(args) == 0 { - cmd.Help() - os.Exit(0) - } +var ( + needReport = false + httpConnections = 0 +) +func prepareJDs(configPath string, command string) ([]*config.JD, error) { jds := []*config.JD{} - if len(viper.GetString("config")) > 0 { - cfs, err := config.ParseJDs(viper.GetString("config")) + if len(configPath) > 0 { + cfs, err := config.ParseJDs(configPath) if err != nil { - fmt.Println("job failed:", err) - os.Exit(1) + return nil, err + } jds = cfs } else { jd := config.CommandJD() - jd.Command.Shell.Name = args[0] + jd.Command.Shell.Name = command jds = append(jds, jd) } + return jds, nil +} + +func optionJDs(jds []*config.JD, options []config.Option, report bool) { + for _, jd := range jds { + for _, opt := range options { + opt(jd) + if report { + jd.Report = true + } + if jd.Report { + needReport = true + } + if jd.Command.HTTP != nil { + httpConnections = httpConnections + jd.Concurrent + } + } + } +} + +func output(jds []*config.JD) { + for i, jd := range jds { + bt, err := yaml.Marshal(map[string]*config.JD{ + "Job": jd, + }) + exitForErr(err) + if i > 0 { + fmt.Println("---") + } + fmt.Print(string(bt)) + } +} + +func exitForErr(err error) { + if err != nil { + fmt.Println("job failed:", err) + os.Exit(errors.ValueFrom(err)) + } +} + +func withVerbose(ctx context.Context) context.Context { + if viper.GetBool("verbose") { + log := logrus.New() + log.SetLevel(logrus.TraceLevel) + return routine.WithLogger(ctx, log) + } + return ctx +} + +func withTransport(ctx context.Context) context.Context { + if httpConnections > 0 { + return exec.WithTransport(ctx, httpclient.NewHTTPTransport(httpclient.MaxIdleConnections(httpConnections))) + } + return ctx +} + +//Main func +func Main(cmd *cobra.Command, args []string) { + jds, err := prepareJDs(viper.GetString("config"), args[0]) + exitForErr(err) options := []config.Option{} options = append(options, config.Name(viper.GetString("name"))) @@ -67,52 +111,22 @@ func Main(cmd *cobra.Command, args []string) { options = append(options, config.CommandArgs(args[1:]...)) } options = append(options, config.CommandStdoutDiscard(viper.GetBool("cmd-stdout-discard"))) - options = append(options, config.Guarantee(viper.GetBool("guarantee"))) options = append(options, config.Crontab(viper.GetString("crontab"))) options = append(options, config.RepeatTimes(viper.GetInt("repeat-times"))) options = append(options, config.RepeatInterval(viper.GetDuration("repeat-interval"))) options = append(options, config.Timeout(viper.GetDuration("timeout"))) options = append(options, config.Concurrent(viper.GetInt("concurrent"))) - - needReport := false - httpShared := 0 - for _, jd := range jds { - for _, opt := range options { - opt(jd) - if viper.GetBool("report") { - jd.Report = true - } - if jd.Report { - needReport = true - } - if jd.Command.HTTP != nil { - httpShared = httpShared + jd.Concurrent - } - } - } - - sort.Sort(config.JDs(jds)) + optionJDs(jds, options, viper.GetBool("report")) //output if viper.GetBool("output") { - for i, jd := range jds { - bt, err := yaml.Marshal(map[string]*config.JD{ - "Job": jd, - }) - if err != nil { - fmt.Println("job failed:", err) - os.Exit(errors.ValueFrom(err)) - } - if i > 0 { - fmt.Println("---") - } - fmt.Print(string(bt)) - } + output(jds) os.Exit(0) } - + //reporter var reporter *exec.Reporter + //main options mainOptions := []routine.Opt{routine.Interrupts(routine.DefaultCancelInterruptors...)} if needReport { n := viper.GetInt("repeat-times") * viper.GetInt("concurrent") @@ -128,53 +142,10 @@ func Main(cmd *cobra.Command, args []string) { }) mainOptions = append(mainOptions, routine.Prepare(prepare), routine.Cleanup(cleanup)) } - - ctx := context.TODO() - if viper.GetBool("verbose") { - log := logrus.New() - log.SetLevel(logrus.TraceLevel) - ctx = routine.WithLogger(ctx, log) - } - if httpShared > 0 { - ctx = exec.WithTransport(ctx, httpclient.NewHTTPTransport(httpclient.MaxIdleConnections(httpShared))) - } - runtime.GOMAXPROCS(runtime.NumCPU()) - - err := routine.Main( - ctx, - routine.ExecutorFunc(func(ctx context.Context) error { - jobs := make(map[string]chan error, len(jds)) - var tail chan error - for _, jd := range jds { - if len(jd.Order.Precondition) == 0 { - job := exec.NewJob(jd, reporter) - ch := routine.Go(ctx, job) - jobs[job.String()] = ch - tail = ch - if jd.Order.Wait { - <-ch - } - } - } - for _, jd := range jds { - for _, pre := range jd.Order.Precondition { - if ch, ok := jobs[pre]; ok { - <-ch - } - job := exec.NewJob(jd, reporter) - ch := routine.Go(ctx, job) - jobs[job.String()] = ch - tail = ch - if jd.Order.Wait { - <-ch - } - } - } - return <-tail - }), - mainOptions...) - if err != nil { - fmt.Println("job failed:", err) - } - os.Exit(errors.ValueFrom(err)) + jobs := NewJOBs(jds, reporter) + jobs.Sort() + exitForErr(routine.Main( + withVerbose(withTransport(context.TODO())), + jobs, + mainOptions...)) } diff --git a/cmd/root.go b/cmd/root.go index b2e651e..ca353f2 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -18,6 +18,7 @@ import ( "os" "time" + "github.com/liujianping/job/build" "github.com/spf13/cobra" "github.com/spf13/viper" ) @@ -41,6 +42,15 @@ var rootCmd = &cobra.Command{ (job output) $: job -n 10 -i 500ms -T 3s -o -- echo hello (job config) $: job -f /path/to/job.yaml`, Run: func(cmd *cobra.Command, args []string) { + if viper.GetBool("version") { + build.Print() + os.Exit(0) + } + + if len(viper.GetString("config")) == 0 && len(args) == 0 { + cmd.Help() + os.Exit(0) + } Main(cmd, args) }, } @@ -57,7 +67,7 @@ func init() { rootCmd.Flags().StringP("config", "f", "", "job config file path") rootCmd.Flags().StringP("name", "N", "", "job name definition") metadata = rootCmd.Flags().StringToStringP("metadata", "M", map[string]string{}, "job metadata definition") - envs = rootCmd.Flags().StringToStringP("cmd-env", "e", map[string]string{}, "job command enviromental variables") + envs = rootCmd.Flags().StringToStringP("cmd-env", "e", map[string]string{}, "job command environmental variables") rootCmd.Flags().IntP("cmd-retry", "r", 0, "job command retry times when failed") rootCmd.Flags().DurationP("cmd-timeout", "t", 0, "job command timeout duration") rootCmd.Flags().BoolP("cmd-stdout-discard", "d", false, "job command stdout discard ?") diff --git a/config/config.go b/config/config.go index 78b6b90..64113c8 100644 --- a/config/config.go +++ b/config/config.go @@ -253,18 +253,3 @@ func Metadata(key string, val interface{}) Option { } } } - -//JDs type -type JDs []*JD - -func (js JDs) Len() int { - return len(js) -} - -func (js JDs) Less(i, j int) bool { - return js[i].Order.Weight < js[j].Order.Weight -} - -func (js JDs) Swap(i, j int) { - js[i], js[j] = js[j], js[i] -} diff --git a/exec/reporter.go b/exec/reporter.go index 21f4296..7741b2b 100644 --- a/exec/reporter.go +++ b/exec/reporter.go @@ -12,6 +12,7 @@ import ( const maxNum = 1000000 +//ReportData struct type ReportData struct { AvgTotal float64 Fastest float64 @@ -40,11 +41,13 @@ type ReportData struct { Histogram []Bucket } +//LatencyDistribution struct type LatencyDistribution struct { Percentage int Latency float64 } +//Bucket struct type Bucket struct { Mark float64 Count int @@ -55,6 +58,7 @@ const ( barChar = "■" ) +//Reporter struct type Reporter struct { upTime time.Time results chan *routine.Result @@ -78,6 +82,7 @@ type Reporter struct { numRes int64 } +//NewReporter new func NewReporter(n int) *Reporter { cap := min(n, maxNum) return &Reporter{ @@ -89,10 +94,12 @@ func NewReporter(n int) *Reporter { } } +//Report chan func (r *Reporter) Report() chan *routine.Result { return r.results } +//Execute impl executor func (r *Reporter) Execute(ctx context.Context) error { // Loop will continue until channel is closed for res := range r.results { @@ -123,10 +130,13 @@ func min(a, b int) int { } return b } + +//Stop reporter func (r *Reporter) Stop() { close(r.results) } +//Finalize end func (r *Reporter) Finalize() { r.total = time.Since(r.upTime) r.ops = float64(r.numRes) / r.total.Seconds()