Skip to content

Commit

Permalink
feat(event): Add high usage enpoint to API
Browse files Browse the repository at this point in the history
  • Loading branch information
vincent-pochet committed Feb 11, 2025
1 parent 815a361 commit f4dd4e0
Show file tree
Hide file tree
Showing 10 changed files with 730 additions and 165 deletions.
39 changes: 21 additions & 18 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ AllCops:
NewCops: disable
DisplayStyleGuide: true
Exclude:
- 'bin/**/*'
- 'db/schema.rb'
- 'db/*_schema.rb'
- 'storage/**/*'
- 'tmp/**/*'
- 'coverage/**/*'
- 'log/**/*'
- "bin/**/*"
- "db/schema.rb"
- "db/*_schema.rb"
- "storage/**/*"
- "tmp/**/*"
- "coverage/**/*"
- "log/**/*"

# TODO: Enable when we have time to fix all the offenses
Style/StringLiterals:
Expand All @@ -36,27 +36,30 @@ Style/FrozenStringLiteralComment:
Enabled: true
SafeAutoCorrect: true

Style/QuotedSymbols:
Enabled: false

Performance/CaseWhenSplat:
Enabled: true

Rails/InverseOf:
Description: 'Checks for associations where the inverse cannot be determined automatically.'
Description: "Checks for associations where the inverse cannot be determined automatically."
Enabled: false

Rails/HttpStatus:
Description: 'Enforces use of symbolic or numeric value to describe HTTP status.'
Description: "Enforces use of symbolic or numeric value to describe HTTP status."
Enabled: false

Rails/HasManyOrHasOneDependent:
Description: 'Forces a "dependent" options for has_one and has_many rails relations.'
Enabled: false

RSpec/ExampleLength:
Description: 'Checks for long examples.'
Description: "Checks for long examples."
Enabled: false

RSpec/MultipleExpectations:
Description: 'Checks if examples contain too many expect calls.'
Description: "Checks if examples contain too many expect calls."
Enabled: false

RSpec/MultipleMemoizedHelpers:
Expand Down Expand Up @@ -84,10 +87,10 @@ GraphQL/ExtractType:

GraphQL/ExtractInputType:
Exclude:
- 'app/graphql/mutations/applied_coupons/create.rb'
- 'app/graphql/mutations/credit_notes/create.rb'
- 'app/graphql/mutations/invites/accept.rb'
- 'app/graphql/mutations/plans/create.rb'
- 'app/graphql/mutations/plans/update.rb'
- 'app/graphql/mutations/register_user.rb'
- 'app/graphql/mutations/wallet_transactions/create.rb'
- "app/graphql/mutations/applied_coupons/create.rb"
- "app/graphql/mutations/credit_notes/create.rb"
- "app/graphql/mutations/invites/accept.rb"
- "app/graphql/mutations/plans/create.rb"
- "app/graphql/mutations/plans/update.rb"
- "app/graphql/mutations/register_user.rb"
- "app/graphql/mutations/wallet_transactions/create.rb"
36 changes: 35 additions & 1 deletion app/controllers/api/v1/events_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@
module Api
module V1
class EventsController < Api::BaseController
ACTIONS_WITH_CACHED_API_KEY = %i[create batch estimate_instant_fees batch_estimate_instant_fees].freeze
ACTIONS_WITH_CACHED_API_KEY = %i[
create
high_usage
batch
estimate_instant_fees
batch_estimate_instant_fees
].freeze

def create
result = ::Events::CreateService.call(
Expand All @@ -25,6 +31,20 @@ def create
end
end

def high_usage
result = ::Events::HighUsageCreateService.call(
organization: current_organization,
params: create_params,
timestamp: Time.current.to_f
)

if result.success?
render(json: {event: {transaction_id: result.transaction_id}})
else
render_error_response(result)
end
end

def batch
result = ::Events::CreateBatchService.call(
organization: current_organization,
Expand All @@ -46,6 +66,20 @@ def batch
end
end

def batch_high_usage
result = ::Events::HighUsageBatchCreateService.call(
organization: current_organization,
params: batch_params[:events],
timestamp: Time.current.to_f
)

if result.success?
render(json: {events: result.transactions})
else
render_error_response(result)
end
end

def show
event_scope = current_organization.clickhouse_events_store? ? Clickhouse::EventsRaw : Event
event = event_scope.find_by(
Expand Down
14 changes: 7 additions & 7 deletions app/services/events/create_batch_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ def initialize(organization:, events_params:, timestamp:, metadata:)

def call
if events_params.blank?
return result.single_validation_failure!(error_code: 'no_events', field: :events)
return result.single_validation_failure!(error_code: "no_events", field: :events)
end

if events_params.count > MAX_LENGTH
return result.single_validation_failure!(error_code: 'too_many_events', field: :events)
return result.single_validation_failure!(error_code: "too_many_events", field: :events)
end

validate_events
Expand Down Expand Up @@ -56,7 +56,7 @@ def validate_events
result.events.push(event)
result.errors[index] = event.errors.messages unless event.valid?
rescue ArgumentError
result.errors = result.errors.merge({index => {timestamp: ['invalid_format']}})
result.errors = result.errors.merge({index => {timestamp: ["invalid_format"]}})
end
end

Expand All @@ -72,11 +72,11 @@ def post_validate_events
end

def produce_kafka_event(event)
return if ENV['LAGO_KAFKA_BOOTSTRAP_SERVERS'].blank?
return if ENV['LAGO_KAFKA_RAW_EVENTS_TOPIC'].blank?
return if ENV["LAGO_KAFKA_BOOTSTRAP_SERVERS"].blank?
return if ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"].blank?

Karafka.producer.produce_async(
topic: ENV['LAGO_KAFKA_RAW_EVENTS_TOPIC'],
topic: ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"],
key: "#{organization.id}-#{event.external_subscription_id}",
payload: {
organization_id: organization.id,
Expand All @@ -88,7 +88,7 @@ def produce_kafka_event(event)
properties: event.properties,
ingested_at: Time.zone.now.iso8601[...-1],
precise_total_amount_cents: event.precise_total_amount_cents.present? ? event.precise_total_amount_cents.to_s : "0.0",
source: 'http_ruby'
source: "http_ruby"
}.to_json
)
end
Expand Down
13 changes: 6 additions & 7 deletions app/services/events/create_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,24 @@ def call
rescue ActiveRecord::RecordInvalid => e
result.record_validation_failure!(record: e.record)
rescue ActiveRecord::RecordNotUnique
result.single_validation_failure!(field: :transaction_id, error_code: 'value_already_exist')
result.single_validation_failure!(field: :transaction_id, error_code: "value_already_exist")
rescue ArgumentError
result.single_validation_failure!(field: :timestamp, error_code: 'invalid_format')
result.single_validation_failure!(field: :timestamp, error_code: "invalid_format")
end

private

attr_reader :organization, :params, :timestamp, :metadata

def produce_kafka_event(event)
return if ENV['LAGO_KAFKA_BOOTSTRAP_SERVERS'].blank?
return if ENV['LAGO_KAFKA_RAW_EVENTS_TOPIC'].blank?
return if ENV["LAGO_KAFKA_BOOTSTRAP_SERVERS"].blank?
return if ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"].blank?

Karafka.producer.produce_async(
topic: ENV['LAGO_KAFKA_RAW_EVENTS_TOPIC'],
topic: ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"],
key: "#{organization.id}-#{event.external_subscription_id}",
payload: {
organization_id: organization.id,
external_customer_id: event.external_customer_id,
external_subscription_id: event.external_subscription_id,
transaction_id: event.transaction_id,
# NOTE: Removes trailing 'Z' to allow clickhouse parsing
Expand All @@ -63,7 +62,7 @@ def produce_kafka_event(event)
precise_total_amount_cents: event.precise_total_amount_cents.present? ? event.precise_total_amount_cents.to_s : "0.0",
properties: event.properties,
ingested_at: Time.zone.now.iso8601[...-1],
source: 'http_ruby'
source: "http_ruby"
}.to_json
)
end
Expand Down
70 changes: 70 additions & 0 deletions app/services/events/high_usage_batch_create_service.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# frozen_string_literal: true

module Events
class HighUsageBatchCreateService < BaseService
Result = BaseResult[:transactions]

MAX_LENGTH = ENV.fetch("LAGO_EVENTS_BATCH_MAX_LENGTH", 100).to_i

def initialize(organization:, params:, timestamp:)
@organization = organization
@params = params
@timestamp = timestamp
super
end

def call
return result.not_allowed_failure!(code: "missing_configuration") if ENV["LAGO_KAFKA_BOOTSTRAP_SERVERS"].blank?
return result.not_allowed_failure!(code: "missing_configuration") if ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"].blank?

if params.blank?
return result.single_validation_failure!(error_code: "no_events", field: :events)
end

if params.count > MAX_LENGTH
return result.single_validation_failure!(error_code: "too_many_events", field: :events)
end

result.transactions = params.map { process_event(_1) }
result
end

private

attr_reader :organization, :params, :timestamp

def process_event(event_params)
Karafka.producer.produce_async(
topic: ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"],
key: "#{organization.id}-#{event_params[:external_subscription_id]}",
payload: {
organization_id: organization.id,
external_subscription_id: event_params[:external_subscription_id],
transaction_id: event_params[:transaction_id],
timestamp: parsed_timestamp(event_params[:timestamp]),
code: event_params[:code],
# NOTE: Default value to 0.0 is required for clickhouse parsing
precise_total_amount_cents: precise_total_amount_cents(event_params[:precise_total_amount_cents]),
properties: event_params[:properties] || {},
# NOTE: Removes trailing 'Z' to allow clickhouse parsing
ingested_at: Time.current.iso8601[...-1],
source: "http_ruby_high_usage"
}.to_json
)

{transaction_id: event_params[:transaction_id]}
end

def precise_total_amount_cents(precise_total_amount_cents)
BigDecimal(precise_total_amount_cents.presence || "0.0").to_s
rescue ArgumentError
"0.0"
end

def parsed_timestamp(event_timestamp)
Time.zone.at(event_timestamp ? Float(event_timestamp) : timestamp).to_i
rescue ArgumentError
timestamp.to_i
end
end
end
50 changes: 50 additions & 0 deletions app/services/events/high_usage_create_service.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# frozen_string_literal: true

module Events
class HighUsageCreateService < BaseService
Result = BaseResult[:transaction_id]

def initialize(organization:, params:, timestamp:)
@organization = organization
@params = params
@timestamp = timestamp
super
end

def call
return result.not_allowed_failure!(code: "missing_configuration") if ENV["LAGO_KAFKA_BOOTSTRAP_SERVERS"].blank?
return result.not_allowed_failure!(code: "missing_configuration") if ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"].blank?

Karafka.producer.produce_async(
topic: ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"],
key: "#{organization.id}-#{params[:external_subscription_id]}",
payload: {
organization_id: organization.id,
external_subscription_id: params[:external_subscription_id],
transaction_id: params[:transaction_id],
timestamp: params[:timestamp].presence || timestamp.to_i,
code: params[:code],
# NOTE: Default value to 0.0 is required for clickhouse parsing
precise_total_amount_cents:,
properties: params[:properties] || {},
# NOTE: Removes trailing 'Z' to allow clickhouse parsing
ingested_at: Time.current.iso8601[...-1],
source: "http_ruby_high_usage"
}.to_json
)

result.transaction_id = params[:transaction_id]
result
end

private

attr_reader :organization, :params, :timestamp

def precise_total_amount_cents
BigDecimal(params[:precise_total_amount_cents].presence || "0.0").to_s
rescue ArgumentError
"0.0"
end
end
end
Loading

0 comments on commit f4dd4e0

Please sign in to comment.