Skip to content

Commit

Permalink
remove deprecated encoded connector
Browse files Browse the repository at this point in the history
  • Loading branch information
taigrr committed Sep 10, 2024
1 parent ece2cb4 commit 6ec2aa2
Show file tree
Hide file tree
Showing 15 changed files with 75 additions and 61 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ coverage.out
local/

dist/
cmd/farmer/farmer
cmd/grlx/grlx
cmd/sprout/sprout
2 changes: 1 addition & 1 deletion api/handlers/cook.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func Cook(w http.ResponseWriter, r *http.Request) {
}

jid := cook.GenerateJobID()
sub, err := ec.Conn.SubscribeSync(fmt.Sprintf("grlx.farmer.cook.trigger.%s", jid))
sub, err := conn.SubscribeSync(fmt.Sprintf("grlx.farmer.cook.trigger.%s", jid))
if err != nil {
log.Errorf("error subscribing to NATS: %v", err)
return
Expand Down
6 changes: 3 additions & 3 deletions api/handlers/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package handlers

import nats "github.com/nats-io/nats.go"

var ec *nats.EncodedConn
var conn *nats.Conn

func RegisterEC(n *nats.EncodedConn) {
ec = n
func RegisterNatsConn(n *nats.Conn) {
conn = n
}
16 changes: 6 additions & 10 deletions cmd/farmer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,11 @@ func ConnectFarmer() {
log.Errorf("Got an error on Subscribe: %+v\n", err)
}

ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
log.Errorf("Got an error on NewEncodedConn: %+v\n", err)
}
test.RegisterEC(ec)
cmd.RegisterEC(ec)
cook.RegisterEC(ec)
jobs.RegisterEC(ec)
handlers.RegisterEC(ec)
defer ec.Close()
test.RegisterNatsConn(nc)
cmd.RegisterNatsConn(nc)
cook.RegisterNatsConn(nc)
jobs.RegisterNatsConn(nc)
handlers.RegisterNatsConn(nc)
defer nc.Close()
select {}
}
12 changes: 5 additions & 7 deletions cmd/grlx/cmd/cook.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

var (
async bool
async bool
cookTimeout int
)

Expand Down Expand Up @@ -58,15 +58,11 @@ var cmdCook = &cobra.Command{
if err != nil {
log.Fatal(err)
}
ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
log.Fatal(err)
}
finished := make(chan struct{}, 1)
completions := make(chan types.SproutStepCompletion)
topic := fmt.Sprintf("grlx.cook.*.%s", jid)
completionSteps := make(map[string][]types.StepCompletion)
sub, err := ec.Subscribe(topic, func(msg *nats.Msg) {
sub, err := nc.Subscribe(topic, func(msg *nats.Msg) {
var step types.StepCompletion
err := json.Unmarshal(msg.Data, &step)
if err != nil {
Expand Down Expand Up @@ -116,7 +112,9 @@ var cmdCook = &cobra.Command{
log.Fatal(err)
}
// TODO convert this to a request and get back the list of targeted sprouts
ec.Publish(fmt.Sprintf("grlx.farmer.cook.trigger.%s", jid), types.TriggerMsg{JID: jid})
triggerMsg := types.TriggerMsg{JID: jid}
b, _ := json.Marshal(triggerMsg)
nc.Publish(fmt.Sprintf("grlx.farmer.cook.trigger.%s", jid), b)
localTimeout := time.After(time.Duration(cookTimeout) * time.Second)
dripTimeout := time.After(120 * time.Second)
concurrent := 0
Expand Down
8 changes: 2 additions & 6 deletions cmd/grlx/cmd/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ var tailCmd = &cobra.Command{
if err != nil {
log.Fatal(err)
}
ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
log.Fatal(err)
}
sub, err := ec.Subscribe("grlx.>", func(msg *nats.Msg) {
sub, err := nc.Subscribe("grlx.>", func(msg *nats.Msg) {
printTex.Lock()
fmt.Println(msg.Subject)
fmt.Println(string(msg.Data))
Expand All @@ -34,7 +30,7 @@ var tailCmd = &cobra.Command{
if err != nil {
log.Fatal(err)
}
sub2, err := ec.Subscribe("_INBOX.>", func(msg *nats.Msg) {
sub2, err := nc.Subscribe("_INBOX.>", func(msg *nats.Msg) {
printTex.Lock()
fmt.Println(msg.Subject)
fmt.Println(string(msg.Data))
Expand Down
11 changes: 5 additions & 6 deletions cmd/sprout/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,13 @@ func ConnectSprout() {
// //TODO: handle error
// panic(err)
// }
ec, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
test.RegisterEC(ec)
cmd.RegisterEC(ec)
cook.RegisterEC(ec)
err = natsInit(ec)
test.RegisterNatsConn(nc)
cmd.RegisterNatsConn(nc)
cook.RegisterNatsConn(nc)
err = natsInit(nc)
if err != nil {
log.Panicf("Error with natsInit: %v", err)
}
defer ec.Close()
defer nc.Close()
select {}
}
5 changes: 3 additions & 2 deletions cmd/sprout/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func init() {
pki.SetupPKISprout()
}

func natsInit(nc *nats.EncodedConn) error {
func natsInit(nc *nats.Conn) error {
log.Debugf("Announcing on Farmer...")
startup := types.Startup{}
startup.Version.Arch = runtime.GOARCH
Expand All @@ -30,7 +30,8 @@ func natsInit(nc *nats.EncodedConn) error {
startup.Version.Tag = Tag
startup.SproutID = sproutID
startupEvent := "grlx.sprouts.announce." + sproutID
err := nc.Publish(startupEvent, startup)
b, _ := json.Marshal(startup)
err := nc.Publish(startupEvent, b)
if err != nil {
return err
}
Expand Down
8 changes: 7 additions & 1 deletion cook/farmercook.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cook

import (
"encoding/json"
"errors"
"fmt"
"os"
Expand Down Expand Up @@ -90,9 +91,14 @@ func SendCookEvent(sproutID string, recipeID types.RecipeName, JID string) error
JobID: JID,
Steps: validSteps,
}
b, _ := json.Marshal(rEnvelope)
log.Noticef("cooking sprout %s: %s", sproutID, JID)
var ack types.Ack
err = ec.Request("grlx.sprouts."+sproutID+".cook", rEnvelope, &ack, 30*time.Second)
msg, err := conn.Request("grlx.sprouts."+sproutID+".cook", b, 30*time.Second)
if err != nil {
return err
}
err = json.Unmarshal(msg.Data, &ack)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions cook/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import (
"github.com/gogrlx/grlx/types"
)

var ec *nats.EncodedConn
var conn *nats.Conn

func RegisterEC(n *nats.EncodedConn) {
ec = n
func RegisterNatsConn(n *nats.Conn) {
conn = n
}

func makeRecipeSteps(recipes map[string]interface{}) ([]*types.Step, error) {
Expand Down
22 changes: 13 additions & 9 deletions cook/sproutcook.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cook

import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
Expand Down Expand Up @@ -57,7 +58,9 @@ func CookRecipeEnvelope(envelope types.RecipeEnvelope) error {
select {
// each time a step completes, check if any other steps can be started
case completion := <-completionChan:
ec.Publish("grlx.cook."+pki.GetSproutID()+"."+envelope.JobID, completion)
b, _ := json.Marshal(completion)

conn.Publish("grlx.cook."+pki.GetSproutID()+"."+envelope.JobID, b)
log.Infof("Step %s completed with status %v", completion.ID, completion)
wg.Done()
// TODO also collect the results of the step and store them into a log folder by JID
Expand Down Expand Up @@ -143,14 +146,15 @@ func CookRecipeEnvelope(envelope types.RecipeEnvelope) error {
}
// All steps are done, so context will be cancelled and we'll exit
case <-ctx.Done():
ec.Publish("grlx.cook."+pki.GetSproutID()+"."+envelope.JobID,
types.StepCompletion{
ID: types.StepID(fmt.Sprintf("completed-%s", envelope.JobID)),
CompletionStatus: types.StepCompleted,
ChangesMade: false,
Changes: nil,
},
)
completion := types.StepCompletion{
ID: types.StepID(fmt.Sprintf("completed-%s", envelope.JobID)),
CompletionStatus: types.StepCompleted,
ChangesMade: false,
Changes: nil,
}
b, _ := json.Marshal(completion)

conn.Publish("grlx.cook."+pki.GetSproutID()+"."+envelope.JobID, b)
log.Info("All steps completed")
return nil
// TODO add a timeout case
Expand Down
14 changes: 10 additions & 4 deletions ingredients/cmd/interactive.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"math"
Expand All @@ -21,18 +22,23 @@ import (
"github.com/gogrlx/grlx/types"
)

var ec *nats.EncodedConn
var nc *nats.Conn

func RegisterEC(encodedConn *nats.EncodedConn) {
ec = encodedConn
func RegisterNatsConn(conn *nats.Conn) {
nc = conn
}

var envMutex sync.Mutex

func FRun(target types.KeyManager, cmdRun types.CmdRun) (types.CmdRun, error) {
topic := "grlx.sprouts." + target.SproutID + ".cmd.run"
var results types.CmdRun
err := ec.Request(topic, cmdRun, &results, time.Second*15+cmdRun.Timeout)
b, _ := json.Marshal(cmdRun)
msg, err := nc.Request(topic, b, time.Second*15+cmdRun.Timeout)
if err != nil {
return results, err
}
err = json.Unmarshal(msg.Data, &results)
return results, err
}

Expand Down
7 changes: 6 additions & 1 deletion ingredients/test/ping.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package test

import (
"encoding/json"
"time"

"github.com/gogrlx/grlx/types"
Expand All @@ -12,7 +13,11 @@ func FPing(target types.KeyManager, ping types.PingPong) (types.PingPong, error)
ping.Ping = true
ping.Pong = false
var pong types.PingPong
err := ec.Request(topic, ping, &pong, time.Second*15)
b, _ := json.Marshal(ping)
msg, err := nc.Request(topic, b, time.Second*15)
if err != nil {
err = json.Unmarshal(msg.Data, &pong)
}
return pong, err
}

Expand Down
6 changes: 3 additions & 3 deletions ingredients/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package test

import nats "github.com/nats-io/nats.go"

var ec *nats.EncodedConn
var nc *nats.Conn

func RegisterEC(n *nats.EncodedConn) {
ec = n
func RegisterNatsConn(n *nats.Conn) {
nc = n
}
10 changes: 5 additions & 5 deletions jobs/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ import (

// Job represents a job

var ec *nats.EncodedConn
var nc *nats.Conn

func RegisterEC(conn *nats.EncodedConn) {
ec = conn
_, err := ec.Subscribe("grlx.cook.*.*", logJobs)
func RegisterNatsConn(conn *nats.Conn) {
nc = conn
_, err := nc.Subscribe("grlx.cook.*.*", logJobs)
if err != nil {
log.Error(err)
}
_, err = ec.Subscribe("grlx.sprouts.*.cook", logJobCreation)
_, err = nc.Subscribe("grlx.sprouts.*.cook", logJobCreation)
if err != nil {
log.Error(err)
}
Expand Down

0 comments on commit 6ec2aa2

Please sign in to comment.