Skip to content

Commit

Permalink
fix(storage): de-duplicate payments when updating db (#286)
Browse files Browse the repository at this point in the history
Sometimes, we can have two refunds for the same payment
inside the same batch, and postgres does not support it
and return an error 'command cannot affect row a second
time'
  • Loading branch information
paul-nicolas authored Jan 24, 2025
1 parent 4b942f6 commit 1bb1c5b
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 10 deletions.
1 change: 0 additions & 1 deletion internal/connectors/engine/activities/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ func (a Activities) temporalPluginPollingError(ctx context.Context, err error, p
}

func (a Activities) temporalPluginErrorCheck(ctx context.Context, err error, isPeriodic bool) error {

switch {
// Do not retry the following errors
case errors.Is(err, plugins.ErrNotImplemented):
Expand Down
36 changes: 27 additions & 9 deletions internal/storage/payments.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@ type paymentAdjustment struct {
func (s *store) PaymentsUpsert(ctx context.Context, payments []models.Payment) error {
paymentsToInsert := make([]payment, 0, len(payments))
adjustmentsToInsert := make([]paymentAdjustment, 0)
paymentsRefundedSeen := make(map[models.PaymentID]int)
paymentsRefunded := make([]payment, 0)
paymentsInitialAmountToAdjustSeen := make(map[models.PaymentID]int)
paymentsInitialAmountToAdjust := make([]payment, 0)
paymentsCapturedSeen := make(map[models.PaymentID]int)
paymentsCaptured := make([]payment, 0)
for _, p := range payments {
paymentsToInsert = append(paymentsToInsert, fromPaymentModels(p))
Expand All @@ -76,17 +79,32 @@ func (s *store) PaymentsUpsert(ctx context.Context, payments []models.Payment) e
adjustmentsToInsert = append(adjustmentsToInsert, fromPaymentAdjustmentModels(a))
switch a.Status {
case models.PAYMENT_STATUS_AMOUNT_ADJUSTEMENT:
res := fromPaymentModels(p)
res.InitialAmount = a.Amount
paymentsInitialAmountToAdjust = append(paymentsInitialAmountToAdjust, res)
if i, ok := paymentsInitialAmountToAdjustSeen[p.ID]; ok {
paymentsInitialAmountToAdjust[i].InitialAmount = a.Amount
} else {
res := fromPaymentModels(p)
res.InitialAmount = a.Amount
paymentsInitialAmountToAdjust = append(paymentsInitialAmountToAdjust, res)
paymentsInitialAmountToAdjustSeen[p.ID] = len(paymentsInitialAmountToAdjust) - 1
}
case models.PAYMENT_STATUS_REFUNDED:
res := fromPaymentModels(p)
res.Amount = a.Amount
paymentsRefunded = append(paymentsRefunded, res)
if i, ok := paymentsRefundedSeen[p.ID]; ok {
paymentsRefunded[i].Amount.Add(paymentsRefunded[i].Amount, a.Amount)
} else {
res := fromPaymentModels(p)
res.Amount = a.Amount
paymentsRefunded = append(paymentsRefunded, res)
paymentsRefundedSeen[p.ID] = len(paymentsRefunded) - 1
}
case models.PAYMENT_STATUS_CAPTURE, models.PAYMENT_STATUS_REFUND_REVERSED:
res := fromPaymentModels(p)
res.Amount = a.Amount
paymentsCaptured = append(paymentsCaptured, res)
if i, ok := paymentsCapturedSeen[p.ID]; ok {
paymentsCaptured[i].Amount.Add(paymentsCaptured[i].Amount, a.Amount)
} else {
res := fromPaymentModels(p)
res.Amount = a.Amount
paymentsCaptured = append(paymentsCaptured, res)
paymentsCapturedSeen[p.ID] = len(paymentsCaptured) - 1
}
}
}
}
Expand Down
118 changes: 118 additions & 0 deletions internal/storage/payments_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,108 @@ var (
}
)

func defaultPaymentsRefunded() []models.Payment {
defaultAccounts := defaultAccounts()
return []models.Payment{
{
ID: pID1,
ConnectorID: defaultConnector.ID,
Reference: "test1",
CreatedAt: now.Add(-60 * time.Minute).UTC().Time,
Type: models.PAYMENT_TYPE_TRANSFER,
InitialAmount: big.NewInt(100),
Amount: big.NewInt(100),
Asset: "USD/2",
Scheme: models.PAYMENT_SCHEME_OTHER,
SourceAccountID: &defaultAccounts[0].ID,
DestinationAccountID: &defaultAccounts[1].ID,
Metadata: map[string]string{
"key1": "value1",
},
Adjustments: []models.PaymentAdjustment{
{
ID: models.PaymentAdjustmentID{
PaymentID: pID1,
Reference: "test1",
CreatedAt: now.Add(-60 * time.Minute).UTC().Time,
Status: models.PAYMENT_STATUS_SUCCEEDED,
},
Reference: "test1",
CreatedAt: now.Add(-60 * time.Minute).UTC().Time,
Status: models.PAYMENT_STATUS_SUCCEEDED,
Amount: big.NewInt(100),
Asset: pointer.For("USD/2"),
Raw: []byte(`{}`),
},
},
},
{
ID: pID1,
ConnectorID: defaultConnector.ID,
Reference: "test1",
CreatedAt: now.Add(-59 * time.Minute).UTC().Time,
Type: models.PAYMENT_TYPE_TRANSFER,
InitialAmount: big.NewInt(100),
Amount: big.NewInt(100),
Asset: "USD/2",
Scheme: models.PAYMENT_SCHEME_OTHER,
SourceAccountID: &defaultAccounts[0].ID,
DestinationAccountID: &defaultAccounts[1].ID,
Metadata: map[string]string{
"key1": "value1",
},
Adjustments: []models.PaymentAdjustment{
{
ID: models.PaymentAdjustmentID{
PaymentID: pID1,
Reference: "test1",
CreatedAt: now.Add(-59 * time.Minute).UTC().Time,
Status: models.PAYMENT_STATUS_REFUNDED,
},
Reference: "test1",
CreatedAt: now.Add(-59 * time.Minute).UTC().Time,
Status: models.PAYMENT_STATUS_REFUNDED,
Amount: big.NewInt(10),
Asset: pointer.For("USD/2"),
Raw: []byte(`{}`),
},
},
},
{
ID: pID1,
ConnectorID: defaultConnector.ID,
Reference: "test1",
CreatedAt: now.Add(-58 * time.Minute).UTC().Time,
Type: models.PAYMENT_TYPE_TRANSFER,
InitialAmount: big.NewInt(100),
Amount: big.NewInt(100),
Asset: "USD/2",
Scheme: models.PAYMENT_SCHEME_OTHER,
SourceAccountID: &defaultAccounts[0].ID,
DestinationAccountID: &defaultAccounts[1].ID,
Metadata: map[string]string{
"key1": "value1",
},
Adjustments: []models.PaymentAdjustment{
{
ID: models.PaymentAdjustmentID{
PaymentID: pID1,
Reference: "test1",
CreatedAt: now.Add(-58 * time.Minute).UTC().Time,
Status: models.PAYMENT_STATUS_REFUNDED,
},
Reference: "test1",
CreatedAt: now.Add(-58 * time.Minute).UTC().Time,
Status: models.PAYMENT_STATUS_REFUNDED,
Amount: big.NewInt(10),
Asset: pointer.For("USD/2"),
Raw: []byte(`{}`),
},
},
},
}
}

func defaultPayments() []models.Payment {
defaultAccounts := defaultAccounts()
return []models.Payment{
Expand Down Expand Up @@ -472,6 +574,22 @@ func TestPaymentsUpsert(t *testing.T) {
})
}

func TestPaymentsUpsertRefunded(t *testing.T) {
t.Parallel()

ctx := logging.TestingContext()
store := newStore(t)

upsertConnector(t, ctx, store, defaultConnector)
upsertAccounts(t, ctx, store, defaultAccounts())
upsertPayments(t, ctx, store, defaultPaymentsRefunded())

actual, err := store.PaymentsGet(ctx, pID1)
require.NoError(t, err)
// two refunds in the same batch, should be 100 - 10 - 10 = 80
require.Equal(t, big.NewInt(80), actual.Amount)
}

func TestPaymentsUpdateMetadata(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 1bb1c5b

Please sign in to comment.