From 6b580b36a9ea3e1149670b1b48eb397fb5f2306e Mon Sep 17 00:00:00 2001 From: robertsami Date: Wed, 3 Mar 2021 14:31:56 -0500 Subject: [PATCH] Pull TPCCBenchmark functionality up into BenchmarkModule (#84) --- config/plugin.xml | 4 - src/com/oltpbenchmark/DBWorkload.java | 5 +- .../oltpbenchmark/api/BenchmarkModule.java | 167 +++++++++++++-- src/com/oltpbenchmark/api/Loader.java | 4 +- src/com/oltpbenchmark/api/Worker.java | 7 +- .../tpcc => api}/ddls/tpcc-ddl.sql | 0 .../tpcc => api}/ddls/tpcc-pg-ddl.sql | 0 .../tpcc => api}/ddls/tpcc-postgres-ddl.sql | 0 .../benchmarks/tpcc/TPCCBenchmark.java | 195 ------------------ 9 files changed, 154 insertions(+), 228 deletions(-) delete mode 100644 config/plugin.xml rename src/com/oltpbenchmark/{benchmarks/tpcc => api}/ddls/tpcc-ddl.sql (100%) rename src/com/oltpbenchmark/{benchmarks/tpcc => api}/ddls/tpcc-pg-ddl.sql (100%) rename src/com/oltpbenchmark/{benchmarks/tpcc => api}/ddls/tpcc-postgres-ddl.sql (100%) delete mode 100644 src/com/oltpbenchmark/benchmarks/tpcc/TPCCBenchmark.java diff --git a/config/plugin.xml b/config/plugin.xml deleted file mode 100644 index d91c437..0000000 --- a/config/plugin.xml +++ /dev/null @@ -1,4 +0,0 @@ - - - com.oltpbenchmark.benchmarks.tpcc.TPCCBenchmark - diff --git a/src/com/oltpbenchmark/DBWorkload.java b/src/com/oltpbenchmark/DBWorkload.java index 6b8d3b9..a85392d 100644 --- a/src/com/oltpbenchmark/DBWorkload.java +++ b/src/com/oltpbenchmark/DBWorkload.java @@ -22,7 +22,6 @@ import java.text.DecimalFormat; import java.util.*; -import com.oltpbenchmark.benchmarks.tpcc.TPCCBenchmark; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.HelpFormatter; @@ -365,9 +364,9 @@ public static void main(String[] args) throws Exception { // ---------------------------------------------------------------- // CREATE BENCHMARK MODULE // ---------------------------------------------------------------- - BenchmarkModule bench = new TPCCBenchmark(wrkld); + BenchmarkModule bench = new BenchmarkModule(wrkld); Map initDebug = new ListOrderedMap<>(); - initDebug.put("Benchmark", String.format("%s {%s}", plugin.toUpperCase(), "TPCCBenchmark")); + initDebug.put("Benchmark", String.format("%s {%s}", plugin.toUpperCase(), "BenchmarkModule")); initDebug.put("Configuration", configFile); initDebug.put("Type", wrkld.getDBType()); initDebug.put("Driver", wrkld.getDBDriver()); diff --git a/src/com/oltpbenchmark/api/BenchmarkModule.java b/src/com/oltpbenchmark/api/BenchmarkModule.java index a57b7ba..a632537 100644 --- a/src/com/oltpbenchmark/api/BenchmarkModule.java +++ b/src/com/oltpbenchmark/api/BenchmarkModule.java @@ -20,14 +20,13 @@ import java.io.File; import java.io.IOException; import java.net.URL; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; +import java.sql.*; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; +import com.oltpbenchmark.benchmarks.tpcc.TPCCConfig; import com.oltpbenchmark.benchmarks.tpcc.procedures.*; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; @@ -45,7 +44,7 @@ /** * Base class for all benchmark implementations */ -public abstract class BenchmarkModule { +public class BenchmarkModule { private static final Logger LOG = Logger.getLogger(BenchmarkModule.class); /** @@ -74,10 +73,10 @@ public abstract class BenchmarkModule { */ private final Random rng = new Random(); - public BenchmarkModule(String benchmarkName, WorkloadConfiguration workConf) { + public BenchmarkModule(WorkloadConfiguration workConf) { assert (workConf != null) : "The WorkloadConfiguration instance is null."; - this.benchmarkName = benchmarkName; + this.benchmarkName = "tpcc"; this.workConf = workConf; this.catalog = new Catalog(this); if (workConf.getNeedsExecution()) { @@ -159,19 +158,25 @@ public final HikariDataSource getDataSource() { // IMPLEMENTING CLASS INTERFACE // -------------------------------------------------------------------------- - protected abstract List makeWorkersImpl(); + protected List makeWorkersImpl() { + ArrayList workers = new ArrayList<>(); + try { + List terminals = createTerminals(); + workers.addAll(terminals); + } catch (Exception e) { + e.printStackTrace(); + } - /** - * Each BenchmarkModule needs to implement this method to load a sample - * dataset into the database. The Connection handle will already be - * configured for you, and the base class will commit+close it once this - * method returns - * - * @return TODO - */ - protected abstract Loader makeLoaderImpl(); + return workers; + } + + protected Loader makeLoaderImpl() { + return new Loader(this); + } - protected abstract Package getProcedurePackageImpl(); + protected Package getProcedurePackageImpl() { + return (NewOrder.class.getPackage()); + } // -------------------------------------------------------------------------- // PUBLIC INTERFACE @@ -385,9 +390,131 @@ public Map getProcedures() { return (proc_xref); } - public abstract void enableForeignKeys() throws Exception; + protected ArrayList createTerminals() { + + // The array 'terminals' contains a terminal associated to a {warehouse, district}. + Worker[] terminals = new Worker[workConf.getTerminals()]; + + int numWarehouses = workConf.getNumWarehouses(); + if (numWarehouses <= 0) { + numWarehouses = 1; + } + int numTerminals = workConf.getTerminals(); + assert (numTerminals >= numWarehouses) : + String.format("Insufficient number of terminals '%d' [numWarehouses=%d]", + numTerminals, numWarehouses); + + // TODO: This is currently broken: fix it! + int warehouseOffset = Integer.getInteger("warehouseOffset", 1); + assert warehouseOffset == 1; + + // We distribute terminals evenly across the warehouses + // Eg. if there are 10 terminals across 7 warehouses, they + // are distributed as + // 1, 1, 2, 1, 2, 1, 2 + final double terminalsPerWarehouse = (double) numTerminals + / numWarehouses; + int workerId = 0; + assert terminalsPerWarehouse >= 1; + int k = 0; + for (int w = workConf.getStartWarehouseIdForShard() - 1; + w < numWarehouses + workConf.getStartWarehouseIdForShard() - 1; + w++) { + // Compute the number of terminals in *this* warehouse + int lowerTerminalId = (int) (w * terminalsPerWarehouse); + int upperTerminalId = (int) ((w + 1) * terminalsPerWarehouse); + // protect against double rounding errors + int w_id = w + 1; + if (w_id == numWarehouses) + upperTerminalId = numTerminals; + int numWarehouseTerminals = upperTerminalId - lowerTerminalId; + + if (BenchmarkModule.LOG.isDebugEnabled()) + BenchmarkModule.LOG.debug(String.format("w_id %d = %d terminals [lower=%d / upper%d]", + w_id, numWarehouseTerminals, lowerTerminalId, upperTerminalId)); + + final double districtsPerTerminal = + TPCCConfig.configDistPerWhse / (double) numWarehouseTerminals; + assert districtsPerTerminal >= 1 : + String.format("Too many terminals [districtsPerTerminal=%.2f, numWarehouseTerminals=%d]", + districtsPerTerminal, numWarehouseTerminals); + for (int terminalId = 0; terminalId < numWarehouseTerminals; terminalId++) { + int lowerDistrictId = (int) (terminalId * districtsPerTerminal); + int upperDistrictId = (int) ((terminalId + 1) * districtsPerTerminal); + if (terminalId + 1 == numWarehouseTerminals) { + upperDistrictId = TPCCConfig.configDistPerWhse; + } + lowerDistrictId += 1; + + Worker terminal = new Worker(this, workerId++, + w_id, lowerDistrictId, upperDistrictId); + terminals[k++] = terminal; + } + } + assert terminals[terminals.length - 1] != null; + + ArrayList ret = new ArrayList<>(); + Collections.addAll(ret, terminals); + return ret; + } + + /** + * Hack to support postgres-specific timestamps + * @param time - millis since epoch + * @return Timestamp + */ + public Timestamp getTimestamp(long time) { + Timestamp timestamp; + + // HACK: NoisePage doesn't support JDBC timestamps. + // We have to use the postgres-specific type + if (this.workConf.getDBType() == DatabaseType.NOISEPAGE) { + timestamp = new org.postgresql.util.PGTimestamp(time); + } else { + timestamp = new Timestamp(time); + } + return (timestamp); + } + + public void enableForeignKeys() throws Exception { + Loader loader = new Loader(this); + loader.EnableForeignKeyConstraints(makeConnection()); + } - public abstract void createSqlProcedures() throws Exception; + // This function creates SQL procedures that the execution would need. Currently we have procedures to update the + // Stock table, and a procedure to get stock levels of items recently ordered. + public void createSqlProcedures() throws Exception { + try { + Connection conn = makeConnection(); + Statement st = conn.createStatement(); + + StringBuilder argsSb = new StringBuilder(); + StringBuilder updateStatements = new StringBuilder(); + + argsSb.append("wid int"); + for (int i = 1; i <= 15; ++i) { + argsSb.append(String.format(", i%d int, q%d int, y%d int, r%d int", i, i, i, i)); + updateStatements.append(String.format( + "UPDATE STOCK SET S_QUANTITY = q%d, S_YTD = y%d, S_ORDER_CNT = S_ORDER_CNT + 1, " + + "S_REMOTE_CNT = r%d WHERE S_W_ID = wid AND S_I_ID = i%d;", + i, i, i, i)); + String updateStmt = + String.format("CREATE PROCEDURE updatestock%d (%s) AS '%s' LANGUAGE SQL;", + i, argsSb.toString(), updateStatements.toString()); + + st.execute(String.format("DROP PROCEDURE IF EXISTS updatestock%d", i)); + st.execute(updateStmt); + } + + StockLevel.InitializeGetStockCountProc(conn); + } catch (SQLException se) { + BenchmarkModule.LOG.error(se.getMessage()); + throw se; + } + } - public void test() throws Exception {} + public void test() throws Exception { + Worker worker = new Worker(this, 1 /* worker_id */, 1, 1, 1); + worker.test(makeConnection()); + } } diff --git a/src/com/oltpbenchmark/api/Loader.java b/src/com/oltpbenchmark/api/Loader.java index f991122..8281675 100644 --- a/src/com/oltpbenchmark/api/Loader.java +++ b/src/com/oltpbenchmark/api/Loader.java @@ -38,7 +38,7 @@ public class Loader { private static final Logger LOG = Logger.getLogger(Loader.class); private static final int FIRST_UNPROCESSED_O_ID = 2101; - protected final TPCCBenchmark benchmark; + protected final BenchmarkModule benchmark; protected final WorkloadConfiguration workConf; protected final int numWarehouses; private final Histogram tableSizes = new Histogram<>(); @@ -75,7 +75,7 @@ public final void run() { public abstract void load(Connection conn); } - public Loader(TPCCBenchmark benchmark) { + public Loader(BenchmarkModule benchmark) { this.benchmark = benchmark; this.workConf = benchmark.getWorkloadConfiguration(); this.numWarehouses = workConf.getNumWarehouses(); diff --git a/src/com/oltpbenchmark/api/Worker.java b/src/com/oltpbenchmark/api/Worker.java index a1c32b4..17d9fd5 100644 --- a/src/com/oltpbenchmark/api/Worker.java +++ b/src/com/oltpbenchmark/api/Worker.java @@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicInteger; -import com.oltpbenchmark.benchmarks.tpcc.TPCCBenchmark; import com.oltpbenchmark.benchmarks.tpcc.TPCCConfig; import com.oltpbenchmark.benchmarks.tpcc.TPCCUtil; import com.oltpbenchmark.benchmarks.tpcc.procedures.StockLevel; @@ -63,7 +62,7 @@ public class Worker implements Runnable { private final AtomicInteger intervalRequests = new AtomicInteger(0); private final int id; - private final TPCCBenchmark benchmarkModule; + private final BenchmarkModule benchmarkModule; protected final HikariDataSource dataSource; protected static WorkloadConfiguration wrkld; protected TransactionTypes transactionTypes; @@ -76,7 +75,7 @@ public class Worker implements Runnable { int totalAttemptsPerTransaction = 1; public Worker( - TPCCBenchmark benchmarkModule, int id, int terminalWarehouseID, int terminalDistrictLowerID, + BenchmarkModule benchmarkModule, int id, int terminalWarehouseID, int terminalDistrictLowerID, int terminalDistrictUpperID) { this.id = id; this.benchmarkModule = benchmarkModule; @@ -116,7 +115,7 @@ public final void InitializeProcedures() { /** * Get the BenchmarkModule managing this Worker */ - public final TPCCBenchmark getBenchmarkModule() { + public final BenchmarkModule getBenchmarkModule() { return (this.benchmarkModule); } diff --git a/src/com/oltpbenchmark/benchmarks/tpcc/ddls/tpcc-ddl.sql b/src/com/oltpbenchmark/api/ddls/tpcc-ddl.sql similarity index 100% rename from src/com/oltpbenchmark/benchmarks/tpcc/ddls/tpcc-ddl.sql rename to src/com/oltpbenchmark/api/ddls/tpcc-ddl.sql diff --git a/src/com/oltpbenchmark/benchmarks/tpcc/ddls/tpcc-pg-ddl.sql b/src/com/oltpbenchmark/api/ddls/tpcc-pg-ddl.sql similarity index 100% rename from src/com/oltpbenchmark/benchmarks/tpcc/ddls/tpcc-pg-ddl.sql rename to src/com/oltpbenchmark/api/ddls/tpcc-pg-ddl.sql diff --git a/src/com/oltpbenchmark/benchmarks/tpcc/ddls/tpcc-postgres-ddl.sql b/src/com/oltpbenchmark/api/ddls/tpcc-postgres-ddl.sql similarity index 100% rename from src/com/oltpbenchmark/benchmarks/tpcc/ddls/tpcc-postgres-ddl.sql rename to src/com/oltpbenchmark/api/ddls/tpcc-postgres-ddl.sql diff --git a/src/com/oltpbenchmark/benchmarks/tpcc/TPCCBenchmark.java b/src/com/oltpbenchmark/benchmarks/tpcc/TPCCBenchmark.java deleted file mode 100644 index d93d7e3..0000000 --- a/src/com/oltpbenchmark/benchmarks/tpcc/TPCCBenchmark.java +++ /dev/null @@ -1,195 +0,0 @@ -/****************************************************************************** - * Copyright 2015 by OLTPBenchmark Project * - * * - * Licensed under the Apache License, Version 2.0 (the "License"); * - * you may not use this file except in compliance with the License. * - * You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, software * - * distributed under the License is distributed on an "AS IS" BASIS, * - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * - * See the License for the specific language governing permissions and * - * limitations under the License. * - ******************************************************************************/ - - -package com.oltpbenchmark.benchmarks.tpcc; - -import java.sql.Connection; -import java.sql.Statement; -import java.sql.SQLException; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import com.oltpbenchmark.benchmarks.tpcc.procedures.StockLevel; -import org.apache.log4j.Logger; - -import com.oltpbenchmark.WorkloadConfiguration; -import com.oltpbenchmark.api.BenchmarkModule; -import com.oltpbenchmark.api.Loader; -import com.oltpbenchmark.api.Worker; -import com.oltpbenchmark.benchmarks.tpcc.procedures.NewOrder; -import com.oltpbenchmark.types.DatabaseType; - -public class TPCCBenchmark extends BenchmarkModule { - private static final Logger LOG = Logger.getLogger(TPCCBenchmark.class); - - public TPCCBenchmark(WorkloadConfiguration workConf) { - super("tpcc", workConf); - } - - @Override - protected Package getProcedurePackageImpl() { - return (NewOrder.class.getPackage()); - } - - @Override - protected List makeWorkersImpl() { - ArrayList workers = new ArrayList<>(); - try { - List terminals = createTerminals(); - workers.addAll(terminals); - } catch (Exception e) { - e.printStackTrace(); - } - - return workers; - } - - @Override - protected Loader makeLoaderImpl() { - return new Loader(this); - } - - protected ArrayList createTerminals() { - - // The array 'terminals' contains a terminal associated to a {warehouse, district}. - Worker[] terminals = new Worker[workConf.getTerminals()]; - - int numWarehouses = workConf.getNumWarehouses(); - if (numWarehouses <= 0) { - numWarehouses = 1; - } - int numTerminals = workConf.getTerminals(); - assert (numTerminals >= numWarehouses) : - String.format("Insufficient number of terminals '%d' [numWarehouses=%d]", - numTerminals, numWarehouses); - - // TODO: This is currently broken: fix it! - int warehouseOffset = Integer.getInteger("warehouseOffset", 1); - assert warehouseOffset == 1; - - // We distribute terminals evenly across the warehouses - // Eg. if there are 10 terminals across 7 warehouses, they - // are distributed as - // 1, 1, 2, 1, 2, 1, 2 - final double terminalsPerWarehouse = (double) numTerminals - / numWarehouses; - int workerId = 0; - assert terminalsPerWarehouse >= 1; - int k = 0; - for (int w = workConf.getStartWarehouseIdForShard() - 1; - w < numWarehouses + workConf.getStartWarehouseIdForShard() - 1; - w++) { - // Compute the number of terminals in *this* warehouse - int lowerTerminalId = (int) (w * terminalsPerWarehouse); - int upperTerminalId = (int) ((w + 1) * terminalsPerWarehouse); - // protect against double rounding errors - int w_id = w + 1; - if (w_id == numWarehouses) - upperTerminalId = numTerminals; - int numWarehouseTerminals = upperTerminalId - lowerTerminalId; - - if (LOG.isDebugEnabled()) - LOG.debug(String.format("w_id %d = %d terminals [lower=%d / upper%d]", - w_id, numWarehouseTerminals, lowerTerminalId, upperTerminalId)); - - final double districtsPerTerminal = - TPCCConfig.configDistPerWhse / (double) numWarehouseTerminals; - assert districtsPerTerminal >= 1 : - String.format("Too many terminals [districtsPerTerminal=%.2f, numWarehouseTerminals=%d]", - districtsPerTerminal, numWarehouseTerminals); - for (int terminalId = 0; terminalId < numWarehouseTerminals; terminalId++) { - int lowerDistrictId = (int) (terminalId * districtsPerTerminal); - int upperDistrictId = (int) ((terminalId + 1) * districtsPerTerminal); - if (terminalId + 1 == numWarehouseTerminals) { - upperDistrictId = TPCCConfig.configDistPerWhse; - } - lowerDistrictId += 1; - - Worker terminal = new Worker(this, workerId++, - w_id, lowerDistrictId, upperDistrictId); - terminals[k++] = terminal; - } - } - assert terminals[terminals.length - 1] != null; - - ArrayList ret = new ArrayList<>(); - Collections.addAll(ret, terminals); - return ret; - } - - /** - * Hack to support postgres-specific timestamps - * @param time - millis since epoch - * @return Timestamp - */ - public Timestamp getTimestamp(long time) { - Timestamp timestamp; - - // HACK: NoisePage doesn't support JDBC timestamps. - // We have to use the postgres-specific type - if (this.workConf.getDBType() == DatabaseType.NOISEPAGE) { - timestamp = new org.postgresql.util.PGTimestamp(time); - } else { - timestamp = new java.sql.Timestamp(time); - } - return (timestamp); - } - - public void enableForeignKeys() throws Exception { - Loader loader = new Loader(this); - loader.EnableForeignKeyConstraints(makeConnection()); - } - - // This function creates SQL procedures that the execution would need. Currently we have procedures to update the - // Stock table, and a procedure to get stock levels of items recently ordered. - public void createSqlProcedures() throws Exception { - try { - Connection conn = makeConnection(); - Statement st = conn.createStatement(); - - StringBuilder argsSb = new StringBuilder(); - StringBuilder updateStatements = new StringBuilder(); - - argsSb.append("wid int"); - for (int i = 1; i <= 15; ++i) { - argsSb.append(String.format(", i%d int, q%d int, y%d int, r%d int", i, i, i, i)); - updateStatements.append(String.format( - "UPDATE STOCK SET S_QUANTITY = q%d, S_YTD = y%d, S_ORDER_CNT = S_ORDER_CNT + 1, " + - "S_REMOTE_CNT = r%d WHERE S_W_ID = wid AND S_I_ID = i%d;", - i, i, i, i)); - String updateStmt = - String.format("CREATE PROCEDURE updatestock%d (%s) AS '%s' LANGUAGE SQL;", - i, argsSb.toString(), updateStatements.toString()); - - st.execute(String.format("DROP PROCEDURE IF EXISTS updatestock%d", i)); - st.execute(updateStmt); - } - - StockLevel.InitializeGetStockCountProc(conn); - } catch (SQLException se) { - LOG.error(se.getMessage()); - throw se; - } - } - - public void test() throws Exception { - Worker worker = new Worker(this, 1 /* worker_id */, 1, 1, 1); - worker.test(makeConnection()); - } -}