diff --git a/go.work.sum b/go.work.sum index f02171c..600cae3 100644 --- a/go.work.sum +++ b/go.work.sum @@ -1,5 +1,6 @@ cel.dev/expr v0.15.0 h1:O1jzfJCQBfL5BFoYktaxwIhuttaQPsVWerH9/EEKx0w= cel.dev/expr v0.15.0/go.mod h1:TRSuuV7DlVCE/uwv5QbAiW/v8l5O8C4eEPHeu7gf7Sg= +cel.dev/expr v0.16.1/go.mod h1:AsGA5zb3WruAEQeQng1RZdGEXmBj0jvMWh6l5SnNuC8= cloud.google.com/go v0.110.0 h1:Zc8gqp3+a9/Eyph2KDmcGaPtbKRIoqq4YTlL4NMD0Ys= cloud.google.com/go v0.110.8/go.mod h1:Iz8AkXJf1qmxC3Oxoep8R1T36w8B92yU29PcBhHO5fk= cloud.google.com/go v0.110.10 h1:LXy9GEO+timppncPIAZoOj3l58LIU9k+kn48AN7IO3Y= @@ -169,6 +170,7 @@ cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGB cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc= cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= +cloud.google.com/go/compute/metadata v0.5.0/go.mod h1:aHnloV2TPI38yx4s9+wAZhHykWvVCfu7hQbF+9CWoiY= cloud.google.com/go/contactcenterinsights v1.10.0 h1:YR2aPedGVQPpFBZXJnPkqRj8M//8veIZZH5ZvICoXnI= cloud.google.com/go/contactcenterinsights v1.11.1 h1:dEfCjtdYjS3n8/1HEKbJaOL31l3dEs3q9aeaNsyrJBc= cloud.google.com/go/contactcenterinsights v1.11.1/go.mod h1:FeNP3Kg8iteKM80lMwSk3zZZKVxr+PGnAId6soKuXwE= @@ -753,12 +755,14 @@ github.com/cncf/xds/go v0.0.0-20240318125728-8a4994d93e50 h1:DBmgJDC9dTfkVyGgipa github.com/cncf/xds/go v0.0.0-20240318125728-8a4994d93e50/go.mod h1:5e1+Vvlzido69INQaVO6d87Qn543Xr6nooe9Kz7oBFM= github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b h1:ga8SEFjZ60pxLcmhnThWgvH2wg8376yUJmPhEH4H3kw= github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= +github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9 h1:uDmaGzcdjhF4i/plgjmEsriH11Y0o7RKapEf/LDaM3w= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815 h1:bWDMxwH3px2JBh6AyO7hdCn/PkvCZXii8TGj7sbtEbQ= @@ -778,11 +782,13 @@ github.com/envoyproxy/go-control-plane v0.11.1 h1:wSUXTlLfiAQRWs2F+p+EKOY9rUyis1 github.com/envoyproxy/go-control-plane v0.11.1/go.mod h1:uhMcXKCQMEJHiAb0w+YGefQLaTEw+YhGluxZkrTmD0g= github.com/envoyproxy/go-control-plane v0.12.0 h1:4X+VP1GHd1Mhj6IB5mMeGbLCleqxjletLK6K0rbxyZI= github.com/envoyproxy/go-control-plane v0.12.0/go.mod h1:ZBTaoJ23lqITozF0M6G4/IragXCQKCnYbmlmtHvwRG0= +github.com/envoyproxy/go-control-plane v0.13.0/go.mod h1:GRaKG3dwvFoTg4nj7aXdZnvMg4d7nvT/wl9WgVXn3Q8= github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A= github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA= github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE= github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= +github.com/envoyproxy/protoc-gen-validate v1.1.0/go.mod h1:sXRDRVmzEbkM7CVcM06s9shE/m23dg3wzjl0UWqJ2q4= github.com/fogleman/gg v1.3.0 h1:/7zJX8F6AaYQc57WQCyN9cAIz+4bCJGO9B+dyW29am8= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/go-fonts/dejavu v0.1.0 h1:JSajPXURYqpr+Cu8U9bt8K+XcACIHWqWrvWCKyeFmVQ= @@ -809,9 +815,11 @@ github.com/golang/glog v1.2.0 h1:uCdmnmatrKCgMBlM4rMuJZWOkPDqdbZPnrMXDY4gI68= github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= github.com/golang/glog v1.2.1 h1:OptwRhECazUx5ix5TTWC3EZhsZEHWcYWY4FQHTIubm4= github.com/golang/glog v1.2.1/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= +github.com/golang/glog v1.2.2/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= @@ -950,6 +958,7 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/sftp v1.13.1 h1:I2qBYMChEhIjOgazfJmV3/mZM256btk6wkCDRmW7JYs= github.com/pkg/sftp v1.13.6 h1:JFZT4XbOU7l77xGSpOdW+pwIMqP044IyjXX6FGyEKFo= github.com/pkg/sftp v1.13.6/go.mod h1:tz1ryNURKu77RL+GuCzmoJYxQczL3wLNNpPWagdg4Qk= +github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= github.com/rabbitmq/amqp091-go v1.8.1 h1:RejT1SBUim5doqcL6s7iN6SBmsQqyTgXb1xMlH0h1hA= @@ -970,6 +979,7 @@ github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A= github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/rs/zerolog v1.32.0 h1:keLypqrlIjaFsbmJOBdB/qvyF8KEtCWHwobLp5l/mQ0= github.com/rs/zerolog v1.32.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= +github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ruudk/golang-pdf417 v0.0.0-20201230142125-a7e3863a1245 h1:K1Xf3bKttbF+koVGaX5xngRIZ5bVjbmPnaxE/dR08uY= @@ -988,10 +998,12 @@ github.com/urfave/cli/v2 v2.23.0 h1:pkly7gKIeYv3olPAeNajNpLjeJrmTPYCoZWaV+2VfvE= github.com/urfave/cli/v2 v2.23.0/go.mod h1:1CNUng3PtjQMtRzJO4FMXBQvkGtuYRxxiR9xMa7jMwI= github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs= github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ= +github.com/urfave/cli/v2 v2.27.4/go.mod h1:m4QzxcD2qpra4z7WhzEGn74WZLViBnMpb1ToCAKdGRQ= github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8Ydu2Bstc= github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= +github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= github.com/yuin/goldmark v1.2.1 h1:ruQGxdhGHe7FWOJPT0mKs5+pD2Xs1Bm/kdGlHO04FmM= github.com/yuin/goldmark v1.3.5 h1:dPmz1Snjq0kmkz159iL7S6WzdahUTHnHB5M56WFVifs= github.com/yuin/goldmark v1.4.13 h1:fVcFKWvrslecOb/tg+Cc05dkeYx540o0FuFt3nUVDoE= @@ -1062,6 +1074,7 @@ golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/mod v0.19.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/mod v0.20.0 h1:utOm6MM3R3dnawAiJgn0y+xvuYRsm1RKM/4giyfDgV0= golang.org/x/mod v0.20.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.22.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= @@ -1076,6 +1089,7 @@ golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs= golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/oauth2 v0.22.0 h1:BzDx2FehcG7jJwgWLELCdmLuxk2i+x9UDpSiss2u0ZA= golang.org/x/oauth2 v0.22.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -1099,6 +1113,7 @@ golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4= golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU= golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= +golang.org/x/term v0.26.0/go.mod h1:Si5m1o57C5nBNQo5z1iq+XDijt21BDBDp2bK0QI8e3E= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= @@ -1118,6 +1133,7 @@ golang.org/x/tools v0.22.0/go.mod h1:aCwcsjqvq7Yqt6TNyX7QMU2enbQ/Gt0bo6krSeEri+c golang.org/x/tools v0.23.0/go.mod h1:pnu6ufv6vQkll6szChhK3C3L/ruaIv5eBeztNG8wtsI= golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24= golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ= +golang.org/x/tools v0.27.0/go.mod h1:sUi0ZgbwW9ZPAq26Ekut+weQPR5eIM6GQLQ1Yjm1H0Q= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E= diff --git a/natsjobs/driver.go b/natsjobs/driver.go index 4f33bf1..ad4f1ad 100644 --- a/natsjobs/driver.go +++ b/natsjobs/driver.go @@ -256,10 +256,10 @@ func (c *Driver) Push(ctx context.Context, job jobs.Message) error { return errors.E(op, err) } - _, err = c.jetstream.PublishMsg(ctx, &nats.Msg{ - Data: data, - Subject: c.subject, - }) + msg := nats.NewMsg(c.subject) + msg.Data = data + msg.Header = j.headers + _, err = c.jetstream.PublishMsg(ctx, msg) if err != nil { return errors.E(op, err) } diff --git a/natsjobs/listener.go b/natsjobs/listener.go index f2b4d9b..3f9c7d3 100644 --- a/natsjobs/listener.go +++ b/natsjobs/listener.go @@ -76,7 +76,7 @@ func (c *Driver) listenerStart() { //nolint:gocognit } item := &Item{} - c.unpack(m.Data(), item) + c.unpack(m.Data(), m.Headers(), item) ctx := c.prop.Extract(context.Background(), propagation.HeaderCarrier(item.headers)) ctx, span := c.tracer.Tracer(tracerName).Start(ctx, "nats_listener") diff --git a/natsjobs/unpack.go b/natsjobs/unpack.go index bde2786..0fd3d21 100644 --- a/natsjobs/unpack.go +++ b/natsjobs/unpack.go @@ -11,14 +11,15 @@ const ( auto string = "deduced_by_rr" ) -func (c *Driver) unpack(data []byte, item *Item) { +func (c *Driver) unpack(data []byte, headers map[string][]string, item *Item) { err := json.Unmarshal(data, item) + item.headers = headers if err != nil { *item = Item{ Job: auto, Ident: uuid.NewString(), Payload: data, - headers: make(map[string][]string, 2), + headers: headers, Options: &Options{ Priority: (*c.pipeline.Load()).Priority(), Pipeline: (*c.pipeline.Load()).Name(), diff --git a/tests/configs/.rr-nats-headers.yaml b/tests/configs/.rr-nats-headers.yaml new file mode 100644 index 0000000..6c41b51 --- /dev/null +++ b/tests/configs/.rr-nats-headers.yaml @@ -0,0 +1,36 @@ +version: '3' + +rpc: + listen: tcp://127.0.0.1:6464 + +server: + command: "php php_test_files/jobs/jobs_ok_headers.php" + relay: "pipes" + relay_timeout: "20s" + +nats: + addr: "nats://127.0.0.1:4222" + +logs: + level: debug + mode: development + +jobs: + num_pollers: 1 + pool: + num_workers: 2 + allocate_timeout: 60s + destroy_timeout: 60s + + pipelines: + test-1: + driver: nats + config: + prefetch: 100 + subject: "testheaders.*" + stream: "headers-test" + delete_after_ack: true + deliver_new: true + priority: 1 + + consume: [ "test-1" ] \ No newline at end of file diff --git a/tests/env/docker-compose-nats.yaml b/tests/env/docker-compose-nats.yaml index 613424c..dfbe3d6 100644 --- a/tests/env/docker-compose-nats.yaml +++ b/tests/env/docker-compose-nats.yaml @@ -1,5 +1,3 @@ -version: "3.8" - services: nats: image: nats:latest diff --git a/tests/jobs_nats_test.go b/tests/jobs_nats_test.go index c2f1dcb..4b18881 100644 --- a/tests/jobs_nats_test.go +++ b/tests/jobs_nats_test.go @@ -39,6 +39,93 @@ import ( "go.uber.org/zap" ) +func TestNATSHeaders(t *testing.T) { + cont := endure.New(slog.LevelDebug) + + cfg := &config.Plugin{ + Version: "v2024.2.0", + Path: "configs/.rr-nats-headers.yaml", + Prefix: "rr", + } + + l, oLogger := mocklogger.ZapTestLogger(zap.DebugLevel) + err := cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + l, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &natsPlugin.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + t.Run("PushPipeline", helpers.PushToPipe("test-1", false, "127.0.0.1:6464")) + time.Sleep(time.Second) + + t.Run("DestroyPipeline", helpers.DestroyPipelines("127.0.0.1:6464", "test-1")) + stopCh <- struct{}{} + wg.Wait() + + require.Equal(t, 1, oLogger.FilterMessageSnippet("job was pushed successfully").Len()) + require.Equal(t, 1, oLogger.FilterMessageSnippet("job processing was started").Len()) + require.Equal(t, 1, oLogger.FilterMessageSnippet("job was processed successfully").Len()) + require.Equal(t, 1, oLogger.FilterMessageSnippet("pipeline was stopped").Len()) + + t.Cleanup(func() { + errc := helpers.CleanupNats("nats://127.0.0.1:4222", "headers-test") + if errc != nil { + t.Log(errc) + } + }) +} func TestNATSInit(t *testing.T) { cont := endure.New(slog.LevelDebug) diff --git a/tests/php_test_files/jobs/jobs_create_memory.php b/tests/php_test_files/jobs/jobs_create_memory.php index 3b56d69..7c4bc11 100644 --- a/tests/php_test_files/jobs/jobs_create_memory.php +++ b/tests/php_test_files/jobs/jobs_create_memory.php @@ -43,5 +43,5 @@ $consumer = new Spiral\RoadRunner\Jobs\Consumer(); while ($task = $consumer->waitTask()) { - $task->complete(); + $task->ack(); } diff --git a/tests/php_test_files/jobs/jobs_err.php b/tests/php_test_files/jobs/jobs_err.php index e93f8f4..3b0ceb8 100644 --- a/tests/php_test_files/jobs/jobs_err.php +++ b/tests/php_test_files/jobs/jobs_err.php @@ -21,9 +21,9 @@ $total_attempts = (int)$task->getHeaderLine("attempts") + 1; if ($total_attempts > 3) { - $task->complete(); + $task->ack(); } else { - $task->withHeader("attempts",$total_attempts)->withDelay(5)->fail("failed", true); + $task->withHeader("attempts", $total_attempts)->withDelay(5)->nack("failed", true); } } catch (\Throwable $e) { $task->error((string)$e); diff --git a/tests/php_test_files/jobs/jobs_ok.php b/tests/php_test_files/jobs/jobs_ok.php index f6cdc47..714209a 100644 --- a/tests/php_test_files/jobs/jobs_ok.php +++ b/tests/php_test_files/jobs/jobs_ok.php @@ -17,7 +17,7 @@ while ($task = $consumer->waitTask()) { try { - $task->complete(); + $task->ack(); } catch (\Throwable $e) { $task->error((string)$e); } diff --git a/tests/php_test_files/jobs/jobs_ok_headers.php b/tests/php_test_files/jobs/jobs_ok_headers.php new file mode 100644 index 0000000..44a1a0b --- /dev/null +++ b/tests/php_test_files/jobs/jobs_ok_headers.php @@ -0,0 +1,32 @@ +waitTask()) { + try { + $h = $task->getHeader('test')[0] ?? 'undefined'; + if ("test2" !== $h) { + throw new RuntimeException(sprintf( + "Expected header '%s', got '%s'", + "test2", + $h + )); + } + $task->ack(); + } catch (\Throwable $e) { + $task->error((string)$e); + } +} diff --git a/tests/php_test_files/jobs/jobs_ok_pq.php b/tests/php_test_files/jobs/jobs_ok_pq.php index 3fdd95b..c170a1e 100644 --- a/tests/php_test_files/jobs/jobs_ok_pq.php +++ b/tests/php_test_files/jobs/jobs_ok_pq.php @@ -18,7 +18,7 @@ while ($task = $consumer->waitTask()) { try { sleep(15); - $task->complete(); + $task->ack(); } catch (\Throwable $e) { $task->error((string)$e); } diff --git a/tests/php_test_files/jobs/jobs_ok_queue_name_exist.php b/tests/php_test_files/jobs/jobs_ok_queue_name_exist.php index e80f6d3..6c70185 100644 --- a/tests/php_test_files/jobs/jobs_ok_queue_name_exist.php +++ b/tests/php_test_files/jobs/jobs_ok_queue_name_exist.php @@ -21,7 +21,7 @@ throw new RuntimeException('Queue name was not found'); } - $task->complete(); + $task->ack(); } catch (\Throwable $e) { $task->error((string)$e); } diff --git a/tests/php_test_files/jobs/jobs_ok_sleep1.php b/tests/php_test_files/jobs/jobs_ok_sleep1.php index 4733a77..12e158a 100644 --- a/tests/php_test_files/jobs/jobs_ok_sleep1.php +++ b/tests/php_test_files/jobs/jobs_ok_sleep1.php @@ -18,7 +18,7 @@ while ($task = $consumer->waitTask()) { try { sleep(1); - $task->complete(); + $task->ack(); } catch (\Throwable $e) { $task->error((string)$e); } diff --git a/tests/php_test_files/jobs/jobs_ok_slow.php b/tests/php_test_files/jobs/jobs_ok_slow.php index e2970c7..a1a009d 100644 --- a/tests/php_test_files/jobs/jobs_ok_slow.php +++ b/tests/php_test_files/jobs/jobs_ok_slow.php @@ -18,7 +18,7 @@ while ($task = $consumer->waitTask()) { try { sleep(60); - $task->complete(); + $task->ack(); } catch (\Throwable $e) { $task->error((string)$e); } diff --git a/tests/php_test_files/jobs/jobs_ok_slow_rand.php b/tests/php_test_files/jobs/jobs_ok_slow_rand.php index 37c4c0e..6e49834 100644 --- a/tests/php_test_files/jobs/jobs_ok_slow_rand.php +++ b/tests/php_test_files/jobs/jobs_ok_slow_rand.php @@ -22,7 +22,7 @@ sleep(60); } - $task->complete(); + $task->ack(); } catch (\Throwable $e) { $task->error((string)$e); } diff --git a/tests/php_test_files/jobs/jobs_ok_with_subject_header.php b/tests/php_test_files/jobs/jobs_ok_with_subject_header.php index 14cda3d..d6feb52 100644 --- a/tests/php_test_files/jobs/jobs_ok_with_subject_header.php +++ b/tests/php_test_files/jobs/jobs_ok_with_subject_header.php @@ -28,8 +28,8 @@ )); } - $task->complete(); + $task->ack(); } catch (\Throwable $e) { - $task->fail((string)$e); + $task->nack((string)$e); } }