From 2e4bad9bcb36bc355ca12c3fc241ef4928c546e8 Mon Sep 17 00:00:00 2001 From: "Doyin.O" <111298305+0xdev22@users.noreply.github.com> Date: Tue, 30 Apr 2024 21:52:53 -0600 Subject: [PATCH] Register device for tesla telemetry (#315) * Move vehice command to smartcar and tesla client, add telemetry/subscribe to tesla commands * Fix lint issues * Create endpoint for fetching all commands available to an integration * Create endpoint for fetching all commands available to an integration * Create endpoint for fetching all commands available to an integration * Add tesla virtual token status to integration status endpoint * Add tesla virtual token status to integration status endpoint * Fix failing test * Function and endpoint to register device for tesla telemetry * Add open telemetry config to environment * Fix issues from PR review * Fix issues from PR review * Fix issues from PR review * Fix issues from PR review * Fix issues from PR review * Fix issues from PR review * Change name from virtual-token to virtual-key --------- Co-authored-by: Dylan Moreland <79415431+elffjs@users.noreply.github.com> --- charts/devices-api/values.yaml | 3 + cmd/devices-api/api.go | 1 + internal/config/settings.go | 3 + .../user_integrations_controller.go | 103 +++++++++++ .../user_integrations_controller_test.go | 167 +++++++++++++++++- .../mocks/tesla_fleet_api_service_mock.go | 14 ++ internal/services/tesla_fleet_api_service.go | 106 +++++++++++ .../services/tesla_fleet_api_service_test.go | 129 ++++++++++++++ settings.sample.yaml | 6 +- 9 files changed, 528 insertions(+), 4 deletions(-) create mode 100644 internal/services/tesla_fleet_api_service_test.go diff --git a/charts/devices-api/values.yaml b/charts/devices-api/values.yaml index 75a0b72aa..c8f350e79 100644 --- a/charts/devices-api/values.yaml +++ b/charts/devices-api/values.yaml @@ -90,6 +90,9 @@ env: TESLA_TOKEN_URL: https://auth.tesla.com/oauth2/v3/token TESLA_FLEET_URL: https://fleet-api.prd.%s.vn.cloud.tesla.com META_TRANSACTION_PROCESSOR_GRPC_ADDR: meta-transaction-processor-dev:8086 + TESLA_TELEMETRY_HOST_NAME: ingest-tesla.dev.dimo.zone + TESLA_TELEMETRY_PORT: 443 + TESLA_TELEMETRY_CA_CERTIFICATE: -----BEGIN CERTIFICATE-----\nMIIBvDCCAWKgAwIBAgIRAL6QCUcK/8jy48V7ElERABowCgYIKoZIzj0EAwIwIzEh\nMB8GA1UEAxMYRElNTyBDQSBEZXZlbG9wbWVudCBSb290MCAXDTIyMDQyMzExMTEw\nM1oYDzIwNzIwNDEwMTExMTAzWjAyMTAwLgYDVQQDEydESU1PIENBIERldmVsb3Bt\nZW50IFNlcnZlciBJbnRlcm1lZGlhdGUwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNC\nAAQMyh6plUM3p4KWWfK0CqWXr1B9NWk53+c9ps8OpgZZIyXjxiw1EHxrpcqU7C9e\nhw+6JfmvTqqi3F4ES8K+Tt/mo2YwZDAOBgNVHQ8BAf8EBAMCAQYwEgYDVR0TAQH/\nBAgwBgEB/wIBADAdBgNVHQ4EFgQU+7zrfioO4bjNpD9KiG8fbTcIq8kwHwYDVR0j\nBBgwFoAUeMfSSqt+S65xQF82yRnjr+J5XC8wCgYIKoZIzj0EAwIDSAAwRQIhAK3s\nWtlk+d0fnkii091dTZGt+dtzEbM4HuizaG6mO5zPAiApi03qU/hdsAxXwlbhufH/\n5HuUiCLgBK8vPvL2YdMaKQ==\n-----END CERTIFICATE-----\n-----BEGIN CERTIFICATE-----\nMIIBrTCCAVKgAwIBAgIQEgthFz9Ww3+VaErBc3nDFjAKBggqhkjOPQQDAjAjMSEw\nHwYDVQQDExhESU1PIENBIERldmVsb3BtZW50IFJvb3QwIBcNMjIwNDIzMTExMTAz\nWhgPMjEyMjAzMzAxMTExMDNaMCMxITAfBgNVBAMTGERJTU8gQ0EgRGV2ZWxvcG1l\nbnQgUm9vdDBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABBuxEb6jTyfrUwI8RiBV\nKCQWqTAeLdHPj60Qk7HeMeaEcGjzF799xgpl6/8iNKaHN/w+705cdxp5pRswbUtu\nizWjZjBkMA4GA1UdDwEB/wQEAwIBBjASBgNVHRMBAf8ECDAGAQH/AgEBMB0GA1Ud\nDgQWBBR4x9JKq35LrnFAXzbJGeOv4nlcLzAfBgNVHSMEGDAWgBR4x9JKq35LrnFA\nXzbJGeOv4nlcLzAKBggqhkjOPQQDAgNJADBGAiEAlslTE9mX+VjPSYLKEsy48Rzh\nOUCdaWovmF+28PyAi4wCIQDXRKpYK+VMFyUR1GJVoV3gWezQcJmFswuWq+7M+XPb\nGQ==\n-----END CERTIFICATE----- service: type: ClusterIP ports: diff --git a/cmd/devices-api/api.go b/cmd/devices-api/api.go index 2234f97ec..7df7c3c5f 100644 --- a/cmd/devices-api/api.go +++ b/cmd/devices-api/api.go @@ -322,6 +322,7 @@ func startWebAPI(logger zerolog.Logger, settings *config.Settings, pdb db.Store, udOwner.Post("/integrations/:integrationID/commands/trunk/open", userDeviceController.OpenTrunk) udOwner.Post("/integrations/:integrationID/commands/frunk/open", userDeviceController.OpenFrunk) udOwner.Get("/integrations/:integrationID/commands/:requestID", userDeviceController.GetCommandRequestStatus) + udOwner.Post("/integrations/:integrationID/commands/telemetry/subscribe", userDeviceController.TelemetrySubscribe) udOwner.Post("/commands/opt-in", userDeviceController.DeviceOptIn) diff --git a/internal/config/settings.go b/internal/config/settings.go index 905c7aac3..2fb9f00b7 100644 --- a/internal/config/settings.go +++ b/internal/config/settings.go @@ -88,6 +88,9 @@ type Settings struct { TeslaClientSecret string `yaml:"TESLA_CLIENT_SECRET"` TeslaTokenURL string `yaml:"TESLA_TOKEN_URL"` TeslaFleetURL string `yaml:"TESLA_FLEET_URL"` + TeslaTelemetryHostName string `yaml:"TESLA_TELEMETRY_HOST_NAME"` + TeslaTelemetryPort int `yaml:"TESLA_TELEMETRY_PORT"` + TeslaTelemetryCACertificate string `yaml:"TESLA_TELEMETRY_CA_CERTIFICATE"` } func (s *Settings) IsProduction() bool { diff --git a/internal/controllers/user_integrations_controller.go b/internal/controllers/user_integrations_controller.go index fcdd347d3..c59f03f36 100644 --- a/internal/controllers/user_integrations_controller.go +++ b/internal/controllers/user_integrations_controller.go @@ -539,6 +539,109 @@ func (udc *UserDevicesController) OpenFrunk(c *fiber.Ctx) error { return udc.handleEnqueueCommand(c, constants.FrunkOpen) } +// TelemetrySubscribe godoc +// @Summary Subscribe vehicle for Telemetry Data +// @Description Subscribe vehicle for Telemetry Data. Currently, this only works for Teslas connected through Tesla. +// @ID telemetry-subscribe +// @Tags device,integration,command +// @Success 200 {object} +// @Produce json +// @Param userDeviceID path string true "Device ID" +// @Param integrationID path string true "Integration ID" +// @Router /user/devices/{userDeviceID}/integrations/{integrationID}/commands/telemetry/subscribe [post] +func (udc *UserDevicesController) TelemetrySubscribe(c *fiber.Ctx) error { + userDeviceID := c.Params("userDeviceID") + integrationID := c.Params("integrationID") + + logger := helpers.GetLogger(c, udc.log).With(). + Str("IntegrationID", integrationID). + Str("Name", "Telemetry/Subscribe"). + Logger() + + logger.Info().Msg("Received command request.") + + device, err := models.UserDevices( + models.UserDeviceWhere.ID.EQ(userDeviceID), + ).One(c.Context(), udc.DBS().Reader) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return fiber.NewError(fiber.StatusNotFound, "Device not found.") + } + logger.Err(err).Msg("Failed to search for device.") + return opaqueInternalError + } + + udai, err := models.UserDeviceAPIIntegrations( + models.UserDeviceAPIIntegrationWhere.UserDeviceID.EQ(userDeviceID), + models.UserDeviceAPIIntegrationWhere.IntegrationID.EQ(integrationID), + ).One(c.Context(), udc.DBS().Reader) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return fiber.NewError(fiber.StatusNotFound, "Integration not found for this device.") + } + logger.Err(err).Msg("Failed to search for device integration record.") + return opaqueInternalError + } + + if udai.Status != models.UserDeviceAPIIntegrationStatusActive { + return fiber.NewError(fiber.StatusConflict, "Integration is not active for this device.") + } + + md := new(services.UserDeviceAPIIntegrationsMetadata) + if err := udai.Metadata.Unmarshal(md); err != nil { + logger.Err(err).Msg("Couldn't parse metadata JSON.") + return opaqueInternalError + } + + if md.TeslaRegion == "" || md.Commands == nil { + return fiber.NewError(fiber.StatusBadRequest, "No commands config for integration and device") + } + + if len(md.Commands.Capable) != 0 && !slices.Contains(md.Commands.Capable, constants.TelemetrySubscribe) { + return fiber.NewError(fiber.StatusBadRequest, "Telemetry command not available for device and integration combination") + } + + // Is telemetry already enabled, return early + if ok := slices.Contains(md.Commands.Enabled, constants.TelemetrySubscribe); ok { + return c.SendStatus(fiber.StatusOK) + } + + integration, err := udc.DeviceDefSvc.GetIntegrationByID(c.Context(), udai.IntegrationID) + if err != nil { + return shared.GrpcErrorToFiber(err, "deviceDefSvc error getting integration id: "+udai.IntegrationID) + } + + switch integration.Vendor { + case constants.TeslaVendor: + if err := udc.teslaFleetAPISvc.SubscribeForTelemetryData(c.Context(), + udai.AccessToken.String, + md.TeslaRegion, + device.VinIdentifier.String, + ); err != nil { + logger.Error().Err(err).Msg("error registering for telemetry") + return fiber.NewError(fiber.StatusFailedDependency, "could not register device for tesla telemetry: ", err.Error()) + } + + newEnabledCmd := append(md.Commands.Enabled, constants.TelemetrySubscribe) + md.Commands.Enabled = newEnabledCmd + newMeta, err := json.Marshal(md) + if err != nil { + return fiber.NewError(fiber.StatusInternalServerError, "could not save command state", err.Error()) + } + udai.Metadata = null.JSONFrom(newMeta) + _, err = udai.Update(c.Context(), udc.DBS().Writer, boil.Whitelist(models.UserDeviceAPIIntegrationColumns.Metadata)) + if err != nil { + return fiber.NewError(fiber.StatusInternalServerError, "could not save command state", err.Error()) + } + default: + return fiber.NewError(fiber.StatusBadRequest, "Integration not supported for this command") + } + + logger.Info().Msg("Successfully subscribed to telemetry") + + return c.SendStatus(fiber.StatusOK) +} + // GetAutoPiUnitInfo godoc // @Description gets the information about the aftermarket device by the hw serial // @Tags integrations diff --git a/internal/controllers/user_integrations_controller_test.go b/internal/controllers/user_integrations_controller_test.go index c7e5741fb..f621a8f67 100644 --- a/internal/controllers/user_integrations_controller_test.go +++ b/internal/controllers/user_integrations_controller_test.go @@ -11,9 +11,6 @@ import ( "testing" "time" - "github.com/nats-io/nats-server/v2/server" - "github.com/rs/zerolog" - "github.com/DIMO-Network/shared/redis/mocks" "github.com/ericlagergren/decimal" "github.com/ethereum/go-ethereum/common" @@ -22,6 +19,8 @@ import ( signer "github.com/ethereum/go-ethereum/signer/core/apitypes" "github.com/go-redis/redis/v8" "github.com/google/uuid" + "github.com/nats-io/nats-server/v2/server" + "github.com/rs/zerolog" pbuser "github.com/DIMO-Network/shared/api/users" "github.com/DIMO-Network/shared/db" @@ -126,6 +125,10 @@ func (s *UserIntegrationsControllerTestSuite) SetupSuite() { app.Post("/user2/devices/:userDeviceID/integrations/:integrationID", test.AuthInjectorTestHandler(testUser2), c.RegisterDeviceIntegration) app.Get("/user/devices/:userDeviceID/integrations/:integrationID", test.AuthInjectorTestHandler(testUserID), c.GetUserDeviceIntegration) + app.Post("/user/devices/:userDeviceID/integrations/:integrationID/commands/telemetry/subscribe", + test.AuthInjectorTestHandler(testUserID), + c.TelemetrySubscribe, + ) s.app = app } @@ -1224,3 +1227,161 @@ func (s *UserIntegrationsControllerTestSuite) TestGetUserDeviceIntegration_Refre s.Assert().Equal(encRefreshTk, newAPIInt.RefreshToken.String) s.Assert().Equal(encAccessTk, newAPIInt.AccessToken.String) } + +func (s *UserIntegrationsControllerTestSuite) TestTelemetrySubscribe() { + integration := test.BuildIntegrationGRPC(constants.TeslaVendor, 10, 0) + dd := test.BuildDeviceDefinitionGRPC(ksuid.New().String(), "Tesla", "Model S", 2012, integration) + ud := test.SetupCreateUserDevice(s.T(), testUserID, dd[0].DeviceDefinitionId, nil, "5YJSA1CN0CFP02439", s.pdb) + + accessTk := "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c" + refreshTk := "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.UWfqdcCvyzObpI2gaIGcx2r7CcDjlQ0IzGyk8N0_vqw" + extID := "SomeID" + expectedExpiry := time.Now().Add(10 * time.Minute) + region := "na" + + mtd, err := json.Marshal(services.UserDeviceAPIIntegrationsMetadata{ + TeslaRegion: region, + Commands: &services.UserDeviceAPIIntegrationsMetadataCommands{ + Enabled: []string{}, + Capable: []string{constants.TelemetrySubscribe}, + }, + }) + s.Require().NoError(err) + + apIntd := models.UserDeviceAPIIntegration{ + UserDeviceID: ud.ID, + IntegrationID: integration.Id, + Status: models.UserDeviceAPIIntegrationStatusActive, + AccessToken: null.StringFrom(accessTk), + AccessExpiresAt: null.TimeFrom(expectedExpiry), + RefreshToken: null.StringFrom(refreshTk), + ExternalID: null.StringFrom(extID), + Metadata: null.JSONFrom(mtd), + } + err = apIntd.Insert(s.ctx, s.pdb.DBS().Writer, boil.Infer()) + s.Require().NoError(err) + + s.deviceDefSvc.EXPECT().GetIntegrationByID(gomock.Any(), integration.Id).Return(integration, nil) + s.teslaFleetAPISvc.EXPECT().SubscribeForTelemetryData(gomock.Any(), accessTk, region, ud.VinIdentifier.String).Return(nil) + + request := test.BuildRequest(http.MethodPost, fmt.Sprintf("/user/devices/%s/integrations/%s/commands/telemetry/subscribe", ud.ID, integration.Id), "") + res, err := s.app.Test(request, 60*1000) + s.Assert().NoError(err) + + s.Assert().True(res.StatusCode == fiber.StatusOK) + + udai, err := models.UserDeviceAPIIntegrations( + models.UserDeviceAPIIntegrationWhere.IntegrationID.EQ(integration.Id), + models.UserDeviceAPIIntegrationWhere.UserDeviceID.EQ(ud.ID), + ).One(s.ctx, s.pdb.DBS().Reader) + s.Require().NoError(err) + + md := new(services.UserDeviceAPIIntegrationsMetadata) + err = udai.Metadata.Unmarshal(md) + s.Require().NoError(err) + + s.T().Log(md.Commands.Enabled, "-0------") + s.Assert().Equal(md.Commands.Enabled, []string{constants.TelemetrySubscribe}) +} + +func (s *UserIntegrationsControllerTestSuite) Test_NoUserDevice_TelemetrySubscribe() { + request := test.BuildRequest(http.MethodPost, fmt.Sprintf("/user/devices/%s/integrations/%s/commands/telemetry/subscribe", "mockUserDeviceID", "mockIntID"), "") + res, err := s.app.Test(request, 60*1000) + s.Assert().NoError(err) + + s.Assert().True(res.StatusCode == fiber.StatusNotFound) +} + +func (s *UserIntegrationsControllerTestSuite) Test_InactiveIntegration_TelemetrySubscribe() { + integration := test.BuildIntegrationGRPC(constants.TeslaVendor, 10, 0) + dd := test.BuildDeviceDefinitionGRPC(ksuid.New().String(), "Tesla", "Model S", 2012, integration) + ud := test.SetupCreateUserDevice(s.T(), testUserID, dd[0].DeviceDefinitionId, nil, "5YJSA1CN0CFP02439", s.pdb) + + apIntd := models.UserDeviceAPIIntegration{ + UserDeviceID: ud.ID, + IntegrationID: integration.Id, + Status: models.DeviceCommandRequestStatusPending, + } + err := apIntd.Insert(s.ctx, s.pdb.DBS().Writer, boil.Infer()) + s.Require().NoError(err) + + request := test.BuildRequest(http.MethodPost, fmt.Sprintf("/user/devices/%s/integrations/%s/commands/telemetry/subscribe", ud.ID, integration.Id), "") + res, err := s.app.Test(request, 60*1000) + s.Assert().NoError(err) + + s.Assert().True(res.StatusCode == fiber.StatusConflict) +} + +func (s *UserIntegrationsControllerTestSuite) Test_MissingRegionAndCapable_TelemetrySubscribe() { + integration := test.BuildIntegrationGRPC(constants.TeslaVendor, 10, 0) + dd := test.BuildDeviceDefinitionGRPC(ksuid.New().String(), "Tesla", "Model S", 2012, integration) + ud := test.SetupCreateUserDevice(s.T(), testUserID, dd[0].DeviceDefinitionId, nil, "5YJSA1CN0CFP02439", s.pdb) + + apIntd := models.UserDeviceAPIIntegration{ + UserDeviceID: ud.ID, + IntegrationID: integration.Id, + Status: models.UserDeviceAPIIntegrationStatusActive, + } + err := apIntd.Insert(s.ctx, s.pdb.DBS().Writer, boil.Infer()) + s.Require().NoError(err) + + request := test.BuildRequest(http.MethodPost, fmt.Sprintf("/user/devices/%s/integrations/%s/commands/telemetry/subscribe", ud.ID, integration.Id), "") + res, err := s.app.Test(request, 60*1000) + s.Assert().NoError(err) + + s.Assert().True(res.StatusCode == fiber.StatusBadRequest) +} + +func (s *UserIntegrationsControllerTestSuite) Test_TelemetrySubscribe_AlreadyEnabled() { + integration := test.BuildIntegrationGRPC(constants.TeslaVendor, 10, 0) + dd := test.BuildDeviceDefinitionGRPC(ksuid.New().String(), "Tesla", "Model S", 2012, integration) + ud := test.SetupCreateUserDevice(s.T(), testUserID, dd[0].DeviceDefinitionId, nil, "5YJSA1CN0CFP02439", s.pdb) + + mtd, err := json.Marshal(services.UserDeviceAPIIntegrationsMetadata{ + TeslaRegion: "na", + Commands: &services.UserDeviceAPIIntegrationsMetadataCommands{ + Enabled: []string{constants.TelemetrySubscribe}, + Capable: []string{constants.TelemetrySubscribe}, + }, + }) + s.Require().NoError(err) + apIntd := models.UserDeviceAPIIntegration{ + UserDeviceID: ud.ID, + IntegrationID: integration.Id, + Status: models.UserDeviceAPIIntegrationStatusActive, + Metadata: null.JSONFrom(mtd), + } + err = apIntd.Insert(s.ctx, s.pdb.DBS().Writer, boil.Infer()) + s.Require().NoError(err) + + request := test.BuildRequest(http.MethodPost, fmt.Sprintf("/user/devices/%s/integrations/%s/commands/telemetry/subscribe", ud.ID, integration.Id), "") + res, err := s.app.Test(request, 60*1000) + s.Assert().NoError(err) + + s.Assert().True(res.StatusCode == fiber.StatusOK) +} + +func (s *UserIntegrationsControllerTestSuite) Test_TelemetrySubscribe_NotCapable() { + integration := test.BuildIntegrationGRPC(constants.TeslaVendor, 10, 0) + dd := test.BuildDeviceDefinitionGRPC(ksuid.New().String(), "Tesla", "Model S", 2012, integration) + ud := test.SetupCreateUserDevice(s.T(), testUserID, dd[0].DeviceDefinitionId, nil, "5YJSA1CN0CFP02439", s.pdb) + + mtd, err := json.Marshal(services.UserDeviceAPIIntegrationsMetadata{ + TeslaRegion: "na", + }) + s.Require().NoError(err) + apIntd := models.UserDeviceAPIIntegration{ + UserDeviceID: ud.ID, + IntegrationID: integration.Id, + Status: models.UserDeviceAPIIntegrationStatusActive, + Metadata: null.JSONFrom(mtd), + } + err = apIntd.Insert(s.ctx, s.pdb.DBS().Writer, boil.Infer()) + s.Require().NoError(err) + + request := test.BuildRequest(http.MethodPost, fmt.Sprintf("/user/devices/%s/integrations/%s/commands/telemetry/subscribe", ud.ID, integration.Id), "") + res, err := s.app.Test(request, 60*1000) + s.Assert().NoError(err) + + s.Assert().True(res.StatusCode == fiber.StatusBadRequest) +} diff --git a/internal/services/mocks/tesla_fleet_api_service_mock.go b/internal/services/mocks/tesla_fleet_api_service_mock.go index 9cb462354..cfe9a5548 100644 --- a/internal/services/mocks/tesla_fleet_api_service_mock.go +++ b/internal/services/mocks/tesla_fleet_api_service_mock.go @@ -114,6 +114,20 @@ func (mr *MockTeslaFleetAPIServiceMockRecorder) RefreshToken(ctx, refreshToken a return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RefreshToken", reflect.TypeOf((*MockTeslaFleetAPIService)(nil).RefreshToken), ctx, refreshToken) } +// SubscribeForTelemetryData mocks base method. +func (m *MockTeslaFleetAPIService) SubscribeForTelemetryData(ctx context.Context, token, region, vin string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SubscribeForTelemetryData", ctx, token, region, vin) + ret0, _ := ret[0].(error) + return ret0 +} + +// SubscribeForTelemetryData indicates an expected call of SubscribeForTelemetryData. +func (mr *MockTeslaFleetAPIServiceMockRecorder) SubscribeForTelemetryData(ctx, token, region, vin any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeForTelemetryData", reflect.TypeOf((*MockTeslaFleetAPIService)(nil).SubscribeForTelemetryData), ctx, token, region, vin) +} + // VirtualKeyConnectionStatus mocks base method. func (m *MockTeslaFleetAPIService) VirtualKeyConnectionStatus(ctx context.Context, token, region, vin string) (bool, error) { m.ctrl.T.Helper() diff --git a/internal/services/tesla_fleet_api_service.go b/internal/services/tesla_fleet_api_service.go index 46dccdc5d..84abc9992 100644 --- a/internal/services/tesla_fleet_api_service.go +++ b/internal/services/tesla_fleet_api_service.go @@ -28,6 +28,7 @@ type TeslaFleetAPIService interface { GetAvailableCommands() *UserDeviceAPIIntegrationsMetadataCommands VirtualKeyConnectionStatus(ctx context.Context, token, region, vin string) (bool, error) RefreshToken(ctx context.Context, refreshToken string) (*TeslaAuthCodeResponse, error) + SubscribeForTelemetryData(ctx context.Context, token, region, vin string) error } var teslaScopes = []string{"openid", "offline_access", "user_data", "vehicle_device_data", "vehicle_cmds", "vehicle_charging_cmds", "energy_device_data", "energy_device_data", "energy_cmds"} @@ -64,6 +65,41 @@ type VirtualKeyConnectionStatus struct { KeyPairedVins []string `json:"key_paired_vins"` } +type SubscribeForTelemetryDataRequest struct { + Vins []string `json:"vins"` + Config TelemetryConfigRequest `json:"config"` +} + +type Interval struct { + IntervalSeconds int `json:"interval_seconds"` +} + +type TelemetryFields map[string]Interval + +type TelemetryConfigRequest struct { + HostName string `json:"hostName"` + PublicCACertificate string `json:"ca"` + Fields TelemetryFields `json:"fields"` + AlertTypes []string `json:"alert_types,omitempty"` + Expiration int64 `json:"exp"` + Port int `json:"port"` +} + +type SkippedVehicles struct { + MissingKey []string `json:"missing_key"` + UnsupportedHardware []string `json:"unsupported_hardware"` + UnsupportedFirmware []string `json:"unsupported_firmware"` +} + +type SubscribeForTelemetryDataResponse struct { + UpdatedVehicles int `json:"updated_vehicles"` + SkippedVehicles SkippedVehicles `json:"skipped_vehicles"` +} + +type SubscribeForTelemetryDataResponseWrapper struct { + Response SubscribeForTelemetryDataResponse `json:"response"` +} + type teslaFleetAPIService struct { Settings *config.Settings HTTPClient *http.Client @@ -241,6 +277,76 @@ func (t *teslaFleetAPIService) RefreshToken(ctx context.Context, refreshToken st return tokResp, nil } +var fields = TelemetryFields{ + "ChargeState": {IntervalSeconds: 300}, + "Location": {IntervalSeconds: 10}, + "OriginLocation": {IntervalSeconds: 300}, + "DestinationLocation": {IntervalSeconds: 300}, + "DestinationName": {IntervalSeconds: 300}, + "EnergyRemaining": {IntervalSeconds: 300}, + "VehicleSpeed": {IntervalSeconds: 60}, + "Odometer": {IntervalSeconds: 300}, + "EstBatteryRange": {IntervalSeconds: 300}, + "Soc": {IntervalSeconds: 300}, + "BatteryLevel": {IntervalSeconds: 60}, +} + +func (t *teslaFleetAPIService) SubscribeForTelemetryData(ctx context.Context, token, region, vin string) error { + baseURL := fmt.Sprintf(t.Settings.TeslaFleetURL, region) + u, err := url.JoinPath(baseURL, "/api/1/vehicles/fleet_telemetry_config") + if err != nil { + return err + } + + r := SubscribeForTelemetryDataRequest{ + Vins: []string{vin}, + Config: TelemetryConfigRequest{ + HostName: t.Settings.TeslaTelemetryHostName, + PublicCACertificate: t.Settings.TeslaTelemetryCACertificate, + Port: t.Settings.TeslaTelemetryPort, + Fields: fields, + AlertTypes: []string{"service"}, + }, + } + + b, err := json.Marshal(r) + if err != nil { + return err + } + + req := strings.NewReader(string(b)) + + resp, err := t.performRequest(ctx, u, token, http.MethodPost, req) + if err != nil { + return err + } + + defer resp.Body.Close() + + subResp := SubscribeForTelemetryDataResponseWrapper{} + if err := json.NewDecoder(resp.Body).Decode(&subResp); err != nil { + return err + } + + if subResp.Response.UpdatedVehicles == 1 { + return nil + } + + if slices.Contains(subResp.Response.SkippedVehicles.MissingKey, vin) { + return fmt.Errorf("vehicle has not approved virtual token connection") + } + + if slices.Contains(subResp.Response.SkippedVehicles.UnsupportedHardware, vin) { + return fmt.Errorf("vehicle hardware not supported") + } + + if slices.Contains(subResp.Response.SkippedVehicles.UnsupportedFirmware, vin) { + return fmt.Errorf("vehicle firmware not supported") + } + + return nil +} + // performRequest a helper function for making http requests, it adds a timeout context and parses error response func (t *teslaFleetAPIService) performRequest(ctx context.Context, url, token, method string, body *strings.Reader) (*http.Response, error) { ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*10) diff --git a/internal/services/tesla_fleet_api_service_test.go b/internal/services/tesla_fleet_api_service_test.go new file mode 100644 index 000000000..2f94f2d1c --- /dev/null +++ b/internal/services/tesla_fleet_api_service_test.go @@ -0,0 +1,129 @@ +package services + +import ( + "context" + "fmt" + "net/http" + "testing" + + "github.com/jarcoal/httpmock" + "github.com/stretchr/testify/suite" + + "github.com/DIMO-Network/devices-api/internal/config" + "github.com/DIMO-Network/devices-api/internal/test" +) + +const mockTeslaFleetBaeURL = "https://fleet-mock-api.%s.tesla.com" + +type TeslaFleetAPIServiceTestSuite struct { + suite.Suite + ctx context.Context + SUT TeslaFleetAPIService + settings *config.Settings +} + +func (t *TeslaFleetAPIServiceTestSuite) SetupSuite() { + t.ctx = context.Background() + logger := test.Logger() + t.settings = &config.Settings{TeslaFleetURL: mockTeslaFleetBaeURL, TeslaTelemetryCACertificate: "Ca-Cert", TeslaTelemetryPort: 443, TeslaTelemetryHostName: "tel.dimo.com"} + + t.SUT = NewTeslaFleetAPIService(t.settings, logger) +} + +func TestTeslaFleetAPIServiceTestSuite(t *testing.T) { + suite.Run(t, new(TeslaFleetAPIServiceTestSuite)) +} + +func (t *TeslaFleetAPIServiceTestSuite) TestSubscribeForTelemetryData() { + httpmock.Activate() + defer httpmock.DeactivateAndReset() + + token := "someToken" + region := "mockRegion" + vin := "RandomVin" + + baseURL := fmt.Sprintf(mockTeslaFleetBaeURL, region) + u := fmt.Sprintf("%s/api/1/vehicles/fleet_telemetry_config", baseURL) + + respBody := SubscribeForTelemetryDataResponseWrapper{ + SubscribeForTelemetryDataResponse{ + UpdatedVehicles: 1, + SkippedVehicles: SkippedVehicles{}, + }, + } + + jsonResp, err := httpmock.NewJsonResponder(http.StatusOK, respBody) + t.Require().NoError(err) + httpmock.RegisterResponder(http.MethodPost, u, jsonResp) + + err = t.SUT.SubscribeForTelemetryData(t.ctx, token, region, vin) + + t.Require().NoError(err) +} + +func (t *TeslaFleetAPIServiceTestSuite) TestSubscribeForTelemetryData_Errror_Cases() { + httpmock.Activate() + defer httpmock.DeactivateAndReset() + + vin := "RandomVin" + tests := []struct { + response SubscribeForTelemetryDataResponseWrapper + expectedError string + }{ + { + response: SubscribeForTelemetryDataResponseWrapper{ + SubscribeForTelemetryDataResponse{ + UpdatedVehicles: 0, + SkippedVehicles: SkippedVehicles{ + MissingKey: []string{vin}, + UnsupportedHardware: nil, + UnsupportedFirmware: nil, + }, + }, + }, + expectedError: "vehicle has not approved virtual token connection", + }, + { + response: SubscribeForTelemetryDataResponseWrapper{ + SubscribeForTelemetryDataResponse{ + UpdatedVehicles: 0, + SkippedVehicles: SkippedVehicles{ + MissingKey: nil, + UnsupportedHardware: []string{vin}, + UnsupportedFirmware: nil, + }, + }, + }, + expectedError: "vehicle hardware not supported", + }, + { + response: SubscribeForTelemetryDataResponseWrapper{ + SubscribeForTelemetryDataResponse{ + UpdatedVehicles: 0, + SkippedVehicles: SkippedVehicles{ + MissingKey: nil, + UnsupportedHardware: nil, + UnsupportedFirmware: []string{vin}, + }, + }, + }, + expectedError: "vehicle firmware not supported", + }, + } + + for _, tst := range tests { + token := "someToken" + region := "mockRegion" + + baseURL := fmt.Sprintf(mockTeslaFleetBaeURL, region) + u := fmt.Sprintf("%s/api/1/vehicles/fleet_telemetry_config", baseURL) + + responder, err := httpmock.NewJsonResponder(http.StatusOK, tst.response) + t.Require().NoError(err) + httpmock.RegisterResponder(http.MethodPost, u, responder) + + err = t.SUT.SubscribeForTelemetryData(t.ctx, token, region, vin) + + t.Require().EqualError(err, tst.expectedError) + } +} diff --git a/settings.sample.yaml b/settings.sample.yaml index 0427f1cb5..6a6d8fd49 100644 --- a/settings.sample.yaml +++ b/settings.sample.yaml @@ -75,4 +75,8 @@ SYNTHETIC_FINGERPRINT_CONSUMER_GROUP: consumer.synthetic.fingerprint TESLA_CLIENT_ID: TESLA_CLIENT_SECRET: TESLA_TOKEN_URL: -TESLA_FLEET_URL: \ No newline at end of file +TESLA_FLEET_URL: + +TESLA_TELEMETRY_HOST_NAME: +TESLA_TELEMETRY_PORT: +TESLA_TELEMETRY_CA_CERTIFICATE: \ No newline at end of file