Skip to content

Commit

Permalink
fix: chunked transactions
Browse files Browse the repository at this point in the history
Signed-off-by: Ricky Saechao <[email protected]>
  • Loading branch information
RickyLB committed Feb 5, 2024
1 parent f47f427 commit c351c2f
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 32 deletions.
16 changes: 10 additions & 6 deletions Sources/Hedera/ChunkedTransaction.swift
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ public class ChunkedTransaction: Transaction {
let resp = try await executeAny(
client,
FirstChunkView(transaction: self, totalChunks: usedChunks),
timeoutPerChunk
timeoutPerChunk,
nil,
0
)

if waitForReceipt {
Expand All @@ -157,7 +159,9 @@ public class ChunkedTransaction: Transaction {
ChunkView(
transaction: self, initialTransactionId: initialTransactionId, currentChunk: chunk,
totalChunks: usedChunks),
timeoutPerChunk
timeoutPerChunk,
initialTransactionId,
chunk
)

if waitForReceipt {
Expand Down Expand Up @@ -260,16 +264,16 @@ extension ChunkedTransaction.ChunkView: Execute {
) {
assert(transaction.isFrozen)

guard let transactionId = transactionId else {
throw HError.noPayerAccountOrTransactionId
}
let currentTransactionId = TransactionId(
accountId: initialTransactionId.accountId,
validStart: initialTransactionId.validStart.adding(nanos: UInt64(currentChunk)))

return transaction.makeRequestInner(
chunkInfo: .init(
current: currentChunk,
total: totalChunks,
initialTransactionId: initialTransactionId,
currentTransactionId: transactionId,
currentTransactionId: currentTransactionId,
nodeAccountId: nodeAccountId
)
)
Expand Down
33 changes: 26 additions & 7 deletions Sources/Hedera/Execute.swift
Original file line number Diff line number Diff line change
Expand Up @@ -91,23 +91,26 @@ private struct ExecuteContext {
fileprivate let grpcTimeout: Duration?
}

internal func executeAny<E: Execute & ValidateChecksums>(_ client: Client, _ executable: E, _ timeout: TimeInterval?)
internal func executeAny<E: Execute & ValidateChecksums>(
_ client: Client, _ executable: E, _ timeout: TimeInterval?, _ initialTransactionId: TransactionId?, _ index: Int
)
async throws -> E.Response
{
let timeout = timeout ?? LegacyExponentialBackoff.defaultMaxElapsedTime

if client.isAutoValidateChecksumsEnabled() {
try executable.validateChecksums(on: client)
}

let operatorAccountId: AccountId?

// Where the transactionId is set
do {
if executable.explicitTransactionId != nil
|| !(executable.regenerateTransactionId ?? client.defaultRegenerateTransactionId)
{
operatorAccountId = nil
} else {
operatorAccountId = executable.operatorAccountId ?? client.operator?.accountId
operatorAccountId = initialTransactionId?.accountId ?? client.operator?.accountId
}
}

Expand All @@ -119,6 +122,13 @@ internal func executeAny<E: Execute & ValidateChecksums>(_ client: Client, _ exe
maxElapsedTime: .limited(timeout)
)

var firstTransactionId: TransactionId? = nil

// Keep track of the initialTransactionId to reference in chunked Transactions.
if let initialTransactionId = initialTransactionId {
firstTransactionId = initialTransactionId
}

// let backoff = client.backoff();
// let mut backoff_builder = ExponentialBackoffBuilder::new();

Expand All @@ -138,18 +148,23 @@ internal func executeAny<E: Execute & ValidateChecksums>(_ client: Client, _ exe
maxAttempts: backoff.maxAttempts,
grpcTimeout: nil
),
executable: executable)
executable: executable,
initialTransactionId: firstTransactionId,
index: index)
}

private func executeAnyInner<E: Execute>(ctx: ExecuteContext, executable: E) async throws -> E.Response {
private func executeAnyInner<E: Execute>(
ctx: ExecuteContext, executable: E, initialTransactionId: TransactionId?, index: Int
) async throws -> E.Response {
let explicitTransactionId = executable.explicitTransactionId

var backoff = ctx.backoffConfig
var lastError: HError?

var transactionId =
executable.requiresTransactionId
? (explicitTransactionId ?? executable.operatorAccountId.map(TransactionId.generateFrom)
? (explicitTransactionId ?? TransactionId.generateFromInitial(initialTransactionId, index) ?? executable
.operatorAccountId.map(TransactionId.generateFrom)
?? ctx.operatorAccountId.map(TransactionId.generateFrom)) : nil

let explicitNodeIndexes = try executable.nodeAccountIds.map(ctx.network.nodeIndexesForIds)
Expand Down Expand Up @@ -216,6 +231,7 @@ private func executeAnyInner<E: Execute>(ctx: ExecuteContext, executable: E) asy
&& ctx.operatorAccountId != nil:
// the transaction that was generated has since expired
// re-generate the transaction ID and try again, immediately

lastError = executable.makeErrorPrecheck(precheckStatus, transactionId)
transactionId =
executable.operatorAccountId.map(TransactionId.generateFrom)
Expand Down Expand Up @@ -244,6 +260,7 @@ private func executeAnyInner<E: Execute>(ctx: ExecuteContext, executable: E) asy

try await Task.sleep(nanoseconds: UInt64(timeout * 1e9))
}

}

internal func randomIndexes(upTo count: Int, amount: Int) -> [Int] {
Expand Down Expand Up @@ -292,7 +309,9 @@ private struct NodeIndexesGeneratorMap: AsyncSequence, AsyncIteratorProtocol {
maxAttempts: ctx.maxAttempts,
grpcTimeout: ctx.grpcTimeout
),
executable: request
executable: request,
initialTransactionId: nil,
index: 0
)

return res != nil
Expand Down
4 changes: 3 additions & 1 deletion Sources/Hedera/PingQuery.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ internal struct PingQuery {
private let nodeAccountId: AccountId

internal func execute(_ client: Client, timeout: TimeInterval? = nil) async throws {
try await executeAny(client, self, timeout)
try await executeAny(client, self, timeout, nil, 0)
}
}

Expand All @@ -55,6 +55,8 @@ extension PingQuery: Execute {

internal var explicitTransactionId: TransactionId? { nil }

internal var firstTransactionId: TransactionId? { nil }

internal var operatorAccountId: AccountId? {
nil
}
Expand Down
6 changes: 5 additions & 1 deletion Sources/Hedera/Query.swift
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public class Query<Response>: ValidateChecksums {
try payment.freezeWith(client)
}

return try await executeAny(client, self, timeout)
return try await executeAny(client, self, timeout, nil, 0)
}

internal func validateChecksums(on ledgerId: LedgerId) throws {
Expand All @@ -197,6 +197,10 @@ extension Query: Execute {
payment.transactionId
}

internal var firstTransactionId: TransactionId? {
payment.transactionId
}

internal var requiresTransactionId: Bool {
self.requiresPayment
}
Expand Down
4 changes: 3 additions & 1 deletion Sources/Hedera/QueryCost.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ internal struct QueryCost<T, U: Query<T>> {
}

internal func execute(_ client: Client, _ timeout: TimeInterval? = nil) async throws -> Response {
try await executeAny(client, self, timeout)
try await executeAny(client, self, timeout, nil, 0)
}
}

Expand All @@ -49,6 +49,8 @@ extension QueryCost: Execute {

internal var explicitTransactionId: TransactionId? { nil }

internal var firstTransactionId: TransactionId? { nil }

internal var operatorAccountId: AccountId? {
nil
}
Expand Down
59 changes: 48 additions & 11 deletions Sources/Hedera/Transaction.swift
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,6 @@ public class Transaction: ValidateChecksums {
return self
}

guard let transactionId = self.transactionId ?? client?.operator?.generateTransactionId() else {
throw HError(
kind: .noPayerAccountOrTransactionId,
description: "transaction frozen without client or explicit transaction ID")
}

guard let nodeAccountIds = self.nodeAccountIds ?? client?.net.randomNodeIds() else {
throw HError(
kind: .freezeUnsetNodeAccountIds, description: "transaction frozen without client or explicit node IDs")
Expand All @@ -327,15 +321,12 @@ public class Transaction: ValidateChecksums {

let `operator` = client?.operator

self.transactionId = transactionId
self.nodeAccountIds = nodeAccountIds
self.maxTransactionFee = maxTransactionFee
self.`operator` = `operator`

isFrozen = true

self.sources = try TransactionSources.init(transactions: try self.makeTransactionList())

if client?.isAutoValidateChecksumsEnabled() == true {
try validateChecksums(on: client!)
}
Expand Down Expand Up @@ -363,7 +354,7 @@ public class Transaction: ValidateChecksums {
return try await SourceTransaction(inner: self, sources: sources).execute(client, timeout: timeout)
}

return try await executeAny(client, self, timeout)
return try await executeAny(client, self, timeout, nil, 0)
}

public static func fromBytes(_ bytes: Data) throws -> Transaction {
Expand Down Expand Up @@ -530,7 +521,6 @@ extension Transaction {

internal func makeRequestInner(chunkInfo: ChunkInfo) -> (Proto_Transaction, TransactionHash) {
assert(self.isFrozen)

let body: Proto_TransactionBody = self.toTransactionBodyProtobuf(chunkInfo)

// swiftlint:disable:next force_try
Expand Down Expand Up @@ -698,3 +688,50 @@ private func protoTransactionBodyEqual(_ lhs: Proto_TransactionBody, _ rhs: Prot

return true
}

protocol TransactionIdCollection {
associatedtype TransactionId
func append(_ item: TransactionId)
var count: Int { get }
func getTransactionId(at index: Int) -> TransactionId?

// You can add more methods as required.
}

class TransactionIdArray<T>: TransactionIdCollection {
private var array: [T] = []
private let lock = NSLock()

func append(_ item: T) {
lock.lock()
defer { lock.unlock() }
array.append(item)
}

var count: Int {
lock.lock()
defer { lock.unlock() }
return array.count
}

func getTransactionId(at index: Int) -> T? {
lock.lock()
defer { lock.unlock() }
guard index >= 0 && index < array.count else { return nil }
return array[index]
}

func performLocked(_ closure: () -> Void) {
lock.lock()
defer { lock.unlock() }
closure()
}

var isEmpty: Bool {
lock.lock()
defer { lock.unlock() }
return array.isEmpty
}

// Implement other protocol methods here.
}
5 changes: 3 additions & 2 deletions Sources/Hedera/Transaction/TransactionSources.swift
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ internal struct TransactionSources: Sendable {
(0..<chunksCount).lazy.map { SourceChunk(map: self.guts, index: $0) }
}
}

let fileName = ("\(#file)" as NSString).lastPathComponent
extension TransactionSources {
// this is every bit as insane as the rust method I ported it from :/
// swiftlint:disable:next function_body_length
Expand Down Expand Up @@ -239,6 +239,7 @@ extension TransactionSources {

for chunk in chunks {
let transactionId = transactionInfo[chunk.startIndex].transactionId

guard !transactionIdsTmp.contains(transactionId) else {
throw HError.fromProtobuf("duplicate transaction ID between chunked transaction chunks")
}
Expand Down Expand Up @@ -358,7 +359,7 @@ internal struct SourceTransaction<Tx: Transaction> {

for chunk in sources.chunks {
let response = try await executeAny(
client, SourceTransactionExecuteView(inner: inner, chunk: chunk), timeoutPerChunk)
client, SourceTransactionExecuteView(inner: inner, chunk: chunk), timeoutPerChunk, nil, 0)

if waitForReceipt {
_ = try await response.getReceipt(client)
Expand Down
10 changes: 10 additions & 0 deletions Sources/Hedera/TransactionId.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ public struct TransactionId: Sendable, Equatable, Hashable, ExpressibleByStringL
return Self(accountId: accountId, validStart: validStart)
}

/// Generates a new transaction ID for chunks.
public static func generateFromInitial(_ initialTransactionId: TransactionId?, _ index: Int) -> Self? {
guard let initialTransactionId = initialTransactionId else {
return nil
}
return TransactionId(
accountId: initialTransactionId.accountId,
validStart: initialTransactionId.validStart.adding(nanos: UInt64(index)))
}

/// Creates a new transaction Id with the given account id and valid start.
public static func withValidStart(_ accountId: AccountId, _ validStart: Timestamp) -> Self {
Self(accountId: accountId, validStart: validStart)
Expand Down
6 changes: 3 additions & 3 deletions Tests/HederaE2ETests/Topic/TopicMessageSubmit.swift
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ internal class TopicMessageSubmit: XCTestCase {
let message = "12" // 2 bytes message
let chunkSize = 1 // 1 byte chunk size
let transactionId = TransactionId.generateFrom(payerAccountId) // custom transactionId

// Transaction creation
let transaction = TopicMessageSubmitTransaction()
.transactionId(transactionId)
Expand All @@ -180,7 +179,8 @@ internal class TopicMessageSubmit: XCTestCase {

let txResponses = try await doubleSignedTx.executeAll(payerClient)

// Compare the transaction payer accounts with each other
XCTAssertEqual(txResponses[0].transactionId.accountId, txResponses[1].transactionId.accountId)
// Checks if TransactionId have the same operator and if the second transactionId valid start
// is incremented by 1
XCTAssertEqual(TransactionId.generateFromInitial(txResponses[0].transactionId, 1), txResponses[1].transactionId)
}
}

0 comments on commit c351c2f

Please sign in to comment.