Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add plumbing for future JSON schema changes #140

Merged
merged 3 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.15.0
v0.16.0
4 changes: 2 additions & 2 deletions api/get_alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type GetAlertRequest struct {
}

type GetAlertResponse struct {
Success bool `json:"success"`
Webhook *models.Webhook `json:"webhook"`
Success bool `json:"success"`
Webhook *models.WebhookV1 `json:"webhook"`
}

func GetAlertUsecase(
Expand Down
4 changes: 2 additions & 2 deletions api/get_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type GetPipelineRequest struct {
}

type GetPipelineResponse struct {
Success bool `json:"success"`
Flow *models.FlowGraph `json:"flow"`
Success bool `json:"success"`
Flow *models.FlowGraphV1 `json:"flow"`
}

func GetPipelineUsecase(
Expand Down
4 changes: 2 additions & 2 deletions api/save_alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
)

type SaveAlertRequest struct {
Alert string `path:"alert" minLength:"1"`
Webhook models.Webhook `json:"webhook"`
Alert string `path:"alert" minLength:"1"`
Webhook models.WebhookV1 `json:"webhook"`
}

type SaveAlertResponse struct {
Expand Down
4 changes: 2 additions & 2 deletions api/save_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
)

type SavePipelineRequest struct {
Pipeline string `path:"pipeline" minLength:"1"`
Flow models.FlowGraph `json:"flow"`
Pipeline string `path:"pipeline" minLength:"1"`
Flow models.FlowGraphV1 `json:"flow"`
}

type SavePipelineResponse struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/engines/pipelines/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func Build(ctx context.Context, configStorage *config.Storage, name string) (*Pi

var (
pipelineNodes = make(map[string]Node)
flowNodesByID = make(map[string]*models.FlowNode)
flowNodesByID = make(map[string]*models.FlowNodeV1)
sourceNodeTypes = make(map[string]string)
entrypointNodes = make(map[string]Node)
)
Expand Down
15 changes: 8 additions & 7 deletions internal/models/flow.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
package models

type FlowGraph struct {
Nodes []*FlowNode `json:"nodes"`
Edges []*FlowEdge `json:"edges"`
type FlowGraphV1 struct {
Version int `json:"version"`
Nodes []*FlowNodeV1 `json:"nodes"`
Edges []*FlowEdgeV1 `json:"edges"`
}

type FlowNode struct {
type FlowNodeV1 struct {
ID string `json:"id"`
Type string `json:"type"`
Position FlowPosition `json:"position"`
Position FlowPositionV1 `json:"position"`
Data map[string]string `json:"data"`
}

type FlowEdge struct {
type FlowEdgeV1 struct {
ID string `json:"id"`
Source string `json:"source"`
SourceHandle string `json:"sourceHandle"`
Target string `json:"target"`
}

type FlowPosition struct {
type FlowPositionV1 struct {
X float64 `json:"x"`
Y float64 `json:"y"`
}
32 changes: 32 additions & 0 deletions internal/models/flow_convert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package models

import (
"fmt"

"encoding/json"
)

func ConvertFlowGraph(content []byte) (*FlowGraphV1, bool, error) {
var data map[string]interface{}
if err := json.Unmarshal(content, &data); err != nil {
return nil, false, fmt.Errorf("failed to unmarshal flow: %w", err)
}

version, ok := data["version"].(float64)
if !ok || version == 0 {
version = 1
}

switch int(version) {
case 1:
objV1 := &FlowGraphV1{Version: 1}
if err := json.Unmarshal(content, objV1); err != nil {
return nil, false, fmt.Errorf("failed to unmarshal flow: %w", err)
}

return objV1, false, nil

default:
return nil, false, fmt.Errorf("unsupported flow version: %d", int(version))
}
}
5 changes: 3 additions & 2 deletions internal/models/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import (
"net/http"
)

type Webhook struct {
type WebhookV1 struct {
Version int `json:"version"`
Url string `json:"url"`
Headers map[string]string `json:"headers"`
}

func (w *Webhook) Call(ctx context.Context, record *LogRecord) error {
func (w *WebhookV1) Call(ctx context.Context, record *LogRecord) error {
payload, err := json.Marshal(record)
if err != nil {
return fmt.Errorf("failed to marshal log record: %w", err)
Expand Down
32 changes: 32 additions & 0 deletions internal/models/webhook_convert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package models

import (
"fmt"

"encoding/json"
)

func ConvertWebhook(content []byte) (*WebhookV1, bool, error) {
var data map[string]interface{}
if err := json.Unmarshal(content, &data); err != nil {
return nil, false, fmt.Errorf("failed to unmarshal webhook: %w", err)
}

version, ok := data["version"].(float64)
if !ok || version == 0 {
version = 1
}

switch int(version) {
case 1:
objV1 := &WebhookV1{Version: 1}
if err := json.Unmarshal(content, objV1); err != nil {
return nil, false, fmt.Errorf("failed to unmarshal webhook: %w", err)
}

return objV1, false, nil

default:
return nil, false, fmt.Errorf("unsupported webhook version: %d", int(version))
}
}
4 changes: 2 additions & 2 deletions internal/models/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestWebhook_Call(t *testing.T) {
}))
defer testServer.Close()

webhook := &models.Webhook{
webhook := &models.WebhookV1{
Url: testServer.URL,
Headers: map[string]string{
"Foo": "Bar",
Expand All @@ -41,7 +41,7 @@ func TestWebhook_Call_Failure(t *testing.T) {
}))
defer testServer.Close()

webhook := &models.Webhook{
webhook := &models.WebhookV1{
Url: testServer.URL,
Headers: map[string]string{},
}
Expand Down
32 changes: 22 additions & 10 deletions internal/storage/config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,21 +157,27 @@ func (s *Storage) ListPipelines(ctx context.Context) ([]string, error) {
return s.pipelineStore.ListFiles(ctx)
}

func (s *Storage) ReadPipeline(ctx context.Context, name string) (*models.FlowGraph, error) {
func (s *Storage) ReadPipeline(ctx context.Context, name string) (*models.FlowGraphV1, error) {
content, err := s.pipelineStore.ReadFile(ctx, name)
if err != nil {
return nil, err
}

var flowGraph *models.FlowGraph
if err := json.Unmarshal(content, &flowGraph); err != nil {
return nil, fmt.Errorf("failed to unmarshal flow: %w", err)
flowGraph, changed, err := models.ConvertFlowGraph(content)
if err != nil {
return nil, err
}

if changed {
if err := s.WritePipeline(ctx, name, flowGraph); err != nil {
return nil, fmt.Errorf("failed to write updated flow graph: %w", err)
}
}

return flowGraph, nil
}

func (s *Storage) WritePipeline(ctx context.Context, name string, flow *models.FlowGraph) error {
func (s *Storage) WritePipeline(ctx context.Context, name string, flow *models.FlowGraphV1) error {
content, err := json.Marshal(flow)
if err != nil {
return fmt.Errorf("failed to marshal flow: %w", err)
Expand All @@ -192,7 +198,7 @@ func (s *Storage) ListAlerts(ctx context.Context) ([]string, error) {
return s.alertStore.ListFiles(ctx)
}

func (s *Storage) ReadAlert(ctx context.Context, name string) (*models.Webhook, error) {
func (s *Storage) ReadAlert(ctx context.Context, name string) (*models.WebhookV1, error) {
b64content, err := s.alertStore.ReadFile(ctx, name)
if err != nil {
return nil, err
Expand All @@ -204,15 +210,21 @@ func (s *Storage) ReadAlert(ctx context.Context, name string) (*models.Webhook,
return nil, fmt.Errorf("failed to decode webhook %s: %w", name, err)
}

var webhook *models.Webhook
if err := json.Unmarshal(content[:n], &webhook); err != nil {
return nil, fmt.Errorf("failed to unmarshal webhook: %w", err)
webhook, changed, err := models.ConvertWebhook(content[:n])
if err != nil {
return nil, err
}

if changed {
if err := s.WriteAlert(ctx, name, webhook); err != nil {
return nil, fmt.Errorf("failed to write updated webhook: %w", err)
}
}

return webhook, nil
}

func (s *Storage) WriteAlert(ctx context.Context, name string, webhook *models.Webhook) error {
func (s *Storage) WriteAlert(ctx context.Context, name string, webhook *models.WebhookV1) error {
content, err := json.Marshal(webhook)
if err != nil {
return fmt.Errorf("failed to marshal webhook: %w", err)
Expand Down