diff --git a/changelog.d/0-release-notes/elasticmq b/changelog.d/0-release-notes/elasticmq new file mode 100644 index 00000000000..3ce140d0c4f --- /dev/null +++ b/changelog.d/0-release-notes/elasticmq @@ -0,0 +1,8 @@ +* Replace fake-sqs with ElasticMQ + +ElasticMQ is an actively maintained project, fake-sqs hasn't seen a commit since +2018. This is not expected to have any noticeable effect on deployments that +don't have any extra configurations for the SQS queues. If the fake-aws-sqs +chart had configured custom queue names, they have couple of extra limitations: +- The queue names must only contain alphanumeric characters and hyphens. +- The FIFO queue names must end in `.fifo`. \ No newline at end of file diff --git a/charts/fake-aws-sqs/templates/configmap.yaml b/charts/fake-aws-sqs/templates/configmap.yaml new file mode 100644 index 00000000000..baac9d15bca --- /dev/null +++ b/charts/fake-aws-sqs/templates/configmap.yaml @@ -0,0 +1,60 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ template "fullname" . }} + labels: + app: {{ template "fullname" . }} + chart: "{{ .Chart.Name }}-{{ .Chart.Version }}" + release: "{{ .Release.Name }}" + heritage: "{{ .Release.Service }}" +data: + elasticmq.conf: | + include classpath("application.conf") + + # What is the outside visible address of this ElasticMQ node + # Used to create the queue URL (may be different from bind address!) + node-address { + protocol = http + host = localhost + port = {{ .Values.service.httpPort }} + context-path = "" + } + + rest-sqs { + enabled = true + bind-port = {{ .Values.service.httpPort }} + bind-hostname = "0.0.0.0" + # Possible values: relaxed, strict + sqs-limits = strict + } + + rest-stats { + enabled = true + bind-port = 9325 + bind-hostname = "0.0.0.0" + } + + # Should the node-address be generated from the bind port/hostname + # Set this to true e.g. when assigning port automatically by using port 0. + generate-node-address = false + + queues { + {{- range $i, $queueName := .Values.queueNames }} + "{{ $queueName }}" { + {{- if hasSuffix ".fifo" $queueName }} + fifo = true + {{- end }} + } + {{- end }} + } + + messages-storage { + enabled = true + uri = "jdbc:h2:/data/elasticmq.db" + } + + # Region and accountId which will be included in resource ids + aws { + region = eu-west-1 + accountId = 000000000000 + } diff --git a/charts/fake-aws-sqs/templates/deployment.yaml b/charts/fake-aws-sqs/templates/deployment.yaml index 24055e126a1..39848020c90 100644 --- a/charts/fake-aws-sqs/templates/deployment.yaml +++ b/charts/fake-aws-sqs/templates/deployment.yaml @@ -16,6 +16,8 @@ spec: metadata: labels: app: {{ template "fullname" . }} + annotations: + checksum/configmap: {{ include (print .Template.BasePath "/configmap.yaml") . | sha256sum }} spec: containers: - name: fake-aws-sqs @@ -24,45 +26,22 @@ spec: - containerPort: {{ .Values.service.httpPort }} name: http protocol: TCP + command: + - /sbin/tini + - -- + - /opt/elasticmq/bin/elasticmq-native-server + - -Dconfig.file=/config/elasticmq.conf + - -Dlogback.configurationFile=/opt/logback.xml volumeMounts: - name: storage mountPath: /data + - name: config + mountPath: /config resources: {{ toYaml .Values.resources | indent 12 }} - - name: initiate-fake-aws-sqs - image: mesosphere/aws-cli:1.14.5 - command: [/bin/sh] - args: - - -c - - | - exec_until_ready() { - until $1; do echo 'service not ready yet'; sleep 1; done - } - queue_exists() { - # NOTE: we use the '"' to match the queue name more exactly (otherwise there is some overlap) - OUTPUT=$(aws --endpoint-url=http://localhost:{{ $.Values.service.httpPort }} sqs list-queues | grep $1'"' | wc -l) - echo $OUTPUT - } - - echo 'Creating AWS resources' - aws configure set aws_access_key_id dummy - aws configure set aws_secret_access_key dummy - aws configure set region eu-west-1 - - while true - do - # Recreate resources if needed - {{ range $i, $queueName := .Values.queueNames }} - QUEUE=$(queue_exists "{{ $queueName }}") - if [ "$QUEUE" == "1" ] - then echo "Queue {{ $queueName }} exists, no need to re-create" - else exec_until_ready "aws --endpoint-url=http://localhost:{{ $.Values.service.httpPort }} sqs create-queue --queue-name {{ $queueName }}" - fi - {{ end }} - - echo 'Sleeping 10' - sleep 10 - done volumes: - emptyDir: {} name: "storage" + - name: config + configMap: + name: {{ template "fullname" . }} diff --git a/charts/fake-aws-sqs/values.yaml b/charts/fake-aws-sqs/values.yaml index 4f46cd50d94..a5161706986 100644 --- a/charts/fake-aws-sqs/values.yaml +++ b/charts/fake-aws-sqs/values.yaml @@ -1,6 +1,6 @@ image: - repository: airdock/fake-sqs - tag: 0.3.1 + repository: softwaremill/elasticmq-native + tag: 1.5.2 # TODO: in a wire-server chart, these queue names should match the ones defined in galley/brig/gundeck (i.e. only be defined once) queueNames: diff --git a/charts/integration/templates/integration-integration.yaml b/charts/integration/templates/integration-integration.yaml index 044b63b3b9a..2fe7718fa5f 100644 --- a/charts/integration/templates/integration-integration.yaml +++ b/charts/integration/templates/integration-integration.yaml @@ -113,7 +113,6 @@ spec: # FUTUREWORK: Do all of this in the integration test binary integration-dynamic-backends-db-schemas.sh --host {{ .Values.config.cassandra.host }} --port {{ .Values.config.cassandra.port }} --replication-factor {{ .Values.config.cassandra.replicationFactor }} integration-dynamic-backends-brig-index.sh --elasticsearch-server http://{{ .Values.config.elasticsearch.host }}:9200 - integration-dynamic-backends-sqs.sh {{ .Values.config.sqsEndpointUrl }} integration-dynamic-backends-ses.sh {{ .Values.config.sesEndpointUrl }} integration-dynamic-backends-s3.sh {{ .Values.config.s3EndpointUrl }} {{- range $name, $dynamicBackend := .Values.config.dynamicBackends }} diff --git a/deploy/dockerephemeral/docker-compose.yaml b/deploy/dockerephemeral/docker-compose.yaml index 564141eebc5..b8bdebb4d9a 100644 --- a/deploy/dockerephemeral/docker-compose.yaml +++ b/deploy/dockerephemeral/docker-compose.yaml @@ -34,10 +34,12 @@ services: fake_sqs: container_name: demo_wire_sqs -# image: airdock/fake-sqs:0.3.1 - image: julialongtin/airdock_fakesqs:0.0.9 + image: softwaremill/elasticmq-native:1.5.2 ports: - 127.0.0.1:4568:4568 + - 127.0.0.1:9325:9325 + volumes: + - ./docker/elasticmq.conf:/opt/elasticmq.conf networks: - demo_wire diff --git a/deploy/dockerephemeral/docker/elasticmq.conf b/deploy/dockerephemeral/docker/elasticmq.conf new file mode 100644 index 00000000000..7cd41d7317e --- /dev/null +++ b/deploy/dockerephemeral/docker/elasticmq.conf @@ -0,0 +1,80 @@ +include classpath("application.conf") + +# What is the outside visible address of this ElasticMQ node +# Used to create the queue URL (may be different from bind address!) +node-address { + protocol = http + host = localhost + port = 4568 + context-path = "" +} + +rest-sqs { + enabled = true + bind-port = 4568 + bind-hostname = "0.0.0.0" + # Possible values: relaxed, strict + sqs-limits = strict +} + +rest-stats { + enabled = true + bind-port = 9325 + bind-hostname = "0.0.0.0" +} + +# Should the node-address be generated from the bind port/hostname +# Set this to true e.g. when assigning port automatically by using port 0. +generate-node-address = false + +queues { + default-queue-template { + defaultVisibilityTimeout = 1s + } + + fifo-queue-template { + defaultVisibilityTimeout = 1s + fifo = true + } + + integration-brig-events = ${queues.default-queue-template} + integration-brig-events2 = ${queues.default-queue-template} + integration-brig-events3 = ${queues.default-queue-template} + integration-brig-events4 = ${queues.default-queue-template} + integration-brig-events5 = ${queues.default-queue-template} + integration-brig-events-federation-v0 = ${queues.default-queue-template} + + integration-brig-events-internal = ${queues.default-queue-template} + integration-brig-events-internal2 = ${queues.default-queue-template} + integration-brig-events-internal3 = ${queues.default-queue-template} + integration-brig-events-internal4 = ${queues.default-queue-template} + integration-brig-events-internal5 = ${queues.default-queue-template} + integration-brig-events-internal-federation-v0 = ${queues.default-queue-template} + + "integration-user-events.fifo" = ${queues.fifo-queue-template} + "integration-user-events2.fifo" = ${queues.fifo-queue-template} + "integration-user-events3.fifo" = ${queues.fifo-queue-template} + "integration-user-events4.fifo" = ${queues.fifo-queue-template} + "integration-user-events5.fifo" = ${queues.fifo-queue-template} + "integration-user-events-federation-v0.fifo" = ${queues.fifo-queue-template} + + integration-gundeck-events = ${queues.default-queue-template} + integration-gundeck-events2 = ${queues.default-queue-template} + integration-gundeck-events3 = ${queues.default-queue-template} + integration-gundeck-events4 = ${queues.default-queue-template} + integration-gundeck-events5 = ${queues.default-queue-template} + integration-gundeck-events-federation-v0 = ${queues.default-queue-template} + + "integration-team-events.fifo" = ${queues.fifo-queue-template} + "integration-team-events2.fifo" = ${queues.fifo-queue-template} + "integration-team-events3.fifo" = ${queues.fifo-queue-template} + "integration-team-events4.fifo" = ${queues.fifo-queue-template} + "integration-team-events5.fifo" = ${queues.fifo-queue-template} + "integration-team-events-federation-v0.fifo" = ${queues.fifo-queue-template} +} + +# Region and accountId which will be included in resource ids +aws { + region = eu-west-1 + accountId = 000000000000 +} \ No newline at end of file diff --git a/deploy/dockerephemeral/federation-v0/brig.yaml b/deploy/dockerephemeral/federation-v0/brig.yaml index 693ba492783..06dfefe80e3 100644 --- a/deploy/dockerephemeral/federation-v0/brig.yaml +++ b/deploy/dockerephemeral/federation-v0/brig.yaml @@ -36,7 +36,7 @@ federatorInternal: # You can set up local SQS/Dynamo running e.g. `../../deploy/dockerephemeral/run.sh` aws: - userJournalQueue: integration-user-events.fifo-federation-v0 + userJournalQueue: integration-user-events-federation-v0.fifo # ^ Comment this out if you don't want to journal user events prekeyTable: integration-brig-prekeys-federation-v0 sqsEndpoint: http://fake_sqs:4568 # https://sqs.eu-west-1.amazonaws.com diff --git a/deploy/dockerephemeral/federation-v0/galley.yaml b/deploy/dockerephemeral/federation-v0/galley.yaml index 6879901c48c..ab2644a8ef5 100644 --- a/deploy/dockerephemeral/federation-v0/galley.yaml +++ b/deploy/dockerephemeral/federation-v0/galley.yaml @@ -88,6 +88,6 @@ logLevel: Warn logNetStrings: false journal: # if set, journals; if not set, disables journaling - queueName: integration-team-events.fifo-federation-v0 + queueName: integration-team-events-federation-v0.fifo endpoint: http://demo_wire_sqs:4568 # https://sqs.eu-west-1.amazonaws.com region: eu-west-1 diff --git a/deploy/dockerephemeral/init.sh b/deploy/dockerephemeral/init.sh index f10067319aa..7f11fc7ee0c 100755 --- a/deploy/dockerephemeral/init.sh +++ b/deploy/dockerephemeral/init.sh @@ -22,25 +22,10 @@ for suffix in "" "2" "3" "4" "5" "-federation-v0"; do aws --endpoint-url=http://dynamodb:8000 dynamodb delete-table --table-name integration-brig-userkey-blacklist$suffix || true aws --endpoint-url=http://dynamodb:8000 dynamodb delete-table --table-name integration-brig-prekeys$suffix || true - # Create Dynamo/SQS resources + # Create Dynamo resources exec_until_ready "aws --endpoint-url=http://dynamodb:8000 dynamodb create-table --table-name integration-brig-userkey-blacklist$suffix --attribute-definitions AttributeName=key,AttributeType=S --key-schema AttributeName=key,KeyType=HASH --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5" exec_until_ready "aws --endpoint-url=http://dynamodb:8000 dynamodb create-table --table-name integration-brig-prekeys$suffix --attribute-definitions AttributeName=client,AttributeType=S --key-schema AttributeName=client,KeyType=HASH --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5" - exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs create-queue --queue-name integration-brig-events$suffix" - exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs set-queue-attributes --queue-url http://sqs:4568/integration-brig-events$suffix --attributes VisibilityTimeout=1" - exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs create-queue --queue-name integration-brig-events-internal$suffix" - exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs set-queue-attributes --queue-url http://sqs:4568/integration-brig-events-internal$suffix --attributes VisibilityTimeout=1" - exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs create-queue --queue-name integration-user-events.fifo$suffix" - exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs set-queue-attributes --queue-url http://sqs:4568/integration-user-events.fifo$suffix --attributes VisibilityTimeout=1" - - # Gundeck's feedback queue - exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs create-queue --queue-name integration-gundeck-events$suffix" - exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs set-queue-attributes --queue-url http://sqs:4568/integration-gundeck-events$suffix --attributes VisibilityTimeout=1" - - # Galley's team event queue - exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs create-queue --queue-name integration-team-events.fifo$suffix" - exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs set-queue-attributes --queue-url http://sqs:4568/integration-team-events.fifo$suffix --attributes VisibilityTimeout=1" - # Verify sender's email address (ensure the sender address is in sync with the config in brig) exec_until_ready "aws --endpoint-url=http://ses:4579 ses verify-email-identity --email-address backend-integration$suffix@wire.com" @@ -53,6 +38,13 @@ for suffix in "" "2" "3" "4" "5" "-federation-v0"; do # TODO: Lifecycle configuration for the bucket, if supported. aws --endpoint-url=http://s3:9000 s3api create-bucket --bucket "dummy-bucket$suffix" aws --endpoint-url=http://s3:9000 s3api wait bucket-exists --bucket "dummy-bucket$suffix" + + # Check that SQS resources are created + exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs get-queue-url --queue-name integration-brig-events$suffix" + exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs get-queue-url --queue-name integration-brig-events-internal$suffix" + exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs get-queue-url --queue-name integration-user-events$suffix.fifo" + exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs get-queue-url --queue-name integration-gundeck-events" + exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs get-queue-url --queue-name integration-team-events$suffix.fifo" done echo 'AWS resources created successfully!' diff --git a/hack/helm_vars/fake-aws/values.yaml b/hack/helm_vars/fake-aws/values.yaml index 543867c48cf..57bb23acb05 100644 --- a/hack/helm_vars/fake-aws/values.yaml +++ b/hack/helm_vars/fake-aws/values.yaml @@ -1,3 +1,36 @@ fake-aws-ses: enabled: true sesSender: "backend-integrationk8s@wire.com" + +fake-aws-sqs: + queueNames: + - "integration-brig-events" + - "integration-brig-events-internal" + - "integration-gundeck-events" + - "integration-user-events.fifo" + - "integration-team-events.fifo" + + # No need for the set of queues with "2" as the suffix because the second + # deployment of wire-server runs in a separate namespace (the "-fed2" + # namespace) with its own fake-aws-sqs. + # + # In that namespace these extra queues will be unused, but its easier to + # create them and not use them than to not create them. It shouldn't create + # any significant perfomance degradation. + - "integration-brig-events3" + - "integration-brig-events-internal3" + - "integration-gundeck-events3" + - "integration-user-events3.fifo" + - "integration-team-events3.fifo" + + - "integration-brig-events4" + - "integration-brig-events-internal4" + - "integration-gundeck-events4" + - "integration-user-events4.fifo" + - "integration-team-events4.fifo" + + - "integration-brig-events5" + - "integration-brig-events-internal5" + - "integration-gundeck-events5" + - "integration-user-events5.fifo" + - "integration-team-events5.fifo" diff --git a/integration/scripts/integration-dynamic-backends-sqs.sh b/integration/scripts/integration-dynamic-backends-sqs.sh deleted file mode 100755 index d85a3c85f06..00000000000 --- a/integration/scripts/integration-dynamic-backends-sqs.sh +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/env bash - -set -eo pipefail - -ENDPOINT_URL=$1 - -for i in $(seq "$INTEGRATION_DYNAMIC_BACKENDS_POOLSIZE"); do - suffix=$((i + 2)) - aws --endpoint-url="$ENDPOINT_URL" sqs create-queue --queue-name integration-brig-events$suffix - aws --endpoint-url="$ENDPOINT_URL" sqs set-queue-attributes --queue-url "$ENDPOINT_URL/integration-brig-events$suffix" --attributes VisibilityTimeout=1 - aws --endpoint-url="$ENDPOINT_URL" sqs create-queue --queue-name integration-brig-events-internal$suffix - aws --endpoint-url="$ENDPOINT_URL" sqs set-queue-attributes --queue-url "$ENDPOINT_URL/integration-brig-events-internal$suffix" --attributes VisibilityTimeout=1 - aws --endpoint-url="$ENDPOINT_URL" sqs create-queue --queue-name integration-user-events.fifo$suffix - aws --endpoint-url="$ENDPOINT_URL" sqs set-queue-attributes --queue-url "$ENDPOINT_URL/integration-user-events.fifo$suffix" --attributes VisibilityTimeout=1 - - # Gundeck's feedback queue - aws --endpoint-url="$ENDPOINT_URL" sqs create-queue --queue-name "integration-gundeck-events$suffix" - aws --endpoint-url="$ENDPOINT_URL" sqs set-queue-attributes --queue-url "$ENDPOINT_URL/integration-gundeck-events$suffix" --attributes VisibilityTimeout=1 - - # Galley's team event queue - aws --endpoint-url="$ENDPOINT_URL" sqs create-queue --queue-name "integration-team-events.fifo$suffix" - aws --endpoint-url="$ENDPOINT_URL" sqs set-queue-attributes --queue-url "$ENDPOINT_URL/integration-team-events.fifo$suffix" --attributes VisibilityTimeout=1 -done - -echo 'AWS sqs queues created successfully!' - diff --git a/integration/test/Testlib/ResourcePool.hs b/integration/test/Testlib/ResourcePool.hs index de35ea2d70e..db685049dab 100644 --- a/integration/test/Testlib/ResourcePool.hs +++ b/integration/test/Testlib/ResourcePool.hs @@ -112,14 +112,14 @@ backendResources dynConfs = berFederatorInternal = Ports.portForDyn (Ports.ServiceInternal FederatorInternal) i, berFederatorExternal = dynConf.federatorExternalPort, berDomain = dynConf.domain, - berAwsUserJournalQueue = "integration-user-events.fifo" <> suffix i, + berAwsUserJournalQueue = "integration-user-events" <> suffix i <> ".fifo", berAwsPrekeyTable = "integration-brig-prekeys" <> suffix i, berAwsS3Bucket = "dummy-bucket" <> suffix i, berAwsQueueName = "integration-gundeck-events" <> suffix i, berBrigInternalEvents = "integration-brig-events-internal" <> suffix i, berEmailSMSSesQueue = "integration-brig-events" <> suffix i, berEmailSMSEmailSender = "backend-integration" <> suffix i <> "@wire.com", - berGalleyJournal = "integration-team-events.fifo" <> suffix i, + berGalleyJournal = "integration-team-events" <> suffix i <> ".fifo", berVHost = dynConf.domain, berNginzSslPort = Ports.portForDyn Ports.NginzSSL i, berNginzHttp2Port = Ports.portForDyn Ports.NginzHttp2 i, @@ -168,14 +168,14 @@ backendB = berFederatorInternal = Ports.port (Ports.ServiceInternal FederatorInternal) BackendB, berFederatorExternal = Ports.port Ports.FederatorExternal BackendB, berDomain = "b.example.com", - berAwsUserJournalQueue = "integration-user-events.fifo2", + berAwsUserJournalQueue = "integration-user-events2.fifo", berAwsPrekeyTable = "integration-brig-prekeys2", berAwsS3Bucket = "dummy-bucket2", berAwsQueueName = "integration-gundeck-events2", berBrigInternalEvents = "integration-brig-events-internal2", berEmailSMSSesQueue = "integration-brig-events2", berEmailSMSEmailSender = "backend-integration2@wire.com", - berGalleyJournal = "integration-team-events.fifo2", + berGalleyJournal = "integration-team-events2.fifo", -- FUTUREWORK: set up vhosts in dev/ci for example.com and b.example.com -- in case we want backendA and backendB to federate with a third backend -- (because otherwise both queues will overlap) diff --git a/libs/types-common-aws/src/Util/Test/SQS.hs b/libs/types-common-aws/src/Util/Test/SQS.hs index c94188c2aea..f91b649d896 100644 --- a/libs/types-common-aws/src/Util/Test/SQS.hs +++ b/libs/types-common-aws/src/Util/Test/SQS.hs @@ -5,8 +5,6 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE UndecidableInstances #-} --- Disabling for HasCallStack -{-# OPTIONS_GHC -Wno-redundant-constraints #-} -- This file is part of the Wire Server implementation. -- @@ -37,7 +35,8 @@ import Data.ProtoLens import Data.Text.Encoding qualified as Text import Imports import Safe (headDef) -import UnliftIO (Async, async) +import UnliftIO (Async, async, throwIO) +import UnliftIO.Async qualified as Async import UnliftIO.Resource (MonadResource, ResourceT) import UnliftIO.Timeout (timeout) @@ -54,17 +53,33 @@ data SQSWatcher a = SQSWatcher -- the queue has too many things in it before the tests start. -- Note that the purgeQueue command is not guaranteed to be instant (can take up to 60 seconds) -- Hopefully, the fake-aws implementation used during tests is fast enough. -watchSQSQueue :: Message a => AWS.Env -> Text -> IO (SQSWatcher a) -watchSQSQueue env queueUrl = do +watchSQSQueue :: (Message a) => AWS.Env -> Text -> IO (SQSWatcher a) +watchSQSQueue env queueName = do eventsRef <- newIORef [] - ensureEmpty - process <- async $ recieveLoop eventsRef + + queueUrlRes <- execute env . sendEnv $ SQS.newGetQueueUrl queueName + let queueUrl = view SQS.getQueueUrlResponse_queueUrl queueUrlRes + + ensureEmpty queueUrl + process <- async $ do + -- Every receive request takes ~300ms (on my machine). This puts a limit of + -- ~3 notifications per second. Which makes tests reallly slow. SQS scales + -- pretty well with multiple consumers, so we start 5 consumers here to bump + -- the max throughput to about ~15 notifications per second. + loop1 <- async $ receiveLoop queueUrl eventsRef + loop2 <- async $ receiveLoop queueUrl eventsRef + loop3 <- async $ receiveLoop queueUrl eventsRef + loop4 <- async $ receiveLoop queueUrl eventsRef + loop5 <- async $ receiveLoop queueUrl eventsRef + _ <- Async.waitAny [loop1, loop2, loop3, loop4, loop5] + throwIO $ BackgroundThreadNotRunning $ "One of the SQS receive loops finished, all of them are supposed to run forever" + pure $ SQSWatcher process eventsRef where - recieveLoop ref = do + receiveLoop queueUrl ref = do let rcvReq = SQS.newReceiveMessage queueUrl - & set SQS.receiveMessage_waitTimeSeconds (Just 100) + & set SQS.receiveMessage_waitTimeSeconds (Just 10) . set SQS.receiveMessage_maxNumberOfMessages (Just 1) . set SQS.receiveMessage_visibilityTimeout (Just 1) rcvRes <- execute env $ sendEnv rcvReq @@ -74,16 +89,27 @@ watchSQSQueue env queueUrl = do [] -> pure () _ -> atomicModifyIORef ref $ \xs -> (parsedMsgs <> xs, ()) - recieveLoop ref + receiveLoop queueUrl ref + + ensureEmpty :: Text -> IO () + ensureEmpty queueUrl = void $ execute env $ sendEnv (SQS.newPurgeQueue queueUrl) + +data SQSWatcherError = BackgroundThreadNotRunning String + deriving (Show) - ensureEmpty :: IO () - ensureEmpty = void $ execute env $ sendEnv (SQS.newPurgeQueue queueUrl) +instance Exception SQSWatcherError -- | Waits for a message matching a predicate for a given number of seconds. -waitForMessage :: (MonadUnliftIO m, Eq a) => SQSWatcher a -> Int -> (a -> Bool) -> m (Maybe a) +waitForMessage :: forall m a. (MonadUnliftIO m, Eq a, HasCallStack) => SQSWatcher a -> Int -> (a -> Bool) -> m (Maybe a) waitForMessage watcher seconds predicate = timeout (seconds * 1_000_000) poll where + poll :: (HasCallStack) => m a poll = do + -- Check if the background thread is still alive. If not fail with a nicer error + Async.poll watcher.watcherProcess >>= \case + Nothing -> pure () + Just (Left err) -> throwIO $ BackgroundThreadNotRunning $ "Thread finished with exception: " <> show err + Just (Right ()) -> throwIO $ BackgroundThreadNotRunning "Thread finished without any exceptions when it was supposed to run forever" matched <- atomicModifyIORef (events watcher) $ \events -> case filter predicate events of [] -> (events, Nothing) @@ -95,7 +121,7 @@ waitForMessage watcher seconds predicate = timeout (seconds * 1_000_000) poll -- an assertion on such a message. assertMessage :: (MonadUnliftIO m, Eq a, HasCallStack) => SQSWatcher a -> String -> (a -> Bool) -> (String -> Maybe a -> m ()) -> m () assertMessage watcher label predicate callback = do - matched <- waitForMessage watcher 10 predicate + matched <- waitForMessage watcher 5 predicate callback label matched ----------------------------------------------------------------------------- diff --git a/nix/wire-server.nix b/nix/wire-server.nix index 82a0211e427..89356ed00c2 100644 --- a/nix/wire-server.nix +++ b/nix/wire-server.nix @@ -223,13 +223,6 @@ let checkPhase = ""; }; - integration-dynamic-backends-sqs = pkgs.writeShellApplication { - name = "integration-dynamic-backends-sqs.sh"; - text = "${builtins.readFile ../integration/scripts/integration-dynamic-backends-sqs.sh}"; - runtimeInputs = [ pkgs.parallel pkgs.awscli2 ]; - checkPhase = ""; - }; - integration-dynamic-backends-ses = pkgs.writeShellApplication { name = "integration-dynamic-backends-ses.sh"; text = "${builtins.readFile ../integration/scripts/integration-dynamic-backends-ses.sh}"; @@ -286,7 +279,6 @@ let pkgs.awscli2 integration-dynamic-backends-db-schemas integration-dynamic-backends-brig-index - integration-dynamic-backends-sqs integration-dynamic-backends-ses integration-dynamic-backends-s3 integration-dynamic-backends-vhosts diff --git a/services/brig/test/integration/Run.hs b/services/brig/test/integration/Run.hs index a987096b8c5..7fe1b37bda5 100644 --- a/services/brig/test/integration/Run.hs +++ b/services/brig/test/integration/Run.hs @@ -145,7 +145,7 @@ runTests iConf brigOpts otherArgs = do let fedGalleyClient = FedClient @'Galley mg (galley iConf) emailAWSOpts <- parseEmailAWSOpts awsEnv <- AWS.mkEnv lg awsOpts emailAWSOpts mg - mUserJournalWatcher <- for (view AWS.userJournalQueue awsEnv) $ SQS.watchSQSQueue (view AWS.amazonkaEnv awsEnv) + mUserJournalWatcher <- for (Opts.userJournalQueue awsOpts) $ SQS.watchSQSQueue (view AWS.amazonkaEnv awsEnv) userApi <- User.tests brigOpts fedBrigClient fedGalleyClient mg b c ch g n awsEnv db mUserJournalWatcher providerApi <- Provider.tests localDomain (provider iConf) mg db b c g n searchApis <- Search.tests brigOpts mg g b