Skip to content

Commit

Permalink
add csv loader (#27)
Browse files Browse the repository at this point in the history
* add csv loader and add csv params file

* add file suffix comment
  • Loading branch information
BingTong0 authored May 12, 2023
1 parent d50bfc0 commit 699e650
Show file tree
Hide file tree
Showing 22 changed files with 118 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
/**
* Handles the connection with DuckDb.
*/
public class DuckDbParquetExtractor implements Closeable {
public class DuckDbExtractor implements Closeable {

private final Connection connection;

public DuckDbParquetExtractor() throws SQLException {
public DuckDbExtractor() throws SQLException {
connection = DriverManager.getConnection("jdbc:duckdb:");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
import org.ldbcouncil.finbench.driver.util.Tuple2;

/**
* Class to read Parquet files for operation streams.
* Class to read Parquet/CSV files for operation streams.
*/
public class ParquetLoader {
public class FileLoader {

private final DuckDbParquetExtractor db;
private final DuckDbExtractor db;

public ParquetLoader(DuckDbParquetExtractor db) throws SQLException {
public FileLoader(DuckDbExtractor db) throws SQLException {
this.db = db;
}

Expand All @@ -33,7 +33,12 @@ public Iterator<Operation> loadOperationStream(String path, EventStreamReader.Ev
try {
Connection connection = db.getConnection();
stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM read_parquet('" + path + "');");
ResultSet rs;
if (path.contains(".parquet")) {
rs = stmt.executeQuery("SELECT * FROM read_parquet('" + path + "');");
} else {
rs = stmt.executeQuery("SELECT * FROM read_csv('" + path + "', delim='|', AUTO_DETECT=TRUE);");
}
while (rs.next()) {
Operation obj = decoder.decodeEvent(rs);
results.add(obj);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
import java.util.Iterator;
import org.ldbcouncil.finbench.driver.Operation;
import org.ldbcouncil.finbench.driver.WorkloadException;
import org.ldbcouncil.finbench.driver.csv.ParquetLoader;
import org.ldbcouncil.finbench.driver.csv.FileLoader;
import org.ldbcouncil.finbench.driver.generator.EventStreamReader;
import org.ldbcouncil.finbench.driver.util.Tuple2;

public class BatchedOperationStreamReader {

private final ParquetLoader loader;
private final FileLoader loader;

public BatchedOperationStreamReader(
ParquetLoader loader
FileLoader loader
) {
this.loader = loader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.ldbcouncil.finbench.driver.WorkloadStreams;
import org.ldbcouncil.finbench.driver.control.ConsoleAndFileDriverConfiguration;
import org.ldbcouncil.finbench.driver.control.OperationMode;
import org.ldbcouncil.finbench.driver.csv.DuckDbParquetExtractor;
import org.ldbcouncil.finbench.driver.csv.ParquetLoader;
import org.ldbcouncil.finbench.driver.csv.DuckDbExtractor;
import org.ldbcouncil.finbench.driver.csv.FileLoader;
import org.ldbcouncil.finbench.driver.generator.BufferedIterator;
import org.ldbcouncil.finbench.driver.generator.EventStreamReader;
import org.ldbcouncil.finbench.driver.generator.GeneratorFactory;
Expand All @@ -60,6 +60,7 @@ public class LdbcFinBenchTransactionWorkload extends Workload {
private Map<Integer, Long> longReadInterleavesAsMilli;
private File parametersDir;
private File updatesDir;
private String fileSuffix;
private long updateInterleaveAsMilli;
private double compressionRatio;
private double simpleReadDissipationFactor;
Expand Down Expand Up @@ -124,6 +125,11 @@ public void onInit(Map<String, String> params) throws WorkloadException {
}

if (operationMode != OperationMode.VALIDATE_DATABASE) {
if (params.containsKey(LdbcFinBenchTransactionWorkloadConfiguration.FILES_SUFFIX)) {
fileSuffix = params.get(LdbcFinBenchTransactionWorkloadConfiguration.FILES_SUFFIX).trim();
} else {
fileSuffix = LdbcFinBenchTransactionWorkloadConfiguration.DEFAULT_FILE_SUFFIX;
}
parametersDir =
new File(params.get(LdbcFinBenchTransactionWorkloadConfiguration.PARAMETERS_DIRECTORY).trim());
if (!parametersDir.exists()) {
Expand All @@ -132,7 +138,8 @@ public void onInit(Map<String, String> params) throws WorkloadException {
}
for (String readOperationParamsFilename :
LdbcFinBenchTransactionWorkloadConfiguration.COMPLEX_READ_OPERATION_PARAMS_FILENAMES.values()) {
File readOperationParamsFile = new File(parametersDir, readOperationParamsFilename);
File readOperationParamsFile = new File(parametersDir, readOperationParamsFilename
+ LdbcFinBenchTransactionWorkloadConfiguration.FILE_SEPARATOR + fileSuffix);
if (!readOperationParamsFile.exists()) {
throw new WorkloadException(
format("Read operation parameters file does not exist: %s",
Expand Down Expand Up @@ -391,7 +398,7 @@ private long getOperationStreamStartTime(
private List<Iterator<?>> getOperationStreams(
GeneratorFactory gf,
long workloadStartTimeAsMilli,
ParquetLoader loader
FileLoader loader
) throws WorkloadException {
List<Iterator<?>> asynchronousNonDependencyStreamsList = new ArrayList<>();
/*
Expand All @@ -405,7 +412,8 @@ private List<Iterator<?>> getOperationStreams(
Iterator<Operation> eventOperationStream = readOperationStream.readOperationStream(
decoders.get(type),
new File(parametersDir,
LdbcFinBenchTransactionWorkloadConfiguration.COMPLEX_READ_OPERATION_PARAMS_FILENAMES.get(type))
LdbcFinBenchTransactionWorkloadConfiguration.COMPLEX_READ_OPERATION_PARAMS_FILENAMES.get(type)
+ LdbcFinBenchTransactionWorkloadConfiguration.FILE_SEPARATOR + fileSuffix)
);
long readOperationInterleaveAsMilli = longReadInterleavesAsMilli.get(type);
Iterator<Long> operationStartTimes =
Expand Down Expand Up @@ -484,18 +492,18 @@ protected WorkloadStreams getStreams(GeneratorFactory gf, boolean hasDbConnected
dependencyAsynchronousOperationTypes.addAll(enabledUpdateOperationTypes);
// dependentAsynchronousOperationTypes.addAll(enabledLongReadOperationTypes);

ParquetLoader loader;
FileLoader loader;
try {
DuckDbParquetExtractor db = new DuckDbParquetExtractor();
loader = new ParquetLoader(db);
DuckDbExtractor db = new DuckDbExtractor();
loader = new FileLoader(db);
} catch (SQLException e) {
throw new WorkloadException(format("Error creating loader for operation streams %s", e));
}

ParquetLoader updateLoader;
FileLoader updateLoader;
try {
DuckDbParquetExtractor db = new DuckDbParquetExtractor();
updateLoader = new ParquetLoader(db);
DuckDbExtractor db = new DuckDbExtractor();
updateLoader = new FileLoader(db);
} catch (SQLException e) {
throw new WorkloadException(format("Error creating updateLoader for operation streams %s", e));
}
Expand Down Expand Up @@ -564,7 +572,7 @@ public Set<Class> enabledValidationOperations() {
private Iterator<Operation> setBatchedUpdateStreams(
GeneratorFactory gf,
long workloadStartTimeAsMilli,
ParquetLoader loader
FileLoader loader
) throws WorkloadException {
long batchSizeInMillis = Math.round(TimeUnit.HOURS.toMillis(1) * batchSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ public abstract class LdbcFinBenchTransactionWorkloadConfiguration {
public static final String PARAMETERS_DIRECTORY = LDBC_FINBENCH_TRANSACTION_PARAM_NAME_PREFIX + "parameters_dir";
// directory containing forum and person update event streams
public static final String UPDATES_DIRECTORY = LDBC_FINBENCH_TRANSACTION_PARAM_NAME_PREFIX + "updates_dir";
// the parameter and updates files suffix
public static final String FILES_SUFFIX = LDBC_FINBENCH_TRANSACTION_PARAM_NAME_PREFIX + "files_suffix";


// Simple reads random walk dissipation rate, in the interval [1.0-0.0]
// Higher values translate to shorter walks and therefore fewer simple reads
Expand All @@ -85,6 +88,8 @@ public abstract class LdbcFinBenchTransactionWorkloadConfiguration {
public static final int BUFFERED_QUEUE_SIZE = 4;
public static final String INSERTS_DIRECTORY = "inserts";
public static final String INSERTS_DATE_COLUMN = "creationDate";
public static final String FILE_SEPARATOR = ".";
public static final String DEFAULT_FILE_SUFFIX = "csv";

public static final String LDBC_FINBENCH_TRANSACTION_PACKAGE_PREFIX =
removeSuffix(ComplexRead1.class.getName(), ComplexRead1.class.getSimpleName());
Expand Down Expand Up @@ -250,19 +255,18 @@ public abstract class LdbcFinBenchTransactionWorkloadConfiguration {
/*
* Read Operation Parameters
*/
public static final String COMPLEX_READ_OPERATION_1_PARAMS_FILENAME = "complex_1_param.parquet";
public static final String COMPLEX_READ_OPERATION_2_PARAMS_FILENAME = "complex_2_param.parquet";
public static final String COMPLEX_READ_OPERATION_3_PARAMS_FILENAME = "complex_3_param.parquet";
public static final String COMPLEX_READ_OPERATION_4_PARAMS_FILENAME = "complex_4_param.parquet";
public static final String COMPLEX_READ_OPERATION_5_PARAMS_FILENAME = "complex_5_param.parquet";
public static final String COMPLEX_READ_OPERATION_6_PARAMS_FILENAME = "complex_6_param.parquet";
public static final String COMPLEX_READ_OPERATION_7_PARAMS_FILENAME = "complex_7_param.parquet";
public static final String COMPLEX_READ_OPERATION_8_PARAMS_FILENAME = "complex_8_param.parquet";
public static final String COMPLEX_READ_OPERATION_9_PARAMS_FILENAME = "complex_9_param.parquet";
public static final String COMPLEX_READ_OPERATION_10_PARAMS_FILENAME = "complex_10_param.parquet";
public static final String COMPLEX_READ_OPERATION_11_PARAMS_FILENAME = "complex_11_param.parquet";
public static final String COMPLEX_READ_OPERATION_12_PARAMS_FILENAME = "complex_12_param.parquet";
public static final String COMPLEX_READ_OPERATION_13_PARAMS_FILENAME = "complex_13_param.parquet";
public static final String COMPLEX_READ_OPERATION_1_PARAMS_FILENAME = "complex_1_param";
public static final String COMPLEX_READ_OPERATION_2_PARAMS_FILENAME = "complex_2_param";
public static final String COMPLEX_READ_OPERATION_3_PARAMS_FILENAME = "complex_3_param";
public static final String COMPLEX_READ_OPERATION_4_PARAMS_FILENAME = "complex_4_param";
public static final String COMPLEX_READ_OPERATION_5_PARAMS_FILENAME = "complex_5_param";
public static final String COMPLEX_READ_OPERATION_6_PARAMS_FILENAME = "complex_6_param";
public static final String COMPLEX_READ_OPERATION_7_PARAMS_FILENAME = "complex_7_param";
public static final String COMPLEX_READ_OPERATION_8_PARAMS_FILENAME = "complex_8_param";
public static final String COMPLEX_READ_OPERATION_9_PARAMS_FILENAME = "complex_9_param";
public static final String COMPLEX_READ_OPERATION_10_PARAMS_FILENAME = "complex_10_param";
public static final String COMPLEX_READ_OPERATION_11_PARAMS_FILENAME = "complex_11_param";
public static final String COMPLEX_READ_OPERATION_12_PARAMS_FILENAME = "complex_12_param";
public static final Map<Integer, String> COMPLEX_READ_OPERATION_PARAMS_FILENAMES =
typeToOperationParameterFilename();
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import java.util.Iterator;
import org.ldbcouncil.finbench.driver.Operation;
import org.ldbcouncil.finbench.driver.WorkloadException;
import org.ldbcouncil.finbench.driver.csv.ParquetLoader;
import org.ldbcouncil.finbench.driver.csv.FileLoader;
import org.ldbcouncil.finbench.driver.generator.EventStreamReader;

/**
Expand All @@ -18,9 +18,9 @@

public class OperationStreamReader {

private final ParquetLoader loader;
private final FileLoader loader;

public OperationStreamReader(ParquetLoader loader) {
public OperationStreamReader(FileLoader loader) {
this.loader = loader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,22 @@
import java.util.concurrent.BlockingQueue;
import org.ldbcouncil.finbench.driver.Operation;
import org.ldbcouncil.finbench.driver.WorkloadException;
import org.ldbcouncil.finbench.driver.csv.ParquetLoader;
import org.ldbcouncil.finbench.driver.csv.FileLoader;
import org.ldbcouncil.finbench.driver.generator.EventStreamReader;
import org.ldbcouncil.finbench.driver.generator.GeneratorFactory;
import org.ldbcouncil.finbench.driver.util.Tuple2;

public class RunnableOperationStreamBatchLoader extends Thread {

private final ParquetLoader loader;
private final FileLoader loader;
private final long batchSize;
private final GeneratorFactory gf;
private final File updatesDir;
private final Set<Class<? extends Operation>> enabledUpdateOperationTypes;
private final BlockingQueue<Iterator<Operation>> blockingQueue;

public RunnableOperationStreamBatchLoader(
ParquetLoader loader,
FileLoader loader,
GeneratorFactory gf,
File updatesDir,
BlockingQueue<Iterator<Operation>> blockingQueue,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pid1|pid2|startTime|endTime
1|2|0|99999
5|10|0|99999
3|6|0|99999
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id|startTime|endTime|truncationLimit|truncationOrder
1|0|99999|10000|ASC
2|0|99999|10000|ASC
3|0|99999|10000|ASC
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id|startTime|endTime|truncationLimit|truncationOrder
1|0|99999|10000|ASC
2|0|99999|10000|ASC
3|0|99999|10000|ASC
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id|startTime|endTime|truncationLimit|truncationOrder
1|0|99999|10000|ASC
2|0|99999|10000|ASC
3|0|99999|10000|ASC
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id|startTime|endTime|truncationLimit|truncationOrder
1|0|99999|10000|ASC
2|0|99999|10000|ASC
3|0|99999|10000|ASC
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id1|id2|startTime|endTime|truncationLimit|truncationOrder
1|2|0|99999|10000|ASC
5|10|0|99999|10000|ASC
3|6|0|99999|10000|ASC
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id1|id2|startTime|endTime|truncationLimit|truncationOrder
1|2|0|99999|10000|ASC
5|10|0|99999|10000|ASC
3|6|0|99999|10000|ASC
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id|startTime|endTime|truncationLimit|truncationOrder
1|0|99999|10000|ASC
2|0|99999|10000|ASC
3|0|99999|10000|ASC
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id|threshold1|threshold2|startTime|endTime|truncationLimit|truncationOrder
1|0|50|0|99999|10000|ASC
2|50|100|0|99999|10000|ASC
3|0|100|0|99999|10000|ASC
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id|threshold|startTime|endTime|truncationLimit|truncationOrder
1|0|0|99999|10000|ASC
2|50|0|99999|10000|ASC
3|100|0|99999|10000|ASC
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id|threshold|startTime|endTime|truncationLimit|truncationOrder
1|0|0|99999|10000|ASC
2|50|0|99999|10000|ASC
3|100|0|99999|10000|ASC
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id|threshold|startTime|endTime|truncationLimit|truncationOrder
1|0|0|99999|10000|ASC
2|50|0|99999|10000|ASC
3|100|0|99999|10000|ASC
2 changes: 2 additions & 0 deletions src/main/resources/example/example.properties
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ validate_database=validation_params.csv
warmup=5
ldbc.finbench.transaction.queries.parameters_dir=src/main/resources/example/data/read_params
ldbc.finbench.transaction.queries.updates_dir=src/main/resources/example/data/incremental_data
# param and update files suffix, `csv` or `parquet`, default is `csv`
ldbc.finbench.transaction.queries.files_suffix=csv
ldbc.finbench.transaction.queries.simple_read_dissipation=0.2
ldbc.finbench.transaction.queries.update_interleave=156
ldbc.finbench.transaction.queries.scale_factor=1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
import org.junit.jupiter.api.Test;
import org.ldbcouncil.finbench.driver.Operation;
import org.ldbcouncil.finbench.driver.WorkloadException;
import org.ldbcouncil.finbench.driver.csv.DuckDbParquetExtractor;
import org.ldbcouncil.finbench.driver.csv.ParquetLoader;
import org.ldbcouncil.finbench.driver.csv.DuckDbExtractor;
import org.ldbcouncil.finbench.driver.csv.FileLoader;
import org.ldbcouncil.finbench.driver.generator.EventStreamReader;
import org.ldbcouncil.finbench.driver.workloads.transaction.queries.ComplexRead1;


public class TransactionReadEventStreamReaderTest {
private DuckDbParquetExtractor db;
private DuckDbExtractor db;
private Statement stmt;

/**
Expand All @@ -35,7 +35,7 @@ public class TransactionReadEventStreamReaderTest {
@BeforeEach
public void init() throws SQLException {
Connection connection = mock(Connection.class);
db = mock(DuckDbParquetExtractor.class);
db = mock(DuckDbExtractor.class);
when(db.getConnection()).thenReturn(connection);
stmt = mock(Statement.class);
when(connection.createStatement()).thenReturn(stmt);
Expand Down Expand Up @@ -86,7 +86,7 @@ public void shouldParseAllComplexRead1Events() throws WorkloadException, SQLExce
.thenReturn("DESC")
.thenReturn("ASC");*/
EventStreamReader.EventDecoder<Operation> decoder = new QueryEventStreamReader.ComplexRead1Decoder();
ParquetLoader loader = new ParquetLoader(db);
FileLoader loader = new FileLoader(db);
Iterator<Operation> opStream = loader.loadOperationStream("/somepath", decoder);

// Act
Expand Down
Loading

0 comments on commit 699e650

Please sign in to comment.