Skip to content

Commit

Permalink
⬆️ Bump InfluxDB code to v2
Browse files Browse the repository at this point in the history
Co-authored-by: Gudsfile <[email protected]>
  • Loading branch information
yannbertrand and Gudsfile committed Oct 28, 2024
1 parent 1d19379 commit 5aea91c
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 47 deletions.
6 changes: 5 additions & 1 deletion .env
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
APPLE_WATCH_INFLUX_DATABASE="apple_watch"
APPLE_WATCH_INFLUX_BUCKET="apple_watch"
APPLE_WATCH_INFLUX_ORG="my_org"
APPLE_WATCH_INFLUX_USERNAME="my_user"
APPLE_WATCH_INFLUX_PASSWORD="my_password"
APPLE_WATCH_INFLUX_TOKEN="my_token"
APPLE_WATCH_INFLUX_MEASUREMENT="workouts"
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ services:
container_name: apple_watch_grafana
ports:
- "3000:3000"
environment:
- APPLE_WATCH_INFLUX_BUCKET=$APPLE_WATCH_INFLUX_BUCKET
- APPLE_WATCH_INFLUX_TOKEN=$APPLE_WATCH_INFLUX_TOKEN
links:
- influxdb
volumes:
Expand Down
5 changes: 4 additions & 1 deletion grafana/provisioning/datasources/datasource.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ datasources:
access: proxy
orgId: 1
url: http://apple_watch_influxdb:8086
database: apple_watch
database: $APPLE_WATCH_INFLUX_BUCKET
jsonData:
httpHeaderName1: 'Authorization'
timeInterval: 1d
secureJsonData:
httpHeaderValue1: 'Token $APPLE_WATCH_INFLUX_TOKEN'
isDefault: true
editable: true
options:
Expand Down
9 changes: 8 additions & 1 deletion lib/controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,16 @@ const appleWatchWorkoutParser = require('./parser')
const writeInfluxPoints = require('./influx')

module.exports.pushArchiveDataToInfluxDB = (archivePath) => {
console.log('Exploring your data. It will take some time.')

fs.createReadStream(archivePath)
.pipe(unzipper.ParseOne('export.xml'))
.on('entry', () => console.log('Exploring your data. It will take some time, please wait...'))
.on('entry', () => console.log('Please wait...\n'))
.pipe(appleWatchWorkoutParser(new Handler()))
.pipe(writeInfluxPoints())
.on('finish', () =>
console.log(
"All done! You're ready to open http://localhost:3000/d/apple-watch-workouts/year-dashboard?orgId=1\n"
)
)
}
2 changes: 1 addition & 1 deletion lib/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ module.exports = class Handler {

onEnd() {
console.log(
`Found ${this._workoutsIds.size} workouts and ${this._nbOfDuplicateWorkouts} duplicates`
`\nFound ${this._workoutsIds.size} workouts and ${this._nbOfDuplicateWorkouts} duplicates`
)
}

Expand Down
117 changes: 74 additions & 43 deletions lib/influx.js
Original file line number Diff line number Diff line change
@@ -1,67 +1,98 @@
const Influx = require('influx')
const { InfluxDB, Point } = require('@influxdata/influxdb-client')
const { SetupAPI } = require('@influxdata/influxdb-client-apis')
const { hostname } = require('node:os')
const { Writable } = require('node:stream')

let influx
let influxWriteApi
;(async () => {
influxWriteApi = await loadInfluxInstance()
})()

const stream = new Writable({
async write(chunk, encoding, callback) {
try {
await loadInfluxInstance()

const currentWorkout = JSON.parse(chunk.toString())

const influxPoints = [currentWorkout].map((workout) => ({
measurement: process.env.APPLE_WATCH_INFLUX_MEASUREMENT,
tags: { type: workout.type, date: workout.startDate.substring(0, 10) },
fields: {
duration: workout.duration,
totalDistance: workout.totalDistance,
totalEnergyBurned: workout.totalEnergyBurned,
sourceName: workout.sourceName,
sourceVersion: workout.sourceVersion,
},
timestamp: new Date(workout.startDate),
}))
const influxPoints = [currentWorkout].map((workout) =>
new Point(process.env.APPLE_WATCH_INFLUX_MEASUREMENT)
.tag('type', workout.type)
.tag('date', workout.startDate.substring(0, 10))
.tag('sourceName', workout.sourceName)
.tag('sourceVersion', workout.sourceVersion)
.floatField('duration', workout.duration)
.floatField('totalDistance', workout.totalDistance)
.floatField('totalEnergyBurned', workout.totalEnergyBurned)
.timestamp(new Date(workout.startDate))
)

await influx.writePoints(influxPoints)
influxWriteApi.writePoints(influxPoints)
} catch (error) {
console.error(error)
console.log(`InfluxDB writing error: ${error.message}`)
}

callback()
},
})

stream.on('close', () => {
closeInfluxInstance(influxWriteApi)
})

module.exports = () => {
return stream
}

const loadInfluxInstance = async () => {
if (influx !== undefined) {
return
}
async function loadInfluxInstance() {
const url = 'http://localhost:8086'
const org = process.env.APPLE_WATCH_INFLUX_ORG
const bucket = process.env.APPLE_WATCH_INFLUX_BUCKET
const token = process.env.APPLE_WATCH_INFLUX_TOKEN
const username = process.env.APPLE_WATCH_INFLUX_USERNAME
const password = process.env.APPLE_WATCH_INFLUX_PASSWORD

const influxInstance = new Influx.InfluxDB({
host: 'localhost',
database: process.env.APPLE_WATCH_INFLUX_DATABASE,
schema: [
{
measurement: process.env.APPLE_WATCH_INFLUX_MEASUREMENT,
tags: ['type', 'date'],
fields: {
duration: Influx.FieldType.FLOAT,
totalDistance: Influx.FieldType.FLOAT,
totalEnergyBurned: Influx.FieldType.FLOAT,
sourceName: Influx.FieldType.STRING,
sourceVersion: Influx.FieldType.STRING,
},
},
],
})
const influx = new InfluxDB({ url, token })
const setupApi = new SetupAPI(influx)

const databasesNames = await influxInstance.getDatabaseNames()
if (!databasesNames.includes(process.env.APPLE_WATCH_INFLUX_DATABASE)) {
await influxInstance.createDatabase(process.env.APPLE_WATCH_INFLUX_DATABASE)
try {
const { allowed } = await setupApi.getSetup()
if (allowed) {
await setupApi.postSetup({
body: {
org,
bucket,
username,
password,
token,
},
})
console.log(`InfluxDB '${url}' is now onboarded.`)
} else {
console.debug(`InfluxDB '${url}' is ready.`)
}
} catch (error) {
console.error(error)
console.log('\nInfluxDB setup ERROR')
}

influx = influxInstance
const influxWriteApi = influx.getWriteApi(org, bucket, 's')
influxWriteApi.useDefaultTags({ location: hostname() })

return influxWriteApi
}

async function closeInfluxInstance(influxWriteApi) {
// WriteApi always buffer data into batches to optimize data transfer to InfluxDB server.
// writeApi.flush() can be called to flush the buffered data. The data is always written
// asynchronously, Moreover, a failed write (caused by a temporary networking or server failure)
// is retried automatically.
//
// close() flushes the remaining buffered data and then cancels pending retries.

try {
await influxWriteApi.close()
console.debug('InfluxDB connection closed successfully.')
} catch (error) {
console.error(error)
console.log('\nInfluxDB connection closing ERROR')
}
}
17 changes: 17 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
"test": "jest"
},
"dependencies": {
"@influxdata/influxdb-client": "^1.35.0",
"@influxdata/influxdb-client-apis": "^1.35.0",
"influx": "5.9.3",
"sax": "1.4.1",
"unzipper": "0.12.3",
Expand Down

0 comments on commit 5aea91c

Please sign in to comment.