From a6d52705ee6b80fba898b9e4dff1e132b8b56779 Mon Sep 17 00:00:00 2001 From: Arjun Gupta Date: Tue, 18 May 2021 16:29:40 -0700 Subject: [PATCH] Fix ZeroRowFileCreator to use DataSink instead of OutputStream interface --- .../facebook/presto/common/io/DataSink.java | 3 +++ .../presto/hive/HiveZeroRowFileCreator.java | 19 ++++++++++++++----- .../hive/pagefile/PageFileWriterFactory.java | 4 +--- .../spark/TestPrestoSparkQueryRunner.java | 12 ++++++++++++ 4 files changed, 30 insertions(+), 8 deletions(-) diff --git a/presto-common/src/main/java/com/facebook/presto/common/io/DataSink.java b/presto-common/src/main/java/com/facebook/presto/common/io/DataSink.java index 0bcb5bd1eed63..6f5dfdfc8c284 100644 --- a/presto-common/src/main/java/com/facebook/presto/common/io/DataSink.java +++ b/presto-common/src/main/java/com/facebook/presto/common/io/DataSink.java @@ -13,10 +13,12 @@ */ package com.facebook.presto.common.io; +import java.io.Closeable; import java.io.IOException; import java.util.List; public interface DataSink + extends Closeable { /** * Number of bytes written to this sink so far. @@ -38,6 +40,7 @@ void write(List outputData) /** * File is complete */ + @Override void close() throws IOException; } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveZeroRowFileCreator.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveZeroRowFileCreator.java index f4c969b0945c2..081551c7010f7 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveZeroRowFileCreator.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveZeroRowFileCreator.java @@ -14,12 +14,17 @@ package com.facebook.presto.hive; import com.facebook.airlift.log.Logger; +import com.facebook.presto.common.io.DataOutput; +import com.facebook.presto.common.io.DataSink; import com.facebook.presto.hive.datasink.DataSinkFactory; import com.facebook.presto.hive.metastore.StorageFormat; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.PrestoException; +import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; +import io.airlift.slice.Slices; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.mapred.JobConf; @@ -27,7 +32,6 @@ import javax.inject.Inject; import java.io.IOException; -import java.io.OutputStream; import java.io.UncheckedIOException; import java.nio.file.Paths; import java.util.ArrayList; @@ -35,6 +39,7 @@ import java.util.Properties; import static com.facebook.airlift.concurrent.MoreFutures.getFutureValue; +import static com.facebook.presto.common.io.DataOutput.createDataOutput; import static com.facebook.presto.hive.HiveCompressionCodec.NONE; import static com.facebook.presto.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR; import static com.facebook.presto.hive.HiveWriteUtils.initializeSerializer; @@ -79,7 +84,7 @@ public void createFiles(ConnectorSession session, HdfsContext hdfsContext, Path List> commitFutures = new ArrayList<>(); for (String fileName : fileNames) { - commitFutures.add(executor.submit(() -> createFile(hdfsContext, new Path(destinationDirectory, fileName), fileContent))); + commitFutures.add(executor.submit(() -> createFile(hdfsContext, new Path(destinationDirectory, fileName), fileContent, session))); } ListenableFuture listenableFutureAggregate = whenAllSucceed(commitFutures).call(() -> null, directExecutor()); @@ -144,10 +149,14 @@ private byte[] generateZeroRowFile( } } - private void createFile(HdfsContext hdfsContext, Path path, byte[] content) + private void createFile(HdfsContext hdfsContext, Path path, byte[] content, ConnectorSession session) { - try (OutputStream outputStream = hdfsEnvironment.getFileSystem(hdfsContext, path).create(path, false)) { - outputStream.write(content); + try { + FileSystem fs = hdfsEnvironment.getFileSystem(hdfsContext, path); + try (DataSink dataSink = dataSinkFactory.createDataSink(session, fs, path)) { + DataOutput dataOutput = createDataOutput(Slices.wrappedBuffer(content)); + dataSink.write(ImmutableList.of(dataOutput)); + } } catch (IOException e) { throw new PrestoException(HIVE_WRITER_CLOSE_ERROR, "Error write zero-row file to Hive", e); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/pagefile/PageFileWriterFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/pagefile/PageFileWriterFactory.java index aa0a8b8b0b0a0..8381270ce2cc3 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/pagefile/PageFileWriterFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/pagefile/PageFileWriterFactory.java @@ -120,10 +120,8 @@ public static void createEmptyPageFile( FileSystem fileSystem, Path path) { - try { - DataSink dataSink = dataSinkFactory.createDataSink(session, fileSystem, path); + try (DataSink dataSink = dataSinkFactory.createDataSink(session, fileSystem, path)) { dataSink.write(ImmutableList.of(createEmptyPageFileFooterOutput())); - dataSink.close(); } catch (IOException e) { throw new PrestoException(HIVE_WRITER_OPEN_ERROR, "Error creating empty pagefile", e); diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkQueryRunner.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkQueryRunner.java index e893d04d63d42..3de57624083ec 100644 --- a/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkQueryRunner.java +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkQueryRunner.java @@ -102,6 +102,18 @@ public void testTableWrite() 45000); } + @Test + public void testZeroFileCreatorForBucketedTable() + { + assertUpdate( + getSession(), + format("CREATE TABLE hive.hive_test.test_hive_orders_bucketed_join_zero_file WITH (bucketed_by=array['orderkey'], bucket_count=8) AS " + + "SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment " + + "FROM orders_bucketed " + + "WHERE orderkey = 1"), + 1); + } + @Test public void testBucketedTableWriteSimple() {