Skip to content

Commit

Permalink
add http endpoint behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
jmaeng72 committed Jan 24, 2025
1 parent 1063bb2 commit 837136a
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 13 deletions.
21 changes: 19 additions & 2 deletions ingest-app/src/cmr/ingest/api/subscriptions.clj
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
[cmr.transmit.search :as search]
[cmr.transmit.urs :as urs])
(:import
[java.util UUID]))
[java.util UUID])
(:import
(org.apache.commons.validator.routines UrlValidator)))

(def ^:private CMR_PROVIDER
"CMR provider-id, used by collection subscription."
Expand Down Expand Up @@ -77,6 +79,20 @@
query-params)]
(search-concept-refs-with-sub-params context search-params subscription-type)))

(defn- validate-subscription-endpoint
"Validates the subscription endpoint for purposes of validation. Throws error if not valid."
[subscription-concept]
(let [method (:Method subscription-concept)
endpoint (:EndPoint subscription-concept)
default-url-validator (UrlValidator.)]

(if (= method "ingest")
(if-not (or (some? (re-matches #"arn:aws:sqs:.*" endpoint)) (.isValid default-url-validator endpoint))
(errors/throw-service-error
:bad-request
"Subscription creation failed - Method was ingest, but the endpoint given was not valid SQS ARN or HTTP/S URL.
If it is a URL, make sure to give the full URL path like so: https://www.google.com.")))))

(defn- check-subscription-limit
"Given the configuration for subscription limit, this valdiates that the user has no more than
the limit before we allow more subscriptions to be ingested by that user."
Expand Down Expand Up @@ -272,7 +288,7 @@

(defn- body->subscription
"Returns the subscription concept for the given request body, etc.
This is the raw subscritpion that is ready for metadata validation,
This is the raw subscription that is ready for metadata validation,
but still needs some sanitization to be saved to database."
[native-id body content-type headers]
(let [sub-concept (api-core/body->concept!
Expand Down Expand Up @@ -319,6 +335,7 @@
(api-core/verify-provider-exists context provider-id))
(validate-user-id context subscriber-id)
(validate-query context parsed)
(validate-subscription-endpoint parsed)
(let [parsed-metadata (assoc parsed :SubscriberId subscriber-id)]
{:concept (assoc sub-concept
:metadata (json/generate-string parsed-metadata)
Expand Down
36 changes: 33 additions & 3 deletions ingest-app/test/cmr/ingest/api/subscriptions_test.clj
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
(ns cmr.ingest.api.subscriptions-test
(:require
[clojure.string :as string]
[clojure.test :refer :all]
[cmr.ingest.api.subscriptions :as subscriptions]))
[clojure.string :as string]
[clojure.test :refer :all]
[cmr.common.util :as util]
[cmr.ingest.api.subscriptions :as subscriptions]))

(deftest generate-native-id-test
(let [parsed {:Name "the_beginning"
Expand All @@ -14,3 +15,32 @@

(testing "name is used as the prefix"
(is (string/starts-with? native-id "the_beginning")))))

(deftest validate-subscription-endpoint-test
(testing "validate subscription endpoint str -- expected valid"
(util/are3 [subscription-concept]
(let [fun #'cmr.ingest.api.subscriptions/validate-subscription-endpoint]
(is (= nil (fun subscription-concept))))

"given method is search -- endpoint ignored"
{:EndPoint "blahblah", :Method "search"}

"given method is search and endpoint not given -- endpoint ignored"
{:EndPoint "blahblah", :Method "search"}

"given method is ingest and sqs arn is valid"
{:EndPoint "arn:aws:sqs:us-east-1:000000000:Test-Queue", :Method "ingest"}

"given method is ingest and url is valid"
{:EndPoint "https://testwebsite.com", :Method "ingest"}))

(testing "validate subscription endpoint str -- expected invalid"
(util/are3 [subscription-concept]
(let [fun #'cmr.ingest.api.subscriptions/validate-subscription-endpoint]
(is (thrown? Exception (fun subscription-concept))))

"given method is ingest and sqs arn is invalid"
{:EndPoint "iaminvalidendpoint", :Method "ingest"}

"given method is ingest and endpoint is empty is invalid"
{:Endpoint "", :Method "ingest"})))
55 changes: 47 additions & 8 deletions metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@
"Buisness logic for subscription processing."
(:require
[cheshire.core :as json]
[clj-http.client :as client]
[cmr.common.log :refer [debug info]]
[cmr.common.services.errors :as errors]
[cmr.message-queue.topic.topic-protocol :as topic-protocol]
[cmr.metadata-db.config :as mdb-config]
[cmr.metadata-db.services.search-service :as mdb-search]
[cmr.metadata-db.services.subscription-cache :as subscription-cache]
[cmr.transmit.config :as t-config]))
[cmr.transmit.config :as t-config])
(:import
(org.apache.commons.validator.routines UrlValidator)))

(def subscriptions-enabled?
"Checks to see if ingest subscriptions are enabled."
Expand Down Expand Up @@ -95,7 +99,7 @@
(subscription-cache/remove-value context coll-concept-id))))

;;
;; The functions below are for subscribing and unsubscribing and endpoint to the topic.
;; The functions below are for subscribing and unsubscribing
;;

(defn add-delete-subscription
Expand All @@ -110,31 +114,66 @@
(change-subscription context concept-edn)
concept-edn))))

(defn- is-valid-sqs-arn
"Checks if given sqs arn is valid. Returns true or false."
[endpoint]
(some? (re-matches #"arn:aws:sqs:.*" endpoint)))

(defn- is-valid-subscription-endpoint-url
"Checks if subscription endpoint destination is a valid url. Returns true or false."
[endpoint]
(let [default-validator (UrlValidator.)]
(.isValid default-validator endpoint)))

(defn- send-sub-to-url-dest
"Sends subscription details to url given. Throws error if subscription is not successful."
[subscription-concept dest-endpoint]
(let [response (client/post dest-endpoint
{:body (json/generate-string subscription-concept)
:content-type "application/json"
:throw-exceptions false})]
(when-not (= 200 (:status response))
(throw (Exception. (format "Failed to send subscription message to url %s. Response was: %s" dest-endpoint response))))))

(defn add-subscription
"Add the subscription to the cache and subscribe the subscription to
the topic."
[context concept]
(when-let [concept-edn (convert-concept-to-edn concept)]
(when (ingest-subscription-concept? concept-edn)
(let [topic (get-in context [:system :sns :external])]
(topic-protocol/subscribe topic concept-edn)))))
(let [endpoint (:EndPoint (:metadata concept-edn))]
(cond
(is-valid-sqs-arn endpoint) (let [topic (get-in context [:system :sns :external])]
(topic-protocol/subscribe topic concept-edn))
(is-valid-subscription-endpoint-url endpoint) (send-sub-to-url-dest concept-edn endpoint)
:else (errors/throw-service-error :bad-request
(format "Endpoint given for subscription was neither a valid sqs arn or a valid URL.
Invalid endpoint received was: %s" endpoint))
)))))

(defn delete-subscription
"Remove the subscription from the cache and unsubscribe the subscription from
the topic."
[context concept]
(when-let [concept-edn (add-delete-subscription context concept)]
(when (ingest-subscription-concept? concept-edn)
(let [topic (get-in context [:system :sns :external])]
(topic-protocol/unsubscribe topic {:concept-id (:concept-id concept-edn)
:subscription-arn (get-in concept-edn [:extra-fields :aws-arn])})))))
(let [endpoint (:EndPoint (:metadata concept-edn))]
(cond
(is-valid-sqs-arn endpoint) (let [topic (get-in context [:system :sns :external])]
(topic-protocol/unsubscribe topic {:concept-id (:concept-id concept-edn)
:subscription-arn (get-in concept-edn [:extra-fields :aws-arn])}))
(is-valid-subscription-endpoint-url endpoint) (send-sub-to-url-dest concept-edn endpoint)
:else (errors/throw-service-error :bad-request
(format "Endpoint given for subscription was neither a valid sqs arn or a valid URL.
Invalid endpoint received was: %s" endpoint)))
))))

;;
;; The functions below are for refreshing the subscription cache if needed.
;;

(defn create-subscription-cache-contents-for-refresh
"Go through all of the subscriptions and find the ones that are
"Go through all the subscriptions and find the ones that are
ingest subscriptions. Create the mode values for each collection-concept-id
and put those into a map. The end result looks like:
{Collection concept id 1: [\"New\" \"Update\"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,3 +682,56 @@
(is (= "G12345-PROV1" (:concept-id real-message)))
(is (= '(:concept-id :granule-ur :producer-granule-id :location) (keys real-message)))
(is (some? (queue/delete-messages sqs-client internal-queue-url messages))))))))))

(deftest is-sqs-arn-test
(testing "sqs endpoint validation"
(are3 [expected endpoint]
(let [fun #'cmr.metadata-db.services.subscriptions/is-valid-sqs-arn]
(is (= expected (fun endpoint))))

"valid sqs endpoint"
true
"arn:aws:sqs:us-west-1:123456789:Test-Queue"

"valid sqs endpoint with any string after sqs: "
true
"arn:aws:sqs:anything after this is valid"

"invalid sqs endpoint"
false
"some string"

"invalid sqs endpoint - because partial"
false
"arn:aws:sns:blah blah")))

(deftest is-valid-url-test
(testing "url string validation"
(are3 [expected endpoint]
(let [fun #'cmr.metadata-db.services.subscriptions/is-valid-subscription-endpoint-url]
(is (= expected (fun endpoint))))

"valid url -- with https prefix"
true
"https://www.google.com"

"invalid url - no https prefix"
false
"www.google.com"

"invalid url -- no www prefix"
false
"google.com"

"valid url -- with http prefix"
true
"http://www.google.com"

"invalid url - non-existent domain"
false
"hello.blach"

"invalid url - some string"
false
"this is just some string"
)))

0 comments on commit 837136a

Please sign in to comment.