diff --git a/MessageQueue/Nodejs-RabbitMQ-Docker/.docker/rabbitmq/data/.gitkeep b/MessageQueue/Nodejs-RabbitMQ-Docker/.docker/rabbitmq/data/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/MessageQueue/Nodejs-RabbitMQ-Docker/.docker/rabbitmq/log/.gitkeep b/MessageQueue/Nodejs-RabbitMQ-Docker/.docker/rabbitmq/log/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/MessageQueue/Nodejs-RabbitMQ-Docker/Dockerfile b/MessageQueue/Nodejs-RabbitMQ-Docker/Dockerfile new file mode 100644 index 0000000..bf03a25 --- /dev/null +++ b/MessageQueue/Nodejs-RabbitMQ-Docker/Dockerfile @@ -0,0 +1,17 @@ +FROM node:18-alpine as base +WORKDIR /app +COPY . . +RUN yarn + + +FROM base as development +RUN apk add --no-cache bash +RUN wget -O /bin/wait-for-it.sh https://raw.githubusercontent.com/vishnubob/wait-for-it/master/wait-for-it.sh +RUN chmod +x /bin/wait-for-it.sh + + +FROM base as production +ENV NODE_ENV=production +RUN yarn build +COPY ./dist ./ +CMD ["node", "dist/consumer.js"] diff --git a/MessageQueue/Nodejs-RabbitMQ-Docker/docker-compose.yml b/MessageQueue/Nodejs-RabbitMQ-Docker/docker-compose.yml new file mode 100644 index 0000000..643611b --- /dev/null +++ b/MessageQueue/Nodejs-RabbitMQ-Docker/docker-compose.yml @@ -0,0 +1,34 @@ +version: '3.2' +services: + rabbitmq: + image: rabbitmq:3.8-management-alpine + container_name: 'rabbitmq' + ports: + - '5672:5672' + - '15672:15672' + volumes: + - ~/.docker/rabbitmq/data:/var/lib/rabbitmq/ + - ~/.docker/rabbitmq/log:/var/log/rabbitmq + networks: + - rabbitmq_network + + consumer: + build: + context: ./ + dockerfile: Dockerfile + target: development + volumes: + - .:/app + depends_on: + - rabbitmq + command: sh -c '/bin/wait-for-it.sh rabbitmq:5672 --timeout=30 -- node dist/consumer.js' + environment: + NODE_ENV: production + AMQP_URL: amqp://guest:guest@rabbitmq:5672 + networks: + - rabbitmq_network + +networks: + rabbitmq_network: + driver: bridge +# docker-compose exec consumer /bin/bash -c 'for ((i=1;i<=15;i++)); do node publisher.js; done' diff --git a/MessageQueue/Nodejs-RabbitMQ-Docker/package.json b/MessageQueue/Nodejs-RabbitMQ-Docker/package.json new file mode 100644 index 0000000..4de2c64 --- /dev/null +++ b/MessageQueue/Nodejs-RabbitMQ-Docker/package.json @@ -0,0 +1,19 @@ +{ + "name": "nodejs-rabbitMQ-docker", + "version": "1.0.0", + "main": "index.js", + "license": "MIT", + "scripts": { + "dev": "nodemon src/index.ts", + "start": "node dist/index.js", + "build": "tsc -w" + }, + "dependencies": { + "amqplib": "^0.10.3" + }, + "devDependencies": { + "@types/amqplib": "^0.10.1", + "nodemon": "^2.0.20", + "typescript": "^4.9.4" + } +} \ No newline at end of file diff --git a/MessageQueue/Nodejs-RabbitMQ-Docker/src/consumer.ts b/MessageQueue/Nodejs-RabbitMQ-Docker/src/consumer.ts new file mode 100644 index 0000000..e2bc79e --- /dev/null +++ b/MessageQueue/Nodejs-RabbitMQ-Docker/src/consumer.ts @@ -0,0 +1,39 @@ +import amqplib from 'amqplib'; + +const amqpUrl = process.env.AMQP_URL || 'amqp://localhost:5672'; + +async function processMessage(msg: any) { + console.log('Processing message', msg.content.toString()); + // call API here + await new Promise((resolve) => setTimeout(resolve, 1000)); + console.log('Processed message'); +} + +(async () => { + const connection = await amqplib.connect(amqpUrl, 'heartbeat=60'); + const channel = await connection.createChannel(); + channel.prefetch(10); + const queue = 'user.sign_up_email'; + process.once('SIGINT', async () => { + console.log('Got signint, closing connection'); + await channel.close(); + await connection.close(); + process.exit(0); + }); + + await channel.assertQueue(queue, { durable: true }); + await channel.consume( + queue, + async (msg: any) => { + console.log('Got message'); + await processMessage(msg); + await channel.ack(msg); + }, + { + noAck: false, + exclusive: false, + consumerTag: 'email_consumer', + }, + ); + console.log('Waiting for messages'); +})(); diff --git a/MessageQueue/Nodejs-RabbitMQ-Docker/src/publisher.ts b/MessageQueue/Nodejs-RabbitMQ-Docker/src/publisher.ts new file mode 100644 index 0000000..08e076b --- /dev/null +++ b/MessageQueue/Nodejs-RabbitMQ-Docker/src/publisher.ts @@ -0,0 +1,39 @@ +import amqplib from 'amqplib'; + +const amqpUrl = process.env.AMQP_URL || 'amqp://localhost:5672'; + +(async () => { + const connection = await amqplib.connect(amqpUrl, 'heartbeat=60'); + const channel = await connection.createChannel(); + try { + console.log('Publishing'); + + const exchange = 'user.signed_up'; + const queue = 'user.sign_up_email'; + const routingKey = 'sign_up_email'; + + await channel.assertExchange(exchange, 'direct', { durable: true }); + await channel.assertQueue(queue, { durable: true }); + await channel.bindQueue(queue, exchange, routingKey); + + const msg = { + id: Math.floor(Math.random() * 1000), + email: 'user@domail.com', + name: 'firstname lastname', + }; + await channel.publish( + exchange, + routingKey, + Buffer.from(JSON.stringify(msg)), + ); + + console.log('Published'); + } catch (e) { + console.error('Error in publishing message', e); + } finally { + await channel.close(); + await connection.close(); + console.info('Channel and connection closed'); + } + process.exit(0); +})(); diff --git a/MessageQueue/Nodejs-RabbitMQ-Docker/tsconfig.json b/MessageQueue/Nodejs-RabbitMQ-Docker/tsconfig.json new file mode 100644 index 0000000..be95eb7 --- /dev/null +++ b/MessageQueue/Nodejs-RabbitMQ-Docker/tsconfig.json @@ -0,0 +1,13 @@ +{ + "compilerOptions": { + "target": "es2016", + "module": "commonjs", + "rootDir": "./", + "outDir": "./dist", + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "strict": true, + "skipLibCheck": true, + "experimentalDecorators": true + } +} diff --git a/MessageQueue/Nodejs-RabbitMQ-Docker/yarn.lock b/MessageQueue/Nodejs-RabbitMQ-Docker/yarn.lock new file mode 100644 index 0000000..1177281 --- /dev/null +++ b/MessageQueue/Nodejs-RabbitMQ-Docker/yarn.lock @@ -0,0 +1,104 @@ +# THIS IS AN AUTOGENERATED FILE. DO NOT EDIT THIS FILE DIRECTLY. +# yarn lockfile v1 + + +"@acuminous/bitsyntax@^0.1.2": + version "0.1.2" + resolved "https://registry.yarnpkg.com/@acuminous/bitsyntax/-/bitsyntax-0.1.2.tgz#e0b31b9ee7ad1e4dd840c34864327c33d9f1f653" + integrity sha512-29lUK80d1muEQqiUsSo+3A0yP6CdspgC95EnKBMi22Xlwt79i/En4Vr67+cXhU+cZjbti3TgGGC5wy1stIywVQ== + dependencies: + buffer-more-ints "~1.0.0" + debug "^4.3.4" + safe-buffer "~5.1.2" + +"@types/amqplib@^0.10.1": + version "0.10.1" + resolved "https://registry.yarnpkg.com/@types/amqplib/-/amqplib-0.10.1.tgz#d43d14027b969e82ceec71cc17bd28e5f5abbc7b" + integrity sha512-j6ANKT79ncUDnAs/+9r9eDujxbeJoTjoVu33gHHcaPfmLQaMhvfbH2GqSe8KUM444epAp1Vl3peVOQfZk3UIqA== + dependencies: + "@types/node" "*" + +"@types/node@*": + version "18.11.18" + resolved "https://registry.yarnpkg.com/@types/node/-/node-18.11.18.tgz#8dfb97f0da23c2293e554c5a50d61ef134d7697f" + integrity sha512-DHQpWGjyQKSHj3ebjFI/wRKcqQcdR+MoFBygntYOZytCqNfkd2ZC4ARDJ2DQqhjH5p85Nnd3jhUJIXrszFX/JA== + +amqplib@^0.10.3: + version "0.10.3" + resolved "https://registry.yarnpkg.com/amqplib/-/amqplib-0.10.3.tgz#e186a2f74521eb55ec54db6d25ae82c29c1f911a" + integrity sha512-UHmuSa7n8vVW/a5HGh2nFPqAEr8+cD4dEZ6u9GjP91nHfr1a54RyAKyra7Sb5NH7NBKOUlyQSMXIp0qAixKexw== + dependencies: + "@acuminous/bitsyntax" "^0.1.2" + buffer-more-ints "~1.0.0" + readable-stream "1.x >=1.1.9" + url-parse "~1.5.10" + +buffer-more-ints@~1.0.0: + version "1.0.0" + resolved "https://registry.yarnpkg.com/buffer-more-ints/-/buffer-more-ints-1.0.0.tgz#ef4f8e2dddbad429ed3828a9c55d44f05c611422" + integrity sha512-EMetuGFz5SLsT0QTnXzINh4Ksr+oo4i+UGTXEshiGCQWnsgSs7ZhJ8fzlwQ+OzEMs0MpDAMr1hxnblp5a4vcHg== + +core-util-is@~1.0.0: + version "1.0.3" + resolved "https://registry.yarnpkg.com/core-util-is/-/core-util-is-1.0.3.tgz#a6042d3634c2b27e9328f837b965fac83808db85" + integrity sha512-ZQBvi1DcpJ4GDqanjucZ2Hj3wEO5pZDS89BWbkcrvdxksJorwUDDZamX9ldFkp9aw2lmBDLgkObEA4DWNJ9FYQ== + +debug@^4.3.4: + version "4.3.4" + resolved "https://registry.yarnpkg.com/debug/-/debug-4.3.4.tgz#1319f6579357f2338d3337d2cdd4914bb5dcc865" + integrity sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ== + dependencies: + ms "2.1.2" + +inherits@~2.0.1: + version "2.0.4" + resolved "https://registry.yarnpkg.com/inherits/-/inherits-2.0.4.tgz#0fa2c64f932917c3433a0ded55363aae37416b7c" + integrity sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ== + +isarray@0.0.1: + version "0.0.1" + resolved "https://registry.yarnpkg.com/isarray/-/isarray-0.0.1.tgz#8a18acfca9a8f4177e09abfc6038939b05d1eedf" + integrity sha512-D2S+3GLxWH+uhrNEcoh/fnmYeP8E8/zHl644d/jdA0g2uyXvy3sb0qxotE+ne0LtccHknQzWwZEzhak7oJ0COQ== + +ms@2.1.2: + version "2.1.2" + resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.2.tgz#d09d1f357b443f493382a8eb3ccd183872ae6009" + integrity sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w== + +querystringify@^2.1.1: + version "2.2.0" + resolved "https://registry.yarnpkg.com/querystringify/-/querystringify-2.2.0.tgz#3345941b4153cb9d082d8eee4cda2016a9aef7f6" + integrity sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ== + +"readable-stream@1.x >=1.1.9": + version "1.1.14" + resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-1.1.14.tgz#7cf4c54ef648e3813084c636dd2079e166c081d9" + integrity sha512-+MeVjFf4L44XUkhM1eYbD8fyEsxcV81pqMSR5gblfcLCHfZvbrqy4/qYHE+/R5HoBUT11WV5O08Cr1n3YXkWVQ== + dependencies: + core-util-is "~1.0.0" + inherits "~2.0.1" + isarray "0.0.1" + string_decoder "~0.10.x" + +requires-port@^1.0.0: + version "1.0.0" + resolved "https://registry.yarnpkg.com/requires-port/-/requires-port-1.0.0.tgz#925d2601d39ac485e091cf0da5c6e694dc3dcaff" + integrity sha512-KigOCHcocU3XODJxsu8i/j8T9tzT4adHiecwORRQ0ZZFcp7ahwXuRU1m+yuO90C5ZUyGeGfocHDI14M3L3yDAQ== + +safe-buffer@~5.1.2: + version "5.1.2" + resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.1.2.tgz#991ec69d296e0313747d59bdfd2b745c35f8828d" + integrity sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g== + +string_decoder@~0.10.x: + version "0.10.31" + resolved "https://registry.yarnpkg.com/string_decoder/-/string_decoder-0.10.31.tgz#62e203bc41766c6c28c9fc84301dab1c5310fa94" + integrity sha512-ev2QzSzWPYmy9GuqfIVildA4OdcGLeFZQrq5ys6RtiuF+RQQiZWr8TZNyAcuVXyQRYfEO+MsoB/1BuQVhOJuoQ== + +url-parse@~1.5.10: + version "1.5.10" + resolved "https://registry.yarnpkg.com/url-parse/-/url-parse-1.5.10.tgz#9d3c2f736c1d75dd3bd2be507dcc111f1e2ea9c1" + integrity sha512-WypcfiRhfeUP9vvF0j6rw0J3hrWrw6iZv3+22h6iRMJ/8z1Tj6XfLP4DsUix5MhMPnXpiHDoKyoZ/bdCkwBCiQ== + dependencies: + querystringify "^2.1.1" + requires-port "^1.0.0"