Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add csv loader #27

Merged
merged 3 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
/**
* Handles the connection with DuckDb.
*/
public class DuckDbParquetExtractor implements Closeable {
public class DuckDbExtractor implements Closeable {

private final Connection connection;

public DuckDbParquetExtractor() throws SQLException {
public DuckDbExtractor() throws SQLException {
connection = DriverManager.getConnection("jdbc:duckdb:");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
import org.ldbcouncil.finbench.driver.util.Tuple2;

/**
* Class to read Parquet files for operation streams.
* Class to read Parquet/CSV files for operation streams.
*/
public class ParquetLoader {
public class FileLoader {

private final DuckDbParquetExtractor db;
private final DuckDbExtractor db;

public ParquetLoader(DuckDbParquetExtractor db) throws SQLException {
public FileLoader(DuckDbExtractor db) throws SQLException {
this.db = db;
}

Expand All @@ -33,7 +33,12 @@ public Iterator<Operation> loadOperationStream(String path, EventStreamReader.Ev
try {
Connection connection = db.getConnection();
stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM read_parquet('" + path + "');");
ResultSet rs;
if (path.contains(".parquet")) {
rs = stmt.executeQuery("SELECT * FROM read_parquet('" + path + "');");
} else {
rs = stmt.executeQuery("SELECT * FROM read_csv('" + path + "', delim='|', AUTO_DETECT=TRUE);");
}
while (rs.next()) {
Operation obj = decoder.decodeEvent(rs);
results.add(obj);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
import java.util.Iterator;
import org.ldbcouncil.finbench.driver.Operation;
import org.ldbcouncil.finbench.driver.WorkloadException;
import org.ldbcouncil.finbench.driver.csv.ParquetLoader;
import org.ldbcouncil.finbench.driver.csv.FileLoader;
import org.ldbcouncil.finbench.driver.generator.EventStreamReader;
import org.ldbcouncil.finbench.driver.util.Tuple2;

public class BatchedOperationStreamReader {

private final ParquetLoader loader;
private final FileLoader loader;

public BatchedOperationStreamReader(
ParquetLoader loader
FileLoader loader
) {
this.loader = loader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.ldbcouncil.finbench.driver.WorkloadStreams;
import org.ldbcouncil.finbench.driver.control.ConsoleAndFileDriverConfiguration;
import org.ldbcouncil.finbench.driver.control.OperationMode;
import org.ldbcouncil.finbench.driver.csv.DuckDbParquetExtractor;
import org.ldbcouncil.finbench.driver.csv.ParquetLoader;
import org.ldbcouncil.finbench.driver.csv.DuckDbExtractor;
import org.ldbcouncil.finbench.driver.csv.FileLoader;
import org.ldbcouncil.finbench.driver.generator.BufferedIterator;
import org.ldbcouncil.finbench.driver.generator.EventStreamReader;
import org.ldbcouncil.finbench.driver.generator.GeneratorFactory;
Expand All @@ -60,6 +60,7 @@ public class LdbcFinBenchTransactionWorkload extends Workload {
private Map<Integer, Long> longReadInterleavesAsMilli;
private File parametersDir;
private File updatesDir;
private String fileSuffix;
private long updateInterleaveAsMilli;
private double compressionRatio;
private double simpleReadDissipationFactor;
Expand Down Expand Up @@ -124,6 +125,11 @@ public void onInit(Map<String, String> params) throws WorkloadException {
}

if (operationMode != OperationMode.VALIDATE_DATABASE) {
if (params.containsKey(LdbcFinBenchTransactionWorkloadConfiguration.FILES_SUFFIX)) {
fileSuffix = params.get(LdbcFinBenchTransactionWorkloadConfiguration.FILES_SUFFIX).trim();
} else {
fileSuffix = LdbcFinBenchTransactionWorkloadConfiguration.DEFAULT_FILE_SUFFIX;
}
parametersDir =
new File(params.get(LdbcFinBenchTransactionWorkloadConfiguration.PARAMETERS_DIRECTORY).trim());
if (!parametersDir.exists()) {
Expand All @@ -132,7 +138,8 @@ public void onInit(Map<String, String> params) throws WorkloadException {
}
for (String readOperationParamsFilename :
LdbcFinBenchTransactionWorkloadConfiguration.COMPLEX_READ_OPERATION_PARAMS_FILENAMES.values()) {
File readOperationParamsFile = new File(parametersDir, readOperationParamsFilename);
File readOperationParamsFile = new File(parametersDir, readOperationParamsFilename
+ LdbcFinBenchTransactionWorkloadConfiguration.FILE_SEPARATOR + fileSuffix);
if (!readOperationParamsFile.exists()) {
throw new WorkloadException(
format("Read operation parameters file does not exist: %s",
Expand Down Expand Up @@ -391,7 +398,7 @@ private long getOperationStreamStartTime(
private List<Iterator<?>> getOperationStreams(
GeneratorFactory gf,
long workloadStartTimeAsMilli,
ParquetLoader loader
FileLoader loader
) throws WorkloadException {
List<Iterator<?>> asynchronousNonDependencyStreamsList = new ArrayList<>();
/*
Expand All @@ -405,7 +412,8 @@ private List<Iterator<?>> getOperationStreams(
Iterator<Operation> eventOperationStream = readOperationStream.readOperationStream(
decoders.get(type),
new File(parametersDir,
LdbcFinBenchTransactionWorkloadConfiguration.COMPLEX_READ_OPERATION_PARAMS_FILENAMES.get(type))
LdbcFinBenchTransactionWorkloadConfiguration.COMPLEX_READ_OPERATION_PARAMS_FILENAMES.get(type)
+ LdbcFinBenchTransactionWorkloadConfiguration.FILE_SEPARATOR + fileSuffix)
);
long readOperationInterleaveAsMilli = longReadInterleavesAsMilli.get(type);
Iterator<Long> operationStartTimes =
Expand Down Expand Up @@ -484,18 +492,18 @@ protected WorkloadStreams getStreams(GeneratorFactory gf, boolean hasDbConnected
dependencyAsynchronousOperationTypes.addAll(enabledUpdateOperationTypes);
// dependentAsynchronousOperationTypes.addAll(enabledLongReadOperationTypes);

ParquetLoader loader;
FileLoader loader;
try {
DuckDbParquetExtractor db = new DuckDbParquetExtractor();
loader = new ParquetLoader(db);
DuckDbExtractor db = new DuckDbExtractor();
loader = new FileLoader(db);
} catch (SQLException e) {
throw new WorkloadException(format("Error creating loader for operation streams %s", e));
}

ParquetLoader updateLoader;
FileLoader updateLoader;
try {
DuckDbParquetExtractor db = new DuckDbParquetExtractor();
updateLoader = new ParquetLoader(db);
DuckDbExtractor db = new DuckDbExtractor();
updateLoader = new FileLoader(db);
} catch (SQLException e) {
throw new WorkloadException(format("Error creating updateLoader for operation streams %s", e));
}
Expand Down Expand Up @@ -564,7 +572,7 @@ public Set<Class> enabledValidationOperations() {
private Iterator<Operation> setBatchedUpdateStreams(
GeneratorFactory gf,
long workloadStartTimeAsMilli,
ParquetLoader loader
FileLoader loader
) throws WorkloadException {
long batchSizeInMillis = Math.round(TimeUnit.HOURS.toMillis(1) * batchSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ public abstract class LdbcFinBenchTransactionWorkloadConfiguration {
public static final String PARAMETERS_DIRECTORY = LDBC_FINBENCH_TRANSACTION_PARAM_NAME_PREFIX + "parameters_dir";
// directory containing forum and person update event streams
public static final String UPDATES_DIRECTORY = LDBC_FINBENCH_TRANSACTION_PARAM_NAME_PREFIX + "updates_dir";
// the parameter and updates files suffix
public static final String FILES_SUFFIX = LDBC_FINBENCH_TRANSACTION_PARAM_NAME_PREFIX + "files_suffix";


// Simple reads random walk dissipation rate, in the interval [1.0-0.0]
// Higher values translate to shorter walks and therefore fewer simple reads
Expand All @@ -85,6 +88,8 @@ public abstract class LdbcFinBenchTransactionWorkloadConfiguration {
public static final int BUFFERED_QUEUE_SIZE = 4;
public static final String INSERTS_DIRECTORY = "inserts";
public static final String INSERTS_DATE_COLUMN = "creationDate";
public static final String FILE_SEPARATOR = ".";
public static final String DEFAULT_FILE_SUFFIX = "csv";

public static final String LDBC_FINBENCH_TRANSACTION_PACKAGE_PREFIX =
removeSuffix(ComplexRead1.class.getName(), ComplexRead1.class.getSimpleName());
Expand Down Expand Up @@ -250,19 +255,18 @@ public abstract class LdbcFinBenchTransactionWorkloadConfiguration {
/*
* Read Operation Parameters
*/
public static final String COMPLEX_READ_OPERATION_1_PARAMS_FILENAME = "complex_1_param.parquet";
public static final String COMPLEX_READ_OPERATION_2_PARAMS_FILENAME = "complex_2_param.parquet";
public static final String COMPLEX_READ_OPERATION_3_PARAMS_FILENAME = "complex_3_param.parquet";
public static final String COMPLEX_READ_OPERATION_4_PARAMS_FILENAME = "complex_4_param.parquet";
public static final String COMPLEX_READ_OPERATION_5_PARAMS_FILENAME = "complex_5_param.parquet";
public static final String COMPLEX_READ_OPERATION_6_PARAMS_FILENAME = "complex_6_param.parquet";
public static final String COMPLEX_READ_OPERATION_7_PARAMS_FILENAME = "complex_7_param.parquet";
public static final String COMPLEX_READ_OPERATION_8_PARAMS_FILENAME = "complex_8_param.parquet";
public static final String COMPLEX_READ_OPERATION_9_PARAMS_FILENAME = "complex_9_param.parquet";
public static final String COMPLEX_READ_OPERATION_10_PARAMS_FILENAME = "complex_10_param.parquet";
public static final String COMPLEX_READ_OPERATION_11_PARAMS_FILENAME = "complex_11_param.parquet";
public static final String COMPLEX_READ_OPERATION_12_PARAMS_FILENAME = "complex_12_param.parquet";
public static final String COMPLEX_READ_OPERATION_13_PARAMS_FILENAME = "complex_13_param.parquet";
public static final String COMPLEX_READ_OPERATION_1_PARAMS_FILENAME = "complex_1_param";
public static final String COMPLEX_READ_OPERATION_2_PARAMS_FILENAME = "complex_2_param";
public static final String COMPLEX_READ_OPERATION_3_PARAMS_FILENAME = "complex_3_param";
public static final String COMPLEX_READ_OPERATION_4_PARAMS_FILENAME = "complex_4_param";
public static final String COMPLEX_READ_OPERATION_5_PARAMS_FILENAME = "complex_5_param";
public static final String COMPLEX_READ_OPERATION_6_PARAMS_FILENAME = "complex_6_param";
public static final String COMPLEX_READ_OPERATION_7_PARAMS_FILENAME = "complex_7_param";
public static final String COMPLEX_READ_OPERATION_8_PARAMS_FILENAME = "complex_8_param";
public static final String COMPLEX_READ_OPERATION_9_PARAMS_FILENAME = "complex_9_param";
public static final String COMPLEX_READ_OPERATION_10_PARAMS_FILENAME = "complex_10_param";
public static final String COMPLEX_READ_OPERATION_11_PARAMS_FILENAME = "complex_11_param";
public static final String COMPLEX_READ_OPERATION_12_PARAMS_FILENAME = "complex_12_param";
public static final Map<Integer, String> COMPLEX_READ_OPERATION_PARAMS_FILENAMES =
typeToOperationParameterFilename();
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import java.util.Iterator;
import org.ldbcouncil.finbench.driver.Operation;
import org.ldbcouncil.finbench.driver.WorkloadException;
import org.ldbcouncil.finbench.driver.csv.ParquetLoader;
import org.ldbcouncil.finbench.driver.csv.FileLoader;
import org.ldbcouncil.finbench.driver.generator.EventStreamReader;

/**
Expand All @@ -18,9 +18,9 @@

public class OperationStreamReader {

private final ParquetLoader loader;
private final FileLoader loader;

public OperationStreamReader(ParquetLoader loader) {
public OperationStreamReader(FileLoader loader) {
this.loader = loader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,22 @@
import java.util.concurrent.BlockingQueue;
import org.ldbcouncil.finbench.driver.Operation;
import org.ldbcouncil.finbench.driver.WorkloadException;
import org.ldbcouncil.finbench.driver.csv.ParquetLoader;
import org.ldbcouncil.finbench.driver.csv.FileLoader;
import org.ldbcouncil.finbench.driver.generator.EventStreamReader;
import org.ldbcouncil.finbench.driver.generator.GeneratorFactory;
import org.ldbcouncil.finbench.driver.util.Tuple2;

public class RunnableOperationStreamBatchLoader extends Thread {

private final ParquetLoader loader;
private final FileLoader loader;
private final long batchSize;
private final GeneratorFactory gf;
private final File updatesDir;
private final Set<Class<? extends Operation>> enabledUpdateOperationTypes;
private final BlockingQueue<Iterator<Operation>> blockingQueue;

public RunnableOperationStreamBatchLoader(
ParquetLoader loader,
FileLoader loader,
GeneratorFactory gf,
File updatesDir,
BlockingQueue<Iterator<Operation>> blockingQueue,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pid1|pid2|startTime|endTime
1|2|0|99999
5|10|0|99999
3|6|0|99999
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id|startTime|endTime|truncationLimit|truncationOrder
1|0|99999|10000|ASC
2|0|99999|10000|ASC
3|0|99999|10000|ASC
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id|startTime|endTime|truncationLimit|truncationOrder
1|0|99999|10000|ASC
2|0|99999|10000|ASC
3|0|99999|10000|ASC
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id|startTime|endTime|truncationLimit|truncationOrder
1|0|99999|10000|ASC
2|0|99999|10000|ASC
3|0|99999|10000|ASC
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id|startTime|endTime|truncationLimit|truncationOrder
1|0|99999|10000|ASC
2|0|99999|10000|ASC
3|0|99999|10000|ASC
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id1|id2|startTime|endTime|truncationLimit|truncationOrder
1|2|0|99999|10000|ASC
5|10|0|99999|10000|ASC
3|6|0|99999|10000|ASC
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id1|id2|startTime|endTime|truncationLimit|truncationOrder
1|2|0|99999|10000|ASC
5|10|0|99999|10000|ASC
3|6|0|99999|10000|ASC
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id|startTime|endTime|truncationLimit|truncationOrder
1|0|99999|10000|ASC
2|0|99999|10000|ASC
3|0|99999|10000|ASC
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id|threshold1|threshold2|startTime|endTime|truncationLimit|truncationOrder
1|0|50|0|99999|10000|ASC
2|50|100|0|99999|10000|ASC
3|0|100|0|99999|10000|ASC
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id|threshold|startTime|endTime|truncationLimit|truncationOrder
1|0|0|99999|10000|ASC
2|50|0|99999|10000|ASC
3|100|0|99999|10000|ASC
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id|threshold|startTime|endTime|truncationLimit|truncationOrder
1|0|0|99999|10000|ASC
2|50|0|99999|10000|ASC
3|100|0|99999|10000|ASC
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id|threshold|startTime|endTime|truncationLimit|truncationOrder
1|0|0|99999|10000|ASC
2|50|0|99999|10000|ASC
3|100|0|99999|10000|ASC
2 changes: 2 additions & 0 deletions src/main/resources/example/example.properties
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ validate_database=validation_params.csv
warmup=5
ldbc.finbench.transaction.queries.parameters_dir=src/main/resources/example/data/read_params
ldbc.finbench.transaction.queries.updates_dir=src/main/resources/example/data/incremental_data
# param and update files suffix, `csv` or `parquet`, default is `csv`
ldbc.finbench.transaction.queries.files_suffix=csv
ldbc.finbench.transaction.queries.simple_read_dissipation=0.2
ldbc.finbench.transaction.queries.update_interleave=156
ldbc.finbench.transaction.queries.scale_factor=1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
import org.junit.jupiter.api.Test;
import org.ldbcouncil.finbench.driver.Operation;
import org.ldbcouncil.finbench.driver.WorkloadException;
import org.ldbcouncil.finbench.driver.csv.DuckDbParquetExtractor;
import org.ldbcouncil.finbench.driver.csv.ParquetLoader;
import org.ldbcouncil.finbench.driver.csv.DuckDbExtractor;
import org.ldbcouncil.finbench.driver.csv.FileLoader;
import org.ldbcouncil.finbench.driver.generator.EventStreamReader;
import org.ldbcouncil.finbench.driver.workloads.transaction.queries.ComplexRead1;


public class TransactionReadEventStreamReaderTest {
private DuckDbParquetExtractor db;
private DuckDbExtractor db;
private Statement stmt;

/**
Expand All @@ -35,7 +35,7 @@ public class TransactionReadEventStreamReaderTest {
@BeforeEach
public void init() throws SQLException {
Connection connection = mock(Connection.class);
db = mock(DuckDbParquetExtractor.class);
db = mock(DuckDbExtractor.class);
when(db.getConnection()).thenReturn(connection);
stmt = mock(Statement.class);
when(connection.createStatement()).thenReturn(stmt);
Expand Down Expand Up @@ -86,7 +86,7 @@ public void shouldParseAllComplexRead1Events() throws WorkloadException, SQLExce
.thenReturn("DESC")
.thenReturn("ASC");*/
EventStreamReader.EventDecoder<Operation> decoder = new QueryEventStreamReader.ComplexRead1Decoder();
ParquetLoader loader = new ParquetLoader(db);
FileLoader loader = new FileLoader(db);
Iterator<Operation> opStream = loader.loadOperationStream("/somepath", decoder);

// Act
Expand Down
Loading