Skip to content

Commit

Permalink
Pull TPCCBenchmark functionality up into BenchmarkModule (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertsami authored Mar 3, 2021
1 parent f6d1713 commit 6b580b3
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 228 deletions.
4 changes: 0 additions & 4 deletions config/plugin.xml

This file was deleted.

5 changes: 2 additions & 3 deletions src/com/oltpbenchmark/DBWorkload.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.text.DecimalFormat;
import java.util.*;

import com.oltpbenchmark.benchmarks.tpcc.TPCCBenchmark;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
Expand Down Expand Up @@ -365,9 +364,9 @@ public static void main(String[] args) throws Exception {
// ----------------------------------------------------------------
// CREATE BENCHMARK MODULE
// ----------------------------------------------------------------
BenchmarkModule bench = new TPCCBenchmark(wrkld);
BenchmarkModule bench = new BenchmarkModule(wrkld);
Map<String, Object> initDebug = new ListOrderedMap<>();
initDebug.put("Benchmark", String.format("%s {%s}", plugin.toUpperCase(), "TPCCBenchmark"));
initDebug.put("Benchmark", String.format("%s {%s}", plugin.toUpperCase(), "BenchmarkModule"));
initDebug.put("Configuration", configFile);
initDebug.put("Type", wrkld.getDBType());
initDebug.put("Driver", wrkld.getDBDriver());
Expand Down
167 changes: 147 additions & 20 deletions src/com/oltpbenchmark/api/BenchmarkModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.oltpbenchmark.benchmarks.tpcc.TPCCConfig;
import com.oltpbenchmark.benchmarks.tpcc.procedures.*;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
Expand All @@ -45,7 +44,7 @@
/**
* Base class for all benchmark implementations
*/
public abstract class BenchmarkModule {
public class BenchmarkModule {
private static final Logger LOG = Logger.getLogger(BenchmarkModule.class);

/**
Expand Down Expand Up @@ -74,10 +73,10 @@ public abstract class BenchmarkModule {
*/
private final Random rng = new Random();

public BenchmarkModule(String benchmarkName, WorkloadConfiguration workConf) {
public BenchmarkModule(WorkloadConfiguration workConf) {
assert (workConf != null) : "The WorkloadConfiguration instance is null.";

this.benchmarkName = benchmarkName;
this.benchmarkName = "tpcc";
this.workConf = workConf;
this.catalog = new Catalog(this);
if (workConf.getNeedsExecution()) {
Expand Down Expand Up @@ -159,19 +158,25 @@ public final HikariDataSource getDataSource() {
// IMPLEMENTING CLASS INTERFACE
// --------------------------------------------------------------------------

protected abstract List<Worker> makeWorkersImpl();
protected List<Worker> makeWorkersImpl() {
ArrayList<Worker> workers = new ArrayList<>();
try {
List<Worker> terminals = createTerminals();
workers.addAll(terminals);
} catch (Exception e) {
e.printStackTrace();
}

/**
* Each BenchmarkModule needs to implement this method to load a sample
* dataset into the database. The Connection handle will already be
* configured for you, and the base class will commit+close it once this
* method returns
*
* @return TODO
*/
protected abstract Loader makeLoaderImpl();
return workers;
}

protected Loader makeLoaderImpl() {
return new Loader(this);
}

protected abstract Package getProcedurePackageImpl();
protected Package getProcedurePackageImpl() {
return (NewOrder.class.getPackage());
}

// --------------------------------------------------------------------------
// PUBLIC INTERFACE
Expand Down Expand Up @@ -385,9 +390,131 @@ public Map<TransactionType, Procedure> getProcedures() {
return (proc_xref);
}

public abstract void enableForeignKeys() throws Exception;
protected ArrayList<Worker> createTerminals() {

// The array 'terminals' contains a terminal associated to a {warehouse, district}.
Worker[] terminals = new Worker[workConf.getTerminals()];

int numWarehouses = workConf.getNumWarehouses();
if (numWarehouses <= 0) {
numWarehouses = 1;
}
int numTerminals = workConf.getTerminals();
assert (numTerminals >= numWarehouses) :
String.format("Insufficient number of terminals '%d' [numWarehouses=%d]",
numTerminals, numWarehouses);

// TODO: This is currently broken: fix it!
int warehouseOffset = Integer.getInteger("warehouseOffset", 1);
assert warehouseOffset == 1;

// We distribute terminals evenly across the warehouses
// Eg. if there are 10 terminals across 7 warehouses, they
// are distributed as
// 1, 1, 2, 1, 2, 1, 2
final double terminalsPerWarehouse = (double) numTerminals
/ numWarehouses;
int workerId = 0;
assert terminalsPerWarehouse >= 1;
int k = 0;
for (int w = workConf.getStartWarehouseIdForShard() - 1;
w < numWarehouses + workConf.getStartWarehouseIdForShard() - 1;
w++) {
// Compute the number of terminals in *this* warehouse
int lowerTerminalId = (int) (w * terminalsPerWarehouse);
int upperTerminalId = (int) ((w + 1) * terminalsPerWarehouse);
// protect against double rounding errors
int w_id = w + 1;
if (w_id == numWarehouses)
upperTerminalId = numTerminals;
int numWarehouseTerminals = upperTerminalId - lowerTerminalId;

if (BenchmarkModule.LOG.isDebugEnabled())
BenchmarkModule.LOG.debug(String.format("w_id %d = %d terminals [lower=%d / upper%d]",
w_id, numWarehouseTerminals, lowerTerminalId, upperTerminalId));

final double districtsPerTerminal =
TPCCConfig.configDistPerWhse / (double) numWarehouseTerminals;
assert districtsPerTerminal >= 1 :
String.format("Too many terminals [districtsPerTerminal=%.2f, numWarehouseTerminals=%d]",
districtsPerTerminal, numWarehouseTerminals);
for (int terminalId = 0; terminalId < numWarehouseTerminals; terminalId++) {
int lowerDistrictId = (int) (terminalId * districtsPerTerminal);
int upperDistrictId = (int) ((terminalId + 1) * districtsPerTerminal);
if (terminalId + 1 == numWarehouseTerminals) {
upperDistrictId = TPCCConfig.configDistPerWhse;
}
lowerDistrictId += 1;

Worker terminal = new Worker(this, workerId++,
w_id, lowerDistrictId, upperDistrictId);
terminals[k++] = terminal;
}
}
assert terminals[terminals.length - 1] != null;

ArrayList<Worker> ret = new ArrayList<>();
Collections.addAll(ret, terminals);
return ret;
}

/**
* Hack to support postgres-specific timestamps
* @param time - millis since epoch
* @return Timestamp
*/
public Timestamp getTimestamp(long time) {
Timestamp timestamp;

// HACK: NoisePage doesn't support JDBC timestamps.
// We have to use the postgres-specific type
if (this.workConf.getDBType() == DatabaseType.NOISEPAGE) {
timestamp = new org.postgresql.util.PGTimestamp(time);
} else {
timestamp = new Timestamp(time);
}
return (timestamp);
}

public void enableForeignKeys() throws Exception {
Loader loader = new Loader(this);
loader.EnableForeignKeyConstraints(makeConnection());
}

public abstract void createSqlProcedures() throws Exception;
// This function creates SQL procedures that the execution would need. Currently we have procedures to update the
// Stock table, and a procedure to get stock levels of items recently ordered.
public void createSqlProcedures() throws Exception {
try {
Connection conn = makeConnection();
Statement st = conn.createStatement();

StringBuilder argsSb = new StringBuilder();
StringBuilder updateStatements = new StringBuilder();

argsSb.append("wid int");
for (int i = 1; i <= 15; ++i) {
argsSb.append(String.format(", i%d int, q%d int, y%d int, r%d int", i, i, i, i));
updateStatements.append(String.format(
"UPDATE STOCK SET S_QUANTITY = q%d, S_YTD = y%d, S_ORDER_CNT = S_ORDER_CNT + 1, " +
"S_REMOTE_CNT = r%d WHERE S_W_ID = wid AND S_I_ID = i%d;",
i, i, i, i));
String updateStmt =
String.format("CREATE PROCEDURE updatestock%d (%s) AS '%s' LANGUAGE SQL;",
i, argsSb.toString(), updateStatements.toString());

st.execute(String.format("DROP PROCEDURE IF EXISTS updatestock%d", i));
st.execute(updateStmt);
}

StockLevel.InitializeGetStockCountProc(conn);
} catch (SQLException se) {
BenchmarkModule.LOG.error(se.getMessage());
throw se;
}
}

public void test() throws Exception {}
public void test() throws Exception {
Worker worker = new Worker(this, 1 /* worker_id */, 1, 1, 1);
worker.test(makeConnection());
}
}
4 changes: 2 additions & 2 deletions src/com/oltpbenchmark/api/Loader.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class Loader {
private static final Logger LOG = Logger.getLogger(Loader.class);
private static final int FIRST_UNPROCESSED_O_ID = 2101;

protected final TPCCBenchmark benchmark;
protected final BenchmarkModule benchmark;
protected final WorkloadConfiguration workConf;
protected final int numWarehouses;
private final Histogram<String> tableSizes = new Histogram<>();
Expand Down Expand Up @@ -75,7 +75,7 @@ public final void run() {
public abstract void load(Connection conn);
}

public Loader(TPCCBenchmark benchmark) {
public Loader(BenchmarkModule benchmark) {
this.benchmark = benchmark;
this.workConf = benchmark.getWorkloadConfiguration();
this.numWarehouses = workConf.getNumWarehouses();
Expand Down
7 changes: 3 additions & 4 deletions src/com/oltpbenchmark/api/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.concurrent.atomic.AtomicInteger;


import com.oltpbenchmark.benchmarks.tpcc.TPCCBenchmark;
import com.oltpbenchmark.benchmarks.tpcc.TPCCConfig;
import com.oltpbenchmark.benchmarks.tpcc.TPCCUtil;
import com.oltpbenchmark.benchmarks.tpcc.procedures.StockLevel;
Expand Down Expand Up @@ -63,7 +62,7 @@ public class Worker implements Runnable {
private final AtomicInteger intervalRequests = new AtomicInteger(0);

private final int id;
private final TPCCBenchmark benchmarkModule;
private final BenchmarkModule benchmarkModule;
protected final HikariDataSource dataSource;
protected static WorkloadConfiguration wrkld;
protected TransactionTypes transactionTypes;
Expand All @@ -76,7 +75,7 @@ public class Worker implements Runnable {
int totalAttemptsPerTransaction = 1;

public Worker(
TPCCBenchmark benchmarkModule, int id, int terminalWarehouseID, int terminalDistrictLowerID,
BenchmarkModule benchmarkModule, int id, int terminalWarehouseID, int terminalDistrictLowerID,
int terminalDistrictUpperID) {
this.id = id;
this.benchmarkModule = benchmarkModule;
Expand Down Expand Up @@ -116,7 +115,7 @@ public final void InitializeProcedures() {
/**
* Get the BenchmarkModule managing this Worker
*/
public final TPCCBenchmark getBenchmarkModule() {
public final BenchmarkModule getBenchmarkModule() {
return (this.benchmarkModule);
}

Expand Down
File renamed without changes.
Loading

0 comments on commit 6b580b3

Please sign in to comment.