Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
TimVosch committed Jan 3, 2025
1 parent a643378 commit 7cf0b09
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 7 deletions.
4 changes: 2 additions & 2 deletions services/core/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (
AUTH_JWKS_URL = env.Could("AUTH_JWKS_URL", "http://oathkeeper:4456/.well-known/jwks.json")
SYS_ARCHIVE_TIME = env.Could("SYS_ARCHIVE_TIME", "30")
MEASUREMENT_BATCH_SIZE = env.CouldInt("MEASUREMENT_BATCH_SIZE", 1024)
MEASUREMENT_COMMIT_INTERVAL = env.CouldInt("MEASUREMENT_COMMIT_INTERVAL", 1)
MEASUREMENT_COMMIT_INTERVAL = env.CouldInt("MEASUREMENT_COMMIT_INTERVAL", 1000)
)

func main() {
Expand Down Expand Up @@ -97,7 +97,7 @@ func Run(cleanup cleanupper.Cleanupper) error {
}
measurementstore := measurementsinfra.NewPSQL(db)
measurementservice := measurements.New(measurementstore, sysArchiveTime, keyClient)
cleanup.Add(measurementservice.StartMeasurementBatchStorer(MEASUREMENT_BATCH_SIZE, time.Duration(MEASUREMENT_COMMIT_INTERVAL)*time.Second))
cleanup.Add(measurementservice.StartMeasurementBatchStorer(MEASUREMENT_BATCH_SIZE, time.Duration(MEASUREMENT_COMMIT_INTERVAL)*time.Millisecond))

processingstore := processinginfra.NewPSQLStore(db)
processingPipelinePublisher := processinginfra.NewPipelineMessagePublisher(amqpConn, AMQP_XCHG_PIPELINE_MESSAGES)
Expand Down
10 changes: 5 additions & 5 deletions services/core/measurements/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ func (s *Service) StartMeasurementBatchStorer(batchSize int, interval time.Durat
t := time.NewTicker(interval)

commit := func() {
if len(measurements) > 0 {
log.Printf("Committing %d measurements\n", len(measurements))
if len(measurements) == 0 {
return
}
log.Printf("Committing %d measurements\n", len(measurements))
err := s.store.StoreMeasurements(measurements)
if err != nil {
log.Printf("Error storing measurements: %s\n", err.Error())
Expand Down Expand Up @@ -118,8 +119,7 @@ func (s *Service) ProcessPipelineMessage(pmsg pipeline.Message) error {
CreatedAt: time.Now(),
}

measurements := make([]Measurement, len(msg.Measurements))
for ix, m := range msg.Measurements {
for _, m := range msg.Measurements {

sensor, err := dev.GetSensorByExternalIDOrFallback(m.SensorExternalID)
if err != nil {
Expand Down Expand Up @@ -161,7 +161,7 @@ func (s *Service) ProcessPipelineMessage(pmsg pipeline.Message) error {
measurement.MeasurementAltitude = m.Altitude
}

measurements[ix] = measurement
s.measurementBatch <- measurement
}

return nil
Expand Down
5 changes: 5 additions & 0 deletions services/core/measurements/infra/store_psql.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,5 +447,10 @@ func (s *MeasurementStorePSQL) StoreMeasurements(measurements []measurements.Mea
)
}

_, err := q.RunWith(s.db).Exec()
if err != nil {
return fmt.Errorf("could not insert measurements: %w", err)
}

return nil
}

0 comments on commit 7cf0b09

Please sign in to comment.