Skip to content

Commit

Permalink
fix consumer id (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
ignaciogoldchluk-yolo authored Aug 19, 2024
1 parent 7530848 commit 078c34c
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 37 deletions.
14 changes: 11 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
## [3.1.1] (2024-08-16)
## [3.1.2] (2024-08-16)

### Bug fixes
- Fix missing specification for `ConsumerServer` unique id

### Internal
- Prefix modules with `Coney.` to avoid conflicts

## [3.1.1] (2024-08-16) - BROKEN

### Bug fixes
- Fix missing `:name` when starting `ConsumerServer`

## [3.1.0] (2024-08-14)
## [3.1.0] (2024-08-14) - BROKEN

### Enhancements
- Add `:enabled` config value
- `:adapter` config value is now optional and defaults to `Coney.RabbitConnection`

## [3.0.2] (2024-08-12)
## [3.0.2] (2024-08-12) - BROKEN

### Bug fixes
- Fix incorrect termination order where the connection to RabbitMQ was closed before the channels.
Expand Down
3 changes: 2 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ config :coney,
timeout: 1000
},
workers: [
FakeConsumer
Coney.FakeConsumer,
Coney.OtherFakeConsumer
],
topology: %{
exchanges: [{:topic, "exchange", durable: false}],
Expand Down
24 changes: 0 additions & 24 deletions config/test.exs
Original file line number Diff line number Diff line change
@@ -1,25 +1 @@
import Config

config :coney,
topology: %{
exchanges: [{:topic, "exchange", durable: false}],
queues: %{
"queue" => %{
options: [
durable: false
],
bindings: [
[exchange: "exchange", options: [routing_key: "queue"]]
]
}
}
},
pool_size: 1,
auto_start: true,
settings: %{
url: "amqp://guest:guest@localhost:5672",
timeout: 1000
},
workers: [
FakeConsumer
]
7 changes: 7 additions & 0 deletions lib/coney/consumer_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ defmodule Coney.ConsumerServer do

require Logger

def child_spec([consumer]) do
%{
id: consumer,
start: {__MODULE__, :start_link, [[consumer]]}
}
end

def start_link([consumer]) do
GenServer.start_link(__MODULE__, [consumer], name: consumer)
end
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Coney.Mixfile do
def project do
[
app: :coney,
version: "3.1.1",
version: "3.1.2",
elixir: ">= 1.12.0",
build_embedded: Mix.env() == :prod,
start_permanent: Mix.env() == :prod,
Expand Down
6 changes: 3 additions & 3 deletions test/lib/coney/connection_server_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ defmodule Coney.ConnectionServerTest do
# Subscribe a channel
assert {:reply, channel_ref, connected_state} =
ConnectionServer.handle_call(
{:subscribe, FakeConsumer},
{:subscribe, Coney.FakeConsumer},
{self(), :erlang.make_ref()},
state
)
Expand Down Expand Up @@ -91,7 +91,7 @@ defmodule Coney.ConnectionServerTest do
# Subscribe a channel
assert {:reply, channel_ref, new_state} =
ConnectionServer.handle_call(
{:subscribe, FakeConsumer},
{:subscribe, Coney.FakeConsumer},
{self(), :erlang.make_ref()},
state
)
Expand All @@ -100,7 +100,7 @@ defmodule Coney.ConnectionServerTest do

pid = self()

assert {^pid, FakeConsumer, _} = Map.get(new_state.channels, channel_ref)
assert {^pid, Coney.FakeConsumer, _} = Map.get(new_state.channels, channel_ref)
end
end
end
8 changes: 4 additions & 4 deletions test/lib/coney/consumer/consumer_server_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ defmodule ConsumerServerTest do
alias Coney.ConsumerServer

setup do
ref = Coney.ConnectionServer.subscribe(FakeConsumer)
ref = Coney.ConnectionServer.subscribe(Coney.FakeConsumer)

[
args: [FakeConsumer],
state: %{consumer: FakeConsumer, tasks: %{}, chan: ref}
args: [Coney.FakeConsumer],
state: %{consumer: Coney.FakeConsumer, tasks: %{}, chan: ref}
]
end

Expand Down Expand Up @@ -46,7 +46,7 @@ defmodule ConsumerServerTest do

describe "handle_info/2" do
setup do
%{state: %{consumer: FakeConsumer, tasks: Map.new(), chan: :erlang.make_ref()}}
%{state: %{consumer: Coney.FakeConsumer, tasks: Map.new(), chan: :erlang.make_ref()}}
end

test "demonitors a task once it completes successfully", %{state: state} do
Expand Down
2 changes: 1 addition & 1 deletion test/support/fake_consumer.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule FakeConsumer do
defmodule Coney.FakeConsumer do
@behaviour Coney.Consumer

def connection do
Expand Down
28 changes: 28 additions & 0 deletions test/support/other_fake_consumer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
defmodule Coney.OtherFakeConsumer do
@behaviour Coney.Consumer

def connection do
%{
prefetch_count: 10,
queue: "queue"
}
end

def parse(payload, _meta) do
payload
end

def process(payload, _meta) do
case payload do
:ok -> :ok
:reject -> :reject
:reply -> {:reply, :data}
:exception -> raise "Exception happen"
_other -> :ok
end
end

def error_happened(_exception, _payload, _meta) do
:ok
end
end

0 comments on commit 078c34c

Please sign in to comment.