From b1f5192dcf59166d26150ca7a91b9cae92724830 Mon Sep 17 00:00:00 2001 From: darkmatterpool <113581282+darkmatterpool@users.noreply.github.com> Date: Mon, 21 Nov 2022 15:36:23 +0200 Subject: [PATCH] feat: bankingcircle connector (#45) * feat: bankingcircle connector support * feat: banking circle connector Signed-off-by: Lawrence Zawila <113581282+darkmatterpool@users.noreply.github.com> * fix: lint & test issues Signed-off-by: Lawrence Zawila <113581282+darkmatterpool@users.noreply.github.com> Signed-off-by: Lawrence Zawila <113581282+darkmatterpool@users.noreply.github.com> --- internal/app/api/module.go | 2 + .../pkg/connectors/bankingcircle/client.go | 236 ++++++++++++++++++ .../pkg/connectors/bankingcircle/config.go | 28 +++ .../pkg/connectors/bankingcircle/errors.go | 20 ++ .../pkg/connectors/bankingcircle/loader.go | 28 +++ .../bankingcircle/task_fetch_payments.go | 75 ++++++ .../connectors/bankingcircle/task_resolve.go | 40 +++ swagger.yml | 21 ++ 8 files changed, 450 insertions(+) create mode 100644 internal/pkg/connectors/bankingcircle/client.go create mode 100644 internal/pkg/connectors/bankingcircle/config.go create mode 100644 internal/pkg/connectors/bankingcircle/errors.go create mode 100644 internal/pkg/connectors/bankingcircle/loader.go create mode 100644 internal/pkg/connectors/bankingcircle/task_fetch_payments.go create mode 100644 internal/pkg/connectors/bankingcircle/task_resolve.go diff --git a/internal/app/api/module.go b/internal/app/api/module.go index ac693397..cc78a61e 100644 --- a/internal/app/api/module.go +++ b/internal/app/api/module.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/numary/payments/internal/pkg/connectors/bankingcircle" "github.com/numary/payments/internal/pkg/connectors/currencycloud" "github.com/gorilla/mux" @@ -68,6 +69,7 @@ func HTTPModule() fx.Option { addConnector[stripe.Config, stripe.TaskDescriptor](stripe.NewLoader()), addConnector[wise.Config, wise.TaskDescriptor](wise.NewLoader()), addConnector[currencycloud.Config, currencycloud.TaskDescriptor](currencycloud.NewLoader()), + addConnector[bankingcircle.Config, bankingcircle.TaskDescriptor](bankingcircle.NewLoader()), ) } diff --git a/internal/pkg/connectors/bankingcircle/client.go b/internal/pkg/connectors/bankingcircle/client.go new file mode 100644 index 00000000..a934d8f3 --- /dev/null +++ b/internal/pkg/connectors/bankingcircle/client.go @@ -0,0 +1,236 @@ +package bankingcircle + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "github.com/numary/go-libs/sharedlogging" +) + +type client struct { + httpClient *http.Client + + username string + password string + + endpoint string + authorizationEndpoint string + + logger sharedlogging.Logger + + accessToken string + accessTokenExpiresAt time.Time +} + +func newClient(username, password, endpoint, authorizationEndpoint string, logger sharedlogging.Logger) (*client, error) { + c := &client{ + httpClient: &http.Client{Timeout: 10 * time.Second}, + + username: username, + password: password, + endpoint: endpoint, + authorizationEndpoint: authorizationEndpoint, + + logger: logger, + } + + if err := c.login(); err != nil { + return nil, err + } + + return c, nil +} + +func (c *client) login() error { + req, err := http.NewRequest(http.MethodGet, c.authorizationEndpoint+"/api/v1/authorizations/authorize", http.NoBody) + if err != nil { + return fmt.Errorf("failed to create login request: %w", err) + } + + req.SetBasicAuth(c.username, c.password) + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("failed to login: %w", err) + } + + defer func() { + err = resp.Body.Close() + if err != nil { + c.logger.Error(err) + } + }() + + responseBody, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read login response body: %w", err) + } + + //nolint:tagliatelle // allow for client-side structures + type response struct { + AccessToken string `json:"access_token"` + ExpiresIn int `json:"expires_in"` + } + + var res response + + if err = json.Unmarshal(responseBody, &res); err != nil { + return fmt.Errorf("failed to unmarshal login response: %w", err) + } + + c.accessToken = res.AccessToken + c.accessTokenExpiresAt = time.Now().Add(time.Duration(res.ExpiresIn) * time.Second) + + return nil +} + +func (c *client) ensureAccessTokenIsValid() error { + if c.accessTokenExpiresAt.After(time.Now()) { + return nil + } + + return c.login() +} + +//nolint:tagliatelle // allow for client-side structures +type payment struct { + PaymentID string `json:"paymentId"` + TransactionReference string `json:"transactionReference"` + ConcurrencyToken string `json:"concurrencyToken"` + Classification string `json:"classification"` + Status string `json:"status"` + Errors interface{} `json:"errors"` + LastChangedTimestamp time.Time `json:"lastChangedTimestamp"` + DebtorInformation struct { + PaymentBulkID interface{} `json:"paymentBulkId"` + AccountID string `json:"accountId"` + Account struct { + Account string `json:"account"` + FinancialInstitution string `json:"financialInstitution"` + Country string `json:"country"` + } `json:"account"` + VibanID interface{} `json:"vibanId"` + Viban struct { + Account string `json:"account"` + FinancialInstitution string `json:"financialInstitution"` + Country string `json:"country"` + } `json:"viban"` + InstructedDate interface{} `json:"instructedDate"` + DebitAmount struct { + Currency string `json:"currency"` + Amount float64 `json:"amount"` + } `json:"debitAmount"` + DebitValueDate time.Time `json:"debitValueDate"` + FxRate interface{} `json:"fxRate"` + Instruction interface{} `json:"instruction"` + } `json:"debtorInformation"` + Transfer struct { + DebtorAccount interface{} `json:"debtorAccount"` + DebtorName interface{} `json:"debtorName"` + DebtorAddress interface{} `json:"debtorAddress"` + Amount struct { + Currency string `json:"currency"` + Amount float64 `json:"amount"` + } `json:"amount"` + ValueDate interface{} `json:"valueDate"` + ChargeBearer interface{} `json:"chargeBearer"` + RemittanceInformation interface{} `json:"remittanceInformation"` + CreditorAccount interface{} `json:"creditorAccount"` + CreditorName interface{} `json:"creditorName"` + CreditorAddress interface{} `json:"creditorAddress"` + } `json:"transfer"` + CreditorInformation struct { + AccountID string `json:"accountId"` + Account struct { + Account string `json:"account"` + FinancialInstitution string `json:"financialInstitution"` + Country string `json:"country"` + } `json:"account"` + VibanID interface{} `json:"vibanId"` + Viban struct { + Account string `json:"account"` + FinancialInstitution string `json:"financialInstitution"` + Country string `json:"country"` + } `json:"viban"` + CreditAmount struct { + Currency string `json:"currency"` + Amount float64 `json:"amount"` + } `json:"creditAmount"` + CreditValueDate time.Time `json:"creditValueDate"` + FxRate interface{} `json:"fxRate"` + } `json:"creditorInformation"` +} + +func (c *client) getAllPayments() ([]*payment, error) { + var payments []*payment + + for page := 0; ; page++ { + pagedPayments, err := c.getPayments(page) + if err != nil { + return nil, err + } + + if len(pagedPayments) == 0 { + break + } + + payments = append(payments, pagedPayments...) + } + + return payments, nil +} + +func (c *client) getPayments(page int) ([]*payment, error) { + if err := c.ensureAccessTokenIsValid(); err != nil { + return nil, err + } + + req, err := http.NewRequest(http.MethodGet, c.endpoint+"/api/v1/payments/singles", http.NoBody) + if err != nil { + return nil, fmt.Errorf("failed to create login request: %w", err) + } + + q := req.URL.Query() + q.Add("PageSize", "5000") + q.Add("PageNumber", fmt.Sprint(page)) + + req.URL.RawQuery = q.Encode() + + req.Header.Set("Authorization", "Bearer "+c.accessToken) + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to login: %w", err) + } + + defer func() { + err = resp.Body.Close() + if err != nil { + c.logger.Error(err) + } + }() + + responseBody, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read login response body: %w", err) + } + + type response struct { + Result []*payment `json:"result"` + PageInfo struct { + CurrentPage int `json:"currentPage"` + PageSize int `json:"pageSize"` + } `json:"pageInfo"` + } + + var res response + + if err = json.Unmarshal(responseBody, &res); err != nil { + return nil, fmt.Errorf("failed to unmarshal login response: %w", err) + } + + return res.Result, nil +} diff --git a/internal/pkg/connectors/bankingcircle/config.go b/internal/pkg/connectors/bankingcircle/config.go new file mode 100644 index 00000000..06a23457 --- /dev/null +++ b/internal/pkg/connectors/bankingcircle/config.go @@ -0,0 +1,28 @@ +package bankingcircle + +type Config struct { + Username string `json:"username" yaml:"username" bson:"username"` + Password string `json:"password" yaml:"password" bson:"password"` + Endpoint string `json:"endpoint" yaml:"endpoint" bson:"endpoint"` + AuthorizationEndpoint string `json:"authorizationEndpoint" yaml:"authorizationEndpoint" bson:"authorizationEndpoint"` +} + +func (c Config) Validate() error { + if c.Username == "" { + return ErrMissingUsername + } + + if c.Password == "" { + return ErrMissingPassword + } + + if c.Endpoint == "" { + return ErrMissingEndpoint + } + + if c.AuthorizationEndpoint == "" { + return ErrMissingAuthorizationEndpoint + } + + return nil +} diff --git a/internal/pkg/connectors/bankingcircle/errors.go b/internal/pkg/connectors/bankingcircle/errors.go new file mode 100644 index 00000000..7a7c18ac --- /dev/null +++ b/internal/pkg/connectors/bankingcircle/errors.go @@ -0,0 +1,20 @@ +package bankingcircle + +import "github.com/pkg/errors" + +var ( + // ErrMissingTask is returned when the task is missing. + ErrMissingTask = errors.New("task is not implemented") + + // ErrMissingUsername is returned when the username is missing. + ErrMissingUsername = errors.New("missing username from config") + + // ErrMissingPassword is returned when the password is missing. + ErrMissingPassword = errors.New("missing password from config") + + // ErrMissingEndpoint is returned when the endpoint is missing. + ErrMissingEndpoint = errors.New("missing endpoint from config") + + // ErrMissingAuthorizationEndpoint is returned when the authorization endpoint is missing. + ErrMissingAuthorizationEndpoint = errors.New("missing authorization endpoint from config") +) diff --git a/internal/pkg/connectors/bankingcircle/loader.go b/internal/pkg/connectors/bankingcircle/loader.go new file mode 100644 index 00000000..7db2a7e5 --- /dev/null +++ b/internal/pkg/connectors/bankingcircle/loader.go @@ -0,0 +1,28 @@ +package bankingcircle + +import ( + "github.com/numary/go-libs/sharedlogging" + "github.com/numary/payments/internal/pkg/integration" + "github.com/numary/payments/internal/pkg/task" +) + +const connectorName = "bankingcircle" + +// NewLoader creates a new loader. +func NewLoader() integration.Loader[Config, TaskDescriptor] { + loader := integration.NewLoaderBuilder[Config, TaskDescriptor](connectorName). + WithLoad(func(logger sharedlogging.Logger, config Config) integration.Connector[TaskDescriptor] { + return integration.NewConnectorBuilder[TaskDescriptor](). + WithInstall(func(ctx task.ConnectorContext[TaskDescriptor]) error { + return ctx.Scheduler(). + Schedule(TaskDescriptor{ + Name: "Fetch payments from source", + Key: taskNameFetchPayments, + }, false) + }). + WithResolve(resolveTasks(logger, config)). + Build() + }).Build() + + return loader +} diff --git a/internal/pkg/connectors/bankingcircle/task_fetch_payments.go b/internal/pkg/connectors/bankingcircle/task_fetch_payments.go new file mode 100644 index 00000000..27f6956b --- /dev/null +++ b/internal/pkg/connectors/bankingcircle/task_fetch_payments.go @@ -0,0 +1,75 @@ +package bankingcircle + +import ( + "context" + + "github.com/numary/payments/internal/pkg/ingestion" + "github.com/numary/payments/internal/pkg/payments" + "github.com/numary/payments/internal/pkg/task" + + "github.com/numary/go-libs/sharedlogging" +) + +func taskFetchPayments(logger sharedlogging.Logger, client *client) task.Task { + return func( + ctx context.Context, + scheduler task.Scheduler[TaskDescriptor], + ingester ingestion.Ingester, + ) error { + paymentsList, err := client.getAllPayments() + if err != nil { + return err + } + + batch := ingestion.PaymentBatch{} + + for _, paymentEl := range paymentsList { + logger.Info(paymentEl) + + batchElement := ingestion.PaymentBatchElement{ + Referenced: payments.Referenced{ + Reference: paymentEl.TransactionReference, + Type: matchPaymentType(paymentEl.Classification), + }, + Payment: &payments.Data{ + Status: matchPaymentStatus(paymentEl.Status), + Scheme: payments.SchemeOther, + InitialAmount: int64(paymentEl.Transfer.Amount.Amount * 100), + Asset: paymentEl.Transfer.Amount.Currency + "/2", + Raw: paymentEl, + }, + } + + batch = append(batch, batchElement) + } + + return ingester.IngestPayments(ctx, batch, struct{}{}) + } +} + +func matchPaymentStatus(paymentStatus string) payments.Status { + switch paymentStatus { + case "Processed": + return payments.StatusSucceeded + // On MissingFunding - the payment is still in progress. + // If there will be funds available within 10 days - the payment will be processed. + // Otherwise - it will be cancelled. + case "PendingProcessing", "MissingFunding": + return payments.StatusPending + case "Rejected", "Cancelled", "Reversed", "Returned": + return payments.StatusFailed + } + + return payments.TypeOther +} + +func matchPaymentType(paymentType string) string { + switch paymentType { + case "Incoming": + return payments.TypePayIn + case "Outgoing": + return payments.TypePayout + } + + return payments.TypeOther +} diff --git a/internal/pkg/connectors/bankingcircle/task_resolve.go b/internal/pkg/connectors/bankingcircle/task_resolve.go new file mode 100644 index 00000000..c805fa1f --- /dev/null +++ b/internal/pkg/connectors/bankingcircle/task_resolve.go @@ -0,0 +1,40 @@ +package bankingcircle + +import ( + "fmt" + + "github.com/numary/payments/internal/pkg/task" + + "github.com/numary/go-libs/sharedlogging" +) + +const ( + taskNameFetchPayments = "fetch-payments" +) + +// TaskDescriptor is the definition of a task. +type TaskDescriptor struct { + Name string `json:"name" yaml:"name" bson:"name"` + Key string `json:"key" yaml:"key" bson:"key"` +} + +func resolveTasks(logger sharedlogging.Logger, config Config) func(taskDefinition TaskDescriptor) task.Task { + bankingCircleClient, err := newClient(config.Username, config.Password, config.Endpoint, config.AuthorizationEndpoint, logger) + if err != nil { + logger.Error(err) + + return nil + } + + return func(taskDefinition TaskDescriptor) task.Task { + switch taskDefinition.Key { + case taskNameFetchPayments: + return taskFetchPayments(logger, bankingCircleClient) + } + + // This should never happen. + return func() error { + return fmt.Errorf("key '%s': %w", taskDefinition.Key, ErrMissingTask) + } + } +} diff --git a/swagger.yml b/swagger.yml index f6399dad..bc6221ae 100644 --- a/swagger.yml +++ b/swagger.yml @@ -227,6 +227,7 @@ components: - $ref: '#/components/schemas/WiseConfig' - $ref: '#/components/schemas/ModulrConfig' - $ref: '#/components/schemas/CurrencyCloudConfig' + - $ref: '#/components/schemas/BankingCircleConfig' ConnectorTask: oneOf: - $ref: '#/components/schemas/StripeTask' @@ -309,6 +310,26 @@ components: endpoint: type: string example: 'XXX' + BankingCircleConfig: + type: object + required: + - username + - password + - endpoint + - authorizationEndpoint + properties: + username: + type: string + example: 'XXX' + password: + type: string + example: 'XXX' + endpoint: + type: string + example: 'XXX' + authorizationEndpoint: + type: string + example: 'XXX' CurrencyCloudConfig: type: object required: