Skip to content

Commit

Permalink
Fix ZeroRowFileCreator to use DataSink instead of OutputStream interface
Browse files Browse the repository at this point in the history
  • Loading branch information
pgupta2 authored and arhimondr committed May 20, 2021
1 parent 5eb36ae commit a6d5270
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
*/
package com.facebook.presto.common.io;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;

public interface DataSink
extends Closeable
{
/**
* Number of bytes written to this sink so far.
Expand All @@ -38,6 +40,7 @@ void write(List<DataOutput> outputData)
/**
* File is complete
*/
@Override
void close()
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,32 @@
package com.facebook.presto.hive;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.io.DataOutput;
import com.facebook.presto.common.io.DataSink;
import com.facebook.presto.hive.datasink.DataSinkFactory;
import com.facebook.presto.hive.metastore.StorageFormat;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.airlift.slice.Slices;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.mapred.JobConf;

import javax.inject.Inject;

import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import static com.facebook.airlift.concurrent.MoreFutures.getFutureValue;
import static com.facebook.presto.common.io.DataOutput.createDataOutput;
import static com.facebook.presto.hive.HiveCompressionCodec.NONE;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR;
import static com.facebook.presto.hive.HiveWriteUtils.initializeSerializer;
Expand Down Expand Up @@ -79,7 +84,7 @@ public void createFiles(ConnectorSession session, HdfsContext hdfsContext, Path
List<ListenableFuture<?>> commitFutures = new ArrayList<>();

for (String fileName : fileNames) {
commitFutures.add(executor.submit(() -> createFile(hdfsContext, new Path(destinationDirectory, fileName), fileContent)));
commitFutures.add(executor.submit(() -> createFile(hdfsContext, new Path(destinationDirectory, fileName), fileContent, session)));
}

ListenableFuture<?> listenableFutureAggregate = whenAllSucceed(commitFutures).call(() -> null, directExecutor());
Expand Down Expand Up @@ -144,10 +149,14 @@ private byte[] generateZeroRowFile(
}
}

private void createFile(HdfsContext hdfsContext, Path path, byte[] content)
private void createFile(HdfsContext hdfsContext, Path path, byte[] content, ConnectorSession session)
{
try (OutputStream outputStream = hdfsEnvironment.getFileSystem(hdfsContext, path).create(path, false)) {
outputStream.write(content);
try {
FileSystem fs = hdfsEnvironment.getFileSystem(hdfsContext, path);
try (DataSink dataSink = dataSinkFactory.createDataSink(session, fs, path)) {
DataOutput dataOutput = createDataOutput(Slices.wrappedBuffer(content));
dataSink.write(ImmutableList.of(dataOutput));
}
}
catch (IOException e) {
throw new PrestoException(HIVE_WRITER_CLOSE_ERROR, "Error write zero-row file to Hive", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,8 @@ public static void createEmptyPageFile(
FileSystem fileSystem,
Path path)
{
try {
DataSink dataSink = dataSinkFactory.createDataSink(session, fileSystem, path);
try (DataSink dataSink = dataSinkFactory.createDataSink(session, fileSystem, path)) {
dataSink.write(ImmutableList.of(createEmptyPageFileFooterOutput()));
dataSink.close();
}
catch (IOException e) {
throw new PrestoException(HIVE_WRITER_OPEN_ERROR, "Error creating empty pagefile", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ public void testTableWrite()
45000);
}

@Test
public void testZeroFileCreatorForBucketedTable()
{
assertUpdate(
getSession(),
format("CREATE TABLE hive.hive_test.test_hive_orders_bucketed_join_zero_file WITH (bucketed_by=array['orderkey'], bucket_count=8) AS " +
"SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment " +
"FROM orders_bucketed " +
"WHERE orderkey = 1"),
1);
}

@Test
public void testBucketedTableWriteSimple()
{
Expand Down

0 comments on commit a6d5270

Please sign in to comment.