diff --git a/charts/devices-api/values-prod.yaml b/charts/devices-api/values-prod.yaml index 59c248d75..4b171a4bc 100644 --- a/charts/devices-api/values-prod.yaml +++ b/charts/devices-api/values-prod.yaml @@ -42,6 +42,11 @@ env: AFTERMARKET_DEVICE_CONTRACT_ADDRESS: '0x9c94c395cbcbde662235e0a9d3bb87ad708561ba' DIMO_CONTRACT_APIURL: https://contracts-api.dimo.zone/ NATS_URL: nats-prod:4222 + NATS_STREAM_NAME: DD_VALUATION_TASKS + NATS_VALUATION_SUBJECT: dd_valuation_tasks + NATS_OFFER_SUBJECT: dd_offer_tasks + NATS_DURABLE_CONSUMER: dd-valuation-task-consumer + NATS_ACK_TIMEOUT: 2m SYNTHETIC_DEVICES_ENABLED: true SYNTHETIC_WALLET_GRPC_ADDR: synthetic-wallet-instance-prod:8086 DEVICE_DATA_GRPC_ADDR: device-data-api-prod:8086 diff --git a/charts/devices-api/values.yaml b/charts/devices-api/values.yaml index f16a44587..b1cbc3e30 100644 --- a/charts/devices-api/values.yaml +++ b/charts/devices-api/values.yaml @@ -83,6 +83,7 @@ env: NATS_URL: nats-dev:4222 NATS_STREAM_NAME: DD_VALUATION_TASKS NATS_VALUATION_SUBJECT: dd_valuation_tasks + NATS_OFFER_SUBJECT: dd_offer_tasks NATS_DURABLE_CONSUMER: dd-valuation-task-consumer NATS_ACK_TIMEOUT: 2m SYNTHETIC_DEVICES_ENABLED: true diff --git a/cmd/devices-api/api.go b/cmd/devices-api/api.go index 30e8fa8b9..61725c159 100644 --- a/cmd/devices-api/api.go +++ b/cmd/devices-api/api.go @@ -3,7 +3,6 @@ package main import ( "context" "encoding/base64" - "github.com/DIMO-Network/devices-api/internal/services/fingerprint" "math/big" "net" "os" @@ -12,6 +11,8 @@ import ( "syscall" "time" + "github.com/DIMO-Network/devices-api/internal/services/fingerprint" + "github.com/DIMO-Network/devices-api/internal/middleware" "github.com/DIMO-Network/devices-api/internal/rpc" diff --git a/internal/config/settings.go b/internal/config/settings.go index e9b44daae..db71819da 100644 --- a/internal/config/settings.go +++ b/internal/config/settings.go @@ -71,6 +71,7 @@ type Settings struct { NATSURL string `yaml:"NATS_URL"` NATSStreamName string `yaml:"NATS_STREAM_NAME"` NATSValuationSubject string `yaml:"NATS_VALUATION_SUBJECT"` + NATSOfferSubject string `yaml:"NATS_OFFER_SUBJECT"` NATSAckTimeout string `yaml:"NATS_ACK_TIMEOUT"` NATSDurableConsumer string `yaml:"NATS_DURABLE_CONSUMER"` ValuationsAPIGRPCAddr string `yaml:"VALUATIONS_GRPC_ADDR"` diff --git a/internal/controllers/user_devices_controller.go b/internal/controllers/user_devices_controller.go index c06dc5011..da4ff414d 100644 --- a/internal/controllers/user_devices_controller.go +++ b/internal/controllers/user_devices_controller.go @@ -616,22 +616,12 @@ func (udc *UserDevicesController) RegisterDeviceForUserFromVIN(c *fiber.Ctx) err // request valuation if udc.Settings.IsProduction() { - message := services.ValuationDecodeCommand{ - VIN: vin, - UserDeviceID: udFull.ID, - } - messageBytes, err := json.Marshal(message) - - if err != nil { - localLog.Err(err).Msg("Failed to marshal message.") - } else { - pubAck, err := udc.NATSSvc.JetStream.Publish(udc.NATSSvc.JetStreamSubject, messageBytes) - if err != nil { - localLog.Err(err).Msg("failed to publish to NATS") - } else { - localLog.Info().Str("user_device_id", udFull.ID).Msgf("published valuation request to NATS with Ack: %+v", pubAck) - } + tokenID := int64(0) + if udFull.NFT != nil { + tokenID = udFull.NFT.TokenID.Int64() } + udc.requestValuation(vin, udFull.ID, tokenID) + udc.requestInstantOffer(udFull.ID, tokenID) } return c.Status(fiber.StatusCreated).JSON(fiber.Map{ @@ -639,6 +629,49 @@ func (udc *UserDevicesController) RegisterDeviceForUserFromVIN(c *fiber.Ctx) err }) } +func (udc *UserDevicesController) requestValuation(vin string, userDeviceID string, tokenID int64) { + message := services.ValuationDecodeCommand{ + VIN: vin, + UserDeviceID: userDeviceID, + TokenID: tokenID, + } + messageBytes, err := json.Marshal(message) + + if err != nil { + udc.log.Err(err).Msg("Failed to marshal message.") + return + } + + pubAck, err := udc.NATSSvc.JetStream.Publish(udc.NATSSvc.ValuationSubject, messageBytes) + if err != nil { + udc.log.Err(err).Msg("failed to publish to NATS") + return + } + + udc.log.Info().Str("user_device_id", userDeviceID).Msgf("published valuation request to NATS with Ack: %+v", pubAck) +} + +func (udc *UserDevicesController) requestInstantOffer(userDeviceID string, tokenID int64) { + message := services.OfferRequest{ + UserDeviceID: userDeviceID, + TokenID: tokenID, + } + messageBytes, err := json.Marshal(message) + + if err != nil { + udc.log.Err(err).Msg("Failed to marshal message.") + return + } + + pubAck, err := udc.NATSSvc.JetStream.Publish(udc.NATSSvc.OfferSubject, messageBytes) + if err != nil { + udc.log.Err(err).Msg("failed to publish to NATS") + return + } + + udc.log.Info().Str("user_device_id", userDeviceID).Msgf("published instant offer request to NATS with Ack: %+v", pubAck) +} + // RegisterDeviceForUserFromSmartcar godoc // @Description adds a device to a user by decoding VIN from Smartcar. If cannot decode returns 424 or 500 if error. // @Description If the user device already exists from a different integration, for the same user, this will return a 200 with the full user device object @@ -786,26 +819,12 @@ func (udc *UserDevicesController) RegisterDeviceForUserFromSmartcar(c *fiber.Ctx } if udc.Settings.IsProduction() { - - message := services.ValuationDecodeCommand{ - VIN: vin, - UserDeviceID: udFull.ID, + tokenID := int64(0) + if udFull.NFT != nil { + tokenID = udFull.NFT.TokenID.Int64() } - - messageBytes, err := json.Marshal(message) - - if err != nil { - localLog.Err(err).Msg("Failed to marshal message.") - } else { - pubAck, err := udc.NATSSvc.JetStream.Publish(udc.NATSSvc.JetStreamSubject, messageBytes) - - if err != nil { - localLog.Err(err).Msg("Failed to publish to NATS.") - } else { - localLog.Info().Str("vin", vin).Str("user_id", userID).Str("user_device_id", udFull.ID).Msgf("Published valuation request to NATS with Ack: %+v", pubAck) - } - } - + udc.requestValuation(vin, udFull.ID, tokenID) + udc.requestInstantOffer(udFull.ID, tokenID) } return c.Status(fiber.StatusCreated).JSON(fiber.Map{ diff --git a/internal/controllers/user_devices_controller_test.go b/internal/controllers/user_devices_controller_test.go index cbf92bebd..bfd9be3de 100644 --- a/internal/controllers/user_devices_controller_test.go +++ b/internal/controllers/user_devices_controller_test.go @@ -373,7 +373,7 @@ func (s *UserDevicesControllerTestSuite) TestPostUserDeviceFromVIN() { assert.Equal(s.T(), "6", *regUserResp.Metadata.CANProtocol) assert.EqualValues(s.T(), "ICE", *regUserResp.Metadata.PowertrainType) - msg, responseError := s.natsService.JetStream.GetLastMsg(natsStreamName, s.natsService.JetStreamSubject) + msg, responseError := s.natsService.JetStream.GetLastMsg(natsStreamName, s.natsService.ValuationSubject) assert.NoError(s.T(), responseError, "expected no error from nats") vinResult := gjson.GetBytes(msg.Data, "vin") assert.Equal(s.T(), vinny, vinResult.Str) @@ -447,7 +447,7 @@ func (s *UserDevicesControllerTestSuite) TestPostUserDeviceFromVIN_SameUser_Dupl assert.Equal(s.T(), integration.Type, regUserResp.DeviceDefinition.CompatibleIntegrations[0].Type) assert.Equal(s.T(), integration.Id, regUserResp.DeviceDefinition.CompatibleIntegrations[0].ID) - msg, responseError := s.natsService.JetStream.GetLastMsg(natsStreamName, s.natsService.JetStreamSubject) + msg, responseError := s.natsService.JetStream.GetLastMsg(natsStreamName, s.natsService.ValuationSubject) assert.NoError(s.T(), responseError, "expected no error from nats") vinResult := gjson.GetBytes(msg.Data, "vin") assert.Equal(s.T(), vinny, vinResult.Str) diff --git a/internal/controllers/user_integrations_auth_controller.go b/internal/controllers/user_integrations_auth_controller.go index a868c9997..4910ca419 100644 --- a/internal/controllers/user_integrations_auth_controller.go +++ b/internal/controllers/user_integrations_auth_controller.go @@ -4,6 +4,9 @@ import ( "context" "encoding/json" "fmt" + "strconv" + "time" + "github.com/DIMO-Network/devices-api/internal/config" "github.com/DIMO-Network/devices-api/internal/constants" "github.com/DIMO-Network/devices-api/internal/controllers/helpers" @@ -14,8 +17,6 @@ import ( "github.com/DIMO-Network/shared/redis" "github.com/gofiber/fiber/v2" "github.com/rs/zerolog" - "strconv" - "time" ) type UserIntegrationAuthController struct { diff --git a/internal/controllers/user_integrations_auth_controller_test.go b/internal/controllers/user_integrations_auth_controller_test.go index 5261725fa..336efa4a3 100644 --- a/internal/controllers/user_integrations_auth_controller_test.go +++ b/internal/controllers/user_integrations_auth_controller_test.go @@ -4,6 +4,10 @@ import ( "context" "encoding/json" "fmt" + "io" + "testing" + "time" + ddgrpc "github.com/DIMO-Network/device-definitions-api/pkg/grpc" "github.com/DIMO-Network/devices-api/internal/config" "github.com/DIMO-Network/devices-api/internal/constants" @@ -20,9 +24,6 @@ import ( "github.com/stretchr/testify/suite" "github.com/testcontainers/testcontainers-go" "go.uber.org/mock/gomock" - "io" - "testing" - "time" ) type UserIntegrationAuthControllerTestSuite struct { diff --git a/internal/controllers/user_integrations_controller.go b/internal/controllers/user_integrations_controller.go index 3b14283c3..3a3040d13 100644 --- a/internal/controllers/user_integrations_controller.go +++ b/internal/controllers/user_integrations_controller.go @@ -1899,25 +1899,12 @@ func (udc *UserDevicesController) registerDeviceTesla(c *fiber.Ctx, logger *zero } if udc.Settings.IsProduction() { - message := services.ValuationDecodeCommand{ - VIN: v.VIN, - UserDeviceID: userDeviceID, + tokenID := int64(0) + if ud.R != nil && ud.R.VehicleNFT != nil { + tokenID, _ = ud.R.VehicleNFT.TokenID.Int64() } - - messageBytes, err := json.Marshal(message) - - if err != nil { - udc.log.Err(err).Msg("Failed to marshal valuation decode command.") - } else { - pubAck, err := udc.NATSSvc.JetStream.Publish(udc.NATSSvc.JetStreamSubject, messageBytes) - - if err != nil { - udc.log.Err(err).Msg("Failed to publish valuation decode command for Tesla Device.") - } else { - udc.log.Info().Str("vin", v.VIN).Msgf("Published valuation decode command with sequence %d.", pubAck.Sequence) - } - } - + udc.requestValuation(v.VIN, userDeviceID, tokenID) + udc.requestInstantOffer(userDeviceID, tokenID) } if err := udc.teslaTaskService.StartPoll(v, &integration); err != nil { diff --git a/internal/services/mocks/nats_service_mock.go b/internal/services/mocks/nats_service_mock.go index 8618e8ae4..be3e9fb50 100644 --- a/internal/services/mocks/nats_service_mock.go +++ b/internal/services/mocks/nats_service_mock.go @@ -76,7 +76,7 @@ func NewMockNATSService(streamName string) (*services.NATSService, *server.Serve natsSvc := &services.NATSService{ JetStream: js, JetStreamName: streamName, - JetStreamSubject: "test-subject", + ValuationSubject: "test-subject", AckTimeout: to, DurableConsumer: "test-durable-consumer", } diff --git a/internal/services/models.go b/internal/services/models.go index 93b2e10f5..b646d4630 100644 --- a/internal/services/models.go +++ b/internal/services/models.go @@ -185,4 +185,10 @@ type AutoPiCommandJob struct { type ValuationDecodeCommand struct { VIN string `json:"vin"` UserDeviceID string `json:"userDeviceId"` + TokenID int64 `json:"tokenId"` +} + +type OfferRequest struct { + UserDeviceID string `json:"user_device_id"` + TokenID int64 `json:"token_id"` } diff --git a/internal/services/nats_service.go b/internal/services/nats_service.go index 922cdc756..f856f3e00 100644 --- a/internal/services/nats_service.go +++ b/internal/services/nats_service.go @@ -12,7 +12,8 @@ type NATSService struct { log *zerolog.Logger JetStream nats.JetStreamContext JetStreamName string - JetStreamSubject string + ValuationSubject string + OfferSubject string AckTimeout time.Duration DurableConsumer string } @@ -37,7 +38,8 @@ func NewNATSService(settings *config.Settings, log *zerolog.Logger) (*NATSServic log: log, JetStream: js, JetStreamName: settings.NATSStreamName, - JetStreamSubject: settings.NATSValuationSubject, + ValuationSubject: settings.NATSValuationSubject, + OfferSubject: settings.NATSOfferSubject, AckTimeout: to, DurableConsumer: settings.NATSDurableConsumer}