Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refine write/readwrite query #28

Merged
merged 10 commits into from
May 13, 2023
11 changes: 9 additions & 2 deletions src/main/java/org/ldbcouncil/finbench/driver/csv/FileLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public Iterator<Operation> loadOperationStream(String path, EventStreamReader.Ev
if (path.contains(".parquet")) {
rs = stmt.executeQuery("SELECT * FROM read_parquet('" + path + "');");
} else {
rs = stmt.executeQuery("SELECT * FROM read_csv('" + path + "', delim='|', AUTO_DETECT=TRUE);");
rs = stmt.executeQuery("SELECT * FROM read_csv_auto('" + path + "', delim='|', header=TRUE);");
}
while (rs.next()) {
Operation obj = decoder.decodeEvent(rs);
Expand Down Expand Up @@ -112,7 +112,14 @@ public void createViewOnParquetFile(String path, String viewName) throws Workloa
try {
Connection connection = db.getConnection();
stmt = connection.createStatement();
stmt.execute("CREATE VIEW " + viewName + " AS SELECT * FROM read_parquet('" + path + "');");

if (path.contains(".parquet")) {
stmt.execute("CREATE VIEW " + viewName + " AS SELECT * FROM read_parquet('" + path + "');");
} else {
stmt.execute("CREATE VIEW " + viewName + " AS SELECT * FROM read_csv_auto("
+ "'" + path + "', delim='|', header=TRUE);");
}

} catch (SQLException e) {
e.printStackTrace();
throw new WorkloadException(format("Error creating view on temporary database: %s", path), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ private Iterator<Operation> setBatchedUpdateStreams(
loader,
gf,
updatesDir,
fileSuffix,
blockingQueue,
dependencyUpdateOperationTypes,
batchSizeInMillis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ public abstract class LdbcFinBenchTransactionWorkloadConfiguration {
public static final String WRITE_OPERATION_11_ENABLE_KEY = asEnableKey(Write11.class);
public static final String WRITE_OPERATION_12_ENABLE_KEY = asEnableKey(Write12.class);
public static final String WRITE_OPERATION_13_ENABLE_KEY = asEnableKey(Write13.class);
public static final String READ_WRITE_OPERATION_1_ENABLE_KEY = asEnableKey(ReadWrite1.class);
public static final String READ_WRITE_OPERATION_2_ENABLE_KEY = asEnableKey(ReadWrite2.class);
public static final String READ_WRITE_OPERATION_3_ENABLE_KEY = asEnableKey(ReadWrite3.class);
public static final List<String> WRITE_OPERATION_ENABLE_KEYS = Lists.newArrayList(
WRITE_OPERATION_1_ENABLE_KEY,
WRITE_OPERATION_2_ENABLE_KEY,
Expand All @@ -250,7 +253,10 @@ public abstract class LdbcFinBenchTransactionWorkloadConfiguration {
WRITE_OPERATION_10_ENABLE_KEY,
WRITE_OPERATION_11_ENABLE_KEY,
WRITE_OPERATION_12_ENABLE_KEY,
WRITE_OPERATION_13_ENABLE_KEY
WRITE_OPERATION_13_ENABLE_KEY,
READ_WRITE_OPERATION_1_ENABLE_KEY,
READ_WRITE_OPERATION_2_ENABLE_KEY,
READ_WRITE_OPERATION_3_ENABLE_KEY
);
/*
* Read Operation Parameters
Expand Down Expand Up @@ -505,6 +511,9 @@ public static Map<Integer, Class<? extends Operation>> operationTypeToClassMappi
operationTypeToClassMapping.put(Write11.TYPE, Write11.class);
operationTypeToClassMapping.put(Write12.TYPE, Write12.class);
operationTypeToClassMapping.put(Write13.TYPE, Write13.class);
operationTypeToClassMapping.put(ReadWrite1.TYPE, ReadWrite1.class);
operationTypeToClassMapping.put(ReadWrite2.TYPE, ReadWrite2.class);
operationTypeToClassMapping.put(ReadWrite3.TYPE, ReadWrite3.class);
return operationTypeToClassMapping;
}

Expand Down Expand Up @@ -552,22 +561,22 @@ public static List<File> personUpdateFilesInDirectory(File directory) {
public static Map<Class<? extends Operation>, String> getUpdateStreamClassToPathMapping() {
Map<Class<? extends Operation>, String> classToFileNameMapping = new HashMap<>();
// Inserts TODO INSERTS_DIRECTORY
classToFileNameMapping.put(Write1.class, INSERTS_DIRECTORY + "/Person.parquet");
classToFileNameMapping.put(Write2.class, INSERTS_DIRECTORY + "/Person.parquet");
classToFileNameMapping.put(Write3.class, INSERTS_DIRECTORY + "/Person.parquet");
classToFileNameMapping.put(Write4.class, INSERTS_DIRECTORY + "/Person.parquet");
classToFileNameMapping.put(Write5.class, INSERTS_DIRECTORY + "/Person.parquet");
classToFileNameMapping.put(Write6.class, INSERTS_DIRECTORY + "/Person.parquet");
classToFileNameMapping.put(Write7.class, INSERTS_DIRECTORY + "/Person.parquet");
classToFileNameMapping.put(Write8.class, INSERTS_DIRECTORY + "/Person.parquet");
classToFileNameMapping.put(Write9.class, INSERTS_DIRECTORY + "/Person.parquet");
classToFileNameMapping.put(Write10.class, INSERTS_DIRECTORY + "/Person.parquet");
classToFileNameMapping.put(Write11.class, INSERTS_DIRECTORY + "/Person.parquet");
classToFileNameMapping.put(Write12.class, INSERTS_DIRECTORY + "/Person.parquet");
classToFileNameMapping.put(Write13.class, INSERTS_DIRECTORY + "/Person.parquet");
classToFileNameMapping.put(ReadWrite1.class, INSERTS_DIRECTORY + "/Person.parquet");
classToFileNameMapping.put(ReadWrite2.class, INSERTS_DIRECTORY + "/Person.parquet");
classToFileNameMapping.put(ReadWrite3.class, INSERTS_DIRECTORY + "/Person.parquet");
classToFileNameMapping.put(Write1.class, "/write_1_param");
classToFileNameMapping.put(Write2.class, "/write_2_param");
classToFileNameMapping.put(Write3.class, "/write_3_param");
classToFileNameMapping.put(Write4.class, "/write_4_param");
classToFileNameMapping.put(Write5.class, "/write_5_param");
classToFileNameMapping.put(Write6.class, "/write_6_param");
classToFileNameMapping.put(Write7.class, "/write_7_param");
classToFileNameMapping.put(Write8.class, "/write_8_param");
classToFileNameMapping.put(Write9.class, "/write_9_param");
classToFileNameMapping.put(Write10.class, "/write_10_param");
classToFileNameMapping.put(Write11.class, "/write_11_param");
classToFileNameMapping.put(Write12.class, "/write_12_param");
classToFileNameMapping.put(Write13.class, "/write_13_param");
classToFileNameMapping.put(ReadWrite1.class, "/read_write_1_param");
classToFileNameMapping.put(ReadWrite2.class, "/read_write_2_param");
classToFileNameMapping.put(ReadWrite3.class, "/read_write_3_param");
return classToFileNameMapping;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,23 @@ public class RunnableOperationStreamBatchLoader extends Thread {
private final long batchSize;
private final GeneratorFactory gf;
private final File updatesDir;
private final String fileSuffix;
private final Set<Class<? extends Operation>> enabledUpdateOperationTypes;
private final BlockingQueue<Iterator<Operation>> blockingQueue;

public RunnableOperationStreamBatchLoader(
FileLoader loader,
GeneratorFactory gf,
File updatesDir,
String fileSuffix,
BlockingQueue<Iterator<Operation>> blockingQueue,
Set<Class<? extends Operation>> enabledUpdateOperationTypes,
long batchSize
) {
this.loader = loader;
this.gf = gf;
this.updatesDir = updatesDir;
this.fileSuffix = fileSuffix;
this.blockingQueue = blockingQueue;
this.enabledUpdateOperationTypes = enabledUpdateOperationTypes;
this.batchSize = batchSize;
Expand All @@ -60,7 +63,8 @@ public void run() {
String viewName = enabledClass.getSimpleName();
// Initialize the batch reader to set the view in DuckDB on the parquet file
Tuple2<Long, Long> boundaries = updateOperationStream.init(
new File(updatesDir, filename),
new File(updatesDir, filename
+ LdbcFinBenchTransactionWorkloadConfiguration.FILE_SEPARATOR + fileSuffix),
viewName,
batchColumn
);
Expand Down
Loading