Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a delay parameter to the job scheduler #61

Merged
merged 12 commits into from
Sep 29, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,26 @@ public class CronSchedule implements Schedule {
private String expression;
private ExecutionTime executionTime;
private Clock clock;
private long scheduleDelay;

public CronSchedule(String expression, ZoneId timezone) {
this.expression = expression;
this.timezone = timezone;
this.executionTime = ExecutionTime.forCron(cronParser.parse(this.expression));
clock = Clock.system(timezone);
this.scheduleDelay = 0;
}

public CronSchedule(String expression, ZoneId timezone, long scheduleDelay) {
this(expression, timezone);
this.scheduleDelay = scheduleDelay;
}

public CronSchedule(StreamInput input) throws IOException {
timezone = input.readZoneId();
expression = input.readString();
Long delayIn = input.readOptionalLong();
scheduleDelay = delayIn == null ? 0 : delayIn;
executionTime = ExecutionTime.forCron(cronParser.parse(expression));
clock = Clock.system(timezone);
}
Expand All @@ -96,19 +105,24 @@ String getCronExpression() {
return this.expression;
}

public long getDelay() { return this.scheduleDelay; }

public void setDelay(long delay) { this.scheduleDelay = delay; }

@Override
public Instant getNextExecutionTime(Instant time) {
Instant baseTime = time == null ? this.clock.instant() : time;

ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(baseTime, this.timezone);
// the executionTime object doesn't use the delay, need to remove the delay before and then add it back after
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(baseTime.minusMillis(this.scheduleDelay), this.timezone);
ZonedDateTime nextExecutionTime = this.executionTime.nextExecution(zonedDateTime).orElse(null);

return nextExecutionTime == null ? null : nextExecutionTime.toInstant();
return nextExecutionTime == null ? null : nextExecutionTime.toInstant().plusMillis(this.scheduleDelay);
}

@Override
public Duration nextTimeToExecute() {
Instant now = this.clock.instant();
Instant now = this.clock.instant().minusMillis(this.scheduleDelay);
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(now, this.timezone);
Optional<Duration> timeToNextExecution = this.executionTime.timeToNextExecution(zonedDateTime);
return timeToNextExecution.orElse(null);
Expand All @@ -121,15 +135,15 @@ public Tuple<Instant, Instant> getPeriodStartingAt(Instant startTime) {
realStartTime = startTime;
} else {
Instant now = this.clock.instant();
Optional<ZonedDateTime> lastExecutionTime = this.executionTime.lastExecution(ZonedDateTime.ofInstant(now, this.timezone));
Optional<ZonedDateTime> lastExecutionTime = this.executionTime.lastExecution(ZonedDateTime.ofInstant(now.minusMillis(this.scheduleDelay), this.timezone));
if (!lastExecutionTime.isPresent()) {
return new Tuple<>(now, now);
}
realStartTime = lastExecutionTime.get().toInstant();
realStartTime = lastExecutionTime.get().toInstant().plusMillis(this.scheduleDelay);
}
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(realStartTime, this.timezone);
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(realStartTime.minusMillis(this.scheduleDelay), this.timezone);
ZonedDateTime newEndTime = executionTime.nextExecution(zonedDateTime).orElse(null);
return new Tuple<>(realStartTime, newEndTime == null ? null : newEndTime.toInstant());
return new Tuple<>(realStartTime, newEndTime == null ? null : newEndTime.toInstant().plusMillis(this.scheduleDelay));
}

@Override
Expand All @@ -138,7 +152,7 @@ public Boolean runningOnTime(Instant lastExecutionTime) {
return true;
}

Instant now = this.clock.instant();
Instant now = this.clock.instant().minusMillis(this.scheduleDelay);
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(now, timezone);
Optional<ZonedDateTime> expectedExecutionTime = this.executionTime.lastExecution(zonedDateTime);

Expand All @@ -147,7 +161,7 @@ public Boolean runningOnTime(Instant lastExecutionTime) {
}
ZonedDateTime actualExecutionTime = ZonedDateTime.ofInstant(lastExecutionTime, timezone);

return ChronoUnit.SECONDS.between(expectedExecutionTime.get(), actualExecutionTime) == 0L;
return ChronoUnit.SECONDS.between(expectedExecutionTime.get().plus(scheduleDelay, ChronoUnit.MILLIS), actualExecutionTime) == 0L;
}

@Override
Expand All @@ -156,6 +170,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.startObject(CRON_FIELD)
.field(EXPRESSION_FIELD, this.expression)
.field(TIMEZONE_FIELD, this.timezone.getId())
.field(DELAY_FIELD, Long.valueOf(this.scheduleDelay))
.endObject()
.endObject();
return builder;
Expand All @@ -172,17 +187,19 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
CronSchedule cronSchedule = (CronSchedule) o;
return timezone.equals(cronSchedule.timezone) &&
expression.equals(cronSchedule.expression);
expression.equals(cronSchedule.expression) &&
scheduleDelay == cronSchedule.scheduleDelay;
}

@Override
public int hashCode() {
return Objects.hash(timezone, expression);
return Objects.hash(timezone, expression, scheduleDelay);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeZoneId(timezone);
out.writeString(expression);
out.writeOptionalLong(scheduleDelay);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,36 +66,49 @@ public class IntervalSchedule implements Schedule {
SUPPORTED_UNITS = Collections.unmodifiableSet(set);
}

private Instant startTime;
private Instant initialStartTime;
private Instant startTimeWithDelay;
private int interval;
private ChronoUnit unit;
private transient long intervalInMillis;
private Clock clock;
private long scheduleDelay;

public IntervalSchedule(Instant startTime, int interval, ChronoUnit unit) {
if (!SUPPORTED_UNITS.contains(unit)) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "Interval unit %s is not supported, expects %s",
unit, SUPPORTED_UNITS));
}
this.startTime = startTime;
this.initialStartTime = startTime;
this.startTimeWithDelay = startTime;
this.interval = interval;
this.unit = unit;
this.intervalInMillis = Duration.of(interval, this.unit).toMillis();
this.clock = Clock.system(ZoneId.systemDefault());
this.scheduleDelay = 0;
}

public IntervalSchedule(Instant startTime, int interval, ChronoUnit unit, long scheduleDelay) {
this(startTime, interval, unit);
this.startTimeWithDelay = startTime.plusMillis(scheduleDelay);
this.scheduleDelay = scheduleDelay;
}

public IntervalSchedule(StreamInput input) throws IOException {
startTime = input.readInstant();
initialStartTime = input.readInstant();
interval = input.readInt();
unit = input.readEnum(ChronoUnit.class);
Long delayIn = input.readOptionalLong();
scheduleDelay = delayIn == null ? 0 : delayIn;
startTimeWithDelay = initialStartTime.plusMillis(scheduleDelay);
intervalInMillis = Duration.of(interval, unit).toMillis();
clock = Clock.system(ZoneId.systemDefault());
}

@VisibleForTesting
Instant getStartTime() {
return this.startTime;
return this.startTimeWithDelay;
}

@VisibleForTesting
Expand All @@ -107,22 +120,36 @@ public ChronoUnit getUnit() {
return this.unit;
}

public long getDelay() { return this.scheduleDelay; }

public void setDelay(long delay) {
this.scheduleDelay = delay;
this.startTimeWithDelay = initialStartTime.plusMillis(scheduleDelay);
}

@Override
public Instant getNextExecutionTime(Instant time) {
Instant baseTime = time == null ? this.clock.instant() : time;
long delta = (baseTime.toEpochMilli() - this.startTime.toEpochMilli()) % this.intervalInMillis;
long remaining = this.intervalInMillis - delta;

return baseTime.plus(remaining, ChronoUnit.MILLIS);
long delta = (baseTime.toEpochMilli() - this.startTimeWithDelay.toEpochMilli());
if (delta >= 0) {
long remaining = this.intervalInMillis - (delta % this.intervalInMillis);
return baseTime.plus(remaining, ChronoUnit.MILLIS);
} else {
return this.startTimeWithDelay;
}
}

@Override
public Duration nextTimeToExecute() {
long enabledTimeEpochMillis = this.startTime.toEpochMilli();
long enabledTimeEpochMillis = this.startTimeWithDelay.toEpochMilli();
Instant currentTime = this.clock.instant();
long delta = currentTime.toEpochMilli() - enabledTimeEpochMillis;
long remainingScheduleTime = intervalInMillis - (delta % intervalInMillis);
return Duration.of(remainingScheduleTime, ChronoUnit.MILLIS);
if (delta >= 0) {
long remainingScheduleTime = intervalInMillis - (delta % intervalInMillis);
return Duration.of(remainingScheduleTime, ChronoUnit.MILLIS);
} else {
return Duration.ofMillis(enabledTimeEpochMillis - currentTime.toEpochMilli());
}
Comment on lines +138 to +143

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment on Ternary operators! Simplification of code increases its readability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also a big fan of ternary operators! But I try to only use them in simple cases, and I really try to not include anything that takes much thought. I agree with you on the other cases but I will leave this one as an if/else for that reason

}

@Override
Expand All @@ -139,7 +166,7 @@ public Boolean runningOnTime(Instant lastExecutionTime) {
return true;
}

long enabledTimeEpochMillis = this.startTime.toEpochMilli();
long enabledTimeEpochMillis = this.startTimeWithDelay.toEpochMilli();
Instant now = this.clock.instant();
long expectedMillisSinceLastExecution = (now.toEpochMilli() - enabledTimeEpochMillis) % this.intervalInMillis;
if (expectedMillisSinceLastExecution < 1000) {
Expand All @@ -155,9 +182,10 @@ public Boolean runningOnTime(Instant lastExecutionTime) {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject()
.startObject(INTERVAL_FIELD)
.field(START_TIME_FIELD, this.startTime.toEpochMilli())
.field(START_TIME_FIELD, this.initialStartTime.toEpochMilli())
.field(PERIOD_FIELD, this.interval)
.field(UNIT_FIELD, this.unit)
.field(DELAY_FIELD, Long.valueOf(this.scheduleDelay))
.endObject()
.endObject();
return builder;
Expand All @@ -178,21 +206,23 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
IntervalSchedule intervalSchedule = (IntervalSchedule) o;
return startTime.equals(intervalSchedule.startTime) &&
return initialStartTime.equals(intervalSchedule.initialStartTime) &&
interval == intervalSchedule.interval &&
unit == intervalSchedule.unit &&
intervalInMillis == intervalSchedule.intervalInMillis;
intervalInMillis == intervalSchedule.intervalInMillis &&
scheduleDelay == intervalSchedule.scheduleDelay;
}

@Override
public int hashCode() {
return Objects.hash(startTime, interval, unit, intervalInMillis);
return Objects.hash(initialStartTime, interval, unit, intervalInMillis, scheduleDelay);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeInstant(startTime);
out.writeInstant(initialStartTime);
out.writeInt(interval);
out.writeEnum(unit);
out.writeOptionalLong(scheduleDelay);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@
import java.time.Instant;

public interface Schedule extends Writeable, ToXContentObject {
static final String DELAY_FIELD = "schedule_delay";

/**
* Gets next job execution time of give time parameter.
* Gets next job execution time of given time parameter.
*
* @param time base time point
* @return next exection time since time parameter.
* @return next execution time since time parameter.
*/
Instant getNextExecutionTime(Instant time);

Expand All @@ -65,4 +66,7 @@ public interface Schedule extends Writeable, ToXContentObject {
* @return true if the job executes on time, otherwise false.
*/
Boolean runningOnTime(Instant lastExecutionTime);

long getDelay();
void setDelay(long delay);
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public static Schedule parse(XContentParser parser) throws IOException {
case CronSchedule.CRON_FIELD:
String expression = null;
ZoneId timezone = null;
long cronDelay = 0;
while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) {
String cronField = parser.currentName();
parser.nextToken();
Expand All @@ -57,6 +58,8 @@ public static Schedule parse(XContentParser parser) throws IOException {
break;
case CronSchedule.TIMEZONE_FIELD: timezone = ZoneId.of(parser.text());
break;
case Schedule.DELAY_FIELD: cronDelay = parser.longValue();
break;
default:
throw new IllegalArgumentException(
String.format(Locale.ROOT, "Unknown cron field %s", cronField));
Expand All @@ -65,11 +68,12 @@ public static Schedule parse(XContentParser parser) throws IOException {
XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.currentToken(),
parser);
parser.nextToken();
return new CronSchedule(expression, timezone);
return new CronSchedule(expression, timezone, cronDelay);
case IntervalSchedule.INTERVAL_FIELD:
Instant startTime = null;
int period = 0;
ChronoUnit unit = null;
long intervalDelay = 0;
while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) {
String intervalField = parser.currentName();
parser.nextToken();
Expand All @@ -83,6 +87,9 @@ public static Schedule parse(XContentParser parser) throws IOException {
case IntervalSchedule.UNIT_FIELD:
unit = ChronoUnit.valueOf(parser.text().toUpperCase(Locale.ROOT));
break;
case Schedule.DELAY_FIELD:
intervalDelay = parser.longValue();
break;
default:
throw new IllegalArgumentException(
String.format(Locale.ROOT, "Unknown interval field %s", intervalField));
Expand All @@ -91,7 +98,7 @@ public static Schedule parse(XContentParser parser) throws IOException {
XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.currentToken(),
parser);
parser.nextToken();
return new IntervalSchedule(startTime, period, unit);
return new IntervalSchedule(startTime, period, unit, intervalDelay);
default:
throw new IllegalArgumentException(
String.format(Locale.ROOT, "Unknown schedule type %s", fieldName));
Expand Down
Loading