diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitBreathingRateRoute.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitBreathingRateRoute.java index b355ff19..1ed5bbc5 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitBreathingRateRoute.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitBreathingRateRoute.java @@ -23,13 +23,11 @@ import org.radarbase.connect.rest.fitbit.request.FitbitRestRequest; import org.radarbase.connect.rest.fitbit.user.User; import org.radarbase.connect.rest.fitbit.user.UserRepository; -import org.radarbase.connect.rest.fitbit.util.DateRange; -import java.time.ZonedDateTime; import java.util.stream.Stream; -import static java.time.ZoneOffset.UTC; import static java.time.temporal.ChronoUnit.SECONDS; +import java.time.Duration; public class FitbitBreathingRateRoute extends FitbitPollingRoute { private final FitbitBreathingRateAvroConverter converter; @@ -46,16 +44,19 @@ protected String getUrlFormat(String baseUrl) { } protected Stream createRequests(User user) { - ZonedDateTime startDate = this.getOffset(user).plus(ONE_SECOND) - .atZone(UTC) - .truncatedTo(SECONDS); - ZonedDateTime now = ZonedDateTime.now(UTC); - return Stream.of(newRequest(user, new DateRange(startDate, now), - user.getExternalUserId(), DATE_FORMAT.format(startDate), DATE_FORMAT.format(now))); + return startDateGenerator(getOffset(user).plus(ONE_SECOND).truncatedTo(SECONDS)) + .map(dateRange -> newRequest(user, dateRange, + user.getExternalUserId(), DATE_FORMAT.format(dateRange.start()))); + } + + /** Limit range to 30 days as documented here: https://dev.fitbit.com/build/reference/web-api/intraday/get-br-intraday-by-interval/ */ + @Override + Duration getDateRangeInterval() { + return THIRTY_DAYS; } @Override public FitbitBreathingRateAvroConverter converter() { return converter; } -} +} \ No newline at end of file diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateVariabilityRoute.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateVariabilityRoute.java index 1106a342..2b5958b9 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateVariabilityRoute.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateVariabilityRoute.java @@ -18,19 +18,17 @@ package org.radarbase.connect.rest.fitbit.route; import io.confluent.connect.avro.AvroData; -import org.radarbase.connect.rest.fitbit.converter.FitbitIntradayHeartRateAvroConverter; import org.radarbase.connect.rest.fitbit.converter.FitbitIntradayHeartRateVariabilityAvroConverter; import org.radarbase.connect.rest.fitbit.request.FitbitRequestGenerator; import org.radarbase.connect.rest.fitbit.request.FitbitRestRequest; import org.radarbase.connect.rest.fitbit.user.User; import org.radarbase.connect.rest.fitbit.user.UserRepository; -import org.radarbase.connect.rest.fitbit.util.DateRange; -import java.time.ZonedDateTime; import java.util.stream.Stream; -import static java.time.ZoneOffset.UTC; import static java.time.temporal.ChronoUnit.SECONDS; +import java.time.Duration; + public class FitbitIntradayHeartRateVariabilityRoute extends FitbitPollingRoute { private final FitbitIntradayHeartRateVariabilityAvroConverter converter; @@ -47,16 +45,19 @@ protected String getUrlFormat(String baseUrl) { } protected Stream createRequests(User user) { - ZonedDateTime startDate = this.getOffset(user).plus(ONE_SECOND) - .atZone(UTC) - .truncatedTo(SECONDS); - ZonedDateTime now = ZonedDateTime.now(UTC); - return Stream.of(newRequest(user, new DateRange(startDate, now), - user.getExternalUserId(), DATE_FORMAT.format(startDate), DATE_FORMAT.format(now))); + return startDateGenerator(getOffset(user).plus(ONE_SECOND).truncatedTo(SECONDS)) + .map(dateRange -> newRequest(user, dateRange, + user.getExternalUserId(), DATE_FORMAT.format(dateRange.start()))); + } + + /** Limit range to 30 days as documented here: https://dev.fitbit.com/build/reference/web-api/intraday/get-hrv-intraday-by-interval/ */ + @Override + Duration getDateRangeInterval() { + return THIRTY_DAYS; } @Override public FitbitIntradayHeartRateVariabilityAvroConverter converter() { return converter; } -} +} \ No newline at end of file diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java index 5f9905c8..53e17b97 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java @@ -95,6 +95,7 @@ public abstract class FitbitPollingRoute implements PollingRequestRoute { protected static final Duration LOOKBACK_TIME = Duration.ofDays(1); // 1 day protected static final long HISTORICAL_TIME_DAYS = 14L; protected static final Duration ONE_DAY = DAYS.getDuration(); + protected static final Duration THIRTY_DAYS = Duration.ofDays(30); protected static final Duration ONE_NANO = NANOS.getDuration(); protected static final TemporalAmount ONE_SECOND = SECONDS.getDuration(); protected static final TemporalAmount ONE_MINUTE = MINUTES.getDuration(); @@ -326,7 +327,7 @@ private TemporalAmount getEndDateThreshold() { } /** - * Generate one date per day, using UTC time zone. The first date will have the time from the + * Generate one date per day (or specified rangeInterval), using UTC time zone. The first date will have the time from the * given startDate. Following time stamps will start at 00:00. This will not up to the date of * {@link #getLookbackTime()} (exclusive). */ @@ -345,12 +346,12 @@ Stream startDateGenerator(Instant startDate) { return Stream.empty(); } } else { - long numElements = DAYS.between(startDate, lookBack); + Duration rangeInterval = getDateRangeInterval(); Stream elements = Stream - .iterate(dateTime, t -> t.plus(ONE_DAY).truncatedTo(DAYS)) - .limit(numElements) - .map(s -> new DateRange(s, s.plus(ONE_DAY).truncatedTo(DAYS).minus(ONE_NANO))); + .iterate(dateTime, t -> t.plus(rangeInterval).truncatedTo(DAYS)) + .takeWhile(u -> u.isBefore(lookBackDateStart)) + .map(s -> new DateRange(s, s.plus(rangeInterval).truncatedTo(DAYS).minus(ONE_NANO))); // we're polling at exactly 00:00, should not poll the last date if (lookBackDateStart.equals(lookBackDate)) { @@ -362,4 +363,8 @@ Stream startDateGenerator(Instant startDate) { } } } + + Duration getDateRangeInterval() { + return ONE_DAY; + } } diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitSkinTemperatureRoute.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitSkinTemperatureRoute.java index 2fc921e1..31362c94 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitSkinTemperatureRoute.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitSkinTemperatureRoute.java @@ -23,13 +23,11 @@ import org.radarbase.connect.rest.fitbit.request.FitbitRestRequest; import org.radarbase.connect.rest.fitbit.user.User; import org.radarbase.connect.rest.fitbit.user.UserRepository; -import org.radarbase.connect.rest.fitbit.util.DateRange; -import java.time.ZonedDateTime; import java.util.stream.Stream; -import static java.time.ZoneOffset.UTC; import static java.time.temporal.ChronoUnit.SECONDS; +import java.time.Duration; public class FitbitSkinTemperatureRoute extends FitbitPollingRoute { private final FitbitSkinTemperatureAvroConverter converter; @@ -46,16 +44,19 @@ protected String getUrlFormat(String baseUrl) { } protected Stream createRequests(User user) { - ZonedDateTime startDate = this.getOffset(user).plus(ONE_SECOND) - .atZone(UTC) - .truncatedTo(SECONDS); - ZonedDateTime now = ZonedDateTime.now(UTC); - return Stream.of(newRequest(user, new DateRange(startDate, now), - user.getExternalUserId(), DATE_FORMAT.format(startDate), DATE_FORMAT.format(now))); + return startDateGenerator(getOffset(user).plus(ONE_SECOND).truncatedTo(SECONDS)) + .map(dateRange -> newRequest(user, dateRange, + user.getExternalUserId(), DATE_FORMAT.format(dateRange.start()))); + } + + /** Limit range to 30 days as documented here: https://dev.fitbit.com/build/reference/web-api/temperature/get-temperature-skin-summary-by-interval */ + @Override + Duration getDateRangeInterval() { + return THIRTY_DAYS; } @Override public FitbitSkinTemperatureAvroConverter converter() { return converter; } -} +} \ No newline at end of file