Skip to content

Commit

Permalink
misc: Produce sync to karafka
Browse files Browse the repository at this point in the history
  • Loading branch information
vincent-pochet committed Feb 11, 2025
1 parent f4dd4e0 commit afb1bd9
Show file tree
Hide file tree
Showing 12 changed files with 418 additions and 345 deletions.
39 changes: 18 additions & 21 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ AllCops:
NewCops: disable
DisplayStyleGuide: true
Exclude:
- "bin/**/*"
- "db/schema.rb"
- "db/*_schema.rb"
- "storage/**/*"
- "tmp/**/*"
- "coverage/**/*"
- "log/**/*"
- 'bin/**/*'
- 'db/schema.rb'
- 'db/*_schema.rb'
- 'storage/**/*'
- 'tmp/**/*'
- 'coverage/**/*'
- 'log/**/*'

# TODO: Enable when we have time to fix all the offenses
Style/StringLiterals:
Expand All @@ -36,30 +36,27 @@ Style/FrozenStringLiteralComment:
Enabled: true
SafeAutoCorrect: true

Style/QuotedSymbols:
Enabled: false

Performance/CaseWhenSplat:
Enabled: true

Rails/InverseOf:
Description: "Checks for associations where the inverse cannot be determined automatically."
Description: 'Checks for associations where the inverse cannot be determined automatically.'
Enabled: false

Rails/HttpStatus:
Description: "Enforces use of symbolic or numeric value to describe HTTP status."
Description: 'Enforces use of symbolic or numeric value to describe HTTP status.'
Enabled: false

Rails/HasManyOrHasOneDependent:
Description: 'Forces a "dependent" options for has_one and has_many rails relations.'
Enabled: false

RSpec/ExampleLength:
Description: "Checks for long examples."
Description: 'Checks for long examples.'
Enabled: false

RSpec/MultipleExpectations:
Description: "Checks if examples contain too many expect calls."
Description: 'Checks if examples contain too many expect calls.'
Enabled: false

RSpec/MultipleMemoizedHelpers:
Expand Down Expand Up @@ -87,10 +84,10 @@ GraphQL/ExtractType:

GraphQL/ExtractInputType:
Exclude:
- "app/graphql/mutations/applied_coupons/create.rb"
- "app/graphql/mutations/credit_notes/create.rb"
- "app/graphql/mutations/invites/accept.rb"
- "app/graphql/mutations/plans/create.rb"
- "app/graphql/mutations/plans/update.rb"
- "app/graphql/mutations/register_user.rb"
- "app/graphql/mutations/wallet_transactions/create.rb"
- 'app/graphql/mutations/applied_coupons/create.rb'
- 'app/graphql/mutations/credit_notes/create.rb'
- 'app/graphql/mutations/invites/accept.rb'
- 'app/graphql/mutations/plans/create.rb'
- 'app/graphql/mutations/plans/update.rb'
- 'app/graphql/mutations/register_user.rb'
- 'app/graphql/mutations/wallet_transactions/create.rb'
36 changes: 1 addition & 35 deletions app/controllers/api/v1/events_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,7 @@
module Api
module V1
class EventsController < Api::BaseController
ACTIONS_WITH_CACHED_API_KEY = %i[
create
high_usage
batch
estimate_instant_fees
batch_estimate_instant_fees
].freeze
ACTIONS_WITH_CACHED_API_KEY = %i[create batch estimate_instant_fees batch_estimate_instant_fees].freeze

def create
result = ::Events::CreateService.call(
Expand All @@ -31,20 +25,6 @@ def create
end
end

def high_usage
result = ::Events::HighUsageCreateService.call(
organization: current_organization,
params: create_params,
timestamp: Time.current.to_f
)

if result.success?
render(json: {event: {transaction_id: result.transaction_id}})
else
render_error_response(result)
end
end

def batch
result = ::Events::CreateBatchService.call(
organization: current_organization,
Expand All @@ -66,20 +46,6 @@ def batch
end
end

def batch_high_usage
result = ::Events::HighUsageBatchCreateService.call(
organization: current_organization,
params: batch_params[:events],
timestamp: Time.current.to_f
)

if result.success?
render(json: {events: result.transactions})
else
render_error_response(result)
end
end

def show
event_scope = current_organization.clickhouse_events_store? ? Clickhouse::EventsRaw : Event
event = event_scope.find_by(
Expand Down
79 changes: 79 additions & 0 deletions app/controllers/api/v2/events_controller.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# frozen_string_literal: true

module Api
module V2
class EventsController < Api::BaseController
# NOTE: This controller is experimental, and might change in the future
# Do not rely on it for production purpose for now

def create
result = ::Events::HighUsageCreateService.call(
organization: current_organization,
params: create_params,
timestamp: Time.current.to_f
)

if result.success?
render(json: {event: {transaction_id: result.transaction_id}})
else
render_error_response(result)
end
end

def batch
result = ::Events::HighUsageBatchCreateService.call(
organization: current_organization,
params: batch_params[:events],
timestamp: Time.current.to_f
)

if result.success?
render(json: {events: result.transactions})
else
render_error_response(result)
end
end

private

def create_params
params
.require(:event)
.permit(
:transaction_id,
:code,
:timestamp,
:external_subscription_id,
:precise_total_amount_cents,
properties: {}
)
end

def batch_params
params
.permit(
events: [
:transaction_id,
:code,
:timestamp,
:external_subscription_id,
:precise_total_amount_cents,
properties: {} # rubocop:disable Style/HashAsLastArrayItem
]
).to_h.deep_symbolize_keys
end

def track_api_key_usage?
action_name&.to_sym != :create
end

def resource_name
"event"
end

def cached_api_key?
true
end
end
end
end
14 changes: 7 additions & 7 deletions app/services/events/create_batch_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ def initialize(organization:, events_params:, timestamp:, metadata:)

def call
if events_params.blank?
return result.single_validation_failure!(error_code: "no_events", field: :events)
return result.single_validation_failure!(error_code: 'no_events', field: :events)
end

if events_params.count > MAX_LENGTH
return result.single_validation_failure!(error_code: "too_many_events", field: :events)
return result.single_validation_failure!(error_code: 'too_many_events', field: :events)
end

validate_events
Expand Down Expand Up @@ -56,7 +56,7 @@ def validate_events
result.events.push(event)
result.errors[index] = event.errors.messages unless event.valid?
rescue ArgumentError
result.errors = result.errors.merge({index => {timestamp: ["invalid_format"]}})
result.errors = result.errors.merge({index => {timestamp: ['invalid_format']}})
end
end

Expand All @@ -72,11 +72,11 @@ def post_validate_events
end

def produce_kafka_event(event)
return if ENV["LAGO_KAFKA_BOOTSTRAP_SERVERS"].blank?
return if ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"].blank?
return if ENV['LAGO_KAFKA_BOOTSTRAP_SERVERS'].blank?
return if ENV['LAGO_KAFKA_RAW_EVENTS_TOPIC'].blank?

Karafka.producer.produce_async(
topic: ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"],
topic: ENV['LAGO_KAFKA_RAW_EVENTS_TOPIC'],
key: "#{organization.id}-#{event.external_subscription_id}",
payload: {
organization_id: organization.id,
Expand All @@ -88,7 +88,7 @@ def produce_kafka_event(event)
properties: event.properties,
ingested_at: Time.zone.now.iso8601[...-1],
precise_total_amount_cents: event.precise_total_amount_cents.present? ? event.precise_total_amount_cents.to_s : "0.0",
source: "http_ruby"
source: 'http_ruby'
}.to_json
)
end
Expand Down
12 changes: 6 additions & 6 deletions app/services/events/create_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,21 @@ def call
rescue ActiveRecord::RecordInvalid => e
result.record_validation_failure!(record: e.record)
rescue ActiveRecord::RecordNotUnique
result.single_validation_failure!(field: :transaction_id, error_code: "value_already_exist")
result.single_validation_failure!(field: :transaction_id, error_code: 'value_already_exist')
rescue ArgumentError
result.single_validation_failure!(field: :timestamp, error_code: "invalid_format")
result.single_validation_failure!(field: :timestamp, error_code: 'invalid_format')
end

private

attr_reader :organization, :params, :timestamp, :metadata

def produce_kafka_event(event)
return if ENV["LAGO_KAFKA_BOOTSTRAP_SERVERS"].blank?
return if ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"].blank?
return if ENV['LAGO_KAFKA_BOOTSTRAP_SERVERS'].blank?
return if ENV['LAGO_KAFKA_RAW_EVENTS_TOPIC'].blank?

Karafka.producer.produce_async(
topic: ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"],
topic: ENV['LAGO_KAFKA_RAW_EVENTS_TOPIC'],
key: "#{organization.id}-#{event.external_subscription_id}",
payload: {
organization_id: organization.id,
Expand All @@ -62,7 +62,7 @@ def produce_kafka_event(event)
precise_total_amount_cents: event.precise_total_amount_cents.present? ? event.precise_total_amount_cents.to_s : "0.0",
properties: event.properties,
ingested_at: Time.zone.now.iso8601[...-1],
source: "http_ruby"
source: 'http_ruby'
}.to_json
)
end
Expand Down
44 changes: 24 additions & 20 deletions app/services/events/high_usage_batch_create_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,38 @@ def call
return result.single_validation_failure!(error_code: "too_many_events", field: :events)
end

result.transactions = params.map { process_event(_1) }
process_events

result.transactions = params.map { {transaction_id: _1[:transaction_id]} }
result
end

private

attr_reader :organization, :params, :timestamp

def process_event(event_params)
Karafka.producer.produce_async(
topic: ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"],
key: "#{organization.id}-#{event_params[:external_subscription_id]}",
payload: {
organization_id: organization.id,
external_subscription_id: event_params[:external_subscription_id],
transaction_id: event_params[:transaction_id],
timestamp: parsed_timestamp(event_params[:timestamp]),
code: event_params[:code],
# NOTE: Default value to 0.0 is required for clickhouse parsing
precise_total_amount_cents: precise_total_amount_cents(event_params[:precise_total_amount_cents]),
properties: event_params[:properties] || {},
# NOTE: Removes trailing 'Z' to allow clickhouse parsing
ingested_at: Time.current.iso8601[...-1],
source: "http_ruby_high_usage"
}.to_json
)
def process_events
payloads = params.map do |event_params|
{
topic: ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"],
key: "#{organization.id}-#{event_params[:external_subscription_id]}",
payload: {
organization_id: organization.id,
external_subscription_id: event_params[:external_subscription_id],
transaction_id: event_params[:transaction_id],
timestamp: parsed_timestamp(event_params[:timestamp]),
code: event_params[:code],
# NOTE: Default value to 0.0 is required for clickhouse parsing
precise_total_amount_cents: precise_total_amount_cents(event_params[:precise_total_amount_cents]),
properties: event_params[:properties] || {},
# NOTE: Removes trailing 'Z' to allow clickhouse parsing
ingested_at: Time.current.iso8601[...-1],
source: "http_ruby_high_usage"
}.to_json
}
end

{transaction_id: event_params[:transaction_id]}
Karafka.producer.produce_many_sync(payloads)
end

def precise_total_amount_cents(precise_total_amount_cents)
Expand Down
2 changes: 1 addition & 1 deletion app/services/events/high_usage_create_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def call
return result.not_allowed_failure!(code: "missing_configuration") if ENV["LAGO_KAFKA_BOOTSTRAP_SERVERS"].blank?
return result.not_allowed_failure!(code: "missing_configuration") if ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"].blank?

Karafka.producer.produce_async(
Karafka.producer.produce_sync(
topic: ENV["LAGO_KAFKA_RAW_EVENTS_TOPIC"],
key: "#{organization.id}-#{params[:external_subscription_id]}",
payload: {
Expand Down
Loading

0 comments on commit afb1bd9

Please sign in to comment.