Skip to content

Commit 438864b

Browse files
authored
Adds a delay parameter to the job scheduler (opensearch-project#61)
* Adds delay parameter to job scheduler * Adds tests for job scheduler delay parameter * Changes test and build workflow to 1.1 and corrects links Signed-off-by: Clay Downs <[email protected]>
1 parent c566f45 commit 438864b

File tree

10 files changed

+304
-56
lines changed

10 files changed

+304
-56
lines changed

.github/workflows/test-and-build-workflow.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ jobs:
3030
uses: actions/checkout@v2
3131
with:
3232
repository: 'opensearch-project/OpenSearch'
33-
ref: '1.x'
33+
ref: '1.1'
3434
path: OpenSearch
3535
- name: Build OpenSearch
3636
working-directory: ./OpenSearch

DEVELOPER_GUIDE.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ Then you will find the built artifact located at `build/distributions` directory
4949
## Install
5050
Once you have built the plugin from source code, run
5151
```bash
52-
opensearch-plugin install file://${PLUGIN_ZIP_FILE_PATH}
52+
opensearch-plugin install file:///path/to/target/releases/opensearch-job-scheduler-<version>.zip
5353
```
5454
to install the JobScheduler plugin to your OpenSearch.
5555

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ If you discover a potential security issue in this project we ask that you notif
5757

5858
## License
5959

60-
This project is licensed under the [Apache v2.0 License](./LICENSE)
60+
This project is licensed under the [Apache v2.0 License](./LICENSE.txt)
6161

6262
## Copyright
6363

spi/src/main/java/org/opensearch/jobscheduler/spi/schedule/CronSchedule.java

+49-16
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public class CronSchedule implements Schedule {
6161
private String expression;
6262
private ExecutionTime executionTime;
6363
private Clock clock;
64+
private Long scheduleDelay;
6465

6566
public CronSchedule(String expression, ZoneId timezone) {
6667
this.expression = expression;
@@ -69,9 +70,15 @@ public CronSchedule(String expression, ZoneId timezone) {
6970
clock = Clock.system(timezone);
7071
}
7172

73+
public CronSchedule(String expression, ZoneId timezone, long scheduleDelay) {
74+
this(expression, timezone);
75+
this.scheduleDelay = scheduleDelay;
76+
}
77+
7278
public CronSchedule(StreamInput input) throws IOException {
7379
timezone = input.readZoneId();
7480
expression = input.readString();
81+
scheduleDelay = input.readOptionalLong();
7582
executionTime = ExecutionTime.forCron(cronParser.parse(expression));
7683
clock = Clock.system(timezone);
7784
}
@@ -86,59 +93,68 @@ void setExecutionTime(ExecutionTime executionTime) {
8693
this.executionTime = executionTime;
8794
}
8895

89-
@VisibleForTesting
90-
ZoneId getTimeZone() {
96+
public ZoneId getTimeZone() {
9197
return this.timezone;
9298
}
9399

94-
@VisibleForTesting
95-
String getCronExpression() {
100+
public String getCronExpression() {
96101
return this.expression;
97102
}
98103

104+
public Long getDelay() { return this.scheduleDelay; }
105+
99106
@Override
100107
public Instant getNextExecutionTime(Instant time) {
101108
Instant baseTime = time == null ? this.clock.instant() : time;
102-
103-
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(baseTime, this.timezone);
109+
long delay = scheduleDelay == null ? 0 : scheduleDelay;
110+
// The executionTime object doesn't know about the delay, so first subtract the delay from the baseTime in case
111+
// this moves to the previous interval, then add the delay to the returned execution time to get the correct time.
112+
// For example, say it is 10:07 AM with an hourly schedule and a delay of 15 minutes. The next execution time
113+
// should be 10:15 AM, but executionTime.nextExecution( 10:07 AM ) would return the next execution as 11 AM.
114+
// By subtracting the delay first, the ExecutionTime object is given the input time as 9:52 AM, it returns
115+
// 10:00 AM, and after adding the delay, we get the correct next execution time of 10:15 AM.
116+
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(baseTime.minusMillis(delay), this.timezone);
104117
ZonedDateTime nextExecutionTime = this.executionTime.nextExecution(zonedDateTime).orElse(null);
105118

106-
return nextExecutionTime == null ? null : nextExecutionTime.toInstant();
119+
return nextExecutionTime == null ? null : nextExecutionTime.toInstant().plusMillis(delay);
107120
}
108121

109122
@Override
110123
public Duration nextTimeToExecute() {
111-
Instant now = this.clock.instant();
124+
long delay = scheduleDelay == null ? 0 : scheduleDelay;
125+
Instant now = this.clock.instant().minusMillis(delay);
112126
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(now, this.timezone);
113127
Optional<Duration> timeToNextExecution = this.executionTime.timeToNextExecution(zonedDateTime);
114128
return timeToNextExecution.orElse(null);
115129
}
116130

117131
@Override
118132
public Tuple<Instant, Instant> getPeriodStartingAt(Instant startTime) {
133+
long delay = scheduleDelay == null ? 0 : scheduleDelay;
119134
Instant realStartTime;
120135
if (startTime != null) {
121136
realStartTime = startTime;
122137
} else {
123138
Instant now = this.clock.instant();
124-
Optional<ZonedDateTime> lastExecutionTime = this.executionTime.lastExecution(ZonedDateTime.ofInstant(now, this.timezone));
139+
Optional<ZonedDateTime> lastExecutionTime = this.executionTime.lastExecution(ZonedDateTime.ofInstant(now.minusMillis(delay), this.timezone));
125140
if (!lastExecutionTime.isPresent()) {
126141
return new Tuple<>(now, now);
127142
}
128-
realStartTime = lastExecutionTime.get().toInstant();
143+
realStartTime = lastExecutionTime.get().toInstant().plusMillis(delay);
129144
}
130-
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(realStartTime, this.timezone);
145+
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(realStartTime.minusMillis(delay), this.timezone);
131146
ZonedDateTime newEndTime = executionTime.nextExecution(zonedDateTime).orElse(null);
132-
return new Tuple<>(realStartTime, newEndTime == null ? null : newEndTime.toInstant());
147+
return new Tuple<>(realStartTime, newEndTime == null ? null : newEndTime.toInstant().plusMillis(delay));
133148
}
134149

135150
@Override
136151
public Boolean runningOnTime(Instant lastExecutionTime) {
152+
long delay = scheduleDelay == null ? 0 : scheduleDelay;
137153
if (lastExecutionTime == null) {
138154
return true;
139155
}
140156

141-
Instant now = this.clock.instant();
157+
Instant now = this.clock.instant().minusMillis(delay);
142158
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(now, timezone);
143159
Optional<ZonedDateTime> expectedExecutionTime = this.executionTime.lastExecution(zonedDateTime);
144160

@@ -147,15 +163,30 @@ public Boolean runningOnTime(Instant lastExecutionTime) {
147163
}
148164
ZonedDateTime actualExecutionTime = ZonedDateTime.ofInstant(lastExecutionTime, timezone);
149165

150-
return ChronoUnit.SECONDS.between(expectedExecutionTime.get(), actualExecutionTime) == 0L;
166+
return ChronoUnit.SECONDS.between(expectedExecutionTime.get().plus(delay, ChronoUnit.MILLIS), actualExecutionTime) == 0L;
151167
}
152168

153169
@Override
154170
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
171+
return this.scheduleDelay == null ? toXContentNoDelay(builder) : toXContentWithDelay(builder);
172+
}
173+
174+
private XContentBuilder toXContentNoDelay(XContentBuilder builder) throws IOException {
175+
builder.startObject()
176+
.startObject(CRON_FIELD)
177+
.field(EXPRESSION_FIELD, this.expression)
178+
.field(TIMEZONE_FIELD, this.timezone.getId())
179+
.endObject()
180+
.endObject();
181+
return builder;
182+
}
183+
184+
private XContentBuilder toXContentWithDelay(XContentBuilder builder) throws IOException {
155185
builder.startObject()
156186
.startObject(CRON_FIELD)
157187
.field(EXPRESSION_FIELD, this.expression)
158188
.field(TIMEZONE_FIELD, this.timezone.getId())
189+
.field(DELAY_FIELD, this.scheduleDelay)
159190
.endObject()
160191
.endObject();
161192
return builder;
@@ -172,17 +203,19 @@ public boolean equals(Object o) {
172203
if (o == null || getClass() != o.getClass()) return false;
173204
CronSchedule cronSchedule = (CronSchedule) o;
174205
return timezone.equals(cronSchedule.timezone) &&
175-
expression.equals(cronSchedule.expression);
206+
expression.equals(cronSchedule.expression) &&
207+
Objects.equals(scheduleDelay, cronSchedule.scheduleDelay);
176208
}
177209

178210
@Override
179211
public int hashCode() {
180-
return Objects.hash(timezone, expression);
212+
return scheduleDelay == null ? Objects.hash(timezone, expression) : Objects.hash(timezone, expression, scheduleDelay);
181213
}
182214

183215
@Override
184216
public void writeTo(StreamOutput out) throws IOException {
185217
out.writeZoneId(timezone);
186218
out.writeString(expression);
219+
out.writeOptionalLong(scheduleDelay);
187220
}
188221
}

spi/src/main/java/org/opensearch/jobscheduler/spi/schedule/IntervalSchedule.java

+57-20
Original file line numberDiff line numberDiff line change
@@ -66,39 +66,48 @@ public class IntervalSchedule implements Schedule {
6666
SUPPORTED_UNITS = Collections.unmodifiableSet(set);
6767
}
6868

69-
private Instant startTime;
69+
private Instant initialStartTime;
70+
private Instant startTimeWithDelay;
7071
private int interval;
7172
private ChronoUnit unit;
7273
private transient long intervalInMillis;
7374
private Clock clock;
75+
private Long scheduleDelay;
7476

7577
public IntervalSchedule(Instant startTime, int interval, ChronoUnit unit) {
7678
if (!SUPPORTED_UNITS.contains(unit)) {
7779
throw new IllegalArgumentException(
7880
String.format(Locale.ROOT, "Interval unit %s is not supported, expects %s",
7981
unit, SUPPORTED_UNITS));
8082
}
81-
this.startTime = startTime;
83+
this.initialStartTime = startTime;
84+
this.startTimeWithDelay = startTime;
8285
this.interval = interval;
8386
this.unit = unit;
8487
this.intervalInMillis = Duration.of(interval, this.unit).toMillis();
8588
this.clock = Clock.system(ZoneId.systemDefault());
8689
}
8790

91+
public IntervalSchedule(Instant startTime, int interval, ChronoUnit unit, long scheduleDelay) {
92+
this(startTime, interval, unit);
93+
this.startTimeWithDelay = startTime.plusMillis(scheduleDelay);
94+
this.scheduleDelay = scheduleDelay;
95+
}
96+
8897
public IntervalSchedule(StreamInput input) throws IOException {
89-
startTime = input.readInstant();
98+
initialStartTime = input.readInstant();
9099
interval = input.readInt();
91100
unit = input.readEnum(ChronoUnit.class);
101+
scheduleDelay = input.readOptionalLong();
102+
startTimeWithDelay = scheduleDelay == null ? initialStartTime : initialStartTime.plusMillis(scheduleDelay);
92103
intervalInMillis = Duration.of(interval, unit).toMillis();
93104
clock = Clock.system(ZoneId.systemDefault());
94105
}
95106

96-
@VisibleForTesting
97-
Instant getStartTime() {
98-
return this.startTime;
107+
public Instant getStartTime() {
108+
return this.startTimeWithDelay;
99109
}
100110

101-
@VisibleForTesting
102111
public int getInterval() {
103112
return this.interval;
104113
}
@@ -107,22 +116,31 @@ public ChronoUnit getUnit() {
107116
return this.unit;
108117
}
109118

119+
public Long getDelay() { return this.scheduleDelay; }
120+
110121
@Override
111122
public Instant getNextExecutionTime(Instant time) {
112123
Instant baseTime = time == null ? this.clock.instant() : time;
113-
long delta = (baseTime.toEpochMilli() - this.startTime.toEpochMilli()) % this.intervalInMillis;
114-
long remaining = this.intervalInMillis - delta;
115-
116-
return baseTime.plus(remaining, ChronoUnit.MILLIS);
124+
long delta = (baseTime.toEpochMilli() - this.startTimeWithDelay.toEpochMilli());
125+
if (delta >= 0) {
126+
long remaining = this.intervalInMillis - (delta % this.intervalInMillis);
127+
return baseTime.plus(remaining, ChronoUnit.MILLIS);
128+
} else {
129+
return this.startTimeWithDelay;
130+
}
117131
}
118132

119133
@Override
120134
public Duration nextTimeToExecute() {
121-
long enabledTimeEpochMillis = this.startTime.toEpochMilli();
135+
long enabledTimeEpochMillis = this.startTimeWithDelay.toEpochMilli();
122136
Instant currentTime = this.clock.instant();
123137
long delta = currentTime.toEpochMilli() - enabledTimeEpochMillis;
124-
long remainingScheduleTime = intervalInMillis - (delta % intervalInMillis);
125-
return Duration.of(remainingScheduleTime, ChronoUnit.MILLIS);
138+
if (delta >= 0) {
139+
long remainingScheduleTime = intervalInMillis - (delta % intervalInMillis);
140+
return Duration.of(remainingScheduleTime, ChronoUnit.MILLIS);
141+
} else {
142+
return Duration.ofMillis(enabledTimeEpochMillis - currentTime.toEpochMilli());
143+
}
126144
}
127145

128146
@Override
@@ -139,7 +157,7 @@ public Boolean runningOnTime(Instant lastExecutionTime) {
139157
return true;
140158
}
141159

142-
long enabledTimeEpochMillis = this.startTime.toEpochMilli();
160+
long enabledTimeEpochMillis = this.startTimeWithDelay.toEpochMilli();
143161
Instant now = this.clock.instant();
144162
long expectedMillisSinceLastExecution = (now.toEpochMilli() - enabledTimeEpochMillis) % this.intervalInMillis;
145163
if (expectedMillisSinceLastExecution < 1000) {
@@ -153,11 +171,27 @@ public Boolean runningOnTime(Instant lastExecutionTime) {
153171

154172
@Override
155173
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
174+
return this.scheduleDelay == null ? toXContentNoDelay(builder) : toXContentWithDelay(builder);
175+
}
176+
177+
private XContentBuilder toXContentNoDelay(XContentBuilder builder) throws IOException {
178+
builder.startObject()
179+
.startObject(INTERVAL_FIELD)
180+
.field(START_TIME_FIELD, this.initialStartTime.toEpochMilli())
181+
.field(PERIOD_FIELD, this.interval)
182+
.field(UNIT_FIELD, this.unit)
183+
.endObject()
184+
.endObject();
185+
return builder;
186+
}
187+
188+
private XContentBuilder toXContentWithDelay(XContentBuilder builder) throws IOException {
156189
builder.startObject()
157190
.startObject(INTERVAL_FIELD)
158-
.field(START_TIME_FIELD, this.startTime.toEpochMilli())
191+
.field(START_TIME_FIELD, this.initialStartTime.toEpochMilli())
159192
.field(PERIOD_FIELD, this.interval)
160193
.field(UNIT_FIELD, this.unit)
194+
.field(DELAY_FIELD, this.scheduleDelay)
161195
.endObject()
162196
.endObject();
163197
return builder;
@@ -178,21 +212,24 @@ public boolean equals(Object o) {
178212
if (this == o) return true;
179213
if (o == null || getClass() != o.getClass()) return false;
180214
IntervalSchedule intervalSchedule = (IntervalSchedule) o;
181-
return startTime.equals(intervalSchedule.startTime) &&
215+
return initialStartTime.equals(intervalSchedule.initialStartTime) &&
182216
interval == intervalSchedule.interval &&
183217
unit == intervalSchedule.unit &&
184-
intervalInMillis == intervalSchedule.intervalInMillis;
218+
intervalInMillis == intervalSchedule.intervalInMillis &&
219+
Objects.equals(scheduleDelay, intervalSchedule.scheduleDelay);
185220
}
186221

187222
@Override
188223
public int hashCode() {
189-
return Objects.hash(startTime, interval, unit, intervalInMillis);
224+
return scheduleDelay == null ? Objects.hash(initialStartTime, interval, unit, intervalInMillis) :
225+
Objects.hash(initialStartTime, interval, unit, intervalInMillis, scheduleDelay);
190226
}
191227

192228
@Override
193229
public void writeTo(StreamOutput out) throws IOException {
194-
out.writeInstant(startTime);
230+
out.writeInstant(initialStartTime);
195231
out.writeInt(interval);
196232
out.writeEnum(unit);
233+
out.writeOptionalLong(scheduleDelay);
197234
}
198235
}

spi/src/main/java/org/opensearch/jobscheduler/spi/schedule/Schedule.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,13 @@
3434
import java.time.Instant;
3535

3636
public interface Schedule extends Writeable, ToXContentObject {
37+
static final String DELAY_FIELD = "schedule_delay";
3738

3839
/**
39-
* Gets next job execution time of give time parameter.
40+
* Gets next job execution time of given time parameter.
4041
*
4142
* @param time base time point
42-
* @return next exection time since time parameter.
43+
* @return next execution time since time parameter.
4344
*/
4445
Instant getNextExecutionTime(Instant time);
4546

@@ -65,4 +66,12 @@ public interface Schedule extends Writeable, ToXContentObject {
6566
* @return true if the job executes on time, otherwise false.
6667
*/
6768
Boolean runningOnTime(Instant lastExecutionTime);
69+
70+
/**
71+
* Gets the delay parameter of the schedule.
72+
*
73+
* @return the delay parameter of the schedule as a Long.
74+
*/
75+
Long getDelay();
76+
6877
}

0 commit comments

Comments
 (0)