Skip to content

Commit 5e5cb60

Browse files
YshayyYshay YaacobiYshay Yaacobi
authored
WIP - V2 alpha (#1)
Co-authored-by: Yshay Yaacobi <[email protected]> Co-authored-by: Yshay Yaacobi <[email protected]>
1 parent 060d2cb commit 5e5cb60

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+2343
-487
lines changed

.dockerignore

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
integration/**/*
2+
Dockerfile
3+
docs/**/*
4+
benchmark/**/*
5+
.vscode

.github/workflows/push.yaml

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
on: push
2+
jobs:
3+
test:
4+
runs-on: ubuntu-latest
5+
steps:
6+
- uses: actions/checkout@v2
7+
- name: install latest buildx
8+
run: mkdir -p ~/.docker/cli-plugins && wget -O ~/.docker/cli-plugins/docker-buildx https://github.com/docker/buildx/releases/download/v0.4.1/buildx-v0.4.1.linux-amd64 && chmod a+x ~/.docker/cli-plugins/docker-buildx
9+
- name: build
10+
env:
11+
DOCKER_BUILDKIT: 1
12+
DOCKER_CLI_EXPERIMENTAL: enabled
13+
working-directory: ./integration/docker
14+
run: docker buildx bake -f ./docker-compose.base.yaml -f ./docker-compose.worker.yaml -f ./docker-compose.producer.yaml --load
15+
- name: test-multi
16+
working-directory: ./integration/tests
17+
run: ./multi.sh
18+
- name: Push to GitHub Packages
19+
uses: docker/build-push-action@v1
20+
env:
21+
DOCKER_BUILDKIT: 1
22+
with:
23+
username: ${{ github.actor }}
24+
password: ${{ secrets.GITHUB_TOKEN }}
25+
registry: docker.pkg.github.com
26+
repository: soluto/dqd/app
27+
tag_with_sha: ${{ github.ref == 'refs/heads/master'}}
28+
tag_with_ref: ${{ github.ref != 'refs/heads/master' || startsWith(github.ref, 'refs/tags/') }}
29+
- name: Push to Dockerhub
30+
uses: docker/build-push-action@v1
31+
env:
32+
DOCKER_BUILDKIT: 1
33+
with:
34+
username: solutodqd
35+
password: ${{ secrets.DOCKERHUB_TOKEN }}
36+
repository: soluto/dqd
37+
tag_with_sha: ${{ github.ref == 'refs/heads/master'}}
38+
tag_with_ref: ${{ github.ref != 'refs/heads/master' || startsWith(github.ref, 'refs/tags/') }}

.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@
1212
*.out
1313

1414
node_modules
15-
.vscode/launch.json
15+
.vscode/launch.json
16+
cloud-envs

Dockerfile

+25-13
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,30 @@
1-
# ------------- Build ----------------------
2-
FROM golang:1.10.1-stretch as build
1+
# syntax = docker/dockerfile:1.0-experimental
2+
FROM golang:1.14.0-alpine as builder
3+
RUN apk update && apk add --no-cache git ca-certificates && update-ca-certificates
34

4-
ENV CGO_ENABLED=0
5-
ENV GOOS=linux
6-
ENV GOBIN=$GOPATH/bin
7-
WORKDIR /src
5+
ENV UID=10001
6+
RUN adduser \
7+
--disabled-password \
8+
--gecos "" \
9+
--home "/nonexistent" \
10+
--shell "/sbin/nologin" \
11+
--no-create-home \
12+
--uid "${UID}" \
13+
appuser
814

15+
WORKDIR /src
16+
COPY go.mod go.sum ./
17+
RUN --mount=type=cache,target=/root/.cache/go-build go mod download
18+
RUN go mod verify
919
COPY . .
10-
RUN go get .
11-
RUN go build -o main .
1220

13-
# ------------- Release ----------------------
14-
FROM scratch as release
21+
RUN --mount=type=cache,target=/root/.cache/go-build CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /go/bin/dqd
22+
23+
FROM scratch
1524

16-
COPY --from=build /etc/ssl/certs/ /etc/ssl/certs/
17-
COPY --from=build /src/main /
18-
CMD [ "/main" ]
25+
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
26+
COPY --from=builder /go/bin/dqd /dqd
27+
COPY --from=builder /etc/passwd /etc/passwd
28+
COPY --from=builder /etc/group /etc/group
29+
USER appuser:appuser
30+
ENTRYPOINT ["/dqd"]

cmd/dqd.go

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package cmd
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"strings"
7+
8+
gofigure "github.com/NCAR/go-figure"
9+
"github.com/spf13/pflag"
10+
"github.com/spf13/viper"
11+
)
12+
13+
func ConfigurationError(err error) {
14+
fmt.Println(err)
15+
pflag.Usage()
16+
os.Exit(1)
17+
}
18+
19+
func Load() (*viper.Viper, error) {
20+
v := viper.New()
21+
configFiles := pflag.CommandLine.StringSlice("config", []string{"./dqd.yaml"}, "Load a DQD config file")
22+
configDirs := pflag.CommandLine.StringSlice("configDir", []string{"/etc/dqd", "/dqd/config"}, "Lookup for config files in these folders")
23+
configOverrides := pflag.CommandLine.StringSlice("set", []string{}, "Override specific configuration keys")
24+
pflag.Parse()
25+
v.SetConfigType("yaml")
26+
v.SetEnvPrefix("dqd")
27+
v.AutomaticEnv()
28+
err := gofigure.Parse(v, *configDirs)
29+
if err != nil {
30+
panic(fmt.Errorf("Fatal error config file: %s \n", err))
31+
}
32+
for _, f := range *configFiles {
33+
r, err := os.Open(f)
34+
defer r.Close()
35+
if err == nil {
36+
v.MergeConfig(r)
37+
} else {
38+
if f != "./dqd.yaml" {
39+
return nil, err
40+
}
41+
}
42+
43+
}
44+
45+
for _, override := range *configOverrides {
46+
entry := strings.Split(override, "=")
47+
if len(entry) != 2 {
48+
return nil, fmt.Errorf("invalid set value '%v'", override)
49+
}
50+
v.Set(entry[0], entry[1])
51+
}
52+
53+
return v, nil
54+
}

config/config.go

+173
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package config
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/soluto/dqd/handlers"
7+
"github.com/soluto/dqd/listeners"
8+
"github.com/soluto/dqd/pipe"
9+
"github.com/soluto/dqd/providers/azure"
10+
"github.com/soluto/dqd/providers/servicebus"
11+
"github.com/soluto/dqd/providers/sqs"
12+
"github.com/soluto/dqd/utils"
13+
v1 "github.com/soluto/dqd/v1"
14+
"github.com/spf13/viper"
15+
)
16+
17+
type App struct {
18+
Sources map[string]*v1.Source
19+
Listeners []listeners.Listener
20+
Workers []*pipe.Worker
21+
}
22+
23+
var sourceProviders = map[string]struct {
24+
v1.ConsumerFactory
25+
v1.ProducerFactory
26+
}{
27+
"azure-queue": {
28+
&azure.AzureQueueClientFactory{},
29+
&azure.AzureQueueClientFactory{},
30+
},
31+
"sqs": {
32+
&sqs.SQSClientFactory{},
33+
&sqs.SQSClientFactory{},
34+
},
35+
"service-bus": {
36+
&servicebus.ServiceBusClientFactory{},
37+
&servicebus.ServiceBusClientFactory{},
38+
},
39+
"io": {
40+
&utils.IoSourceFactory{},
41+
&utils.IoSourceFactory{},
42+
},
43+
}
44+
45+
func createSources(v *viper.Viper) map[string]*v1.Source {
46+
sources := map[string]*v1.Source{}
47+
for sourceName, subSource := range utils.ViperSubMap(v, "sources") {
48+
sourceType := subSource.GetString("type")
49+
factory, exist := sourceProviders[sourceType]
50+
if !exist {
51+
panic(fmt.Errorf("FATAL - Unkown source provider:%v", sourceType))
52+
}
53+
sources[sourceName] = v1.NewSource(factory, factory, subSource, sourceName)
54+
}
55+
return sources
56+
}
57+
58+
func getSource(sources map[string]*v1.Source, sourceName string) *v1.Source {
59+
source, exists := sources[sourceName]
60+
if !exists {
61+
panic(fmt.Sprintf("Missing source definition: %v", sourceName))
62+
}
63+
return source
64+
}
65+
66+
func getPipeSources(sources map[string]*v1.Source, v *viper.Viper) (pipeSources []*v1.Source) {
67+
sourcesConfig := v.GetStringSlice("sources")
68+
for _, s := range sourcesConfig {
69+
pipeSources = append(pipeSources, getSource(sources, s))
70+
}
71+
if len(pipeSources) == 0 {
72+
pipeSources = []*v1.Source{getSource(sources, v.GetString("source"))}
73+
}
74+
return
75+
}
76+
77+
func createHandler(v *viper.Viper) handlers.Handler {
78+
if v == nil {
79+
panic("no handler define for pipe, use 'none' handler if it's the desired behavior")
80+
}
81+
if v.Get("none") != nil {
82+
return handlers.None
83+
}
84+
v.SetDefault("http.path", "/")
85+
v.SetDefault("http.host", "localhost")
86+
v.SetDefault("http.port", 80)
87+
88+
httpEndpoint := v.GetString("http.endpoint")
89+
if httpEndpoint == "" {
90+
httpEndpoint = fmt.Sprintf("http://%v:%v%v", v.GetString("http.host"), v.GetString("http.port"), v.GetString("http.path"))
91+
}
92+
return handlers.NewHttpHandler(httpEndpoint)
93+
}
94+
95+
func createWorkers(v *viper.Viper, sources map[string]*v1.Source) []*pipe.Worker {
96+
var wList []*pipe.Worker
97+
pipesConfig := utils.ViperSubMap(v, "pipes")
98+
for name, pipeConfig := range pipesConfig {
99+
pipeConfig.SetDefault("rate.init", 10)
100+
pipeConfig.SetDefault("rate.min", 1)
101+
pipeConfig.SetDefault("rate.window", "30s")
102+
pipeConfig.SetDefault("source", "default")
103+
handler := createHandler(pipeConfig.Sub("handler"))
104+
105+
pipeSources := getPipeSources(sources, pipeConfig)
106+
107+
var opts = []pipe.WorkerOption{}
108+
writeToSource := pipeConfig.GetString("onError.writeTo.source")
109+
if writeToSource != "" {
110+
opts = append(opts, pipe.WithErrorSource(getSource(sources, writeToSource)))
111+
}
112+
113+
if pipeConfig.IsSet("rate.fixed") {
114+
opts = append(opts, pipe.WithFixedRate(pipeConfig.GetInt("rate.fixed")))
115+
} else {
116+
opts = append(opts, pipe.WithDynamicRate(pipeConfig.GetInt("rate.init"),
117+
pipeConfig.GetInt("rate.min"),
118+
pipeConfig.GetDuration("rate.window")))
119+
}
120+
output := pipeConfig.GetString("output")
121+
if output != "" {
122+
opts = append(opts, pipe.WithOutput(getSource(sources, output)))
123+
} else {
124+
opts = append(opts, pipe.WithDynamicRate(pipeConfig.GetInt("rate.init"),
125+
pipeConfig.GetInt("rate.min"),
126+
pipeConfig.GetDuration("rate.window")))
127+
}
128+
129+
wList = append(wList, pipe.NewWorker(
130+
name,
131+
pipeSources,
132+
handler,
133+
opts...,
134+
))
135+
}
136+
return wList
137+
}
138+
139+
func createListeners(v *viper.Viper, sources map[string]*v1.Source) []listeners.Listener {
140+
v.SetDefault("listeners.http.host", "0.0.0.0:9999")
141+
host := v.GetString("listeners.http.host")
142+
listener := listeners.Http(host)
143+
for _, s := range sources {
144+
listener.Add(s, viper.New())
145+
}
146+
return []listeners.Listener{listener}
147+
}
148+
149+
func CreateApp(v *viper.Viper) (_ *App, err error) {
150+
defer func() {
151+
if r := recover(); r != nil {
152+
err = fmt.Errorf("%v", r)
153+
}
154+
}()
155+
156+
err = utils.NormalizeEntityConfig(v, "pipe", "pipes")
157+
if err != nil {
158+
return
159+
}
160+
err = utils.NormalizeEntityConfig(v, "source", "sources")
161+
if err != nil {
162+
return
163+
}
164+
165+
sources := createSources(v)
166+
listeners := createListeners(v, sources)
167+
workers := createWorkers(v, sources)
168+
return &App{
169+
sources,
170+
listeners,
171+
workers,
172+
}, nil
173+
}

docs/examples/azure.yaml

Whitespace-only changes.

docs/index.md

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# DQD
2+

docs/sources/azure.md

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
2+
3+
```yaml
4+
source:
5+
type: azure-queue
6+
7+
# Location
8+
storageAccount: test
9+
queue: dqd
10+
#connection: http://azure:10001/devstoreaccount1 useful for local testing
11+
12+
# Credentials
13+
storageAccountKey: ****
14+
sasToken: ****
15+
16+
# Options
17+
visibilityTimeoutInSeconds: 100 # defaults to 60
18+
maxDequeueCount: 1 # deaults to 5
19+
retryVisiblityTimeoutInSeconds: [10, 500, 600]
20+
```

docs/sources/service-bus.md

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
```yaml
2+
source:
3+
type: service-bus
4+
5+
connectionString: ""
6+
topic: "my-topic"
7+
subscription: "my sub"
8+
9+
prefetchCount: 30 #defaults to 1
10+
```

docs/sources/sqs.md

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
2+
# SQS Source
3+
4+
Credentials are taken by the aws sdk defaukt provider chain:
5+
https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html
6+
7+
```yaml
8+
source:
9+
type: sqs
10+
11+
12+
# Location
13+
url:
14+
region: us-east-1
15+
#endpoint: http://sqs:9324 useful for local testing
16+
17+
# Options
18+
visibilityTimeoutInSeconds: 100 # defaults to 30
19+
```

0 commit comments

Comments
 (0)