Skip to content

Commit

Permalink
refactor for gocyclo & golint
Browse files Browse the repository at this point in the history
  • Loading branch information
liujianping committed May 9, 2019
1 parent 53bbc88 commit a7d3a0f
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 126 deletions.
76 changes: 76 additions & 0 deletions cmd/jobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package cmd

import (
"context"
"sort"

"github.com/liujianping/job/config"
"github.com/liujianping/job/exec"
"github.com/x-mod/routine"
)

//JOBs type
type JOBs struct {
jds []*config.JD
report *exec.Reporter
}

//NewJOBs new JOBs
func NewJOBs(jds []*config.JD, report *exec.Reporter) *JOBs {
return &JOBs{
jds: jds,
report: report,
}
}

//Len of JOBs
func (jobs JOBs) Len() int {
return len(jobs.jds)
}

//Less Cmp of JOBs
func (jobs JOBs) Less(i, j int) bool {
return jobs.jds[i].Order.Weight < jobs.jds[j].Order.Weight
}

//Swap of JOBs
func (jobs JOBs) Swap(i, j int) {
jobs.jds[i], jobs.jds[j] = jobs.jds[j], jobs.jds[i]
}

//Sort of JOBs
func (jobs JOBs) Sort() {
sort.Sort(jobs)
}

//Execute impl executor
func (jobs JOBs) Execute(ctx context.Context) error {
jmap := make(map[string]chan error, jobs.Len())
var tail chan error
for _, jd := range jobs.jds {
if len(jd.Order.Precondition) == 0 {
job := exec.NewJob(jd, jobs.report)
ch := routine.Go(ctx, job)
jmap[job.String()] = ch
tail = ch
if jd.Order.Wait {
<-ch
}
}
}
for _, jd := range jobs.jds {
for _, pre := range jd.Order.Precondition {
if ch, ok := jmap[pre]; ok {
<-ch
}
job := exec.NewJob(jd, jobs.report)
ch := routine.Go(ctx, job)
jmap[job.String()] = ch
tail = ch
if jd.Order.Wait {
<-ch
}
}
}
return <-tail
}
191 changes: 81 additions & 110 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,54 +4,98 @@ import (
"context"
"fmt"
"os"
"runtime"
"sort"

"github.com/x-mod/httpclient"

"gopkg.in/yaml.v2"

"github.com/liujianping/job/config"
"github.com/liujianping/job/exec"
"github.com/liujianping/job/build"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/x-mod/errors"
"github.com/x-mod/routine"
"gopkg.in/yaml.v2"
)

func CheckErr(err error) {
if err != nil {
fmt.Println("job failed: ", errors.CauseFrom(err))
fmt.Println()
}
}

func Main(cmd *cobra.Command, args []string) {
if viper.GetBool("version") {
build.Print()
os.Exit(0)
}

if len(viper.GetString("config")) == 0 && len(args) == 0 {
cmd.Help()
os.Exit(0)
}
var (
needReport = false
httpConnections = 0
)

func prepareJDs(configPath string, command string) ([]*config.JD, error) {
jds := []*config.JD{}
if len(viper.GetString("config")) > 0 {
cfs, err := config.ParseJDs(viper.GetString("config"))
if len(configPath) > 0 {
cfs, err := config.ParseJDs(configPath)
if err != nil {
fmt.Println("job failed:", err)
os.Exit(1)
return nil, err

}
jds = cfs
} else {
jd := config.CommandJD()
jd.Command.Shell.Name = args[0]
jd.Command.Shell.Name = command
jds = append(jds, jd)
}
return jds, nil
}

func optionJDs(jds []*config.JD, options []config.Option, report bool) {
for _, jd := range jds {
for _, opt := range options {
opt(jd)
if report {
jd.Report = true
}
if jd.Report {
needReport = true
}
if jd.Command.HTTP != nil {
httpConnections = httpConnections + jd.Concurrent
}
}
}
}

func output(jds []*config.JD) {
for i, jd := range jds {
bt, err := yaml.Marshal(map[string]*config.JD{
"Job": jd,
})
exitForErr(err)
if i > 0 {
fmt.Println("---")
}
fmt.Print(string(bt))
}
}

func exitForErr(err error) {
if err != nil {
fmt.Println("job failed:", err)
os.Exit(errors.ValueFrom(err))
}
}

func withVerbose(ctx context.Context) context.Context {
if viper.GetBool("verbose") {
log := logrus.New()
log.SetLevel(logrus.TraceLevel)
return routine.WithLogger(ctx, log)
}
return ctx
}

func withTransport(ctx context.Context) context.Context {
if httpConnections > 0 {
return exec.WithTransport(ctx, httpclient.NewHTTPTransport(httpclient.MaxIdleConnections(httpConnections)))
}
return ctx
}

//Main func
func Main(cmd *cobra.Command, args []string) {
jds, err := prepareJDs(viper.GetString("config"), args[0])
exitForErr(err)

options := []config.Option{}
options = append(options, config.Name(viper.GetString("name")))
Expand All @@ -67,52 +111,22 @@ func Main(cmd *cobra.Command, args []string) {
options = append(options, config.CommandArgs(args[1:]...))
}
options = append(options, config.CommandStdoutDiscard(viper.GetBool("cmd-stdout-discard")))

options = append(options, config.Guarantee(viper.GetBool("guarantee")))
options = append(options, config.Crontab(viper.GetString("crontab")))
options = append(options, config.RepeatTimes(viper.GetInt("repeat-times")))
options = append(options, config.RepeatInterval(viper.GetDuration("repeat-interval")))
options = append(options, config.Timeout(viper.GetDuration("timeout")))
options = append(options, config.Concurrent(viper.GetInt("concurrent")))

needReport := false
httpShared := 0
for _, jd := range jds {
for _, opt := range options {
opt(jd)
if viper.GetBool("report") {
jd.Report = true
}
if jd.Report {
needReport = true
}
if jd.Command.HTTP != nil {
httpShared = httpShared + jd.Concurrent
}
}
}

sort.Sort(config.JDs(jds))
optionJDs(jds, options, viper.GetBool("report"))

//output
if viper.GetBool("output") {
for i, jd := range jds {
bt, err := yaml.Marshal(map[string]*config.JD{
"Job": jd,
})
if err != nil {
fmt.Println("job failed:", err)
os.Exit(errors.ValueFrom(err))
}
if i > 0 {
fmt.Println("---")
}
fmt.Print(string(bt))
}
output(jds)
os.Exit(0)
}

//reporter
var reporter *exec.Reporter
//main options
mainOptions := []routine.Opt{routine.Interrupts(routine.DefaultCancelInterruptors...)}
if needReport {
n := viper.GetInt("repeat-times") * viper.GetInt("concurrent")
Expand All @@ -128,53 +142,10 @@ func Main(cmd *cobra.Command, args []string) {
})
mainOptions = append(mainOptions, routine.Prepare(prepare), routine.Cleanup(cleanup))
}

ctx := context.TODO()
if viper.GetBool("verbose") {
log := logrus.New()
log.SetLevel(logrus.TraceLevel)
ctx = routine.WithLogger(ctx, log)
}
if httpShared > 0 {
ctx = exec.WithTransport(ctx, httpclient.NewHTTPTransport(httpclient.MaxIdleConnections(httpShared)))
}
runtime.GOMAXPROCS(runtime.NumCPU())

err := routine.Main(
ctx,
routine.ExecutorFunc(func(ctx context.Context) error {
jobs := make(map[string]chan error, len(jds))
var tail chan error
for _, jd := range jds {
if len(jd.Order.Precondition) == 0 {
job := exec.NewJob(jd, reporter)
ch := routine.Go(ctx, job)
jobs[job.String()] = ch
tail = ch
if jd.Order.Wait {
<-ch
}
}
}
for _, jd := range jds {
for _, pre := range jd.Order.Precondition {
if ch, ok := jobs[pre]; ok {
<-ch
}
job := exec.NewJob(jd, reporter)
ch := routine.Go(ctx, job)
jobs[job.String()] = ch
tail = ch
if jd.Order.Wait {
<-ch
}
}
}
return <-tail
}),
mainOptions...)
if err != nil {
fmt.Println("job failed:", err)
}
os.Exit(errors.ValueFrom(err))
jobs := NewJOBs(jds, reporter)
jobs.Sort()
exitForErr(routine.Main(
withVerbose(withTransport(context.TODO())),
jobs,
mainOptions...))
}
12 changes: 11 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"os"
"time"

"github.com/liujianping/job/build"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
Expand All @@ -41,6 +42,15 @@ var rootCmd = &cobra.Command{
(job output) $: job -n 10 -i 500ms -T 3s -o -- echo hello
(job config) $: job -f /path/to/job.yaml`,
Run: func(cmd *cobra.Command, args []string) {
if viper.GetBool("version") {
build.Print()
os.Exit(0)
}

if len(viper.GetString("config")) == 0 && len(args) == 0 {
cmd.Help()
os.Exit(0)
}
Main(cmd, args)
},
}
Expand All @@ -57,7 +67,7 @@ func init() {
rootCmd.Flags().StringP("config", "f", "", "job config file path")
rootCmd.Flags().StringP("name", "N", "", "job name definition")
metadata = rootCmd.Flags().StringToStringP("metadata", "M", map[string]string{}, "job metadata definition")
envs = rootCmd.Flags().StringToStringP("cmd-env", "e", map[string]string{}, "job command enviromental variables")
envs = rootCmd.Flags().StringToStringP("cmd-env", "e", map[string]string{}, "job command environmental variables")
rootCmd.Flags().IntP("cmd-retry", "r", 0, "job command retry times when failed")
rootCmd.Flags().DurationP("cmd-timeout", "t", 0, "job command timeout duration")
rootCmd.Flags().BoolP("cmd-stdout-discard", "d", false, "job command stdout discard ?")
Expand Down
15 changes: 0 additions & 15 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,18 +253,3 @@ func Metadata(key string, val interface{}) Option {
}
}
}

//JDs type
type JDs []*JD

func (js JDs) Len() int {
return len(js)
}

func (js JDs) Less(i, j int) bool {
return js[i].Order.Weight < js[j].Order.Weight
}

func (js JDs) Swap(i, j int) {
js[i], js[j] = js[j], js[i]
}
Loading

0 comments on commit a7d3a0f

Please sign in to comment.