Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a delay parameter to the job scheduler #61

Merged
merged 12 commits into from
Sep 29, 2021
Merged
2 changes: 1 addition & 1 deletion .github/workflows/test-and-build-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
uses: actions/checkout@v2
with:
repository: 'opensearch-project/OpenSearch'
ref: '1.x'
ref: '1.1'
path: OpenSearch
- name: Build OpenSearch
working-directory: ./OpenSearch
Expand Down
2 changes: 1 addition & 1 deletion DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Then you will find the built artifact located at `build/distributions` directory
## Install
Once you have built the plugin from source code, run
```bash
opensearch-plugin install file://${PLUGIN_ZIP_FILE_PATH}
opensearch-plugin install file:///path/to/target/releases/opensearch-job-scheduler-<version>.zip

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are your thoughts on adding the example path to target?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this very much, but this follows another plugin: https://github.com/opensearch-project/security-dashboards-plugin/blob/ad6e723d4710b91cbcf1f0ec543b6ec24b976c53/DEVELOPER_GUIDE.md and this change lets the lychee link checker succeed

```
to install the JobScheduler plugin to your OpenSearch.

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ If you discover a potential security issue in this project we ask that you notif

## License

This project is licensed under the [Apache v2.0 License](./LICENSE)
This project is licensed under the [Apache v2.0 License](./LICENSE.txt)

## Copyright

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class CronSchedule implements Schedule {
private String expression;
private ExecutionTime executionTime;
private Clock clock;
private Long scheduleDelay;

public CronSchedule(String expression, ZoneId timezone) {
this.expression = expression;
Expand All @@ -69,9 +70,15 @@ public CronSchedule(String expression, ZoneId timezone) {
clock = Clock.system(timezone);
}

public CronSchedule(String expression, ZoneId timezone, long scheduleDelay) {
this(expression, timezone);
this.scheduleDelay = scheduleDelay;
}

public CronSchedule(StreamInput input) throws IOException {
timezone = input.readZoneId();
expression = input.readString();
scheduleDelay = input.readOptionalLong();
executionTime = ExecutionTime.forCron(cronParser.parse(expression));
clock = Clock.system(timezone);
}
Expand All @@ -86,59 +93,68 @@ void setExecutionTime(ExecutionTime executionTime) {
this.executionTime = executionTime;
}

@VisibleForTesting
ZoneId getTimeZone() {
public ZoneId getTimeZone() {
return this.timezone;
}

@VisibleForTesting
String getCronExpression() {
public String getCronExpression() {
return this.expression;
}

public Long getDelay() { return this.scheduleDelay; }

@Override
public Instant getNextExecutionTime(Instant time) {
Instant baseTime = time == null ? this.clock.instant() : time;

ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(baseTime, this.timezone);
long delay = scheduleDelay == null ? 0 : scheduleDelay;
// The executionTime object doesn't know about the delay, so first subtract the delay from the baseTime in case
// this moves to the previous interval, then add the delay to the returned execution time to get the correct time.
// For example, say it is 10:07 AM with an hourly schedule and a delay of 15 minutes. The next execution time
// should be 10:15 AM, but executionTime.nextExecution( 10:07 AM ) would return the next execution as 11 AM.
// By subtracting the delay first, the ExecutionTime object is given the input time as 9:52 AM, it returns
// 10:00 AM, and after adding the delay, we get the correct next execution time of 10:15 AM.
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(baseTime.minusMillis(delay), this.timezone);
ZonedDateTime nextExecutionTime = this.executionTime.nextExecution(zonedDateTime).orElse(null);

return nextExecutionTime == null ? null : nextExecutionTime.toInstant();
return nextExecutionTime == null ? null : nextExecutionTime.toInstant().plusMillis(delay);
}

@Override
public Duration nextTimeToExecute() {
Instant now = this.clock.instant();
long delay = scheduleDelay == null ? 0 : scheduleDelay;
Instant now = this.clock.instant().minusMillis(delay);
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(now, this.timezone);
Optional<Duration> timeToNextExecution = this.executionTime.timeToNextExecution(zonedDateTime);
return timeToNextExecution.orElse(null);
}

@Override
public Tuple<Instant, Instant> getPeriodStartingAt(Instant startTime) {
long delay = scheduleDelay == null ? 0 : scheduleDelay;
Instant realStartTime;
if (startTime != null) {
realStartTime = startTime;
} else {
Instant now = this.clock.instant();
Optional<ZonedDateTime> lastExecutionTime = this.executionTime.lastExecution(ZonedDateTime.ofInstant(now, this.timezone));
Optional<ZonedDateTime> lastExecutionTime = this.executionTime.lastExecution(ZonedDateTime.ofInstant(now.minusMillis(delay), this.timezone));
if (!lastExecutionTime.isPresent()) {
return new Tuple<>(now, now);
}
realStartTime = lastExecutionTime.get().toInstant();
realStartTime = lastExecutionTime.get().toInstant().plusMillis(delay);
}
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(realStartTime, this.timezone);
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(realStartTime.minusMillis(delay), this.timezone);
ZonedDateTime newEndTime = executionTime.nextExecution(zonedDateTime).orElse(null);
return new Tuple<>(realStartTime, newEndTime == null ? null : newEndTime.toInstant());
return new Tuple<>(realStartTime, newEndTime == null ? null : newEndTime.toInstant().plusMillis(delay));
}

@Override
public Boolean runningOnTime(Instant lastExecutionTime) {
long delay = scheduleDelay == null ? 0 : scheduleDelay;
if (lastExecutionTime == null) {
return true;
}

Instant now = this.clock.instant();
Instant now = this.clock.instant().minusMillis(delay);
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(now, timezone);
Optional<ZonedDateTime> expectedExecutionTime = this.executionTime.lastExecution(zonedDateTime);

Expand All @@ -147,16 +163,17 @@ public Boolean runningOnTime(Instant lastExecutionTime) {
}
ZonedDateTime actualExecutionTime = ZonedDateTime.ofInstant(lastExecutionTime, timezone);

return ChronoUnit.SECONDS.between(expectedExecutionTime.get(), actualExecutionTime) == 0L;
return ChronoUnit.SECONDS.between(expectedExecutionTime.get().plus(delay, ChronoUnit.MILLIS), actualExecutionTime) == 0L;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject()
.startObject(CRON_FIELD)
.field(EXPRESSION_FIELD, this.expression)
.field(TIMEZONE_FIELD, this.timezone.getId())
.endObject()
.field(TIMEZONE_FIELD, this.timezone.getId());
if (this.scheduleDelay != null) { builder.field(DELAY_FIELD, this.scheduleDelay); }
builder.endObject()
.endObject();
return builder;
}
Expand All @@ -172,17 +189,23 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
CronSchedule cronSchedule = (CronSchedule) o;
return timezone.equals(cronSchedule.timezone) &&
expression.equals(cronSchedule.expression);
expression.equals(cronSchedule.expression) &&
Objects.equals(scheduleDelay, cronSchedule.scheduleDelay);
}

@Override
public int hashCode() {
return Objects.hash(timezone, expression);
if (scheduleDelay == null) {
return Objects.hash(timezone, expression);
} else {
return Objects.hash(timezone, expression, scheduleDelay);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeZoneId(timezone);
out.writeString(expression);
out.writeOptionalLong(scheduleDelay);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,39 +66,48 @@ public class IntervalSchedule implements Schedule {
SUPPORTED_UNITS = Collections.unmodifiableSet(set);
}

private Instant startTime;
private Instant initialStartTime;
private Instant startTimeWithDelay;
private int interval;
private ChronoUnit unit;
private transient long intervalInMillis;
private Clock clock;
private Long scheduleDelay;

public IntervalSchedule(Instant startTime, int interval, ChronoUnit unit) {
if (!SUPPORTED_UNITS.contains(unit)) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "Interval unit %s is not supported, expects %s",
unit, SUPPORTED_UNITS));
}
this.startTime = startTime;
this.initialStartTime = startTime;
this.startTimeWithDelay = startTime;
this.interval = interval;
this.unit = unit;
this.intervalInMillis = Duration.of(interval, this.unit).toMillis();
this.clock = Clock.system(ZoneId.systemDefault());
}

public IntervalSchedule(Instant startTime, int interval, ChronoUnit unit, long scheduleDelay) {
this(startTime, interval, unit);
this.startTimeWithDelay = startTime.plusMillis(scheduleDelay);
this.scheduleDelay = scheduleDelay;
}

public IntervalSchedule(StreamInput input) throws IOException {
startTime = input.readInstant();
initialStartTime = input.readInstant();
interval = input.readInt();
unit = input.readEnum(ChronoUnit.class);
scheduleDelay = input.readOptionalLong();
startTimeWithDelay = scheduleDelay == null ? initialStartTime : initialStartTime.plusMillis(scheduleDelay);
intervalInMillis = Duration.of(interval, unit).toMillis();
clock = Clock.system(ZoneId.systemDefault());
}

@VisibleForTesting
Instant getStartTime() {
return this.startTime;
public Instant getStartTime() {
return this.startTimeWithDelay;
}

@VisibleForTesting
public int getInterval() {
return this.interval;
}
Expand All @@ -107,22 +116,31 @@ public ChronoUnit getUnit() {
return this.unit;
}

public Long getDelay() { return this.scheduleDelay; }

@Override
public Instant getNextExecutionTime(Instant time) {
Instant baseTime = time == null ? this.clock.instant() : time;
long delta = (baseTime.toEpochMilli() - this.startTime.toEpochMilli()) % this.intervalInMillis;
long remaining = this.intervalInMillis - delta;

return baseTime.plus(remaining, ChronoUnit.MILLIS);
long delta = (baseTime.toEpochMilli() - this.startTimeWithDelay.toEpochMilli());
if (delta >= 0) {
long remaining = this.intervalInMillis - (delta % this.intervalInMillis);
return baseTime.plus(remaining, ChronoUnit.MILLIS);
} else {
return this.startTimeWithDelay;
}
}

@Override
public Duration nextTimeToExecute() {
long enabledTimeEpochMillis = this.startTime.toEpochMilli();
long enabledTimeEpochMillis = this.startTimeWithDelay.toEpochMilli();
Instant currentTime = this.clock.instant();
long delta = currentTime.toEpochMilli() - enabledTimeEpochMillis;
long remainingScheduleTime = intervalInMillis - (delta % intervalInMillis);
return Duration.of(remainingScheduleTime, ChronoUnit.MILLIS);
if (delta >= 0) {
long remainingScheduleTime = intervalInMillis - (delta % intervalInMillis);
return Duration.of(remainingScheduleTime, ChronoUnit.MILLIS);
} else {
return Duration.ofMillis(enabledTimeEpochMillis - currentTime.toEpochMilli());
}
Comment on lines +138 to +143

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment on Ternary operators! Simplification of code increases its readability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also a big fan of ternary operators! But I try to only use them in simple cases, and I really try to not include anything that takes much thought. I agree with you on the other cases but I will leave this one as an if/else for that reason

}

@Override
Expand All @@ -139,7 +157,7 @@ public Boolean runningOnTime(Instant lastExecutionTime) {
return true;
}

long enabledTimeEpochMillis = this.startTime.toEpochMilli();
long enabledTimeEpochMillis = this.startTimeWithDelay.toEpochMilli();
Instant now = this.clock.instant();
long expectedMillisSinceLastExecution = (now.toEpochMilli() - enabledTimeEpochMillis) % this.intervalInMillis;
if (expectedMillisSinceLastExecution < 1000) {
Expand All @@ -155,10 +173,11 @@ public Boolean runningOnTime(Instant lastExecutionTime) {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject()
.startObject(INTERVAL_FIELD)
.field(START_TIME_FIELD, this.startTime.toEpochMilli())
.field(START_TIME_FIELD, this.initialStartTime.toEpochMilli())
.field(PERIOD_FIELD, this.interval)
.field(UNIT_FIELD, this.unit)
.endObject()
.field(UNIT_FIELD, this.unit);
if (this.scheduleDelay != null) { builder.field(DELAY_FIELD, this.scheduleDelay); }
builder.endObject()
.endObject();
return builder;
}
Expand All @@ -178,21 +197,27 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
IntervalSchedule intervalSchedule = (IntervalSchedule) o;
return startTime.equals(intervalSchedule.startTime) &&
return initialStartTime.equals(intervalSchedule.initialStartTime) &&
interval == intervalSchedule.interval &&
unit == intervalSchedule.unit &&
intervalInMillis == intervalSchedule.intervalInMillis;
intervalInMillis == intervalSchedule.intervalInMillis &&
Objects.equals(scheduleDelay, intervalSchedule.scheduleDelay);
}

@Override
public int hashCode() {
return Objects.hash(startTime, interval, unit, intervalInMillis);
if (scheduleDelay == null) {
return Objects.hash(initialStartTime, interval, unit, intervalInMillis);
} else {
return Objects.hash(initialStartTime, interval, unit, intervalInMillis, scheduleDelay);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeInstant(startTime);
out.writeInstant(initialStartTime);
out.writeInt(interval);
out.writeEnum(unit);
out.writeOptionalLong(scheduleDelay);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@
import java.time.Instant;

public interface Schedule extends Writeable, ToXContentObject {
static final String DELAY_FIELD = "schedule_delay";

/**
* Gets next job execution time of give time parameter.
* Gets next job execution time of given time parameter.
*
* @param time base time point
* @return next exection time since time parameter.
* @return next execution time since time parameter.
*/
Instant getNextExecutionTime(Instant time);

Expand All @@ -65,4 +66,12 @@ public interface Schedule extends Writeable, ToXContentObject {
* @return true if the job executes on time, otherwise false.
*/
Boolean runningOnTime(Instant lastExecutionTime);

/**
* Gets the delay parameter of the schedule.
*
* @return the delay parameter of the schedule as a Long.
*/
Long getDelay();

}
Loading