From ddb953ab499776bcd1a0bd2c446e2060cbc49350 Mon Sep 17 00:00:00 2001 From: Apekshit Sharma Date: Sat, 29 Feb 2020 18:03:35 -0800 Subject: [PATCH] Postgres Writer #2 - Move topic messages, contract result, file data, and live hashes from RecordFileLogger to PostgresWritingRecordParsedItemHandler - Logic for Transaction and batching will be moved together in followup Signed-off-by: Apekshit Sharma --- .../importer/domain/ContractResult.java | 5 +- .../mirror/importer/domain/FileData.java | 5 +- .../mirror/importer/domain/LiveHash.java | 5 +- .../mirror/importer/domain/TopicMessage.java | 4 + ...ostgresWritingRecordParsedItemHandler.java | 93 ++++++++++++++- .../parser/record/RecordFileLogger.java | 107 +++++------------- ...resWritingRecordParserItemHandlerTest.java | 83 ++++++++++++++ 7 files changed, 217 insertions(+), 85 deletions(-) diff --git a/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/domain/ContractResult.java b/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/domain/ContractResult.java index 584c3ca2dc0..12422ca4392 100644 --- a/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/domain/ContractResult.java +++ b/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/domain/ContractResult.java @@ -24,11 +24,14 @@ import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.Table; - +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; @Data @Entity +@NoArgsConstructor +@AllArgsConstructor @Table(name = "t_contract_result") public class ContractResult { diff --git a/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/domain/FileData.java b/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/domain/FileData.java index 84bfb7e8f8e..502472b07bb 100644 --- a/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/domain/FileData.java +++ b/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/domain/FileData.java @@ -23,11 +23,14 @@ import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.Table; - +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; @Data @Entity +@NoArgsConstructor +@AllArgsConstructor @Table(name = "t_file_data") public class FileData { diff --git a/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/domain/LiveHash.java b/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/domain/LiveHash.java index 0a43975a714..aceacfee8b6 100644 --- a/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/domain/LiveHash.java +++ b/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/domain/LiveHash.java @@ -23,11 +23,14 @@ import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.Table; - +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; @Data @Entity +@NoArgsConstructor +@AllArgsConstructor @Table(name = "t_livehashes") public class LiveHash { diff --git a/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/domain/TopicMessage.java b/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/domain/TopicMessage.java index d74939ca08a..72724ffd90e 100644 --- a/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/domain/TopicMessage.java +++ b/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/domain/TopicMessage.java @@ -22,10 +22,14 @@ import javax.persistence.Entity; import javax.persistence.Id; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; @Data @Entity +@NoArgsConstructor +@AllArgsConstructor public class TopicMessage { @Id diff --git a/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/parser/record/PostgresWritingRecordParsedItemHandler.java b/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/parser/record/PostgresWritingRecordParsedItemHandler.java index 3127100d4c5..b603b2e4332 100644 --- a/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/parser/record/PostgresWritingRecordParsedItemHandler.java +++ b/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/parser/record/PostgresWritingRecordParsedItemHandler.java @@ -41,6 +41,10 @@ public class PostgresWritingRecordParsedItemHandler implements RecordParsedItemHandler { private PreparedStatement sqlInsertTransferList; private PreparedStatement sqlInsertNonFeeTransfers; + private PreparedStatement sqlInsertFileData; + private PreparedStatement sqlInsertContractResult; + private PreparedStatement sqlInsertLiveHashes; + private PreparedStatement sqlInsertTopicMessage; void initSqlStatements(Connection connection) throws ParserSQLException { try { @@ -51,6 +55,22 @@ void initSqlStatements(Connection connection) throws ParserSQLException { sqlInsertNonFeeTransfers = connection.prepareStatement("insert into non_fee_transfers" + " (consensus_timestamp, amount, realm_num, entity_num)" + " values (?, ?, ?, ?)"); + + sqlInsertFileData = connection.prepareStatement("INSERT INTO t_file_data" + + " (consensus_timestamp, file_data)" + + " VALUES (?, ?)"); + + sqlInsertContractResult = connection.prepareStatement("INSERT INTO t_contract_result" + + " (consensus_timestamp, function_params, gas_supplied, call_result, gas_used)" + + " VALUES (?, ?, ?, ?, ?)"); + + sqlInsertLiveHashes = connection.prepareStatement("INSERT INTO t_livehashes" + + " (consensus_timestamp, livehash)" + + " VALUES (?, ?)"); + + sqlInsertTopicMessage = connection.prepareStatement("insert into topic_message" + + " (consensus_timestamp, realm_num, topic_num, message, running_hash, sequence_number)" + + " values (?, ?, ?, ?, ?, ?)"); } catch (SQLException e) { throw new ParserSQLException("Unable to prepare SQL statements", e); } @@ -69,6 +89,10 @@ private void closeStatements() { try { sqlInsertTransferList.close(); sqlInsertNonFeeTransfers.close(); + sqlInsertFileData.close(); + sqlInsertContractResult.close(); + sqlInsertLiveHashes.close(); + sqlInsertTopicMessage.close(); } catch (SQLException e) { throw new ParserSQLException("Error closing connection", e); } @@ -78,7 +102,14 @@ void executeBatches() { try { int[] transferLists = sqlInsertTransferList.executeBatch(); int[] nonFeeTransfers = sqlInsertNonFeeTransfers.executeBatch(); - log.info("Inserted {} transfer lists, {} non-fee transfers", transferLists.length, nonFeeTransfers.length); + int[] fileData = sqlInsertFileData.executeBatch(); + int[] contractResult = sqlInsertContractResult.executeBatch(); + int[] liveHashes = sqlInsertLiveHashes.executeBatch(); + int[] topicMessages = sqlInsertTopicMessage.executeBatch(); + log.info("Inserted {} transfer lists, {} files, {} contracts, {} claims, {} topic messages, " + + "{} non-fee transfers", + transferLists.length, fileData.length, contractResult.length, liveHashes.length, + topicMessages.length, nonFeeTransfers.length); } catch (SQLException e) { log.error("Error committing sql insert batch ", e); throw new ParserSQLException(e); @@ -120,22 +151,57 @@ public void onNonFeeTransfer(NonFeeTransfer nonFeeTransfer) throws ImporterExcep @Override public void onTopicMessage(TopicMessage topicMessage) throws ImporterException { - // to be implemented in followup change + try { + sqlInsertTopicMessage.setLong(F_TOPICMESSAGE.CONSENSUS_TIMESTAMP.ordinal(), + topicMessage.getConsensusTimestamp()); + sqlInsertTopicMessage.setShort(F_TOPICMESSAGE.REALM_NUM.ordinal(), (short) topicMessage.getRealmNum()); + sqlInsertTopicMessage.setInt(F_TOPICMESSAGE.TOPIC_NUM.ordinal(), topicMessage.getTopicNum()); + sqlInsertTopicMessage.setBytes(F_TOPICMESSAGE.MESSAGE.ordinal(), topicMessage.getMessage()); + sqlInsertTopicMessage.setBytes(F_TOPICMESSAGE.RUNNING_HASH.ordinal(), topicMessage.getRunningHash()); + sqlInsertTopicMessage.setLong(F_TOPICMESSAGE.SEQUENCE_NUMBER.ordinal(), topicMessage.getSequenceNumber()); + sqlInsertTopicMessage.addBatch(); + } catch (SQLException e) { + throw new ParserSQLException(e); + } } @Override public void onContractResult(ContractResult contractResult) throws ImporterException { - // to be implemented in followup change + try { + sqlInsertContractResult.setLong(F_CONTRACT_RESULT.CONSENSUS_TIMESTAMP.ordinal(), + contractResult.getConsensusTimestamp()); + sqlInsertContractResult + .setBytes(F_CONTRACT_RESULT.FUNCTION_PARAMS.ordinal(), contractResult.getFunctionParameters()); + sqlInsertContractResult.setLong(F_CONTRACT_RESULT.GAS_SUPPLIED.ordinal(), contractResult.getGasSupplied()); + sqlInsertContractResult.setBytes(F_CONTRACT_RESULT.CALL_RESULT.ordinal(), contractResult.getCallResult()); + sqlInsertContractResult.setLong(F_CONTRACT_RESULT.GAS_USED.ordinal(), contractResult.getGasUsed()); + sqlInsertContractResult.addBatch(); + } catch (SQLException e) { + throw new ParserSQLException(e); + } } @Override public void onFileData(FileData fileData) throws ImporterException { - // to be implemented in followup change + try { + sqlInsertFileData.setLong(F_FILE_DATA.CONSENSUS_TIMESTAMP.ordinal(), fileData.getConsensusTimestamp()); + sqlInsertFileData.setBytes(F_FILE_DATA.FILE_DATA.ordinal(), fileData.getFileData()); + sqlInsertFileData.addBatch(); + } catch (SQLException e) { + throw new ParserSQLException(e); + } } @Override public void onLiveHash(LiveHash liveHash) throws ImporterException { - // to be implemented in followup change + try { + sqlInsertLiveHashes + .setLong(F_LIVEHASHES.CONSENSUS_TIMESTAMP.ordinal(), liveHash.getConsensusTimestamp()); + sqlInsertLiveHashes.setBytes(F_LIVEHASHES.LIVEHASH.ordinal(), liveHash.getLivehash()); + sqlInsertLiveHashes.addBatch(); + } catch (SQLException e) { + throw new ParserSQLException(e); + } } @Override @@ -152,4 +218,21 @@ enum F_NONFEETRANSFER { ZERO // column indices start at 1, this creates the necessary offset , CONSENSUS_TIMESTAMP, AMOUNT, REALM_NUM, ENTITY_NUM } + + enum F_TOPICMESSAGE { + ZERO // column indices start at 1, this creates the necessary offset + , CONSENSUS_TIMESTAMP, REALM_NUM, TOPIC_NUM, MESSAGE, RUNNING_HASH, SEQUENCE_NUMBER + } + + enum F_FILE_DATA { + ZERO, CONSENSUS_TIMESTAMP, FILE_DATA + } + + enum F_CONTRACT_RESULT { + ZERO, CONSENSUS_TIMESTAMP, FUNCTION_PARAMS, GAS_SUPPLIED, CALL_RESULT, GAS_USED + } + + enum F_LIVEHASHES { + ZERO, CONSENSUS_TIMESTAMP, LIVEHASH + } } diff --git a/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/parser/record/RecordFileLogger.java b/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/parser/record/RecordFileLogger.java index 14f1393b12e..033722c761b 100644 --- a/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/parser/record/RecordFileLogger.java +++ b/hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/parser/record/RecordFileLogger.java @@ -53,11 +53,15 @@ import lombok.extern.log4j.Log4j2; import com.hedera.mirror.importer.addressbook.NetworkAddressBook; +import com.hedera.mirror.importer.domain.ContractResult; import com.hedera.mirror.importer.domain.CryptoTransfer; import com.hedera.mirror.importer.domain.Entities; import com.hedera.mirror.importer.domain.EntityId; import com.hedera.mirror.importer.domain.EntityType; +import com.hedera.mirror.importer.domain.FileData; +import com.hedera.mirror.importer.domain.LiveHash; import com.hedera.mirror.importer.domain.NonFeeTransfer; +import com.hedera.mirror.importer.domain.TopicMessage; import com.hedera.mirror.importer.parser.CommonParserProperties; import com.hedera.mirror.importer.repository.EntityRepository; import com.hedera.mirror.importer.repository.EntityTypeRepository; @@ -81,10 +85,6 @@ public class RecordFileLogger { private static long batch_count = 0; private static PreparedStatement sqlInsertTransaction; - private static PreparedStatement sqlInsertFileData; - private static PreparedStatement sqlInsertContractCall; - private static PreparedStatement sqlInsertClaimData; - private static PreparedStatement sqlInsertTopicMessage; public RecordFileLogger(CommonParserProperties commonParserProperties, RecordParserProperties parserProperties, NetworkAddressBook networkAddressBook, EntityRepository entityRepository, @@ -134,22 +134,6 @@ public static boolean start() { + " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"); postgresWriter.initSqlStatements(connect); - - sqlInsertFileData = connect.prepareStatement("INSERT INTO t_file_data" - + " (consensus_timestamp, file_data)" - + " VALUES (?, ?)"); - - sqlInsertContractCall = connect.prepareStatement("INSERT INTO t_contract_result" - + " (consensus_timestamp, function_params, gas_supplied, call_result, gas_used)" - + " VALUES (?, ?, ?, ?, ?)"); - - sqlInsertClaimData = connect.prepareStatement("INSERT INTO t_livehashes" - + " (consensus_timestamp, livehash)" - + " VALUES (?, ?)"); - - sqlInsertTopicMessage = connect.prepareStatement("insert into topic_message" - + " (consensus_timestamp, realm_num, topic_num, message, running_hash, sequence_number)" - + " values (?, ?, ?, ?, ?, ?)"); } catch (SQLException e) { log.error("Unable to prepare SQL statements", e); return false; @@ -160,11 +144,7 @@ public static boolean start() { public static boolean finish() { try { - sqlInsertFileData.close(); sqlInsertTransaction.close(); - sqlInsertContractCall.close(); - sqlInsertClaimData.close(); - sqlInsertTopicMessage.close(); postgresWriter.finish(); connect = DatabaseUtilities.closeDatabase(connect); @@ -559,8 +539,9 @@ public static void storeRecord(Transaction transaction, TransactionRecord txReco } /** - * Should the given transaction/record generate non_fee_transfers based on what type the transaction is, - * it's status, and run-time configuration concerning which situations warrant storing. + * Should the given transaction/record generate non_fee_transfers based on what type the transaction is, it's + * status, and run-time configuration concerning which situations warrant storing. + * * @param body * @param transactionRecord * @return @@ -748,39 +729,33 @@ private static EntityId storeConsensusSubmitMessage(TransactionBody body, } private static void insertConsensusTopicMessage(ConsensusSubmitMessageTransactionBody transactionBody, - TransactionRecord transactionRecord) throws SQLException { + TransactionRecord transactionRecord) { var receipt = transactionRecord.getReceipt(); - var ts = transactionRecord.getConsensusTimestamp(); var topicId = transactionBody.getTopicID(); - sqlInsertTopicMessage.setLong(1, Utility.timeStampInNanos(ts)); - sqlInsertTopicMessage.setShort(2, (short) topicId.getRealmNum()); - sqlInsertTopicMessage.setInt(3, (int) topicId.getTopicNum()); - sqlInsertTopicMessage.setBytes(4, transactionBody.getMessage().toByteArray()); - sqlInsertTopicMessage.setBytes(5, receipt.getTopicRunningHash().toByteArray()); - sqlInsertTopicMessage.setLong(6, receipt.getTopicSequenceNumber()); - sqlInsertTopicMessage.addBatch(); + TopicMessage topicMessage = new TopicMessage( + Utility.timeStampInNanos(transactionRecord.getConsensusTimestamp()), + transactionBody.getMessage().toByteArray(), (int) topicId.getRealmNum(), + receipt.getTopicRunningHash().toByteArray(), receipt.getTopicSequenceNumber(), + (int) topicId.getTopicNum()); + postgresWriter.onTopicMessage(topicMessage); } private static void insertFileCreate(long consensusTimestamp, FileCreateTransactionBody transactionBody, - TransactionRecord transactionRecord) throws SQLException { + TransactionRecord transactionRecord) { if (parserProperties.isPersistFiles() || (parserProperties.isPersistSystemFiles() && transactionRecord.getReceipt().getFileID() .getFileNum() < 1000)) { byte[] contents = transactionBody.getContents().toByteArray(); - sqlInsertFileData.setLong(F_FILE_DATA.CONSENSUS_TIMESTAMP.ordinal(), consensusTimestamp); - sqlInsertFileData.setBytes(F_FILE_DATA.FILE_DATA.ordinal(), contents); - sqlInsertFileData.addBatch(); + postgresWriter.onFileData(new FileData(consensusTimestamp, contents)); } } private static void insertFileAppend(long consensusTimestamp, FileAppendTransactionBody transactionBody) - throws SQLException, IOException { + throws IOException { if (parserProperties.isPersistFiles() || (parserProperties.isPersistSystemFiles() && transactionBody.getFileID().getFileNum() < 1000)) { byte[] contents = transactionBody.getContents().toByteArray(); - sqlInsertFileData.setLong(F_FILE_DATA.CONSENSUS_TIMESTAMP.ordinal(), consensusTimestamp); - sqlInsertFileData.setBytes(F_FILE_DATA.FILE_DATA.ordinal(), contents); - sqlInsertFileData.addBatch(); + postgresWriter.onFileData(new FileData(consensusTimestamp, contents)); // update the local address book if (isFileAddressBook(transactionBody.getFileID())) { @@ -791,13 +766,10 @@ private static void insertFileAppend(long consensusTimestamp, FileAppendTransact } private static void insertCryptoAddClaim(long consensusTimestamp, - CryptoAddClaimTransactionBody transactionBody) throws SQLException { + CryptoAddClaimTransactionBody transactionBody) { if (parserProperties.isPersistClaims()) { byte[] claim = transactionBody.getClaim().getHash().toByteArray(); - - sqlInsertClaimData.setLong(F_LIVEHASH_DATA.CONSENSUS_TIMESTAMP.ordinal(), consensusTimestamp); - sqlInsertClaimData.setBytes(F_LIVEHASH_DATA.LIVEHASH.ordinal(), claim); - sqlInsertClaimData.addBatch(); + postgresWriter.onLiveHash(new LiveHash(consensusTimestamp, claim)); } } @@ -813,9 +785,7 @@ private static void insertContractCall(long consensusTimestamp, callResult = transactionRecord.getContractCallResult().toByteArray(); gasUsed = transactionRecord.getContractCallResult().getGasUsed(); } - - insertContractResults(sqlInsertContractCall, consensusTimestamp, functionParams, gasSupplied, callResult, - gasUsed); + insertContractResults(consensusTimestamp, functionParams, gasSupplied, callResult, gasUsed); } } @@ -831,9 +801,7 @@ private static void insertContractCreateInstance(long consensusTimestamp, callResult = transactionRecord.getContractCreateResult().toByteArray(); gasUsed = transactionRecord.getContractCreateResult().getGasUsed(); } - - insertContractResults(sqlInsertContractCall, consensusTimestamp, functionParams, gasSupplied, callResult, - gasUsed); + insertContractResults(consensusTimestamp, functionParams, gasSupplied, callResult, gasUsed); } } @@ -851,8 +819,7 @@ private static void insertCryptoCreateTransferList(long consensusTimestamp, TransactionRecord txRecord, TransactionBody body, AccountID createdAccountId, - AccountID payerAccountId) - throws SQLException { + AccountID payerAccountId) { long initialBalance = 0; long createdAccountNum = 0; @@ -868,7 +835,6 @@ private static void insertCryptoCreateTransferList(long consensusTimestamp, TransferList transferList = txRecord.getTransferList(); for (int i = 0; i < transferList.getAccountAmountsCount(); ++i) { var aa = transferList.getAccountAmounts(i); - long amount = aa.getAmount(); var accountId = aa.getAccountID(); long accountNum = accountId.getAccountNum(); createEntity(getEntity(accountId)); @@ -899,14 +865,12 @@ private static boolean isFileAddressBook(FileID fileId) { } private static void insertFileUpdate(long consensusTimestamp, FileUpdateTransactionBody transactionBody) - throws SQLException, IOException { + throws IOException { FileID fileId = transactionBody.getFileID(); if (parserProperties.isPersistFiles() || (parserProperties.isPersistSystemFiles() && fileId.getFileNum() < 1000)) { byte[] contents = transactionBody.getContents().toByteArray(); - sqlInsertFileData.setLong(F_FILE_DATA.CONSENSUS_TIMESTAMP.ordinal(), consensusTimestamp); - sqlInsertFileData.setBytes(F_FILE_DATA.FILE_DATA.ordinal(), contents); - sqlInsertFileData.addBatch(); + postgresWriter.onFileData(new FileData(consensusTimestamp, contents)); } // update the local address book @@ -941,27 +905,16 @@ private static int getTransactionType(TransactionBody body) { return dataCase.getNumber(); } - public static void insertContractResults(PreparedStatement insert, long consensusTimestamp, - byte[] functionParams, long gasSupplied, - byte[] callResult, long gasUsed) throws SQLException { - insert.setLong(F_CONTRACT_CALL.CONSENSUS_TIMESTAMP.ordinal(), consensusTimestamp); - insert.setBytes(F_CONTRACT_CALL.FUNCTION_PARAMS.ordinal(), functionParams); - insert.setLong(F_CONTRACT_CALL.GAS_SUPPLIED.ordinal(), gasSupplied); - insert.setBytes(F_CONTRACT_CALL.CALL_RESULT.ordinal(), callResult); - insert.setLong(F_CONTRACT_CALL.GAS_USED.ordinal(), gasUsed); - - insert.addBatch(); + private static void insertContractResults( + long consensusTimestamp, byte[] functionParams, long gasSupplied, byte[] callResult, long gasUsed) { + postgresWriter.onContractResult( + new ContractResult(consensusTimestamp, functionParams, gasSupplied, callResult, gasUsed)); } private static void executeBatches() throws SQLException { int[] transactions = sqlInsertTransaction.executeBatch(); - int[] fileData = sqlInsertFileData.executeBatch(); - int[] contractCalls = sqlInsertContractCall.executeBatch(); - int[] claimData = sqlInsertClaimData.executeBatch(); - int[] topicMessages = sqlInsertTopicMessage.executeBatch(); postgresWriter.executeBatches(); - log.info("Inserted {} transactions, {} files, {} contracts, {} claims, {} topic messages", - transactions.length, fileData.length, contractCalls.length, claimData.length, topicMessages.length); + log.info("Inserted {} transactions", transactions.length); } public static Entities getEntity(AccountID accountID) { diff --git a/hedera-mirror-importer/src/test/java/com/hedera/mirror/importer/parser/record/PostgresWritingRecordParserItemHandlerTest.java b/hedera-mirror-importer/src/test/java/com/hedera/mirror/importer/parser/record/PostgresWritingRecordParserItemHandlerTest.java index 4133c4bdc4b..55cf0002105 100644 --- a/hedera-mirror-importer/src/test/java/com/hedera/mirror/importer/parser/record/PostgresWritingRecordParserItemHandlerTest.java +++ b/hedera-mirror-importer/src/test/java/com/hedera/mirror/importer/parser/record/PostgresWritingRecordParserItemHandlerTest.java @@ -25,17 +25,29 @@ import java.sql.Connection; import java.util.Optional; import javax.annotation.Resource; + +import com.hedera.mirror.importer.domain.ContractResult; +import com.hedera.mirror.importer.domain.FileData; + +import com.hedera.mirror.importer.domain.LiveHash; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.data.repository.CrudRepository; import org.springframework.test.context.jdbc.Sql; +import org.testcontainers.shaded.org.bouncycastle.util.Strings; import com.hedera.mirror.importer.IntegrationTest; import com.hedera.mirror.importer.domain.CryptoTransfer; import com.hedera.mirror.importer.domain.NonFeeTransfer; +import com.hedera.mirror.importer.domain.TopicMessage; +import com.hedera.mirror.importer.repository.ContractResultRepository; import com.hedera.mirror.importer.repository.CryptoTransferRepository; +import com.hedera.mirror.importer.repository.FileDataRepository; +import com.hedera.mirror.importer.repository.LiveHashRepository; import com.hedera.mirror.importer.repository.NonFeeTransferRepository; +import com.hedera.mirror.importer.repository.TopicMessageRepository; import com.hedera.mirror.importer.util.DatabaseUtilities; @Sql(executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD, scripts = "classpath:db/scripts/cleanup.sql") @@ -48,6 +60,18 @@ public class PostgresWritingRecordParserItemHandlerTest extends IntegrationTest @Resource protected NonFeeTransferRepository nonFeeTransferRepository; + @Resource + protected ContractResultRepository contractResultRepository; + + @Resource + protected LiveHashRepository liveHashRepository; + + @Resource + protected FileDataRepository fileDataRepository; + + @Resource + protected TopicMessageRepository topicMessageRepository; + @Resource protected PostgresWritingRecordParsedItemHandler postgresWriter; @@ -104,6 +128,65 @@ void onNonFeeTransfer() throws Exception { assertExistsAndEquals(nonFeeTransferRepository, nonFeeTransfer2, 2L); } + @Test + void onTopicMessage() throws Exception { + // setup + byte[] message = Strings.toByteArray("test message"); + byte[] runningHash = Strings.toByteArray("running hash"); + TopicMessage expectedTopicMessage = new TopicMessage(1L, message, 0, runningHash, 10L, 1001); + + // when + postgresWriter.onTopicMessage(expectedTopicMessage); + completeFileAndCommit(); + + // expect + assertEquals(1, topicMessageRepository.count()); + assertExistsAndEquals(topicMessageRepository, expectedTopicMessage, 1L); + } + + @Test + void onFileData() throws Exception { + // setup + FileData expectedFileData = new FileData(11L, Strings.toByteArray("file data")); + + // when + postgresWriter.onFileData(expectedFileData); + completeFileAndCommit(); + + // expect + assertEquals(1, fileDataRepository.count()); + assertExistsAndEquals(fileDataRepository, expectedFileData, 11L); + } + + @Test + void onContractResult() throws Exception { + // setup + ContractResult expectedContractResult = new ContractResult(15L, Strings.toByteArray("function parameters"), + 10000L, Strings.toByteArray("call result"), 10000L); + + // when + postgresWriter.onContractResult(expectedContractResult); + completeFileAndCommit(); + + // expect + assertEquals(1, contractResultRepository.count()); + assertExistsAndEquals(contractResultRepository, expectedContractResult, 15L); + } + + @Test + void onLiveHash() throws Exception { + // setup + LiveHash expectedLiveHash = new LiveHash(20L, Strings.toByteArray("live hash")); + + // when + postgresWriter.onLiveHash(expectedLiveHash); + completeFileAndCommit(); + + // expect + assertEquals(1, liveHashRepository.count()); + assertExistsAndEquals(liveHashRepository, expectedLiveHash, 20L); + } + @Test void rollback() throws Exception { // when