Skip to content

Commit

Permalink
Register device for tesla telemetry (#315)
Browse files Browse the repository at this point in the history
* Move vehice command to smartcar and tesla client, add telemetry/subscribe to tesla commands

* Fix lint issues

* Create endpoint for fetching all commands available to an integration

* Create endpoint for fetching all commands available to an integration

* Create endpoint for fetching all commands available to an integration

* Add tesla virtual token status to integration status endpoint

* Add tesla virtual token status to integration status endpoint

* Fix failing test

* Function and endpoint to register device for tesla telemetry

* Add open telemetry config to environment

* Fix issues from PR review

* Fix issues from PR review

* Fix issues from PR review

* Fix issues from PR review

* Fix issues from PR review

* Fix issues from PR review

* Change name from virtual-token to virtual-key

---------

Co-authored-by: Dylan Moreland <[email protected]>
  • Loading branch information
0xdev22 and elffjs authored May 1, 2024
1 parent 89fc42f commit 2e4bad9
Show file tree
Hide file tree
Showing 9 changed files with 528 additions and 4 deletions.
3 changes: 3 additions & 0 deletions charts/devices-api/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ env:
TESLA_TOKEN_URL: https://auth.tesla.com/oauth2/v3/token
TESLA_FLEET_URL: https://fleet-api.prd.%s.vn.cloud.tesla.com
META_TRANSACTION_PROCESSOR_GRPC_ADDR: meta-transaction-processor-dev:8086
TESLA_TELEMETRY_HOST_NAME: ingest-tesla.dev.dimo.zone
TESLA_TELEMETRY_PORT: 443
TESLA_TELEMETRY_CA_CERTIFICATE: -----BEGIN CERTIFICATE-----\nMIIBvDCCAWKgAwIBAgIRAL6QCUcK/8jy48V7ElERABowCgYIKoZIzj0EAwIwIzEh\nMB8GA1UEAxMYRElNTyBDQSBEZXZlbG9wbWVudCBSb290MCAXDTIyMDQyMzExMTEw\nM1oYDzIwNzIwNDEwMTExMTAzWjAyMTAwLgYDVQQDEydESU1PIENBIERldmVsb3Bt\nZW50IFNlcnZlciBJbnRlcm1lZGlhdGUwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNC\nAAQMyh6plUM3p4KWWfK0CqWXr1B9NWk53+c9ps8OpgZZIyXjxiw1EHxrpcqU7C9e\nhw+6JfmvTqqi3F4ES8K+Tt/mo2YwZDAOBgNVHQ8BAf8EBAMCAQYwEgYDVR0TAQH/\nBAgwBgEB/wIBADAdBgNVHQ4EFgQU+7zrfioO4bjNpD9KiG8fbTcIq8kwHwYDVR0j\nBBgwFoAUeMfSSqt+S65xQF82yRnjr+J5XC8wCgYIKoZIzj0EAwIDSAAwRQIhAK3s\nWtlk+d0fnkii091dTZGt+dtzEbM4HuizaG6mO5zPAiApi03qU/hdsAxXwlbhufH/\n5HuUiCLgBK8vPvL2YdMaKQ==\n-----END CERTIFICATE-----\n-----BEGIN CERTIFICATE-----\nMIIBrTCCAVKgAwIBAgIQEgthFz9Ww3+VaErBc3nDFjAKBggqhkjOPQQDAjAjMSEw\nHwYDVQQDExhESU1PIENBIERldmVsb3BtZW50IFJvb3QwIBcNMjIwNDIzMTExMTAz\nWhgPMjEyMjAzMzAxMTExMDNaMCMxITAfBgNVBAMTGERJTU8gQ0EgRGV2ZWxvcG1l\nbnQgUm9vdDBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABBuxEb6jTyfrUwI8RiBV\nKCQWqTAeLdHPj60Qk7HeMeaEcGjzF799xgpl6/8iNKaHN/w+705cdxp5pRswbUtu\nizWjZjBkMA4GA1UdDwEB/wQEAwIBBjASBgNVHRMBAf8ECDAGAQH/AgEBMB0GA1Ud\nDgQWBBR4x9JKq35LrnFAXzbJGeOv4nlcLzAfBgNVHSMEGDAWgBR4x9JKq35LrnFA\nXzbJGeOv4nlcLzAKBggqhkjOPQQDAgNJADBGAiEAlslTE9mX+VjPSYLKEsy48Rzh\nOUCdaWovmF+28PyAi4wCIQDXRKpYK+VMFyUR1GJVoV3gWezQcJmFswuWq+7M+XPb\nGQ==\n-----END CERTIFICATE-----
service:
type: ClusterIP
ports:
Expand Down
1 change: 1 addition & 0 deletions cmd/devices-api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ func startWebAPI(logger zerolog.Logger, settings *config.Settings, pdb db.Store,
udOwner.Post("/integrations/:integrationID/commands/trunk/open", userDeviceController.OpenTrunk)
udOwner.Post("/integrations/:integrationID/commands/frunk/open", userDeviceController.OpenFrunk)
udOwner.Get("/integrations/:integrationID/commands/:requestID", userDeviceController.GetCommandRequestStatus)
udOwner.Post("/integrations/:integrationID/commands/telemetry/subscribe", userDeviceController.TelemetrySubscribe)

udOwner.Post("/commands/opt-in", userDeviceController.DeviceOptIn)

Expand Down
3 changes: 3 additions & 0 deletions internal/config/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ type Settings struct {
TeslaClientSecret string `yaml:"TESLA_CLIENT_SECRET"`
TeslaTokenURL string `yaml:"TESLA_TOKEN_URL"`
TeslaFleetURL string `yaml:"TESLA_FLEET_URL"`
TeslaTelemetryHostName string `yaml:"TESLA_TELEMETRY_HOST_NAME"`
TeslaTelemetryPort int `yaml:"TESLA_TELEMETRY_PORT"`
TeslaTelemetryCACertificate string `yaml:"TESLA_TELEMETRY_CA_CERTIFICATE"`
}

func (s *Settings) IsProduction() bool {
Expand Down
103 changes: 103 additions & 0 deletions internal/controllers/user_integrations_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,109 @@ func (udc *UserDevicesController) OpenFrunk(c *fiber.Ctx) error {
return udc.handleEnqueueCommand(c, constants.FrunkOpen)
}

// TelemetrySubscribe godoc
// @Summary Subscribe vehicle for Telemetry Data
// @Description Subscribe vehicle for Telemetry Data. Currently, this only works for Teslas connected through Tesla.
// @ID telemetry-subscribe
// @Tags device,integration,command
// @Success 200 {object}
// @Produce json
// @Param userDeviceID path string true "Device ID"
// @Param integrationID path string true "Integration ID"
// @Router /user/devices/{userDeviceID}/integrations/{integrationID}/commands/telemetry/subscribe [post]
func (udc *UserDevicesController) TelemetrySubscribe(c *fiber.Ctx) error {
userDeviceID := c.Params("userDeviceID")
integrationID := c.Params("integrationID")

logger := helpers.GetLogger(c, udc.log).With().
Str("IntegrationID", integrationID).
Str("Name", "Telemetry/Subscribe").
Logger()

logger.Info().Msg("Received command request.")

device, err := models.UserDevices(
models.UserDeviceWhere.ID.EQ(userDeviceID),
).One(c.Context(), udc.DBS().Reader)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return fiber.NewError(fiber.StatusNotFound, "Device not found.")
}
logger.Err(err).Msg("Failed to search for device.")
return opaqueInternalError
}

udai, err := models.UserDeviceAPIIntegrations(
models.UserDeviceAPIIntegrationWhere.UserDeviceID.EQ(userDeviceID),
models.UserDeviceAPIIntegrationWhere.IntegrationID.EQ(integrationID),
).One(c.Context(), udc.DBS().Reader)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return fiber.NewError(fiber.StatusNotFound, "Integration not found for this device.")
}
logger.Err(err).Msg("Failed to search for device integration record.")
return opaqueInternalError
}

if udai.Status != models.UserDeviceAPIIntegrationStatusActive {
return fiber.NewError(fiber.StatusConflict, "Integration is not active for this device.")
}

md := new(services.UserDeviceAPIIntegrationsMetadata)
if err := udai.Metadata.Unmarshal(md); err != nil {
logger.Err(err).Msg("Couldn't parse metadata JSON.")
return opaqueInternalError
}

if md.TeslaRegion == "" || md.Commands == nil {
return fiber.NewError(fiber.StatusBadRequest, "No commands config for integration and device")
}

if len(md.Commands.Capable) != 0 && !slices.Contains(md.Commands.Capable, constants.TelemetrySubscribe) {
return fiber.NewError(fiber.StatusBadRequest, "Telemetry command not available for device and integration combination")
}

// Is telemetry already enabled, return early
if ok := slices.Contains(md.Commands.Enabled, constants.TelemetrySubscribe); ok {
return c.SendStatus(fiber.StatusOK)
}

integration, err := udc.DeviceDefSvc.GetIntegrationByID(c.Context(), udai.IntegrationID)
if err != nil {
return shared.GrpcErrorToFiber(err, "deviceDefSvc error getting integration id: "+udai.IntegrationID)
}

switch integration.Vendor {
case constants.TeslaVendor:
if err := udc.teslaFleetAPISvc.SubscribeForTelemetryData(c.Context(),
udai.AccessToken.String,
md.TeslaRegion,
device.VinIdentifier.String,
); err != nil {
logger.Error().Err(err).Msg("error registering for telemetry")
return fiber.NewError(fiber.StatusFailedDependency, "could not register device for tesla telemetry: ", err.Error())
}

newEnabledCmd := append(md.Commands.Enabled, constants.TelemetrySubscribe)
md.Commands.Enabled = newEnabledCmd
newMeta, err := json.Marshal(md)
if err != nil {
return fiber.NewError(fiber.StatusInternalServerError, "could not save command state", err.Error())
}
udai.Metadata = null.JSONFrom(newMeta)
_, err = udai.Update(c.Context(), udc.DBS().Writer, boil.Whitelist(models.UserDeviceAPIIntegrationColumns.Metadata))
if err != nil {
return fiber.NewError(fiber.StatusInternalServerError, "could not save command state", err.Error())
}
default:
return fiber.NewError(fiber.StatusBadRequest, "Integration not supported for this command")
}

logger.Info().Msg("Successfully subscribed to telemetry")

return c.SendStatus(fiber.StatusOK)
}

// GetAutoPiUnitInfo godoc
// @Description gets the information about the aftermarket device by the hw serial
// @Tags integrations
Expand Down
167 changes: 164 additions & 3 deletions internal/controllers/user_integrations_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ import (
"testing"
"time"

"github.com/nats-io/nats-server/v2/server"
"github.com/rs/zerolog"

"github.com/DIMO-Network/shared/redis/mocks"
"github.com/ericlagergren/decimal"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -22,6 +19,8 @@ import (
signer "github.com/ethereum/go-ethereum/signer/core/apitypes"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
"github.com/nats-io/nats-server/v2/server"
"github.com/rs/zerolog"

pbuser "github.com/DIMO-Network/shared/api/users"
"github.com/DIMO-Network/shared/db"
Expand Down Expand Up @@ -126,6 +125,10 @@ func (s *UserIntegrationsControllerTestSuite) SetupSuite() {

app.Post("/user2/devices/:userDeviceID/integrations/:integrationID", test.AuthInjectorTestHandler(testUser2), c.RegisterDeviceIntegration)
app.Get("/user/devices/:userDeviceID/integrations/:integrationID", test.AuthInjectorTestHandler(testUserID), c.GetUserDeviceIntegration)
app.Post("/user/devices/:userDeviceID/integrations/:integrationID/commands/telemetry/subscribe",
test.AuthInjectorTestHandler(testUserID),
c.TelemetrySubscribe,
)

s.app = app
}
Expand Down Expand Up @@ -1224,3 +1227,161 @@ func (s *UserIntegrationsControllerTestSuite) TestGetUserDeviceIntegration_Refre
s.Assert().Equal(encRefreshTk, newAPIInt.RefreshToken.String)
s.Assert().Equal(encAccessTk, newAPIInt.AccessToken.String)
}

func (s *UserIntegrationsControllerTestSuite) TestTelemetrySubscribe() {
integration := test.BuildIntegrationGRPC(constants.TeslaVendor, 10, 0)
dd := test.BuildDeviceDefinitionGRPC(ksuid.New().String(), "Tesla", "Model S", 2012, integration)
ud := test.SetupCreateUserDevice(s.T(), testUserID, dd[0].DeviceDefinitionId, nil, "5YJSA1CN0CFP02439", s.pdb)

accessTk := "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
refreshTk := "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.UWfqdcCvyzObpI2gaIGcx2r7CcDjlQ0IzGyk8N0_vqw"
extID := "SomeID"
expectedExpiry := time.Now().Add(10 * time.Minute)
region := "na"

mtd, err := json.Marshal(services.UserDeviceAPIIntegrationsMetadata{
TeslaRegion: region,
Commands: &services.UserDeviceAPIIntegrationsMetadataCommands{
Enabled: []string{},
Capable: []string{constants.TelemetrySubscribe},
},
})
s.Require().NoError(err)

apIntd := models.UserDeviceAPIIntegration{
UserDeviceID: ud.ID,
IntegrationID: integration.Id,
Status: models.UserDeviceAPIIntegrationStatusActive,
AccessToken: null.StringFrom(accessTk),
AccessExpiresAt: null.TimeFrom(expectedExpiry),
RefreshToken: null.StringFrom(refreshTk),
ExternalID: null.StringFrom(extID),
Metadata: null.JSONFrom(mtd),
}
err = apIntd.Insert(s.ctx, s.pdb.DBS().Writer, boil.Infer())
s.Require().NoError(err)

s.deviceDefSvc.EXPECT().GetIntegrationByID(gomock.Any(), integration.Id).Return(integration, nil)
s.teslaFleetAPISvc.EXPECT().SubscribeForTelemetryData(gomock.Any(), accessTk, region, ud.VinIdentifier.String).Return(nil)

request := test.BuildRequest(http.MethodPost, fmt.Sprintf("/user/devices/%s/integrations/%s/commands/telemetry/subscribe", ud.ID, integration.Id), "")
res, err := s.app.Test(request, 60*1000)
s.Assert().NoError(err)

s.Assert().True(res.StatusCode == fiber.StatusOK)

udai, err := models.UserDeviceAPIIntegrations(
models.UserDeviceAPIIntegrationWhere.IntegrationID.EQ(integration.Id),
models.UserDeviceAPIIntegrationWhere.UserDeviceID.EQ(ud.ID),
).One(s.ctx, s.pdb.DBS().Reader)
s.Require().NoError(err)

md := new(services.UserDeviceAPIIntegrationsMetadata)
err = udai.Metadata.Unmarshal(md)
s.Require().NoError(err)

s.T().Log(md.Commands.Enabled, "-0------")
s.Assert().Equal(md.Commands.Enabled, []string{constants.TelemetrySubscribe})
}

func (s *UserIntegrationsControllerTestSuite) Test_NoUserDevice_TelemetrySubscribe() {
request := test.BuildRequest(http.MethodPost, fmt.Sprintf("/user/devices/%s/integrations/%s/commands/telemetry/subscribe", "mockUserDeviceID", "mockIntID"), "")
res, err := s.app.Test(request, 60*1000)
s.Assert().NoError(err)

s.Assert().True(res.StatusCode == fiber.StatusNotFound)
}

func (s *UserIntegrationsControllerTestSuite) Test_InactiveIntegration_TelemetrySubscribe() {
integration := test.BuildIntegrationGRPC(constants.TeslaVendor, 10, 0)
dd := test.BuildDeviceDefinitionGRPC(ksuid.New().String(), "Tesla", "Model S", 2012, integration)
ud := test.SetupCreateUserDevice(s.T(), testUserID, dd[0].DeviceDefinitionId, nil, "5YJSA1CN0CFP02439", s.pdb)

apIntd := models.UserDeviceAPIIntegration{
UserDeviceID: ud.ID,
IntegrationID: integration.Id,
Status: models.DeviceCommandRequestStatusPending,
}
err := apIntd.Insert(s.ctx, s.pdb.DBS().Writer, boil.Infer())
s.Require().NoError(err)

request := test.BuildRequest(http.MethodPost, fmt.Sprintf("/user/devices/%s/integrations/%s/commands/telemetry/subscribe", ud.ID, integration.Id), "")
res, err := s.app.Test(request, 60*1000)
s.Assert().NoError(err)

s.Assert().True(res.StatusCode == fiber.StatusConflict)
}

func (s *UserIntegrationsControllerTestSuite) Test_MissingRegionAndCapable_TelemetrySubscribe() {
integration := test.BuildIntegrationGRPC(constants.TeslaVendor, 10, 0)
dd := test.BuildDeviceDefinitionGRPC(ksuid.New().String(), "Tesla", "Model S", 2012, integration)
ud := test.SetupCreateUserDevice(s.T(), testUserID, dd[0].DeviceDefinitionId, nil, "5YJSA1CN0CFP02439", s.pdb)

apIntd := models.UserDeviceAPIIntegration{
UserDeviceID: ud.ID,
IntegrationID: integration.Id,
Status: models.UserDeviceAPIIntegrationStatusActive,
}
err := apIntd.Insert(s.ctx, s.pdb.DBS().Writer, boil.Infer())
s.Require().NoError(err)

request := test.BuildRequest(http.MethodPost, fmt.Sprintf("/user/devices/%s/integrations/%s/commands/telemetry/subscribe", ud.ID, integration.Id), "")
res, err := s.app.Test(request, 60*1000)
s.Assert().NoError(err)

s.Assert().True(res.StatusCode == fiber.StatusBadRequest)
}

func (s *UserIntegrationsControllerTestSuite) Test_TelemetrySubscribe_AlreadyEnabled() {
integration := test.BuildIntegrationGRPC(constants.TeslaVendor, 10, 0)
dd := test.BuildDeviceDefinitionGRPC(ksuid.New().String(), "Tesla", "Model S", 2012, integration)
ud := test.SetupCreateUserDevice(s.T(), testUserID, dd[0].DeviceDefinitionId, nil, "5YJSA1CN0CFP02439", s.pdb)

mtd, err := json.Marshal(services.UserDeviceAPIIntegrationsMetadata{
TeslaRegion: "na",
Commands: &services.UserDeviceAPIIntegrationsMetadataCommands{
Enabled: []string{constants.TelemetrySubscribe},
Capable: []string{constants.TelemetrySubscribe},
},
})
s.Require().NoError(err)
apIntd := models.UserDeviceAPIIntegration{
UserDeviceID: ud.ID,
IntegrationID: integration.Id,
Status: models.UserDeviceAPIIntegrationStatusActive,
Metadata: null.JSONFrom(mtd),
}
err = apIntd.Insert(s.ctx, s.pdb.DBS().Writer, boil.Infer())
s.Require().NoError(err)

request := test.BuildRequest(http.MethodPost, fmt.Sprintf("/user/devices/%s/integrations/%s/commands/telemetry/subscribe", ud.ID, integration.Id), "")
res, err := s.app.Test(request, 60*1000)
s.Assert().NoError(err)

s.Assert().True(res.StatusCode == fiber.StatusOK)
}

func (s *UserIntegrationsControllerTestSuite) Test_TelemetrySubscribe_NotCapable() {
integration := test.BuildIntegrationGRPC(constants.TeslaVendor, 10, 0)
dd := test.BuildDeviceDefinitionGRPC(ksuid.New().String(), "Tesla", "Model S", 2012, integration)
ud := test.SetupCreateUserDevice(s.T(), testUserID, dd[0].DeviceDefinitionId, nil, "5YJSA1CN0CFP02439", s.pdb)

mtd, err := json.Marshal(services.UserDeviceAPIIntegrationsMetadata{
TeslaRegion: "na",
})
s.Require().NoError(err)
apIntd := models.UserDeviceAPIIntegration{
UserDeviceID: ud.ID,
IntegrationID: integration.Id,
Status: models.UserDeviceAPIIntegrationStatusActive,
Metadata: null.JSONFrom(mtd),
}
err = apIntd.Insert(s.ctx, s.pdb.DBS().Writer, boil.Infer())
s.Require().NoError(err)

request := test.BuildRequest(http.MethodPost, fmt.Sprintf("/user/devices/%s/integrations/%s/commands/telemetry/subscribe", ud.ID, integration.Id), "")
res, err := s.app.Test(request, 60*1000)
s.Assert().NoError(err)

s.Assert().True(res.StatusCode == fiber.StatusBadRequest)
}
14 changes: 14 additions & 0 deletions internal/services/mocks/tesla_fleet_api_service_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 2e4bad9

Please sign in to comment.