From 8ec08719029be3e070a89546e251f2e98eab56f1 Mon Sep 17 00:00:00 2001 From: sramazzina Date: Fri, 26 Apr 2024 14:51:13 +0200 Subject: [PATCH 1/2] fix #3872 - Parquet input: opened files are never closed --- .../transforms/0072-prepare-parquet-file.hpl | 283 ++++++++++++++++++ .../transforms/0072-read-parquet-file.hpl | 218 ++++++++++++++ .../main-0072-parquet-input-test-delete.hwf | 142 +++++++++ .../transforms/input/ParquetInput.java | 37 ++- .../transforms/input/ParquetInputData.java | 8 + 5 files changed, 682 insertions(+), 6 deletions(-) create mode 100644 integration-tests/transforms/0072-prepare-parquet-file.hpl create mode 100644 integration-tests/transforms/0072-read-parquet-file.hpl create mode 100644 integration-tests/transforms/main-0072-parquet-input-test-delete.hwf diff --git a/integration-tests/transforms/0072-prepare-parquet-file.hpl b/integration-tests/transforms/0072-prepare-parquet-file.hpl new file mode 100644 index 00000000000..30e45d33b33 --- /dev/null +++ b/integration-tests/transforms/0072-prepare-parquet-file.hpl @@ -0,0 +1,283 @@ + + + + + 0072-prepare-parquet-file + Y + + + + Normal + + + N + 1000 + 100 + - + 2024/02/28 17:00:11.950 + - + 2024/02/28 17:00:11.950 + + + + + + Generate rows + Fake data + Y + + + Fake data + Generate random value + Y + + + Generate random value + Add sequence + Y + + + Add sequence + Calculator + Y + + + Calculator + Parquet File Output + Y + + + + Add sequence + Sequence + + Y + + 1 + + none + + + duckdb + 1 + 999999999 + SEQ_ + 1 + Y + N + seq + + + 688 + 144 + + + + Calculator + Calculator + + Y + + 1 + + none + + + + CONSTANT + c:\temp\test-parquet + base_filename + Y + -1 + -1 + String + + + CONSTANT + 10000 + den + Y + -1 + -1 + Integer + + + DIVIDE + seq + den + ratio + N + -1 + -1 + Integer + + + ADD + base_filename + ratio + filename + N + -1 + -1 + String + + Y + + + 848 + 144 + + + + Fake data + Fake + + Y + + 1 + + none + + + + + Name + name + Name + + + CompanyName + name + Company + + + Industry + industry + Company + + + en + + + 416 + 144 + + + + Generate random value + RandomValue + + Y + + 1 + + none + + + + + IntNumber + random integer + + + + + 544 + 144 + + + + Generate rows + RowGenerator + + Y + + 1 + + none + + + + + 5000 + FiveSecondsAgo + 10 + N + now + + + 304 + 144 + + + + Parquet File Output + ParquetFileOutput + + Y + + 1 + + none + + + UNCOMPRESSED + 8192 + 1048576 + + + Name + Name + + + CompanyName + CompanyName + + + Industry + Industry + + + IntNumber + Int_Number + + + ${PROJECT_HOME}/files/parquet-test + Y + yyyyMMdd-HHmmss + parquet + Y + N + N + Y + N + 100 + 268435456 + 2.0 + + + 1040 + 144 + + + + + + diff --git a/integration-tests/transforms/0072-read-parquet-file.hpl b/integration-tests/transforms/0072-read-parquet-file.hpl new file mode 100644 index 00000000000..49d0c631684 --- /dev/null +++ b/integration-tests/transforms/0072-read-parquet-file.hpl @@ -0,0 +1,218 @@ + + + + + 0072-read-parquet-file + Y + + + + Normal + + + N + 1000 + 100 + - + 2024/02/28 17:03:34.971 + - + 2024/02/28 17:03:34.971 + + + + + + Parquet File Input + Dummy (do nothing) + Y + + + Generate rows + PROJECT_HOME + Y + + + PROJECT_HOME + Calculator + Y + + + Calculator + Parquet File Input + Y + + + + Calculator + Calculator + + Y + + 1 + + none + + + + CONSTANT + / + sep + Y + -1 + -1 + String + + + ADD3 + prj_home + sep + file + filename + N + -1 + -1 + String + + Y + + + 688 + 224 + + + + Dummy (do nothing) + Dummy + + Y + + 1 + + none + + + + + 1056 + 224 + + + + Generate rows + RowGenerator + + Y + + 1 + + none + + + + + + + + + -1 + file + files/parquet-test-00-0001.parquet + -1 + N + String + + + 5000 + FiveSecondsAgo + 1 + N + now + + + 416 + 224 + + + + PROJECT_HOME + GetVariable + + Y + + 1 + + none + + + + + -1 + prj_home + -1 + none + String + ${PROJECT_HOME} + + + + + 544 + 224 + + + + Parquet File Input + ParquetFileInput + + Y + + 1 + + none + + + + + Name + Name + String + + + CompanyName + CompanyName + String + + + Industry + Industry + String + + + filename + + + 832 + 224 + + + + + + diff --git a/integration-tests/transforms/main-0072-parquet-input-test-delete.hwf b/integration-tests/transforms/main-0072-parquet-input-test-delete.hwf new file mode 100644 index 00000000000..8ad4d17b976 --- /dev/null +++ b/integration-tests/transforms/main-0072-parquet-input-test-delete.hwf @@ -0,0 +1,142 @@ + + + + main-0072-parquet-input-test-delete + Y + + + + - + 2024/04/19 16:34:12.724 + - + 2024/04/19 16:34:12.724 + + + + + Start + + SPECIAL + + 1 + 12 + 60 + 0 + 0 + N + 0 + 1 + N + 208 + 192 + + + + 0072-prepare-parquet-file.hpl + + PIPELINE + + N + N + N + N + N + N + ${PROJECT_HOME}/0072-prepare-parquet-file.hpl + Basic + + Y + + N + local + N + N + Y + N + 432 + 192 + + + + 0072-read-parquet-file.hpl + + PIPELINE + + N + N + N + N + N + N + ${PROJECT_HOME}/0072-read-parquet-file.hpl + Basic + + Y + + N + local + N + N + Y + N + 640 + 192 + + + + Delete file + + DELETE_FILE + + N + ${PROJECT_HOME}/files/parquet-test-00-0001.parquet + N + 864 + 192 + + + + + + Start + 0072-prepare-parquet-file.hpl + Y + Y + Y + + + 0072-prepare-parquet-file.hpl + 0072-read-parquet-file.hpl + Y + N + Y + + + 0072-read-parquet-file.hpl + Delete file + Y + N + Y + + + + + + diff --git a/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/input/ParquetInput.java b/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/input/ParquetInput.java index ba5d4fd60de..52aaf6cc413 100644 --- a/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/input/ParquetInput.java +++ b/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/input/ParquetInput.java @@ -31,7 +31,12 @@ import org.apache.hop.pipeline.transform.TransformMeta; import org.apache.parquet.hadoop.ParquetReader; -public class ParquetInput extends BaseTransform { +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +public class ParquetInput extends BaseTransform +{ public ParquetInput( TransformMeta transformMeta, ParquetInputMeta meta, @@ -48,6 +53,7 @@ public boolean processRow() throws HopException { Object[] row = getRow(); if (row == null) { // No more files, we're done. + closeFile(); setOutputDone(); return false; } @@ -75,25 +81,25 @@ public boolean processRow() throws HopException { try { long size = fileObject.getContent().getSize(); - InputStream inputStream = HopVfs.getInputStream(fileObject); + data.inputStream = HopVfs.getInputStream(fileObject); // Reads the whole file into memory... // ByteArrayOutputStream outputStream = new ByteArrayOutputStream((int) size); - IOUtils.copy(inputStream, outputStream); + IOUtils.copy(data.inputStream, outputStream); ParquetStream inputFile = new ParquetStream(outputStream.toByteArray(), filename); ParquetReadSupport readSupport = new ParquetReadSupport(meta.getFields()); - ParquetReader reader = + data.reader = new ParquetReaderBuilder<>(readSupport, inputFile).build(); - RowMetaAndData r = reader.read(); + RowMetaAndData r = data.reader.read(); while (r != null && !isStopped()) { // Add r to the input rows... // Object[] outputRow = RowDataUtil.addRowData(row, getInputRowMeta().size(), r.getData()); putRow(data.outputRowMeta, outputRow); - r = reader.read(); + r = data.reader.read(); } } catch (Exception e) { throw new HopException("Error read file " + filename, e); @@ -101,4 +107,23 @@ public boolean processRow() throws HopException { return true; } + + public void closeFile() { + if (!data.readerClosed) { + try { + data.reader.close(); + data.inputStream.close(); + } catch (IOException e) { + logError("Unable to properly close parquet reader!"); + } + data.readerClosed = true; + } + } + + @Override + public void dispose() { + super.dispose(); + + closeFile(); + } } diff --git a/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/input/ParquetInputData.java b/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/input/ParquetInputData.java index ae9ccde57a9..4e9c1801928 100644 --- a/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/input/ParquetInputData.java +++ b/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/input/ParquetInputData.java @@ -17,13 +17,21 @@ package org.apache.hop.parquet.transforms.input; +import org.apache.hop.core.RowMetaAndData; import org.apache.hop.core.row.IRowMeta; import org.apache.hop.pipeline.transform.BaseTransformData; import org.apache.hop.pipeline.transform.ITransformData; +import org.apache.parquet.hadoop.ParquetReader; + +import java.io.InputStream; public class ParquetInputData extends BaseTransformData implements ITransformData { + public IRowMeta outputRowMeta; public int filenameFieldIndex; + public ParquetReader reader; + public InputStream inputStream; + public boolean readerClosed = false; public ParquetInputData() { super(); From fa0da2232e6667072e50e392ba0e33e1b7bb7a96 Mon Sep 17 00:00:00 2001 From: Hans Van Akelyen Date: Mon, 29 Apr 2024 21:16:21 +0200 Subject: [PATCH 2/2] Apply spotless, #3872 --- .../transforms/input/ParquetInput.java | 27 +++++++------------ .../transforms/input/ParquetInputData.java | 5 ++-- 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/input/ParquetInput.java b/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/input/ParquetInput.java index 52aaf6cc413..9ae8826c19a 100644 --- a/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/input/ParquetInput.java +++ b/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/input/ParquetInput.java @@ -18,7 +18,7 @@ package org.apache.hop.parquet.transforms.input; import java.io.ByteArrayOutputStream; -import java.io.InputStream; +import java.io.IOException; import org.apache.commons.io.IOUtils; import org.apache.commons.vfs2.FileObject; import org.apache.hop.core.RowMetaAndData; @@ -29,14 +29,8 @@ import org.apache.hop.pipeline.PipelineMeta; import org.apache.hop.pipeline.transform.BaseTransform; import org.apache.hop.pipeline.transform.TransformMeta; -import org.apache.parquet.hadoop.ParquetReader; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; - -public class ParquetInput extends BaseTransform -{ +public class ParquetInput extends BaseTransform { public ParquetInput( TransformMeta transformMeta, ParquetInputMeta meta, @@ -90,8 +84,7 @@ public boolean processRow() throws HopException { ParquetStream inputFile = new ParquetStream(outputStream.toByteArray(), filename); ParquetReadSupport readSupport = new ParquetReadSupport(meta.getFields()); - data.reader = - new ParquetReaderBuilder<>(readSupport, inputFile).build(); + data.reader = new ParquetReaderBuilder<>(readSupport, inputFile).build(); RowMetaAndData r = data.reader.read(); while (r != null && !isStopped()) { @@ -110,13 +103,13 @@ public boolean processRow() throws HopException { public void closeFile() { if (!data.readerClosed) { - try { - data.reader.close(); - data.inputStream.close(); - } catch (IOException e) { - logError("Unable to properly close parquet reader!"); - } - data.readerClosed = true; + try { + data.reader.close(); + data.inputStream.close(); + } catch (IOException e) { + logError("Unable to properly close parquet reader!"); + } + data.readerClosed = true; } } diff --git a/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/input/ParquetInputData.java b/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/input/ParquetInputData.java index 4e9c1801928..01daffc373f 100644 --- a/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/input/ParquetInputData.java +++ b/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/input/ParquetInputData.java @@ -17,19 +17,18 @@ package org.apache.hop.parquet.transforms.input; +import java.io.InputStream; import org.apache.hop.core.RowMetaAndData; import org.apache.hop.core.row.IRowMeta; import org.apache.hop.pipeline.transform.BaseTransformData; import org.apache.hop.pipeline.transform.ITransformData; import org.apache.parquet.hadoop.ParquetReader; -import java.io.InputStream; - public class ParquetInputData extends BaseTransformData implements ITransformData { public IRowMeta outputRowMeta; public int filenameFieldIndex; - public ParquetReader reader; + public ParquetReader reader; public InputStream inputStream; public boolean readerClosed = false;