Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to a single large stream with multiple consumers #104

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions cmd/sse/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,14 @@ func main() {

var streamer stream.Streamer
if cfg.RabbitMQURI() != "" {
log.Info("Starting up a RabbitMQStreamer")
streamer, err = stream.NewRabbitMQStreamer(*rabbitMQStream.NewEnvironmentOptions().SetUri(cfg.RabbitMQURI()), log)
if cfg.RabbitMQStreamName() == "" {
log.Fatal("ETOS_RABBITMQ_STREAM_NAME must be set for the SSE server to work.")
}
log.Infof("Starting up a RabbitMQStreamer with stream name: %s", cfg.RabbitMQStreamName())
streamer, err = stream.NewRabbitMQStreamer(*rabbitMQStream.NewEnvironmentOptions().SetUri(cfg.RabbitMQURI()), log, cfg.RabbitMQStreamName())
if err := streamer.CreateStream(ctx, log, cfg.RabbitMQStreamName()); err != nil {
log.Fatal(err.Error())
}
} else {
log.Warning("RabbitMQURI is not set, defaulting to FileStreamer")
streamer, err = stream.NewFileStreamer(100*time.Millisecond, log)
Expand Down
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/maxcnunes/httpfake v1.2.4
github.com/package-url/packageurl-go v0.1.3
github.com/rabbitmq/amqp091-go v1.10.0
github.com/rabbitmq/rabbitmq-stream-go-client v1.4.10
github.com/rabbitmq/rabbitmq-stream-go-client v1.5.0
github.com/sethvargo/go-retry v0.3.0
github.com/sirupsen/logrus v1.9.3
github.com/snowzach/rotatefilehook v0.0.0-20220211133110-53752135082d
Expand Down Expand Up @@ -102,12 +102,12 @@ require (
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.17.0 // indirect
golang.org/x/crypto v0.26.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sys v0.23.0 // indirect
golang.org/x/term v0.23.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/term v0.27.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect
Expand Down
44 changes: 22 additions & 22 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af h1:kmjWCqn2qkEml422C2Rrd27c3VGxi6a/6HNq8QmHRKM=
github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo=
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad h1:a6HEuzUHeKH6hwfN/ZoQgRgVIWFJljSWa/zetS2WTvg=
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand Down Expand Up @@ -193,10 +193,10 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA=
github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To=
github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk=
github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0=
github.com/onsi/ginkgo/v2 v2.22.2 h1:/3X8Panh8/WwhU/3Ssa6rCKqPLuAkVY2I0RoyDLySlU=
github.com/onsi/ginkgo/v2 v2.22.2/go.mod h1:oeMosUL+8LtarXBHu/c0bx2D/K9zyQ6uX3cTyztHwsk=
github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8=
github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/package-url/packageurl-go v0.1.3 h1:4juMED3hHiz0set3Vq3KeQ75KD1avthoXLtmE3I0PLs=
github.com/package-url/packageurl-go v0.1.3/go.mod h1:nKAWB8E6uk1MHqiS/lQb9pYBGH2+mdJ2PJc2s50dQY0=
Expand Down Expand Up @@ -230,8 +230,8 @@ github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3x
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
github.com/rabbitmq/rabbitmq-stream-go-client v1.4.10 h1:1kDn/orisEbfMtxdZwWKpxX9+FahnzoRCuGCLZ66fAc=
github.com/rabbitmq/rabbitmq-stream-go-client v1.4.10/go.mod h1:SdWsW0K5FVo8lIx0lCH17wh7RItXEQb8bfpxVlTVqS8=
github.com/rabbitmq/rabbitmq-stream-go-client v1.5.0 h1:2UWryxhtQmYA3Bx2iajQCre3yQbARiSikpC/8iWbu3k=
github.com/rabbitmq/rabbitmq-stream-go-client v1.5.0/go.mod h1:KDXSNVSqj4QNg6TNMBnQQ/oWHaxLjUI1520j68SyEcY=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
Expand Down Expand Up @@ -328,8 +328,8 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
Expand All @@ -350,8 +350,8 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand All @@ -364,8 +364,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand All @@ -381,15 +381,15 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM=
golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU=
golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q=
golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand All @@ -400,8 +400,8 @@ golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBn
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8=
golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
12 changes: 10 additions & 2 deletions internal/config/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,21 @@ import (
type SSEConfig interface {
Config
RabbitMQURI() string
RabbitMQStreamName() string
}

type sseCfg struct {
Config
rabbitmqURI string
rabbitmqURI string
rabbitmqStreamName string
}

// NewSSEConfig creates a sse config interface based on input parameters or environment variables.
func NewSSEConfig() SSEConfig {
var conf sseCfg

flag.StringVar(&conf.rabbitmqURI, "rabbitmquri", os.Getenv("ETOS_RABBITMQ_URI"), "URI to the RabbitMQ ")
flag.StringVar(&conf.rabbitmqURI, "rabbitmquri", os.Getenv("ETOS_RABBITMQ_URI"), "URI to the RabbitMQ")
flag.StringVar(&conf.rabbitmqStreamName, "rabbitmqstreamname", os.Getenv("ETOS_RABBITMQ_STREAM_NAME"), "Stream name to use with RabbitMQ.")
base := load()
conf.Config = base
flag.Parse()
Expand All @@ -45,3 +48,8 @@ func NewSSEConfig() SSEConfig {
func (c *sseCfg) RabbitMQURI() string {
return c.rabbitmqURI
}

// RabbitMQStreamName returns the stream name to use with RabbitMQ.
func (c *sseCfg) RabbitMQStreamName() string {
return c.rabbitmqStreamName
}
12 changes: 6 additions & 6 deletions internal/stream/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@ const IgnoreUnfiltered = false
type RabbitMQStreamer struct {
environment *stream.Environment
logger *logrus.Entry
streamName string
}

// NewRabbitMQStreamer creates a new RabbitMQ streamer. Only a single connection should be created.
func NewRabbitMQStreamer(options stream.EnvironmentOptions, logger *logrus.Entry) (Streamer, error) {
func NewRabbitMQStreamer(options stream.EnvironmentOptions, logger *logrus.Entry, streamName string) (Streamer, error) {
env, err := stream.NewEnvironment(&options)
if err != nil {
log.Fatal(err)
}
return &RabbitMQStreamer{environment: env, logger: logger}, err
return &RabbitMQStreamer{environment: env, logger: logger, streamName: streamName}, err
}

// CreateStream creates a new RabbitMQ stream.
Expand All @@ -52,16 +53,15 @@ func (s *RabbitMQStreamer) CreateStream(ctx context.Context, logger *logrus.Entr
// This will create the stream if not already created.
return s.environment.DeclareStream(name,
&stream.StreamOptions{
// TODO: More sane numbers
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
MaxAge: time.Second * 10,
MaxAge: time.Hour * 48,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the configuration parameter instead (test_result_timeout or similar)? I. e. max testrun duration * 2.

},
)
}

// NewStream creates a new stream struct to consume from.
func (s *RabbitMQStreamer) NewStream(ctx context.Context, logger *logrus.Entry, name string) (Stream, error) {
exists, err := s.environment.StreamExists(name)
exists, err := s.environment.StreamExists(s.streamName)
if err != nil {
return nil, err
}
Expand All @@ -73,7 +73,7 @@ func (s *RabbitMQStreamer) NewStream(ctx context.Context, logger *logrus.Entry,
SetConsumerName(name).
SetCRCCheck(false).
SetOffset(stream.OffsetSpecification{}.First())
return &RabbitMQStream{ctx: ctx, logger: logger, streamName: name, environment: s.environment, options: options}, nil
return &RabbitMQStream{ctx: ctx, logger: logger, streamName: s.streamName, environment: s.environment, options: options}, nil
}

// Close the RabbitMQ connection.
Expand Down
1 change: 0 additions & 1 deletion pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func New(data []byte) (Event, error) {
}
e.Data = string(data)
return e, nil

}

// Write an event to a writer object
Expand Down
20 changes: 13 additions & 7 deletions pkg/sse/v2alpha/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ func New(ctx context.Context, cfg config.SSEConfig, log *logrus.Entry, streamer
// LoadRoutes loads all the v2alpha routes.
func (a Application) LoadRoutes(router *httprouter.Router) {
handler := &Handler{a.logger, a.cfg, a.ctx, a.streamer}
router.GET("/v2alpha/selftest/ping", handler.Selftest)
router.GET("/v2alpha/events/:identifier", a.authorizer.Middleware(scope.StreamSSE, handler.GetEvents))
router.POST("/v2alpha/stream/:identifier", a.authorizer.Middleware(scope.DefineSSE, handler.CreateStream))
router.GET("/sse/v2alpha/selftest/ping", handler.Selftest)
router.GET("/sse/v2alpha/events/:identifier", a.authorizer.Middleware(scope.StreamSSE, handler.GetEvents))
router.POST("/sse/v2alpha/stream/:identifier", a.authorizer.Middleware(scope.DefineSSE, handler.CreateStream))
}

// Selftest is a handler to just return 204.
Expand All @@ -102,16 +102,14 @@ type ErrorEvent struct {
}

// subscribe subscribes to stream and gets logs and events from it and writes them to a channel.
func (h Handler) subscribe(ctx context.Context, logger *logrus.Entry, streamer stream.Stream, ch chan<- events.Event, counter int, filter []string) {
func (h Handler) subscribe(ctx context.Context, logger *logrus.Entry, streamer stream.Stream, ch chan<- events.Event, lastID int, filter []string) {
defer close(ch)
var err error

consumeCh := make(chan []byte, 0)

offset := -1
if counter > 1 {
offset = counter
}
counter := 1 // lastID will default to 1 and the first event will be 1

closed, err := streamer.WithChannel(consumeCh).WithOffset(offset).WithFilter(filter).Consume(ctx)
if err != nil {
Expand All @@ -138,6 +136,14 @@ func (h Handler) subscribe(ctx context.Context, logger *logrus.Entry, streamer s
ch <- events.Event{Event: "error", Data: string(b)}
return
case msg := <-consumeCh:
// We have no reliable way of getting a specific offset on the SSE stream so
// we will need to iterate all events until we reach the last known ID.
if counter < lastID {
logger.Infof("Skipping event due to lastID=%d and counter=%d", lastID, counter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Risk of too much logging?

counter++
continue
}

event, err = events.New(msg)
if err != nil {
logger.WithError(err).Error("failed to parse SSE event")
Expand Down
5 changes: 1 addition & 4 deletions python/src/etos_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""ETOS API module."""

import os
from importlib.metadata import PackageNotFoundError, version

Expand All @@ -38,10 +39,6 @@
from .library.providers.register import RegisterProviders
from .main import APP

# The API shall not send logs to RabbitMQ as it
# is too early in the ETOS test run.
os.environ["ETOS_ENABLE_SENDING_LOGS"] = "false"

try:
VERSION = version("etos_api")
except PackageNotFoundError:
Expand Down
12 changes: 7 additions & 5 deletions python/src/etos_api/library/context_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""ETOS API context based logging."""

import logging
from contextvars import ContextVar
from etos_lib.logging.logger import FORMAT_CONFIG
Expand All @@ -32,46 +33,47 @@ class ContextLogging(logging.Logger):
get for each logging method called.
"""

# Default identifier is 'Unknown' which is ignored by the ETOS internal messagebus.
identifier = ContextVar("identifier")

def critical(self, msg, *args, **kwargs):
"""Add identifier to critical calls.

For documentation read :obj:`logging.Logger.critical`
"""
FORMAT_CONFIG.identifier = self.identifier.get("Main") # Default=Main
FORMAT_CONFIG.identifier = self.identifier.get("Unknown") # Default=Unknown
return super().critical(msg, *args, **kwargs)

def error(self, msg, *args, **kwargs):
"""Add identifier to error calls.

For documentation read :obj:`logging.Logger.error`
"""
FORMAT_CONFIG.identifier = self.identifier.get("Main") # Default=Main
FORMAT_CONFIG.identifier = self.identifier.get("Unknown") # Default=Unknown
return super().error(msg, *args, **kwargs)

def warning(self, msg, *args, **kwargs):
"""Add identifier to warning calls.

For documentation read :obj:`logging.Logger.warning`
"""
FORMAT_CONFIG.identifier = self.identifier.get("Main") # Default=Main
FORMAT_CONFIG.identifier = self.identifier.get("Unknown") # Default=Unknown
return super().warning(msg, *args, **kwargs)

def info(self, msg, *args, **kwargs):
"""Add identifier to info calls.

For documentation read :obj:`logging.Logger.info`
"""
FORMAT_CONFIG.identifier = self.identifier.get("Main") # Default=Main
FORMAT_CONFIG.identifier = self.identifier.get("Unknown") # Default=Unknown
return super().info(msg, *args, **kwargs)

def debug(self, msg, *args, **kwargs):
"""Add identifier to debug calls.

For documentation read :obj:`logging.Logger.debug`
"""
FORMAT_CONFIG.identifier = self.identifier.get("Main") # Default=Main
FORMAT_CONFIG.identifier = self.identifier.get("Unknown") # Default=Unknown
return super().debug(msg, *args, **kwargs)


Expand Down
Loading