Skip to content

Commit

Permalink
All MQTT connections have usernames and passwords
Browse files Browse the repository at this point in the history
Previously, only connections coming from Relays had usernames and
passwords associated with them; any connection from Cog itself had
neither. We would distinguish between them by examining which host the
connection was coming from; if it was 127.0.0.1, we treated it like it
was a Cog connection, and like a Relay connection (which needed to be
authenticated) otherwise.

This would cause Cog to crash if a Relay process was running on the same
host as Cog, and if it tried to connect prior to being registered with
Cog.

Now, _all_ connections have a username and password associated with
them. All Cog-initiated connections have a username of `COG_INTERNAL`,
and use a randomly-generated password that is generated at Cog startup,
allowing us to properly distinguish between Cog connections and Relay
connections.

Fixes #1034
  • Loading branch information
christophermaier committed Oct 17, 2016
1 parent 70ccc40 commit 8827428
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 34 deletions.
13 changes: 10 additions & 3 deletions lib/carrier/messaging/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ defmodule Carrier.Messaging.Connection do
Interface for the message bus on which commands communicate.
"""

@internal_mq_username Cog.Util.Misc.internal_mq_username
@default_connect_timeout 5000 # 5 seconds
@default_log_level :error

Expand Down Expand Up @@ -50,7 +51,10 @@ defmodule Carrier.Messaging.Connection do
def connect(opts) do
connect_timeout = Keyword.get(opts, :connect_timeout, @default_connect_timeout)

opts = add_system_config(opts)
opts = opts
|> add_connect_config
|> add_internal_credentials

{:ok, conn} = :emqttc.start_link(opts)

# `emqttc:start_link/1` returns a message bus client process, but it
Expand Down Expand Up @@ -163,9 +167,12 @@ defmodule Carrier.Messaging.Connection do
:emqttc.sync_publish(conn, topic, encoded, :qos1)
end

defp add_system_config(opts) do
########################################################################

defp add_internal_credentials(opts) do
opts
|> add_connect_config
|> Keyword.put(:username, @internal_mq_username)
|> Keyword.put(:password, Application.fetch_env!(:cog, :message_queue_password))
end

defp add_connect_config(opts) do
Expand Down
9 changes: 9 additions & 0 deletions lib/cog.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ defmodule Cog do
end

def start(_type, _args) do
Application.put_env(:cog, :message_queue_password, gen_password(64))

maybe_display_unenforcing_warning()
children = [worker(Cog.BusDriver, [], shutdown: 1000),
worker(Cog.Repo, []),
Expand Down Expand Up @@ -102,4 +104,11 @@ defmodule Cog do
end
end

# Generate a random string `length` characters long
defp gen_password(length) do
:crypto.strong_rand_bytes(length)
|> Base.encode64
|> binary_part(0, length)
end

end
59 changes: 28 additions & 31 deletions lib/cog/bus_enforcer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,43 @@ defmodule Cog.BusEnforcer do

Record.defrecord :mqtt_client, Record.extract(:mqtt_client, from_lib: "emqttd/include/emqttd.hrl")

@internal_mq_username Cog.Util.Misc.internal_mq_username

def connect_allowed?(client, password) do
case mqtt_client(client, :peername) do
{{127, 0, 0, 1}, _} ->
username = mqtt_client(client, :username)
internal_password = Application.fetch_env!(:cog, :message_queue_password)

case {username, password} do
{@internal_mq_username, ^internal_password} ->
true
{:undefined, _} ->
false
{_, :undefined} ->
false
_ ->
validate_creds(client, password)
case Repo.one(Queries.Relay.for_id(username)) do
nil ->
addr = format_address(mqtt_client(client, :peername))
Logger.info("Denied connect attempt for unknown client #{username} from #{addr}")
false
relay ->
if Passwords.matches?(password, relay.token_digest) do
Logger.info("Allowed connection for Relay #{username}")
true
else
Logger.info("Denied connection for Relay #{username} (bad token)")
false
end
end
end
end

def subscription_allowed?(client, topic) do
case mqtt_client(client, :username) do
@internal_mq_username ->
true
:undefined ->
case mqtt_client(client, :peername) do
{{127, 0, 0, 1}, _} ->
true
_ ->
false
end
false
username ->
case Repo.exists?(Relay, username) do
false ->
Expand All @@ -45,28 +64,6 @@ defmodule Cog.BusEnforcer do
end
end

defp validate_creds(client, password) do
username = mqtt_client(client, :username)
addr = format_address(mqtt_client(client, :peername))
if username == :undefined or password == :undefined do
false
else
case Repo.one(Queries.Relay.for_id(username)) do
nil ->
Logger.info("Denied connect attempt for unknown client #{username} from #{addr}")
false
relay ->
if Passwords.matches?(password, relay.token_digest) do
Logger.info("Allowed connection for Relay #{username}")
true
else
Logger.info("Denied connection for Relay #{username} (bad token)")
false
end
end
end
end

defp format_address({addr, _}) do
addr = Tuple.to_list(addr)
if length(addr) == 4 do
Expand Down
4 changes: 4 additions & 0 deletions lib/cog/util/misc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ defmodule Cog.Util.Misc do
@doc "The name of the admin group."
def admin_group, do: "cog-admin"

@doc "Username that all Cog-initiated message queue connections use"
def internal_mq_username,
do: "COG_INTERNAL"

########################################################################
# Adapter Resolution Functions

Expand Down

0 comments on commit 8827428

Please sign in to comment.