From 266e233d46b8a1c38b1374a7ffae65d9cc5835e3 Mon Sep 17 00:00:00 2001 From: Wout Scheepers Date: Tue, 12 Nov 2019 11:48:03 +0100 Subject: [PATCH] Be able to pass success to BigQueryOperation --- .../contrib/bigquery/BigQueryOperation.java | 8 ++++++++ .../bigquery/BigQueryOperatorTest.java | 20 +++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/contrib/flo-bigquery/src/main/java/com/spotify/flo/contrib/bigquery/BigQueryOperation.java b/contrib/flo-bigquery/src/main/java/com/spotify/flo/contrib/bigquery/BigQueryOperation.java index 8382e5d9d..8fc882ada 100644 --- a/contrib/flo-bigquery/src/main/java/com/spotify/flo/contrib/bigquery/BigQueryOperation.java +++ b/contrib/flo-bigquery/src/main/java/com/spotify/flo/contrib/bigquery/BigQueryOperation.java @@ -61,6 +61,10 @@ static BigQueryOperation ofJob(Fn job) { return new BigQueryOperation().job(job); } + static BigQueryOperation ofJob(Fn job, F1 success) { + return new BigQueryOperation().job(job).success(success); + } + public static class Provider implements Serializable { private static final long serialVersionUID = 1L; @@ -95,5 +99,9 @@ public BigQueryOperation job(Fn jobInfo) { public BigQueryOperation job(JobInfo jobInfo) { return BigQueryOperation.ofJob(() -> jobInfo); } + + public BigQueryOperation job(JobInfo jobInfo, F1 success) { + return BigQueryOperation.ofJob(() -> jobInfo, success); + } } } diff --git a/contrib/flo-bigquery/src/test/java/com/spotify/flo/contrib/bigquery/BigQueryOperatorTest.java b/contrib/flo-bigquery/src/test/java/com/spotify/flo/contrib/bigquery/BigQueryOperatorTest.java index 6cf09bcaa..061be85c5 100644 --- a/contrib/flo-bigquery/src/test/java/com/spotify/flo/contrib/bigquery/BigQueryOperatorTest.java +++ b/contrib/flo-bigquery/src/test/java/com/spotify/flo/contrib/bigquery/BigQueryOperatorTest.java @@ -120,6 +120,26 @@ public void shouldRunLoadJobInTestMode() throws Exception { } } + @Test + public void shouldRunLoadJobInTestModeWithSuccess() throws Exception { + final TableId dstTable = TableId.of("foo", "bar", "baz"); + final String srcUri = "gs://foo/bar"; + + final Task task = Task.named("task") + .ofType(TableId.class) + .operator(BigQueryOperator.create()) + .process(bq -> bq.job( + JobInfo.of(LoadJobConfiguration.of(dstTable, srcUri)), response -> dstTable)); + + try (TestScope scope = FloTesting.scope()) { + + final TableId result = FloRunner.runTask(task).future() + .get(30, SECONDS); + + assertThat(result, is(dstTable)); + } + } + @Test public void shouldRunExtractJobInTestMode() throws Exception { final TableId srcTable = TableId.of("foo", "bar", "baz");