diff --git a/.rubocop.yml b/.rubocop.yml index ea0b4722b09..7d0e1d3e438 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -20,13 +20,13 @@ AllCops: NewCops: disable DisplayStyleGuide: true Exclude: - - 'bin/**/*' - - 'db/schema.rb' - - 'db/*_schema.rb' - - 'storage/**/*' - - 'tmp/**/*' - - 'coverage/**/*' - - 'log/**/*' + - "bin/**/*" + - "db/schema.rb" + - "db/*_schema.rb" + - "storage/**/*" + - "tmp/**/*" + - "coverage/**/*" + - "log/**/*" # TODO: Enable when we have time to fix all the offenses Style/StringLiterals: @@ -36,15 +36,18 @@ Style/FrozenStringLiteralComment: Enabled: true SafeAutoCorrect: true +Style/QuotedSymbols: + Enabled: false + Performance/CaseWhenSplat: Enabled: true Rails/InverseOf: - Description: 'Checks for associations where the inverse cannot be determined automatically.' + Description: "Checks for associations where the inverse cannot be determined automatically." Enabled: false Rails/HttpStatus: - Description: 'Enforces use of symbolic or numeric value to describe HTTP status.' + Description: "Enforces use of symbolic or numeric value to describe HTTP status." Enabled: false Rails/HasManyOrHasOneDependent: @@ -52,11 +55,11 @@ Rails/HasManyOrHasOneDependent: Enabled: false RSpec/ExampleLength: - Description: 'Checks for long examples.' + Description: "Checks for long examples." Enabled: false RSpec/MultipleExpectations: - Description: 'Checks if examples contain too many expect calls.' + Description: "Checks if examples contain too many expect calls." Enabled: false RSpec/MultipleMemoizedHelpers: @@ -84,10 +87,10 @@ GraphQL/ExtractType: GraphQL/ExtractInputType: Exclude: - - 'app/graphql/mutations/applied_coupons/create.rb' - - 'app/graphql/mutations/credit_notes/create.rb' - - 'app/graphql/mutations/invites/accept.rb' - - 'app/graphql/mutations/plans/create.rb' - - 'app/graphql/mutations/plans/update.rb' - - 'app/graphql/mutations/register_user.rb' - - 'app/graphql/mutations/wallet_transactions/create.rb' + - "app/graphql/mutations/applied_coupons/create.rb" + - "app/graphql/mutations/credit_notes/create.rb" + - "app/graphql/mutations/invites/accept.rb" + - "app/graphql/mutations/plans/create.rb" + - "app/graphql/mutations/plans/update.rb" + - "app/graphql/mutations/register_user.rb" + - "app/graphql/mutations/wallet_transactions/create.rb" diff --git a/app/controllers/api/v1/events_controller.rb b/app/controllers/api/v1/events_controller.rb index cb838c4934c..7e489a17469 100644 --- a/app/controllers/api/v1/events_controller.rb +++ b/app/controllers/api/v1/events_controller.rb @@ -3,7 +3,13 @@ module Api module V1 class EventsController < Api::BaseController - ACTIONS_WITH_CACHED_API_KEY = %i[create batch estimate_instant_fees batch_estimate_instant_fees].freeze + ACTIONS_WITH_CACHED_API_KEY = %i[ + create + high_usage + batch + estimate_instant_fees + batch_estimate_instant_fees + ].freeze def create result = ::Events::CreateService.call( @@ -25,6 +31,20 @@ def create end end + def high_usage + result = ::Events::HighUsageCreateService.call( + organization: current_organization, + params: create_params, + timestamp: Time.current.to_f + ) + + if result.success? + render(json: {event: {transaction_id: result.transaction_id}}) + else + render_error_response(result) + end + end + def batch result = ::Events::CreateBatchService.call( organization: current_organization, @@ -46,6 +66,20 @@ def batch end end + def batch_high_usage + result = ::Events::HighUsageBatchCreateService.call( + organization: current_organization, + params: batch_params[:events], + timestamp: Time.current.to_f + ) + + if result.success? + render(json: {events: result.transactions}) + else + render_error_response(result) + end + end + def show event_scope = current_organization.clickhouse_events_store? ? Clickhouse::EventsRaw : Event event = event_scope.find_by( diff --git a/app/services/events/create_batch_service.rb b/app/services/events/create_batch_service.rb index d4f3aee776b..a49eee6e561 100644 --- a/app/services/events/create_batch_service.rb +++ b/app/services/events/create_batch_service.rb @@ -15,11 +15,11 @@ def initialize(organization:, events_params:, timestamp:, metadata:) def call if events_params.blank? - return result.single_validation_failure!(error_code: 'no_events', field: :events) + return result.single_validation_failure!(error_code: "no_events", field: :events) end if events_params.count > MAX_LENGTH - return result.single_validation_failure!(error_code: 'too_many_events', field: :events) + return result.single_validation_failure!(error_code: "too_many_events", field: :events) end validate_events @@ -56,7 +56,7 @@ def validate_events result.events.push(event) result.errors[index] = event.errors.messages unless event.valid? rescue ArgumentError - result.errors = result.errors.merge({index => {timestamp: ['invalid_format']}}) + result.errors = result.errors.merge({index => {timestamp: ["invalid_format"]}}) end end @@ -72,11 +72,11 @@ def post_validate_events end def produce_kafka_event(event) - return if ENV['LAGO_KAFKA_BOOTSTRAP_SERVERS'].blank? - return if ENV['LAGO_KAFKA_RAW_EVENTS_TOPIC'].blank? + return if ENV["LAGO_KAFKA_BOOTSTRAP_SERVERS"].blank? + return if ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"].blank? Karafka.producer.produce_async( - topic: ENV['LAGO_KAFKA_RAW_EVENTS_TOPIC'], + topic: ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"], key: "#{organization.id}-#{event.external_subscription_id}", payload: { organization_id: organization.id, @@ -88,7 +88,7 @@ def produce_kafka_event(event) properties: event.properties, ingested_at: Time.zone.now.iso8601[...-1], precise_total_amount_cents: event.precise_total_amount_cents.present? ? event.precise_total_amount_cents.to_s : "0.0", - source: 'http_ruby' + source: "http_ruby" }.to_json ) end diff --git a/app/services/events/create_service.rb b/app/services/events/create_service.rb index 24c4e4d7af4..32ed8baa347 100644 --- a/app/services/events/create_service.rb +++ b/app/services/events/create_service.rb @@ -35,9 +35,9 @@ def call rescue ActiveRecord::RecordInvalid => e result.record_validation_failure!(record: e.record) rescue ActiveRecord::RecordNotUnique - result.single_validation_failure!(field: :transaction_id, error_code: 'value_already_exist') + result.single_validation_failure!(field: :transaction_id, error_code: "value_already_exist") rescue ArgumentError - result.single_validation_failure!(field: :timestamp, error_code: 'invalid_format') + result.single_validation_failure!(field: :timestamp, error_code: "invalid_format") end private @@ -45,15 +45,14 @@ def call attr_reader :organization, :params, :timestamp, :metadata def produce_kafka_event(event) - return if ENV['LAGO_KAFKA_BOOTSTRAP_SERVERS'].blank? - return if ENV['LAGO_KAFKA_RAW_EVENTS_TOPIC'].blank? + return if ENV["LAGO_KAFKA_BOOTSTRAP_SERVERS"].blank? + return if ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"].blank? Karafka.producer.produce_async( - topic: ENV['LAGO_KAFKA_RAW_EVENTS_TOPIC'], + topic: ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"], key: "#{organization.id}-#{event.external_subscription_id}", payload: { organization_id: organization.id, - external_customer_id: event.external_customer_id, external_subscription_id: event.external_subscription_id, transaction_id: event.transaction_id, # NOTE: Removes trailing 'Z' to allow clickhouse parsing @@ -63,7 +62,7 @@ def produce_kafka_event(event) precise_total_amount_cents: event.precise_total_amount_cents.present? ? event.precise_total_amount_cents.to_s : "0.0", properties: event.properties, ingested_at: Time.zone.now.iso8601[...-1], - source: 'http_ruby' + source: "http_ruby" }.to_json ) end diff --git a/app/services/events/high_usage_batch_create_service.rb b/app/services/events/high_usage_batch_create_service.rb new file mode 100644 index 00000000000..ebbe4f6ef43 --- /dev/null +++ b/app/services/events/high_usage_batch_create_service.rb @@ -0,0 +1,70 @@ +# frozen_string_literal: true + +module Events + class HighUsageBatchCreateService < BaseService + Result = BaseResult[:transactions] + + MAX_LENGTH = ENV.fetch("LAGO_EVENTS_BATCH_MAX_LENGTH", 100).to_i + + def initialize(organization:, params:, timestamp:) + @organization = organization + @params = params + @timestamp = timestamp + super + end + + def call + return result.not_allowed_failure!(code: "missing_configuration") if ENV["LAGO_KAFKA_BOOTSTRAP_SERVERS"].blank? + return result.not_allowed_failure!(code: "missing_configuration") if ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"].blank? + + if params.blank? + return result.single_validation_failure!(error_code: "no_events", field: :events) + end + + if params.count > MAX_LENGTH + return result.single_validation_failure!(error_code: "too_many_events", field: :events) + end + + result.transactions = params.map { process_event(_1) } + result + end + + private + + attr_reader :organization, :params, :timestamp + + def process_event(event_params) + Karafka.producer.produce_async( + topic: ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"], + key: "#{organization.id}-#{event_params[:external_subscription_id]}", + payload: { + organization_id: organization.id, + external_subscription_id: event_params[:external_subscription_id], + transaction_id: event_params[:transaction_id], + timestamp: parsed_timestamp(event_params[:timestamp]), + code: event_params[:code], + # NOTE: Default value to 0.0 is required for clickhouse parsing + precise_total_amount_cents: precise_total_amount_cents(event_params[:precise_total_amount_cents]), + properties: event_params[:properties] || {}, + # NOTE: Removes trailing 'Z' to allow clickhouse parsing + ingested_at: Time.current.iso8601[...-1], + source: "http_ruby_high_usage" + }.to_json + ) + + {transaction_id: event_params[:transaction_id]} + end + + def precise_total_amount_cents(precise_total_amount_cents) + BigDecimal(precise_total_amount_cents.presence || "0.0").to_s + rescue ArgumentError + "0.0" + end + + def parsed_timestamp(event_timestamp) + Time.zone.at(event_timestamp ? Float(event_timestamp) : timestamp).to_i + rescue ArgumentError + timestamp.to_i + end + end +end diff --git a/app/services/events/high_usage_create_service.rb b/app/services/events/high_usage_create_service.rb new file mode 100644 index 00000000000..f0b5c4110d4 --- /dev/null +++ b/app/services/events/high_usage_create_service.rb @@ -0,0 +1,50 @@ +# frozen_string_literal: true + +module Events + class HighUsageCreateService < BaseService + Result = BaseResult[:transaction_id] + + def initialize(organization:, params:, timestamp:) + @organization = organization + @params = params + @timestamp = timestamp + super + end + + def call + return result.not_allowed_failure!(code: "missing_configuration") if ENV["LAGO_KAFKA_BOOTSTRAP_SERVERS"].blank? + return result.not_allowed_failure!(code: "missing_configuration") if ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"].blank? + + Karafka.producer.produce_async( + topic: ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"], + key: "#{organization.id}-#{params[:external_subscription_id]}", + payload: { + organization_id: organization.id, + external_subscription_id: params[:external_subscription_id], + transaction_id: params[:transaction_id], + timestamp: params[:timestamp].presence || timestamp.to_i, + code: params[:code], + # NOTE: Default value to 0.0 is required for clickhouse parsing + precise_total_amount_cents:, + properties: params[:properties] || {}, + # NOTE: Removes trailing 'Z' to allow clickhouse parsing + ingested_at: Time.current.iso8601[...-1], + source: "http_ruby_high_usage" + }.to_json + ) + + result.transaction_id = params[:transaction_id] + result + end + + private + + attr_reader :organization, :params, :timestamp + + def precise_total_amount_cents + BigDecimal(params[:precise_total_amount_cents].presence || "0.0").to_s + rescue ArgumentError + "0.0" + end + end +end diff --git a/config/routes.rb b/config/routes.rb index 4ca1aeee0f4..646c2bc0835 100644 --- a/config/routes.rb +++ b/config/routes.rb @@ -1,30 +1,30 @@ # frozen_string_literal: true Rails.application.routes.draw do - mount Sidekiq::Web, at: '/sidekiq' if defined? Sidekiq::Web - mount Karafka::Web::App, at: '/karafka' if ENV['KARAFKA_WEB'] - mount GraphiQL::Rails::Engine, at: '/graphiql', graphql_path: '/graphql' if Rails.env.development? + mount Sidekiq::Web, at: "/sidekiq" if defined? Sidekiq::Web + mount Karafka::Web::App, at: "/karafka" if ENV["KARAFKA_WEB"] + mount GraphiQL::Rails::Engine, at: "/graphiql", graphql_path: "/graphql" if Rails.env.development? - post '/graphql', to: 'graphql#execute' + post "/graphql", to: "graphql#execute" # Health Check status - get '/health', to: 'application#health' + get "/health", to: "application#health" namespace :api do namespace :v1 do namespace :analytics do - get :gross_revenue, to: 'gross_revenues#index', as: :gross_revenue - get :invoiced_usage, to: 'invoiced_usages#index', as: :invoiced_usage - get :invoice_collection, to: 'invoice_collections#index', as: :invoice_collection - get :mrr, to: 'mrrs#index', as: :mrr - get :overdue_balance, to: 'overdue_balances#index', as: :overdue_balance + get :gross_revenue, to: "gross_revenues#index", as: :gross_revenue + get :invoiced_usage, to: "invoiced_usages#index", as: :invoiced_usage + get :invoice_collection, to: "invoice_collections#index", as: :invoice_collection + get :mrr, to: "mrrs#index", as: :mrr + get :overdue_balance, to: "overdue_balances#index", as: :overdue_balance end resources :customers, param: :external_id, only: %i[create index show destroy] do get :portal_url - get :current_usage, to: 'customers/usage#current' - get :past_usage, to: 'customers/usage#past' + get :current_usage, to: "customers/usage#current" + get :past_usage, to: "customers/usage#past" post :checkout_url @@ -36,7 +36,7 @@ resources :subscriptions, only: %i[create update show index], param: :external_id do resource :lifetime_usage, only: %i[show update] end - delete '/subscriptions/:external_id', to: 'subscriptions#terminate', as: :terminate + delete "/subscriptions/:external_id", to: "subscriptions#terminate", as: :terminate resources :add_ons, param: :code, code: /.*/ resources :billable_metrics, param: :code, code: /.*/ do @@ -50,6 +50,8 @@ post :estimate, on: :collection end resources :events, only: %i[create show index] do + post :high_usage, on: :collection + post :batch_high_usage, on: :collection post :estimate_fees, on: :collection post :estimate_instant_fees, on: :collection post :batch_estimate_instant_fees, on: :collection @@ -73,14 +75,14 @@ resources :plans, param: :code, code: /.*/ resources :taxes, param: :code, code: /.*/ resources :wallet_transactions, only: %i[create show] - get '/wallets/:id/wallet_transactions', to: 'wallet_transactions#index' + get "/wallets/:id/wallet_transactions", to: "wallet_transactions#index" resources :wallets, only: %i[create update show index] - delete '/wallets/:id', to: 'wallets#terminate' - post '/events/batch', to: 'events#batch' + delete "/wallets/:id", to: "wallets#terminate" + post "/events/batch", to: "events#batch" - get '/organizations', to: 'organizations#show' - put '/organizations', to: 'organizations#update' - get '/organizations/grpc_token', to: 'organizations#grpc_token' + get "/organizations", to: "organizations#show" + put "/organizations", to: "organizations#update" + get "/organizations/grpc_token", to: "organizations#grpc_token" resources :webhook_endpoints, only: %i[create index show destroy update] resources :webhooks, only: %i[] do @@ -91,10 +93,10 @@ end resources :webhooks, only: [] do - post 'stripe/:organization_id', to: 'webhooks#stripe', on: :collection, as: :stripe - post 'cashfree/:organization_id', to: 'webhooks#cashfree', on: :collection, as: :cashfree - post 'gocardless/:organization_id', to: 'webhooks#gocardless', on: :collection, as: :gocardless - post 'adyen/:organization_id', to: 'webhooks#adyen', on: :collection, as: :adyen + post "stripe/:organization_id", to: "webhooks#stripe", on: :collection, as: :stripe + post "cashfree/:organization_id", to: "webhooks#cashfree", on: :collection, as: :cashfree + post "gocardless/:organization_id", to: "webhooks#gocardless", on: :collection, as: :gocardless + post "adyen/:organization_id", to: "webhooks#adyen", on: :collection, as: :adyen end namespace :admin do @@ -107,13 +109,13 @@ if Rails.env.development? namespace :dev_tools do - get '/invoices/:id', to: 'invoices#show' + get "/invoices/:id", to: "invoices#show" end end - match '*unmatched' => 'application#not_found', + match "*unmatched" => "application#not_found", :via => %i[get post put delete patch], :constraints => lambda { |req| - req.path.exclude?('rails/active_storage') + req.path.exclude?("rails/active_storage") } end diff --git a/spec/requests/api/v1/events_controller_spec.rb b/spec/requests/api/v1/events_controller_spec.rb index f4eaddda2ef..7523e417b0d 100644 --- a/spec/requests/api/v1/events_controller_spec.rb +++ b/spec/requests/api/v1/events_controller_spec.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -require 'rails_helper' +require "rails_helper" RSpec.describe Api::V1::EventsController, type: :request do let(:organization) { create(:organization) } @@ -9,9 +9,9 @@ let(:plan) { create(:plan, organization:) } let!(:subscription) { create(:subscription, customer:, organization:, plan:, started_at: 1.month.ago) } - describe 'POST /api/v1/events' do + describe "POST /api/v1/events" do subject do - post_with_token(organization, '/api/v1/events', event: create_params) + post_with_token(organization, "/api/v1/events", event: create_params) end let(:create_params) do @@ -20,23 +20,23 @@ transaction_id: SecureRandom.uuid, external_subscription_id: subscription.external_id, timestamp: Time.current.to_i, - precise_total_amount_cents: '123.45', + precise_total_amount_cents: "123.45", properties: { - foo: 'bar' + foo: "bar" } } end - include_examples 'requires API permission', 'event', 'write' + include_examples "requires API permission", "event", "write" - it 'returns a success' do + it "returns a success" do expect { subject }.to change(Event, :count).by(1) expect(response).to have_http_status(:success) expect(json[:event][:external_subscription_id]).to eq(subscription.external_id) end - context 'with duplicated transaction_id' do + context "with duplicated transaction_id" do let!(:event) { create(:event, organization:, external_subscription_id: subscription.external_id) } let(:create_params) do @@ -45,35 +45,35 @@ transaction_id: event.transaction_id, external_subscription_id: subscription.external_id, timestamp: Time.current.to_i, - precise_total_amount_cents: '123.45', + precise_total_amount_cents: "123.45", properties: { - foo: 'bar' + foo: "bar" } } end - it 'returns a not found response' do + it "returns a not found response" do expect { subject }.not_to change(Event, :count) expect(response).to have_http_status(:unprocessable_entity) end end - context 'when sending wrong format for the timestamp' do + context "when sending wrong format for the timestamp" do let(:create_params) do { code: metric.code, transaction_id: SecureRandom.uuid, external_subscription_id: subscription.external_id, timestamp: Time.current.to_s, - precise_total_amount_cents: '123.45', + precise_total_amount_cents: "123.45", properties: { - foo: 'bar' + foo: "bar" } } end - it 'returns a not found response' do + it "returns a not found response" do expect { subject }.not_to change(Event, :count) expect(response).to have_http_status(:unprocessable_entity) @@ -90,8 +90,8 @@ external_subscription_id: subscription.external_id, timestamp: Time.current.to_i, properties: { - a: '1', - b: '2' + a: "1", + b: "2" } } end @@ -111,7 +111,7 @@ external_subscription_id: subscription.external_id, timestamp: Time.current.to_i, properties: { - a: '1' + a: "1" } } end @@ -125,9 +125,58 @@ end end - describe 'POST /api/v1/events/batch' do + describe "POST /api/v1/events/high_usage" do subject do - post_with_token(organization, '/api/v1/events/batch', events: batch_params) + post_with_token(organization, "/api/v1/events/high_usage", event: create_params) + end + + let(:create_params) do + { + code: metric.code, + transaction_id: SecureRandom.uuid, + external_subscription_id: subscription.external_id, + timestamp: Time.current.to_i, + precise_total_amount_cents: "123.45", + properties: { + foo: "bar" + } + } + end + + include_examples "requires API permission", "event", "write" + + context "without kafka configuration" do + before do + ENV["LAGO_KAFKA_BOOTSTRAP_SERVERS"] = nil + ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"] = nil + end + + it "returns a not allowed error" do + subject + + expect(response).to have_http_status(:method_not_allowed) + expect(json[:code]).to eq("missing_configuration") + end + end + + context "with kafka configuration" do + before do + ENV["LAGO_KAFKA_BOOTSTRAP_SERVERS"] = "kafka" + ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"] = "raw_events" + end + + it "returns a success" do + subject + + expect(response).to have_http_status(:success) + expect(json[:event][:transaction_id]).to eq(create_params[:transaction_id]) + end + end + end + + describe "POST /api/v1/events/batch" do + subject do + post_with_token(organization, "/api/v1/events/batch", events: batch_params) end let(:batch_params) do @@ -137,24 +186,24 @@ transaction_id: SecureRandom.uuid, external_subscription_id: subscription.external_id, timestamp: Time.current.to_i, - precise_total_amount_cents: '123.45', + precise_total_amount_cents: "123.45", properties: { - foo: 'bar' + foo: "bar" } } ] end - include_examples 'requires API permission', 'event', 'write' + include_examples "requires API permission", "event", "write" - it 'returns a success' do + it "returns a success" do expect { subject }.to change(Event, :count).by(1) expect(response).to have_http_status(:ok) expect(json[:events].first[:external_subscription_id]).to eq(subscription.external_id) end - context 'with invalid timestamp for one event' do + context "with invalid timestamp for one event" do let(:batch_params) do [ { @@ -162,9 +211,9 @@ transaction_id: SecureRandom.uuid, external_subscription_id: subscription.external_id, timestamp: Time.current.to_i, - precise_total_amount_cents: '123.45', + precise_total_amount_cents: "123.45", properties: { - foo: 'bar' + foo: "bar" } }, { @@ -172,19 +221,19 @@ transaction_id: SecureRandom.uuid, external_subscription_id: subscription.external_id, timestamp: Time.current.to_s, - precise_total_amount_cents: '123.45', + precise_total_amount_cents: "123.45", properties: { - foo: 'bar' + foo: "bar" } } ] end - it 'returns an error indicating which event contained which error' do + it "returns an error indicating which event contained which error" do expect { subject }.not_to change(Event, :count) expect(response).to have_http_status(:unprocessable_entity) - expect(json[:error_details]).to eq({'1': {timestamp: ["invalid_format"]}}) + expect(json[:error_details]).to eq({"1": {timestamp: ["invalid_format"]}}) end end @@ -198,8 +247,8 @@ external_subscription_id: subscription.external_id, timestamp: Time.current.to_i, properties: { - a: '1', - b: '2' + a: "1", + b: "2" } } end @@ -219,7 +268,7 @@ external_subscription_id: subscription.external_id, timestamp: Time.current.to_i, properties: { - a: '1' + a: "1" } } end @@ -227,23 +276,74 @@ it "fails with a 422 error" do expect { subject }.not_to change(Event, :count) expect(response).to have_http_status(:unprocessable_entity) - expect(json[:error_details]).to include('0': "expression_evaluation_failed: Variable: b not found") + expect(json[:error_details]).to include("0": "expression_evaluation_failed: Variable: b not found") end end end end - describe 'GET /api/v1/events' do - subject { get_with_token(organization, '/api/v1/events', params) } + describe "POST /api/v1/events/batch_high_usage" do + subject do + post_with_token(organization, "/api/v1/events/batch_high_usage", events: batch_params) + end + + let(:batch_params) do + [ + { + code: metric.code, + transaction_id: SecureRandom.uuid, + external_subscription_id: subscription.external_id, + timestamp: Time.current.to_i, + precise_total_amount_cents: "123.45", + properties: { + foo: "bar" + } + } + ] + end + + include_examples "requires API permission", "event", "write" + + context "without kafka configuration" do + before do + ENV["LAGO_KAFKA_BOOTSTRAP_SERVERS"] = nil + ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"] = nil + end + + it "returns a not allowed error" do + subject + + expect(response).to have_http_status(:method_not_allowed) + expect(json[:code]).to eq("missing_configuration") + end + end + + context "with kafka configuration" do + before do + ENV["LAGO_KAFKA_BOOTSTRAP_SERVERS"] = "kafka" + ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"] = "raw_events" + end + + it "returns a success" do + subject + + expect(response).to have_http_status(:ok) + expect(json[:events].first[:transaction_id]).to eq(batch_params.first[:transaction_id]) + end + end + end + + describe "GET /api/v1/events" do + subject { get_with_token(organization, "/api/v1/events", params) } let!(:event) { create(:event, timestamp: 5.days.ago.to_date, organization:) } - context 'without params' do + context "without params" do let(:params) { {} } - include_examples 'requires API permission', 'event', 'read' + include_examples "requires API permission", "event", "read" - it 'returns events' do + it "returns events" do subject expect(response).to have_http_status(:ok) @@ -252,12 +352,12 @@ end end - context 'with pagination' do + context "with pagination" do let(:params) { {page: 1, per_page: 1} } before { create(:event, organization:) } - it 'returns events with correct meta data' do + it "returns events with correct meta data" do subject expect(response).to have_http_status(:ok) @@ -271,12 +371,12 @@ end end - context 'with code' do + context "with code" do let(:params) { {code: event.code} } before { create(:event, organization:) } - it 'returns events' do + it "returns events" do subject expect(response).to have_http_status(:ok) @@ -285,12 +385,12 @@ end end - context 'with external subscription id' do + context "with external subscription id" do let(:params) { {external_subscription_id: event.external_subscription_id} } before { create(:event, organization:) } - it 'returns events' do + it "returns events" do subject expect(response).to have_http_status(:ok) @@ -299,7 +399,7 @@ end end - context 'with timestamp' do + context "with timestamp" do let(:params) do {timestamp_from: 2.days.ago.to_date, timestamp_to: Date.tomorrow.to_date} end @@ -308,7 +408,7 @@ before { create(:event, timestamp: 3.days.ago.to_date, organization:) } - it 'returns events with correct timestamp' do + it "returns events with correct timestamp" do subject expect(response).to have_http_status(:ok) @@ -318,15 +418,15 @@ end end - describe 'GET /api/v1/events/:id' do + describe "GET /api/v1/events/:id" do subject { get_with_token(organization, "/api/v1/events/#{transaction_id}") } let(:event) { create(:event, organization_id: organization.id) } let(:transaction_id) { event.transaction_id } - include_examples 'requires API permission', 'event', 'read' + include_examples "requires API permission", "event", "read" - it 'returns an event' do + it "returns an event" do subject expect(response).to have_http_status(:ok) @@ -339,25 +439,25 @@ expect(json[:event][:lago_customer_id]).to eq event.customer_id end - context 'with a non-existing transaction_id' do + context "with a non-existing transaction_id" do let(:transaction_id) { SecureRandom.uuid } - it 'returns not found' do + it "returns not found" do subject expect(response).to have_http_status(:not_found) end end - context 'when event is deleted' do + context "when event is deleted" do before { event.discard! } - it 'returns not found' do + it "returns not found" do subject expect(response).to have_http_status(:not_found) end end - context 'with clickhouse', clickhouse: true do + context "with clickhouse", clickhouse: true do let(:event) do Clickhouse::EventsRaw.create!( transaction_id: SecureRandom.uuid, @@ -371,7 +471,7 @@ before { organization.update!(clickhouse_events_store: true) } - it 'returns an event' do + it "returns an event" do subject expect(response).to have_http_status(:ok) @@ -386,9 +486,9 @@ end end - describe 'POST /api/v1/events/estimate_fees' do + describe "POST /api/v1/events/estimate_fees" do subject do - post_with_token(organization, '/api/v1/events/estimate_fees', event: event_params) + post_with_token(organization, "/api/v1/events/estimate_fees", event: event_params) end let(:charge) { create(:standard_charge, :pay_in_advance, plan:, billable_metric: metric) } @@ -399,9 +499,9 @@ code: metric.code, external_subscription_id: subscription.external_id, transaction_id: SecureRandom.uuid, - precise_total_amount_cents: '123.45', + precise_total_amount_cents: "123.45", properties: { - foo: 'bar' + foo: "bar" } } end @@ -411,9 +511,9 @@ tax end - include_examples 'requires API permission', 'event', 'write' + include_examples "requires API permission", "event", "write" - it 'returns a success' do + it "returns a success" do subject aggregate_failures do @@ -424,35 +524,35 @@ fee = json[:fees].first expect(fee[:lago_id]).to be_nil expect(fee[:lago_group_id]).to be_nil - expect(fee[:item][:type]).to eq('charge') + expect(fee[:item][:type]).to eq("charge") expect(fee[:item][:code]).to eq(metric.code) expect(fee[:item][:name]).to eq(metric.name) expect(fee[:pay_in_advance]).to eq(true) expect(fee[:amount_cents]).to be_an(Integer) - expect(fee[:amount_currency]).to eq('EUR') - expect(fee[:units]).to eq('1.0') + expect(fee[:amount_currency]).to eq("EUR") + expect(fee[:units]).to eq("1.0") expect(fee[:events_count]).to eq(1) end end - context 'with missing customer id' do + context "with missing customer id" do let(:event_params) do { code: metric.code, external_subscription_id: nil, properties: { - foo: 'bar' + foo: "bar" } } end - it 'returns a not found error' do + it "returns a not found error" do subject expect(response).to have_http_status(:not_found) end end - context 'when metric code does not match an pay_in_advance charge' do + context "when metric code does not match an pay_in_advance charge" do let(:charge) { create(:standard_charge, plan:, billable_metric: metric) } let(:event_params) do @@ -460,25 +560,25 @@ code: metric.code, external_subscription_id: subscription.external_id, properties: { - foo: 'bar' + foo: "bar" } } end - it 'returns a validation error' do + it "returns a validation error" do subject expect(response).to have_http_status(:unprocessable_entity) end end end - describe 'POST /api/v1/events/batch_estimate_instant_fees' do + describe "POST /api/v1/events/batch_estimate_instant_fees" do subject do - post_with_token(organization, '/api/v1/events/batch_estimate_instant_fees', events: batch_params) + post_with_token(organization, "/api/v1/events/batch_estimate_instant_fees", events: batch_params) end let(:metric) { create(:sum_billable_metric, organization:) } - let(:charge) { create(:percentage_charge, :pay_in_advance, plan:, billable_metric: metric, properties: {rate: '0.1', fixed_amount: '0'}) } + let(:charge) { create(:percentage_charge, :pay_in_advance, plan:, billable_metric: metric, properties: {rate: "0.1", fixed_amount: "0"}) } let(:event_params) do { @@ -498,9 +598,9 @@ charge end - include_examples 'requires API permission', 'event', 'write' + include_examples "requires API permission", "event", "write" - it 'returns a success' do + it "returns a success" do subject expect(response).to have_http_status(:success) @@ -510,16 +610,16 @@ fee = json[:fees].first expect(fee[:lago_id]).to be_nil expect(fee[:lago_group_id]).to be_nil - expect(fee[:item][:type]).to eq('charge') + expect(fee[:item][:type]).to eq("charge") expect(fee[:item][:code]).to eq(metric.code) expect(fee[:item][:name]).to eq(metric.name) - expect(fee[:amount_cents]).to eq('40.0') - expect(fee[:amount_currency]).to eq('EUR') - expect(fee[:units]).to eq('400.0') + expect(fee[:amount_cents]).to eq("40.0") + expect(fee[:amount_currency]).to eq("EUR") + expect(fee[:units]).to eq("400.0") expect(fee[:events_count]).to eq(1) end - context 'with multiple events' do + context "with multiple events" do let(:event2_params) do { code: metric.code, @@ -534,7 +634,7 @@ let(:batch_params) { [event_params, event2_params] } - it 'returns a success' do + it "returns a success" do subject expect(response).to have_http_status(:success) @@ -545,33 +645,33 @@ expect(fee1[:lago_id]).to be_nil expect(fee1[:lago_group_id]).to be_nil - expect(fee1[:item][:type]).to eq('charge') + expect(fee1[:item][:type]).to eq("charge") expect(fee1[:item][:code]).to eq(metric.code) expect(fee1[:item][:name]).to eq(metric.name) - expect(fee1[:amount_cents]).to eq('40.0') - expect(fee1[:amount_currency]).to eq('EUR') - expect(fee1[:units]).to eq('400.0') + expect(fee1[:amount_cents]).to eq("40.0") + expect(fee1[:amount_currency]).to eq("EUR") + expect(fee1[:units]).to eq("400.0") expect(fee1[:events_count]).to eq(1) expect(fee2[:lago_id]).to be_nil expect(fee2[:lago_group_id]).to be_nil - expect(fee2[:item][:type]).to eq('charge') + expect(fee2[:item][:type]).to eq("charge") expect(fee2[:item][:code]).to eq(metric.code) expect(fee2[:item][:name]).to eq(metric.name) - expect(fee2[:amount_cents]).to eq('30.0') - expect(fee2[:amount_currency]).to eq('EUR') - expect(fee2[:units]).to eq('300.0') + expect(fee2[:amount_cents]).to eq("30.0") + expect(fee2[:amount_currency]).to eq("EUR") + expect(fee2[:units]).to eq("300.0") expect(fee2[:events_count]).to eq(1) end end end - describe 'POST /api/v1/events/estimate_instant_fees' do + describe "POST /api/v1/events/estimate_instant_fees" do subject do - post_with_token(organization, '/api/v1/events/estimate_instant_fees', event: event_params) + post_with_token(organization, "/api/v1/events/estimate_instant_fees", event: event_params) end let(:metric) { create(:sum_billable_metric, organization:) } - let(:charge) { create(:percentage_charge, :pay_in_advance, plan:, billable_metric: metric, properties: {rate: '0.1', fixed_amount: '0'}) } + let(:charge) { create(:percentage_charge, :pay_in_advance, plan:, billable_metric: metric, properties: {rate: "0.1", fixed_amount: "0"}) } let(:event_params) do { @@ -589,9 +689,9 @@ charge end - include_examples 'requires API permission', 'event', 'write' + include_examples "requires API permission", "event", "write" - it 'returns a success' do + it "returns a success" do subject expect(response).to have_http_status(:success) @@ -601,33 +701,33 @@ fee = json[:fees].first expect(fee[:lago_id]).to be_nil expect(fee[:lago_group_id]).to be_nil - expect(fee[:item][:type]).to eq('charge') + expect(fee[:item][:type]).to eq("charge") expect(fee[:item][:code]).to eq(metric.code) expect(fee[:item][:name]).to eq(metric.name) - expect(fee[:amount_cents]).to eq('40.0') - expect(fee[:amount_currency]).to eq('EUR') - expect(fee[:units]).to eq('400.0') + expect(fee[:amount_cents]).to eq("40.0") + expect(fee[:amount_currency]).to eq("EUR") + expect(fee[:units]).to eq("400.0") expect(fee[:events_count]).to eq(1) end - context 'with missing subscription id' do + context "with missing subscription id" do let(:event_params) do { code: metric.code, external_subscription_id: nil, properties: { - foo: 'bar' + foo: "bar" } } end - it 'returns a not found error' do + it "returns a not found error" do subject expect(response).to have_http_status(:not_found) end end - context 'when metric code does not match an percentage charge' do + context "when metric code does not match an percentage charge" do let(:charge) { create(:standard_charge, plan:, billable_metric: metric) } let(:event_params) do @@ -635,12 +735,12 @@ code: metric.code, external_subscription_id: subscription.external_id, properties: { - foo: 'bar' + foo: "bar" } } end - it 'returns a validation error' do + it "returns a validation error" do subject expect(response).to have_http_status(:unprocessable_entity) end diff --git a/spec/services/events/high_usage_batch_create_service_spec.rb b/spec/services/events/high_usage_batch_create_service_spec.rb new file mode 100644 index 00000000000..4cd4d9770e6 --- /dev/null +++ b/spec/services/events/high_usage_batch_create_service_spec.rb @@ -0,0 +1,151 @@ +# frozen_string_literal: true + +require "rails_helper" + +RSpec.describe Events::HighUsageBatchCreateService, type: :service do + subject(:create_batch_service) do + described_class.new( + organization:, + params: events_params, + timestamp: creation_timestamp + ) + end + + let(:organization) { create(:organization) } + + let(:timestamp) { Time.current.to_f } + let(:code) { "sum_agg" } + let(:metadata) { {} } + let(:creation_timestamp) { Time.current.to_f } + let(:precise_total_amount_cents) { "123.34" } + + let(:events_params) do + events = [] + 100.times do + event = { + external_customer_id: SecureRandom.uuid, + external_subscription_id: SecureRandom.uuid, + code:, + transaction_id: SecureRandom.uuid, + precise_total_amount_cents:, + properties: {foo: "bar"}, + timestamp: + } + + events << event + end + events + end + + describe ".call" do + let(:karafka_producer) { instance_double(WaterDrop::Producer) } + + before do + ENV["LAGO_KAFKA_BOOTSTRAP_SERVERS"] = "kafka" + ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"] = "raw_events" + + allow(Karafka).to receive(:producer).and_return(karafka_producer) + allow(karafka_producer).to receive(:produce_async) + end + + after do + ENV["LAGO_KAFKA_BOOTSTRAP_SERVERS"] = nil + ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"] = nil + end + + it "produces the event on kafka" do + expect(create_batch_service.call).to be_success + + expect(karafka_producer).to have_received(:produce_async).exactly(100) + end + + context "when no events are provided" do + let(:events_params) { [] } + + it "returns a no_events error" do + result = nil + + aggregate_failures do + expect { result = create_batch_service.call }.not_to change(Event, :count) + + expect(result).not_to be_success + expect(result.error).to be_a(BaseService::ValidationFailure) + expect(result.error.messages.keys).to include(:events) + expect(result.error.messages[:events]).to include("no_events") + end + end + end + + context "when events count is too big" do + before do + events_params.push( + { + external_customer_id: SecureRandom.uuid, + external_subscription_id: SecureRandom.uuid, + code:, + transaction_id: SecureRandom.uuid, + properties: {foo: "bar"}, + timestamp: + } + ) + end + + it "returns a too big error" do + result = nil + + aggregate_failures do + expect { result = create_batch_service.call }.not_to change(Event, :count) + + expect(result).not_to be_success + expect(result.error).to be_a(BaseService::ValidationFailure) + expect(result.error.messages.keys).to include(:events) + expect(result.error.messages[:events]).to include("too_many_events") + end + end + end + + context "when timestamp is not present in the payload" do + let(:timestamp) { nil } + + let(:events_params) do + [ + { + external_customer_id: SecureRandom.uuid, + external_subscription_id: SecureRandom.uuid, + code:, + transaction_id: SecureRandom.uuid, + properties: {foo: "bar"}, + timestamp: + } + ] + end + + it "creates an event by setting the timestamp to the current datetime" do + travel_to(Time.current) do + result = create_batch_service.call + expect(result).to be_success + expect(result.transactions).to eq([{transaction_id: events_params.first[:transaction_id]}]) + + params = events_params.first + + expect(karafka_producer).to have_received(:produce_async) + .with( + topic: "raw_events", + key: "#{organization.id}-#{params[:external_subscription_id]}", + payload: { + organization_id: organization.id, + external_subscription_id: params[:external_subscription_id], + transaction_id: params[:transaction_id], + timestamp: creation_timestamp.to_i, + code: params[:code], + precise_total_amount_cents: "0.0", + properties: {foo: "bar"}, + ingested_at: Time.current.iso8601[...-1], + source: "http_ruby_high_usage" + }.to_json + ) + end + end + end + end +end diff --git a/spec/services/events/high_usage_create_service_spec.rb b/spec/services/events/high_usage_create_service_spec.rb new file mode 100644 index 00000000000..f2b29e14aaa --- /dev/null +++ b/spec/services/events/high_usage_create_service_spec.rb @@ -0,0 +1,156 @@ +# frozen_string_literal: true + +require "rails_helper" + +RSpec.describe Events::HighUsageCreateService, type: :service do + subject(:create_service) do + described_class.new( + organization:, + params:, + timestamp: creation_timestamp + ) + end + + let(:organization) { create(:organization) } + + let(:code) { "sum_agg" } + let(:external_subscription_id) { SecureRandom.uuid } + let(:timestamp) { Time.current.to_f } + let(:transaction_id) { SecureRandom.uuid } + let(:precise_total_amount_cents) { nil } + + let(:creation_timestamp) { Time.current.to_f } + + let(:params) do + { + external_subscription_id:, + code:, + transaction_id:, + precise_total_amount_cents:, + properties: {foo: "bar"}, + timestamp: + } + end + + describe "#call" do + let(:karafka_producer) { instance_double(WaterDrop::Producer) } + + before do + ENV["LAGO_KAFKA_BOOTSTRAP_SERVERS"] = "kafka" + ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"] = "raw_events" + + allow(Karafka).to receive(:producer).and_return(karafka_producer) + allow(karafka_producer).to receive(:produce_async) + end + + it "produces the event on kafka" do + travel_to(Time.current) do + result = create_service.call + expect(result).to be_success + expect(result.transaction_id).to eq(transaction_id) + + expect(karafka_producer).to have_received(:produce_async) + .with( + topic: "raw_events", + key: "#{organization.id}-#{external_subscription_id}", + payload: { + organization_id: organization.id, + external_subscription_id:, + transaction_id:, + timestamp: timestamp, + code:, + precise_total_amount_cents: "0.0", + properties: {foo: "bar"}, + ingested_at: Time.current.iso8601[...-1], + source: "http_ruby_high_usage" + }.to_json + ) + end + end + + context "with a precise_total_amount_cents" do + let(:precise_total_amount_cents) { "123.45" } + + it "produces the event on kafka" do + travel_to(Time.current) do + result = create_service.call + expect(result).to be_success + expect(result.transaction_id).to eq(transaction_id) + + expect(karafka_producer).to have_received(:produce_async) + .with( + topic: "raw_events", + key: "#{organization.id}-#{external_subscription_id}", + payload: { + organization_id: organization.id, + external_subscription_id:, + transaction_id:, + timestamp: timestamp, + code:, + precise_total_amount_cents: "123.45", + properties: {foo: "bar"}, + ingested_at: Time.current.iso8601[...-1], + source: "http_ruby_high_usage" + }.to_json + ) + end + end + + context "when precise_total_amount_cents is not a valid decimal value" do + let(:precise_total_amount_cents) { "asdfa" } + + it "produces the event on kafka" do + travel_to(Time.current) do + result = create_service.call + expect(result).to be_success + expect(result.transaction_id).to eq(transaction_id) + + expect(karafka_producer).to have_received(:produce_async) + .with( + topic: "raw_events", + key: "#{organization.id}-#{external_subscription_id}", + payload: { + organization_id: organization.id, + external_subscription_id:, + transaction_id:, + timestamp: timestamp, + code:, + precise_total_amount_cents: "0.0", + properties: {foo: "bar"}, + ingested_at: Time.current.iso8601[...-1], + source: "http_ruby_high_usage" + }.to_json + ) + end + end + end + end + + context "when timestamp is not present in the payload" do + let(:timestamp) { nil } + + it "produces the event on kafka" do + result = create_service.call + expect(result).to be_success + expect(result.transaction_id).to eq(transaction_id) + + expect(karafka_producer).to have_received(:produce_async) + .with( + topic: "raw_events", + key: "#{organization.id}-#{external_subscription_id}", + payload: { + organization_id: organization.id, + external_subscription_id:, + transaction_id:, + timestamp: creation_timestamp.to_i, + code:, + precise_total_amount_cents: "0.0", + properties: {foo: "bar"}, + ingested_at: Time.current.iso8601[...-1], + source: "http_ruby_high_usage" + }.to_json + ) + end + end + end +end