From 88274284c2746718c694d712e4d4c5f52f08dee4 Mon Sep 17 00:00:00 2001 From: Christopher Maier Date: Fri, 14 Oct 2016 14:39:02 -0400 Subject: [PATCH] All MQTT connections have usernames and passwords Previously, only connections coming from Relays had usernames and passwords associated with them; any connection from Cog itself had neither. We would distinguish between them by examining which host the connection was coming from; if it was 127.0.0.1, we treated it like it was a Cog connection, and like a Relay connection (which needed to be authenticated) otherwise. This would cause Cog to crash if a Relay process was running on the same host as Cog, and if it tried to connect prior to being registered with Cog. Now, _all_ connections have a username and password associated with them. All Cog-initiated connections have a username of `COG_INTERNAL`, and use a randomly-generated password that is generated at Cog startup, allowing us to properly distinguish between Cog connections and Relay connections. Fixes #1034 --- lib/carrier/messaging/connection.ex | 13 +++++-- lib/cog.ex | 9 +++++ lib/cog/bus_enforcer.ex | 59 ++++++++++++++--------------- lib/cog/util/misc.ex | 4 ++ 4 files changed, 51 insertions(+), 34 deletions(-) diff --git a/lib/carrier/messaging/connection.ex b/lib/carrier/messaging/connection.ex index b4701806..cb873721 100644 --- a/lib/carrier/messaging/connection.ex +++ b/lib/carrier/messaging/connection.ex @@ -13,6 +13,7 @@ defmodule Carrier.Messaging.Connection do Interface for the message bus on which commands communicate. """ + @internal_mq_username Cog.Util.Misc.internal_mq_username @default_connect_timeout 5000 # 5 seconds @default_log_level :error @@ -50,7 +51,10 @@ defmodule Carrier.Messaging.Connection do def connect(opts) do connect_timeout = Keyword.get(opts, :connect_timeout, @default_connect_timeout) - opts = add_system_config(opts) + opts = opts + |> add_connect_config + |> add_internal_credentials + {:ok, conn} = :emqttc.start_link(opts) # `emqttc:start_link/1` returns a message bus client process, but it @@ -163,9 +167,12 @@ defmodule Carrier.Messaging.Connection do :emqttc.sync_publish(conn, topic, encoded, :qos1) end - defp add_system_config(opts) do + ######################################################################## + + defp add_internal_credentials(opts) do opts - |> add_connect_config + |> Keyword.put(:username, @internal_mq_username) + |> Keyword.put(:password, Application.fetch_env!(:cog, :message_queue_password)) end defp add_connect_config(opts) do diff --git a/lib/cog.ex b/lib/cog.ex index 0c26203f..1af42152 100644 --- a/lib/cog.ex +++ b/lib/cog.ex @@ -13,6 +13,8 @@ defmodule Cog do end def start(_type, _args) do + Application.put_env(:cog, :message_queue_password, gen_password(64)) + maybe_display_unenforcing_warning() children = [worker(Cog.BusDriver, [], shutdown: 1000), worker(Cog.Repo, []), @@ -102,4 +104,11 @@ defmodule Cog do end end + # Generate a random string `length` characters long + defp gen_password(length) do + :crypto.strong_rand_bytes(length) + |> Base.encode64 + |> binary_part(0, length) + end + end diff --git a/lib/cog/bus_enforcer.ex b/lib/cog/bus_enforcer.ex index 1636083e..60da6a9d 100644 --- a/lib/cog/bus_enforcer.ex +++ b/lib/cog/bus_enforcer.ex @@ -14,24 +14,43 @@ defmodule Cog.BusEnforcer do Record.defrecord :mqtt_client, Record.extract(:mqtt_client, from_lib: "emqttd/include/emqttd.hrl") + @internal_mq_username Cog.Util.Misc.internal_mq_username + def connect_allowed?(client, password) do - case mqtt_client(client, :peername) do - {{127, 0, 0, 1}, _} -> + username = mqtt_client(client, :username) + internal_password = Application.fetch_env!(:cog, :message_queue_password) + + case {username, password} do + {@internal_mq_username, ^internal_password} -> true + {:undefined, _} -> + false + {_, :undefined} -> + false _ -> - validate_creds(client, password) + case Repo.one(Queries.Relay.for_id(username)) do + nil -> + addr = format_address(mqtt_client(client, :peername)) + Logger.info("Denied connect attempt for unknown client #{username} from #{addr}") + false + relay -> + if Passwords.matches?(password, relay.token_digest) do + Logger.info("Allowed connection for Relay #{username}") + true + else + Logger.info("Denied connection for Relay #{username} (bad token)") + false + end + end end end def subscription_allowed?(client, topic) do case mqtt_client(client, :username) do + @internal_mq_username -> + true :undefined -> - case mqtt_client(client, :peername) do - {{127, 0, 0, 1}, _} -> - true - _ -> - false - end + false username -> case Repo.exists?(Relay, username) do false -> @@ -45,28 +64,6 @@ defmodule Cog.BusEnforcer do end end - defp validate_creds(client, password) do - username = mqtt_client(client, :username) - addr = format_address(mqtt_client(client, :peername)) - if username == :undefined or password == :undefined do - false - else - case Repo.one(Queries.Relay.for_id(username)) do - nil -> - Logger.info("Denied connect attempt for unknown client #{username} from #{addr}") - false - relay -> - if Passwords.matches?(password, relay.token_digest) do - Logger.info("Allowed connection for Relay #{username}") - true - else - Logger.info("Denied connection for Relay #{username} (bad token)") - false - end - end - end - end - defp format_address({addr, _}) do addr = Tuple.to_list(addr) if length(addr) == 4 do diff --git a/lib/cog/util/misc.ex b/lib/cog/util/misc.ex index 2164a202..7bb5ef58 100644 --- a/lib/cog/util/misc.ex +++ b/lib/cog/util/misc.ex @@ -12,6 +12,10 @@ defmodule Cog.Util.Misc do @doc "The name of the admin group." def admin_group, do: "cog-admin" + @doc "Username that all Cog-initiated message queue connections use" + def internal_mq_username, + do: "COG_INTERNAL" + ######################################################################## # Adapter Resolution Functions