From 791fcafa6010269926503e40539f57dc8766079a Mon Sep 17 00:00:00 2001 From: Morty Date: Mon, 30 Dec 2024 21:03:57 +0800 Subject: [PATCH 01/29] feat(coordinator): assign static prover first and avoid reassigning failing task to same prover --- coordinator/conf/config.json | 1 + coordinator/internal/config/config.go | 2 + .../logic/provertask/batch_prover_task.go | 57 ++++++-- .../logic/provertask/bundle_prover_task.go | 57 ++++++-- .../logic/provertask/chunk_prover_task.go | 53 +++++-- .../internal/logic/provertask/prover_task.go | 5 + coordinator/internal/orm/batch.go | 60 ++++---- coordinator/internal/orm/bundle.go | 49 ++++--- coordinator/internal/orm/chunk.go | 48 ++++--- coordinator/internal/orm/prover_task.go | 17 +++ rollup/config.json | 130 ++++++++++++++++++ 11 files changed, 371 insertions(+), 108 deletions(-) create mode 100644 rollup/config.json diff --git a/coordinator/conf/config.json b/coordinator/conf/config.json index de8944a7b9..fb738926c6 100644 --- a/coordinator/conf/config.json +++ b/coordinator/conf/config.json @@ -2,6 +2,7 @@ "prover_manager": { "provers_per_session": 1, "session_attempts": 5, + "external_prover_threshold": 32, "bundle_collection_time_sec": 180, "batch_collection_time_sec": 180, "chunk_collection_time_sec": 180, diff --git a/coordinator/internal/config/config.go b/coordinator/internal/config/config.go index dbdaa40b02..52a9158d0b 100644 --- a/coordinator/internal/config/config.go +++ b/coordinator/internal/config/config.go @@ -16,6 +16,8 @@ type ProverManager struct { // Number of attempts that a session can be retried if previous attempts failed. // Currently we only consider proving timeout as failure here. SessionAttempts uint8 `json:"session_attempts"` + // Threshold for activating the external prover based on unassigned task count. + ExternalProverThreshold int64 `json:"external_prover_threshold"` // Zk verifier config. Verifier *VerifierConfig `json:"verifier"` // BatchCollectionTimeSec batch Proof collection time (in seconds). diff --git a/coordinator/internal/logic/provertask/batch_prover_task.go b/coordinator/internal/logic/provertask/batch_prover_task.go index 7a472c4baf..3ecd053a11 100644 --- a/coordinator/internal/logic/provertask/batch_prover_task.go +++ b/coordinator/internal/logic/provertask/batch_prover_task.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "time" "github.com/gin-gonic/gin" @@ -63,29 +64,59 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato maxActiveAttempts := bp.cfg.ProverManager.ProversPerSession maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts + if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) { + unassignedBatchCount, err := bp.batchOrm.GetUnassignedBatchCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) + if err != nil { + log.Error("failed to get unassigned chunk proving tasks count", "height", getTaskParameter.ProverHeight, "err", err) + return nil, ErrCoordinatorInternalFailure + } + // Assign external prover if unassigned task number exceeds threshold + if unassignedBatchCount < bp.cfg.ProverManager.ExternalProverThreshold { + return nil, nil + } + } + var batchTask *orm.Batch for i := 0; i < 5; i++ { var getTaskError error var tmpBatchTask *orm.Batch - tmpBatchTask, getTaskError = bp.batchOrm.GetAssignedBatch(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) + var assignedOffset, unassignedOffset = 0, 0 + tmpAssignedBatchTasks, getTaskError := bp.batchOrm.GetAssignedBatches(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50) if getTaskError != nil { - log.Error("failed to get assigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) + log.Error("failed to get assigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) return nil, ErrCoordinatorInternalFailure } - // Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned` - // batch to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql. - if tmpBatchTask == nil { - tmpBatchTask, getTaskError = bp.batchOrm.GetUnassignedBatch(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) - if getTaskError != nil { - log.Error("failed to get unassigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) - return nil, ErrCoordinatorInternalFailure - } + // chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql. + tmpUnassignedBatchTask, getTaskError := bp.batchOrm.GetUnassignedBatches(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50) + if getTaskError != nil { + log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) + return nil, ErrCoordinatorInternalFailure } + for { + tmpBatchTask = nil + if assignedOffset < len(tmpAssignedBatchTasks) { + tmpBatchTask = tmpAssignedBatchTasks[assignedOffset] + assignedOffset++ + } else if unassignedOffset < len(tmpUnassignedBatchTask) { + tmpBatchTask = tmpUnassignedBatchTask[unassignedOffset] + unassignedOffset++ + } + + if tmpBatchTask == nil { + log.Debug("get empty batch", "height", getTaskParameter.ProverHeight) + return nil, nil + } - if tmpBatchTask == nil { - log.Debug("get empty batch", "height", getTaskParameter.ProverHeight) - return nil, nil + // Don't dispatch the same failing job to the same prover + proverTask, err := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBatchTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) + if err != nil { + log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBatchTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", err) + return nil, ErrCoordinatorInternalFailure + } + if proverTask == nil || types.ProverProveStatus(proverTask.ProvingStatus) != types.ProverProofInvalid { + break + } } rowsAffected, updateAttemptsErr := bp.batchOrm.UpdateBatchAttempts(ctx.Copy(), tmpBatchTask.Index, tmpBatchTask.ActiveAttempts, tmpBatchTask.TotalAttempts) diff --git a/coordinator/internal/logic/provertask/bundle_prover_task.go b/coordinator/internal/logic/provertask/bundle_prover_task.go index c8901d2d4e..c7c609da84 100644 --- a/coordinator/internal/logic/provertask/bundle_prover_task.go +++ b/coordinator/internal/logic/provertask/bundle_prover_task.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "time" "github.com/gin-gonic/gin" @@ -63,29 +64,59 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat maxActiveAttempts := bp.cfg.ProverManager.ProversPerSession maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts + if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) { + unassignedBundleCount, err := bp.bundleOrm.GetUnassignedBundleCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) + if err != nil { + log.Error("failed to get unassigned chunk proving tasks count", "height", getTaskParameter.ProverHeight, "err", err) + return nil, ErrCoordinatorInternalFailure + } + // Assign external prover if unassigned task number exceeds threshold + if unassignedBundleCount < bp.cfg.ProverManager.ExternalProverThreshold { + return nil, nil + } + } + var bundleTask *orm.Bundle for i := 0; i < 5; i++ { var getTaskError error var tmpBundleTask *orm.Bundle - tmpBundleTask, getTaskError = bp.bundleOrm.GetAssignedBundle(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) + var assignedOffset, unassignedOffset = 0, 0 + tmpAssignedBundleTasks, getTaskError := bp.bundleOrm.GetAssignedBundles(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50) if getTaskError != nil { - log.Error("failed to get assigned bundle proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) + log.Error("failed to get assigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) return nil, ErrCoordinatorInternalFailure } - // Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned` - // bundle to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql. - if tmpBundleTask == nil { - tmpBundleTask, getTaskError = bp.bundleOrm.GetUnassignedBundle(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) - if getTaskError != nil { - log.Error("failed to get unassigned bundle proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) - return nil, ErrCoordinatorInternalFailure - } + // chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql. + tmpUnassignedBundleTask, getTaskError := bp.bundleOrm.GetUnassignedBundles(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50) + if getTaskError != nil { + log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) + return nil, ErrCoordinatorInternalFailure } + for { + tmpBundleTask = nil + if assignedOffset < len(tmpAssignedBundleTasks) { + tmpBundleTask = tmpAssignedBundleTasks[assignedOffset] + assignedOffset++ + } else if unassignedOffset < len(tmpUnassignedBundleTask) { + tmpBundleTask = tmpUnassignedBundleTask[unassignedOffset] + unassignedOffset++ + } + + if tmpBundleTask == nil { + log.Debug("get empty bundle", "height", getTaskParameter.ProverHeight) + return nil, nil + } - if tmpBundleTask == nil { - log.Debug("get empty bundle", "height", getTaskParameter.ProverHeight) - return nil, nil + // Don't dispatch the same failing job to the same prover + proverTask, err := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBundleTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) + if err != nil { + log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBundleTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", err) + return nil, ErrCoordinatorInternalFailure + } + if proverTask == nil || types.ProverProveStatus(proverTask.ProvingStatus) != types.ProverProofInvalid { + break + } } rowsAffected, updateAttemptsErr := bp.bundleOrm.UpdateBundleAttempts(ctx.Copy(), tmpBundleTask.Hash, tmpBundleTask.ActiveAttempts, tmpBundleTask.TotalAttempts) diff --git a/coordinator/internal/logic/provertask/chunk_prover_task.go b/coordinator/internal/logic/provertask/chunk_prover_task.go index 56e82a91d3..b48dba38f3 100644 --- a/coordinator/internal/logic/provertask/chunk_prover_task.go +++ b/coordinator/internal/logic/provertask/chunk_prover_task.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "time" "github.com/gin-gonic/gin" @@ -61,29 +62,59 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato maxActiveAttempts := cp.cfg.ProverManager.ProversPerSession maxTotalAttempts := cp.cfg.ProverManager.SessionAttempts + if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) { + unassignedChunkCount, err := cp.chunkOrm.GetUnassignedChunkCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight) + if err != nil { + log.Error("failed to get unassigned chunk proving tasks count", "height", getTaskParameter.ProverHeight, "err", err) + return nil, ErrCoordinatorInternalFailure + } + // Assign external prover if unassigned task number exceeds threshold + if unassignedChunkCount < cp.cfg.ProverManager.ExternalProverThreshold { + return nil, nil + } + } + var chunkTask *orm.Chunk for i := 0; i < 5; i++ { var getTaskError error var tmpChunkTask *orm.Chunk - tmpChunkTask, getTaskError = cp.chunkOrm.GetAssignedChunk(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight) + var assignedOffset, unassignedOffset = 0, 0 + tmpAssignedChunkTasks, getTaskError := cp.chunkOrm.GetAssignedChunks(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight, 50) if getTaskError != nil { log.Error("failed to get assigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) return nil, ErrCoordinatorInternalFailure } - // Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned` // chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql. - if tmpChunkTask == nil { - tmpChunkTask, getTaskError = cp.chunkOrm.GetUnassignedChunk(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight) - if getTaskError != nil { - log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) - return nil, ErrCoordinatorInternalFailure - } + tmpUnassignedChunkTask, getTaskError := cp.chunkOrm.GetUnassignedChunk(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight, 50) + if getTaskError != nil { + log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) + return nil, ErrCoordinatorInternalFailure } + for { + tmpChunkTask = nil + if assignedOffset < len(tmpAssignedChunkTasks) { + tmpChunkTask = tmpAssignedChunkTasks[assignedOffset] + assignedOffset++ + } else if unassignedOffset < len(tmpUnassignedChunkTask) { + tmpChunkTask = tmpUnassignedChunkTask[unassignedOffset] + unassignedOffset++ + } + + if tmpChunkTask == nil { + log.Debug("get empty chunk", "height", getTaskParameter.ProverHeight) + return nil, nil + } - if tmpChunkTask == nil { - log.Debug("get empty chunk", "height", getTaskParameter.ProverHeight) - return nil, nil + // Don't dispatch the same failing job to the same prover + proverTask, err := cp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeChunk, tmpChunkTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) + if err != nil { + log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeChunk.String(), "taskID", tmpChunkTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", err) + return nil, ErrCoordinatorInternalFailure + } + if proverTask == nil || types.ProverProveStatus(proverTask.ProvingStatus) != types.ProverProofInvalid { + break + } } rowsAffected, updateAttemptsErr := cp.chunkOrm.UpdateChunkAttempts(ctx.Copy(), tmpChunkTask.Index, tmpChunkTask.ActiveAttempts, tmpChunkTask.TotalAttempts) diff --git a/coordinator/internal/logic/provertask/prover_task.go b/coordinator/internal/logic/provertask/prover_task.go index 14568e120e..507f3cad0d 100644 --- a/coordinator/internal/logic/provertask/prover_task.go +++ b/coordinator/internal/logic/provertask/prover_task.go @@ -27,6 +27,11 @@ var ( getTaskCounterVec *prometheus.CounterVec = nil ) +var ( + // ExternalProverNamePrefix prefix of prover name + ExternalProverNamePrefix = "external" +) + // ProverTask the interface of a collector who send data to prover type ProverTask interface { Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (*coordinatorType.GetTaskSchema, error) diff --git a/coordinator/internal/orm/batch.go b/coordinator/internal/orm/batch.go index 3dd8412e58..147b9f6e92 100644 --- a/coordinator/internal/orm/batch.go +++ b/coordinator/internal/orm/batch.go @@ -78,38 +78,47 @@ func (*Batch) TableName() string { return "batch" } -// GetUnassignedBatch retrieves unassigned batch based on the specified limit. +// GetUnassignedBatches retrieves unassigned batches based on the specified limit. // The returned batches are sorted in ascending order by their index. -func (o *Batch) GetUnassignedBatch(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) { - var batch Batch +func (o *Batch) GetUnassignedBatches(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, limit uint64) ([]*Batch, error) { + var batch []*Batch db := o.db.WithContext(ctx) - sql := fmt.Sprintf("SELECT * FROM batch WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND chunk_proofs_status = %d AND batch.deleted_at IS NULL ORDER BY batch.index LIMIT 1;", - int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, int(types.ChunkProofsStatusReady)) + sql := fmt.Sprintf("SELECT * FROM batch WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND chunk_proofs_status = %d AND batch.deleted_at IS NULL ORDER BY batch.index LIMIT %d;", + int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, int(types.ChunkProofsStatusReady), limit) err := db.Raw(sql).Scan(&batch).Error if err != nil { - return nil, fmt.Errorf("Batch.GetUnassignedBatch error: %w", err) + return nil, fmt.Errorf("Batch.GetUnassignedBatches error: %w", err) } - if batch.Hash == "" { - return nil, nil + return batch, nil +} + +// GetUnassignedBatchCount retrieves unassigned batch count based on the specified limit. +func (o *Batch) GetUnassignedBatchCount(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (int64, error) { + var count int64 + db := o.db.WithContext(ctx) + db = db.Model(&Batch{}) + db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned)) + db = db.Where("total_attempts < ?", maxTotalAttempts) + db = db.Where("active_attempts < ?", maxActiveAttempts) + db = db.Where("batch.deleted_at IS NULL") + if err := db.Count(&count).Error; err != nil { + return 0, fmt.Errorf("Batch.GetUnassignedBatchCount error: %w", err) } - return &batch, nil + return count, nil } -// GetAssignedBatch retrieves assigned batch based on the specified limit. +// GetAssignedBatches retrieves assigned batches based on the specified limit. // The returned batches are sorted in ascending order by their index. -func (o *Batch) GetAssignedBatch(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) { - var batch Batch +func (o *Batch) GetAssignedBatches(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, limit uint64) ([]*Batch, error) { + var batch []*Batch db := o.db.WithContext(ctx) - sql := fmt.Sprintf("SELECT * FROM batch WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND chunk_proofs_status = %d AND batch.deleted_at IS NULL ORDER BY batch.index LIMIT 1;", - int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, int(types.ChunkProofsStatusReady)) + sql := fmt.Sprintf("SELECT * FROM batch WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND chunk_proofs_status = %d AND batch.deleted_at IS NULL ORDER BY batch.index LIMIT %d;", + int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, int(types.ChunkProofsStatusReady), limit) err := db.Raw(sql).Scan(&batch).Error if err != nil { - return nil, fmt.Errorf("Batch.GetAssignedBatch error: %w", err) - } - if batch.Hash == "" { - return nil, nil + return nil, fmt.Errorf("Batch.GetAssignedBatches error: %w", err) } - return &batch, nil + return batch, nil } // GetUnassignedAndChunksUnreadyBatches get the batches which is unassigned and chunks is not ready @@ -132,19 +141,6 @@ func (o *Batch) GetUnassignedAndChunksUnreadyBatches(ctx context.Context, offset return batches, nil } -// GetAssignedBatches retrieves all batches whose proving_status is either types.ProvingTaskAssigned. -func (o *Batch) GetAssignedBatches(ctx context.Context) ([]*Batch, error) { - db := o.db.WithContext(ctx) - db = db.Model(&Batch{}) - db = db.Where("proving_status = ?", int(types.ProvingTaskAssigned)) - - var assignedBatches []*Batch - if err := db.Find(&assignedBatches).Error; err != nil { - return nil, fmt.Errorf("Batch.GetAssignedBatches error: %w", err) - } - return assignedBatches, nil -} - // GetProvingStatusByHash retrieves the proving status of a batch given its hash. func (o *Batch) GetProvingStatusByHash(ctx context.Context, hash string) (types.ProvingStatus, error) { db := o.db.WithContext(ctx) diff --git a/coordinator/internal/orm/bundle.go b/coordinator/internal/orm/bundle.go index 5deeff1114..0bf6efa5b8 100644 --- a/coordinator/internal/orm/bundle.go +++ b/coordinator/internal/orm/bundle.go @@ -54,38 +54,47 @@ func (*Bundle) TableName() string { return "bundle" } -// GetUnassignedBundle retrieves unassigned bundle based on the specified limit. +// GetUnassignedBundles retrieves unassigned bundle based on the specified limit. // The returned batch sorts in ascending order by their index. -func (o *Bundle) GetUnassignedBundle(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Bundle, error) { - var bundle Bundle +func (o *Bundle) GetUnassignedBundles(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, limit uint64) ([]*Bundle, error) { + var bundle []*Bundle db := o.db.WithContext(ctx) - sql := fmt.Sprintf("SELECT * FROM bundle WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND batch_proofs_status = %d AND bundle.deleted_at IS NULL ORDER BY bundle.index LIMIT 1;", - int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, int(types.BatchProofsStatusReady)) + sql := fmt.Sprintf("SELECT * FROM bundle WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND batch_proofs_status = %d AND bundle.deleted_at IS NULL ORDER BY bundle.index LIMIT %d;", + int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, int(types.BatchProofsStatusReady), limit) err := db.Raw(sql).Scan(&bundle).Error if err != nil { - return nil, fmt.Errorf("Batch.GetUnassignedBundle error: %w", err) - } - if bundle.StartBatchHash == "" || bundle.EndBatchHash == "" { - return nil, nil + return nil, fmt.Errorf("Batch.GetUnassignedBundles error: %w", err) } - return &bundle, nil + return bundle, nil +} + +// GetUnassignedBundleCount retrieves unassigned bundle count based on the specified limit. +func (o *Bundle) GetUnassignedBundleCount(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (int64, error) { + var count int64 + db := o.db.WithContext(ctx) + db = db.Model(&Bundle{}) + db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned)) + db = db.Where("total_attempts < ?", maxTotalAttempts) + db = db.Where("active_attempts < ?", maxActiveAttempts) + db = db.Where("bundle.deleted_at IS NULL") + if err := db.Count(&count).Error; err != nil { + return 0, fmt.Errorf("Bundle.GetUnassignedBundleCount error: %w", err) + } + return count, nil } -// GetAssignedBundle retrieves assigned bundle based on the specified limit. +// GetAssignedBundles retrieves assigned bundles based on the specified limit. // The returned bundle sorts in ascending order by their index. -func (o *Bundle) GetAssignedBundle(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Bundle, error) { - var bundle Bundle +func (o *Bundle) GetAssignedBundles(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, limit uint64) ([]*Bundle, error) { + var bundle []*Bundle db := o.db.WithContext(ctx) - sql := fmt.Sprintf("SELECT * FROM bundle WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND batch_proofs_status = %d AND bundle.deleted_at IS NULL ORDER BY bundle.index LIMIT 1;", - int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, int(types.BatchProofsStatusReady)) + sql := fmt.Sprintf("SELECT * FROM bundle WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND batch_proofs_status = %d AND bundle.deleted_at IS NULL ORDER BY bundle.index LIMIT %d;", + int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, int(types.BatchProofsStatusReady), limit) err := db.Raw(sql).Scan(&bundle).Error if err != nil { - return nil, fmt.Errorf("Bundle.GetAssignedBatch error: %w", err) + return nil, fmt.Errorf("Bundle.GetAssignedBundles error: %w", err) } - if bundle.StartBatchHash == "" || bundle.EndBatchHash == "" { - return nil, nil - } - return &bundle, nil + return bundle, nil } // GetProvingStatusByHash retrieves the proving status of a bundle given its hash. diff --git a/coordinator/internal/orm/chunk.go b/coordinator/internal/orm/chunk.go index 3f1964c400..ce73f3cbb9 100644 --- a/coordinator/internal/orm/chunk.go +++ b/coordinator/internal/orm/chunk.go @@ -73,36 +73,46 @@ func (*Chunk) TableName() string { // GetUnassignedChunk retrieves unassigned chunk based on the specified limit. // The returned chunks are sorted in ascending order by their index. -func (o *Chunk) GetUnassignedChunk(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, height uint64) (*Chunk, error) { - var chunk Chunk +func (o *Chunk) GetUnassignedChunk(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, height, limit uint64) ([]*Chunk, error) { + var chunks []*Chunk db := o.db.WithContext(ctx) - sql := fmt.Sprintf("SELECT * FROM chunk WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND end_block_number <= %d AND chunk.deleted_at IS NULL ORDER BY chunk.index LIMIT 1;", - int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, height) - err := db.Raw(sql).Scan(&chunk).Error + sql := fmt.Sprintf("SELECT * FROM chunk WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND end_block_number <= %d AND chunk.deleted_at IS NULL ORDER BY chunk.index LIMIT %d;", + int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, height, limit) + err := db.Raw(sql).Scan(&chunks).Error if err != nil { return nil, fmt.Errorf("Chunk.GetUnassignedChunk error: %w", err) } - if chunk.Hash == "" { - return nil, nil + return chunks, nil +} + +// GetUnassignedChunkCount retrieves unassigned chunk count based on the specified limit. +func (o *Chunk) GetUnassignedChunkCount(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, height uint64) (int64, error) { + var count int64 + db := o.db.WithContext(ctx) + db = db.Model(&Chunk{}) + db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned)) + db = db.Where("total_attempts < ?", maxTotalAttempts) + db = db.Where("active_attempts < ?", maxActiveAttempts) + db = db.Where("end_block_number <= ?", height) + db = db.Where("chunk.deleted_at IS NULL") + if err := db.Count(&count).Error; err != nil { + return 0, fmt.Errorf("Chunk.GetUnassignedChunkCount error: %w", err) } - return &chunk, nil + return count, nil } -// GetAssignedChunk retrieves assigned chunk based on the specified limit. +// GetAssignedChunks retrieves assigned chunks based on the specified limit. // The returned chunks are sorted in ascending order by their index. -func (o *Chunk) GetAssignedChunk(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, height uint64) (*Chunk, error) { - var chunk Chunk +func (o *Chunk) GetAssignedChunks(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, height uint64, limit uint64) ([]*Chunk, error) { + var chunks []*Chunk db := o.db.WithContext(ctx) - sql := fmt.Sprintf("SELECT * FROM chunk WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND end_block_number <= %d AND chunk.deleted_at IS NULL ORDER BY chunk.index LIMIT 1;", - int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, height) - err := db.Raw(sql).Scan(&chunk).Error + sql := fmt.Sprintf("SELECT * FROM chunk WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND end_block_number <= %d AND chunk.deleted_at IS NULL ORDER BY chunk.index LIMIT %d;", + int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, height, limit) + err := db.Raw(sql).Scan(&chunks).Error if err != nil { - return nil, fmt.Errorf("Chunk.GetAssignedChunk error: %w", err) + return nil, fmt.Errorf("Chunk.GetAssignedChunks error: %w", err) } - if chunk.Hash == "" { - return nil, nil - } - return &chunk, nil + return chunks, nil } // GetChunksByBatchHash retrieves the chunks associated with a specific batch hash. diff --git a/coordinator/internal/orm/prover_task.go b/coordinator/internal/orm/prover_task.go index 00d8b36167..841f9a061e 100644 --- a/coordinator/internal/orm/prover_task.go +++ b/coordinator/internal/orm/prover_task.go @@ -148,6 +148,23 @@ func (o *ProverTask) GetAssignedTaskOfOtherProvers(ctx context.Context, taskType return proverTasks, nil } +// GetTaskOfOtherProvers get the chunk/batch task of prover +func (o *ProverTask) GetTaskOfProver(ctx context.Context, taskType message.ProofType, taskID, proverPublicKey, proverVersion string) (*ProverTask, error) { + db := o.db.WithContext(ctx) + db = db.Model(&ProverTask{}) + db = db.Where("task_type", int(taskType)) + db = db.Where("task_id", taskID) + db = db.Where("prover_public_key", proverPublicKey) + db = db.Where("prover_version", proverVersion) + + var proverTask ProverTask + err := db.First(&proverTask).Error + if err != nil { + return nil, fmt.Errorf("ProverTask.GetTaskOfProver error: %w, taskID: %v, publicKey:%s", err, taskID, proverPublicKey) + } + return &proverTask, nil +} + // GetProvingStatusByTaskID retrieves the proving status of a prover task func (o *ProverTask) GetProvingStatusByTaskID(ctx context.Context, taskType message.ProofType, taskID string) (types.ProverProveStatus, error) { db := o.db.WithContext(ctx) diff --git a/rollup/config.json b/rollup/config.json new file mode 100644 index 0000000000..f68450d829 --- /dev/null +++ b/rollup/config.json @@ -0,0 +1,130 @@ +{ + "l1_config": { + "endpoint": "https://alien-flashy-arm.ethereum-sepolia.quiknode.pro/2aeb75414e5ee0e930b64c2e7feff59efb537f30", + "start_height": 0, + "relayer_config": { + "gas_price_oracle_contract_address": "0x5300000000000000000000000000000000000002", + "sender_config": { + "endpoint": "http://l2-rpc.scrollsdk", + "escalate_blocks": 100, + "escalate_multiple_num": 11, + "escalate_multiple_den": 10, + "min_gas_tip": 1, + "max_gas_price": 10000000000000, + "tx_type": "DynamicFeeTx", + "check_pending_time": 3, + "confirmations": "0x0" + }, + "gas_oracle_config": { + "min_gas_price": 0, + "gas_price_diff": 50000, + "l1_base_fee_weight": 0.086, + "l1_blob_base_fee_weight": 0.030, + "check_committed_batches_window_minutes": 5, + "l1_base_fee_default": 15000000000, + "l1_blob_base_fee_default": 1, + "alternative_gas_token_config": { + "enabled": false, + "mode": "Fixed", + "fixed_exchange_rate": 0.001, + "token_symbol_pair": "" + } + }, + "gas_oracle_sender_signer_config": { + "signer_type": "PrivateKey", + "private_key_signer_config": { + "private_key": "1313131313131313131313131313131313131313131313131313131313131313" + } + } + } + }, + "l2_config": { + "confirmations": "0x10", + "endpoint": "http://l2-rpc.scrollsdk", + "l2_message_queue_address": "0x5300000000000000000000000000000000000000", + "relayer_config": { + "rollup_contract_address": "0xBAA5Cc4a4Ca1c596CbF33183A43148c832a53CC5", + "gas_price_oracle_contract_address": "0x30D802Ba5E7BF1145cA35E67de07388e4C5B8487", + "sender_config": { + "endpoint": "https://alien-flashy-arm.ethereum-sepolia.quiknode.pro/2aeb75414e5ee0e930b64c2e7feff59efb537f30", + "escalate_blocks": 4, + "escalate_multiple_num": 12, + "escalate_multiple_den": 10, + "min_gas_tip": 100000000, + "max_gas_price": 200000000000, + "max_blob_gas_price": 200000000000, + "tx_type": "DynamicFeeTx", + "check_pending_time": 10, + "confirmations": "0x0", + "max_pending_blob_txs": 3 + }, + "gas_oracle_config": { + "min_gas_price": 0, + "gas_price_diff": 50000, + "alternative_gas_token_config": { + "enabled": false, + "mode": "Fixed", + "fixed_exchange_rate": 0.001, + "token_symbol_pair": "" + } + }, + "chain_monitor": { + "enabled": true, + "timeout": 3, + "try_times": 5, + "base_url": "http://chain-monitor:8080" + }, + "enable_test_env_bypass_features": false, + "finalize_batch_without_proof_timeout_sec": 300, + "finalize_bundle_without_proof_timeout_sec": 300, + "gas_oracle_sender_signer_config": { + "signer_type": "PrivateKey", + "private_key_signer_config": { + "private_key": "1313131313131313131313131313131313131313131313131313131313131313" + } + }, + "commit_sender_signer_config": { + "signer_type": "PrivateKey", + "private_key_signer_config": { + "private_key": "1414141414141414141414141414141414141414141414141414141414141414" + } + }, + "finalize_sender_signer_config": { + "signer_type": "PrivateKey", + "private_key_signer_config": { + "private_key": "1515151515151515151515151515151515151515151515151515151515151515" + } + }, + "l1_commit_gas_limit_multiplier": 1.2 + }, + "chunk_proposer_config": { + "propose_interval_milliseconds": 10000, + "max_block_num_per_chunk": 1000000, + "max_tx_num_per_chunk": 1000000, + "max_l1_commit_gas_per_chunk": 50000000, + "max_l1_commit_calldata_size_per_chunk": 1100000, + "chunk_timeout_sec": 27000, + "max_row_consumption_per_chunk": 10000000, + "gas_cost_increase_multiplier": 1.2, + "max_uncompressed_batch_bytes_size": 634880 + }, + "batch_proposer_config": { + "propose_interval_milliseconds": 1000, + "max_l1_commit_gas_per_batch": 5000000, + "max_l1_commit_calldata_size_per_batch": 110000, + "batch_timeout_sec": 2700, + "gas_cost_increase_multiplier": 1.2, + "max_uncompressed_batch_bytes_size": 634880 + }, + "bundle_proposer_config": { + "max_batch_num_per_bundle": 30, + "bundle_timeout_sec": 36000 + } + }, + "db_config": { + "driver_name": "postgres", + "dsn": "postgres://rollup_node:0.qfxlf8tgld@morty-11-28-do-user-9610937-0.i.db.ondigitalocean.com:25060/scroll_rollup?sslmode=require", + "maxOpenNum": 50, + "maxIdleNum": 20 + } +} From 71acdb3b88c42f384576c5cc3f976d554f7fc101 Mon Sep 17 00:00:00 2001 From: Morty Date: Mon, 30 Dec 2024 22:00:20 +0800 Subject: [PATCH 02/29] fix: lint --- .../internal/logic/provertask/batch_prover_task.go | 10 +++++----- .../internal/logic/provertask/bundle_prover_task.go | 10 +++++----- .../internal/logic/provertask/chunk_prover_task.go | 10 +++++----- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/coordinator/internal/logic/provertask/batch_prover_task.go b/coordinator/internal/logic/provertask/batch_prover_task.go index 3ecd053a11..871546f005 100644 --- a/coordinator/internal/logic/provertask/batch_prover_task.go +++ b/coordinator/internal/logic/provertask/batch_prover_task.go @@ -65,8 +65,8 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato maxActiveAttempts := bp.cfg.ProverManager.ProversPerSession maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) { - unassignedBatchCount, err := bp.batchOrm.GetUnassignedBatchCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) - if err != nil { + unassignedBatchCount, getCountError := bp.batchOrm.GetUnassignedBatchCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) + if getCountError != nil { log.Error("failed to get unassigned chunk proving tasks count", "height", getTaskParameter.ProverHeight, "err", err) return nil, ErrCoordinatorInternalFailure } @@ -102,15 +102,15 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato tmpBatchTask = tmpUnassignedBatchTask[unassignedOffset] unassignedOffset++ } - + if tmpBatchTask == nil { log.Debug("get empty batch", "height", getTaskParameter.ProverHeight) return nil, nil } // Don't dispatch the same failing job to the same prover - proverTask, err := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBatchTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) - if err != nil { + proverTask, getTaskError := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBatchTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) + if getTaskError != nil { log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBatchTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", err) return nil, ErrCoordinatorInternalFailure } diff --git a/coordinator/internal/logic/provertask/bundle_prover_task.go b/coordinator/internal/logic/provertask/bundle_prover_task.go index c7c609da84..a16b307b6a 100644 --- a/coordinator/internal/logic/provertask/bundle_prover_task.go +++ b/coordinator/internal/logic/provertask/bundle_prover_task.go @@ -65,8 +65,8 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat maxActiveAttempts := bp.cfg.ProverManager.ProversPerSession maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) { - unassignedBundleCount, err := bp.bundleOrm.GetUnassignedBundleCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) - if err != nil { + unassignedBundleCount, getCountError := bp.bundleOrm.GetUnassignedBundleCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) + if getCountError != nil { log.Error("failed to get unassigned chunk proving tasks count", "height", getTaskParameter.ProverHeight, "err", err) return nil, ErrCoordinatorInternalFailure } @@ -102,15 +102,15 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat tmpBundleTask = tmpUnassignedBundleTask[unassignedOffset] unassignedOffset++ } - + if tmpBundleTask == nil { log.Debug("get empty bundle", "height", getTaskParameter.ProverHeight) return nil, nil } // Don't dispatch the same failing job to the same prover - proverTask, err := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBundleTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) - if err != nil { + proverTask, getTaskError := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBundleTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) + if getTaskError != nil { log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBundleTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", err) return nil, ErrCoordinatorInternalFailure } diff --git a/coordinator/internal/logic/provertask/chunk_prover_task.go b/coordinator/internal/logic/provertask/chunk_prover_task.go index b48dba38f3..306090cac6 100644 --- a/coordinator/internal/logic/provertask/chunk_prover_task.go +++ b/coordinator/internal/logic/provertask/chunk_prover_task.go @@ -63,8 +63,8 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato maxActiveAttempts := cp.cfg.ProverManager.ProversPerSession maxTotalAttempts := cp.cfg.ProverManager.SessionAttempts if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) { - unassignedChunkCount, err := cp.chunkOrm.GetUnassignedChunkCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight) - if err != nil { + unassignedChunkCount, getCountError := cp.chunkOrm.GetUnassignedChunkCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight) + if getCountError != nil { log.Error("failed to get unassigned chunk proving tasks count", "height", getTaskParameter.ProverHeight, "err", err) return nil, ErrCoordinatorInternalFailure } @@ -100,15 +100,15 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato tmpChunkTask = tmpUnassignedChunkTask[unassignedOffset] unassignedOffset++ } - + if tmpChunkTask == nil { log.Debug("get empty chunk", "height", getTaskParameter.ProverHeight) return nil, nil } // Don't dispatch the same failing job to the same prover - proverTask, err := cp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeChunk, tmpChunkTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) - if err != nil { + proverTask, getTaskError := cp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeChunk, tmpChunkTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) + if getTaskError != nil { log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeChunk.String(), "taskID", tmpChunkTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", err) return nil, ErrCoordinatorInternalFailure } From 8ce51217863c885f9fdd31292122d799c5bdd1f2 Mon Sep 17 00:00:00 2001 From: Morty Date: Mon, 30 Dec 2024 22:06:12 +0800 Subject: [PATCH 03/29] fix: GetUnassignedBatchCount --- coordinator/internal/orm/batch.go | 1 + 1 file changed, 1 insertion(+) diff --git a/coordinator/internal/orm/batch.go b/coordinator/internal/orm/batch.go index 147b9f6e92..b3907c9ec8 100644 --- a/coordinator/internal/orm/batch.go +++ b/coordinator/internal/orm/batch.go @@ -100,6 +100,7 @@ func (o *Batch) GetUnassignedBatchCount(ctx context.Context, maxActiveAttempts, db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned)) db = db.Where("total_attempts < ?", maxTotalAttempts) db = db.Where("active_attempts < ?", maxActiveAttempts) + db = db.Where("chunk_proofs_status = ?", int(types.ChunkProofsStatusReady)) db = db.Where("batch.deleted_at IS NULL") if err := db.Count(&count).Error; err != nil { return 0, fmt.Errorf("Batch.GetUnassignedBatchCount error: %w", err) From df926160bab9e9040b45420e671d2690e4cd7368 Mon Sep 17 00:00:00 2001 From: Morty Date: Mon, 30 Dec 2024 22:08:56 +0800 Subject: [PATCH 04/29] chore: remove extra files --- rollup/config.json | 130 --------------------------------------------- 1 file changed, 130 deletions(-) delete mode 100644 rollup/config.json diff --git a/rollup/config.json b/rollup/config.json deleted file mode 100644 index f68450d829..0000000000 --- a/rollup/config.json +++ /dev/null @@ -1,130 +0,0 @@ -{ - "l1_config": { - "endpoint": "https://alien-flashy-arm.ethereum-sepolia.quiknode.pro/2aeb75414e5ee0e930b64c2e7feff59efb537f30", - "start_height": 0, - "relayer_config": { - "gas_price_oracle_contract_address": "0x5300000000000000000000000000000000000002", - "sender_config": { - "endpoint": "http://l2-rpc.scrollsdk", - "escalate_blocks": 100, - "escalate_multiple_num": 11, - "escalate_multiple_den": 10, - "min_gas_tip": 1, - "max_gas_price": 10000000000000, - "tx_type": "DynamicFeeTx", - "check_pending_time": 3, - "confirmations": "0x0" - }, - "gas_oracle_config": { - "min_gas_price": 0, - "gas_price_diff": 50000, - "l1_base_fee_weight": 0.086, - "l1_blob_base_fee_weight": 0.030, - "check_committed_batches_window_minutes": 5, - "l1_base_fee_default": 15000000000, - "l1_blob_base_fee_default": 1, - "alternative_gas_token_config": { - "enabled": false, - "mode": "Fixed", - "fixed_exchange_rate": 0.001, - "token_symbol_pair": "" - } - }, - "gas_oracle_sender_signer_config": { - "signer_type": "PrivateKey", - "private_key_signer_config": { - "private_key": "1313131313131313131313131313131313131313131313131313131313131313" - } - } - } - }, - "l2_config": { - "confirmations": "0x10", - "endpoint": "http://l2-rpc.scrollsdk", - "l2_message_queue_address": "0x5300000000000000000000000000000000000000", - "relayer_config": { - "rollup_contract_address": "0xBAA5Cc4a4Ca1c596CbF33183A43148c832a53CC5", - "gas_price_oracle_contract_address": "0x30D802Ba5E7BF1145cA35E67de07388e4C5B8487", - "sender_config": { - "endpoint": "https://alien-flashy-arm.ethereum-sepolia.quiknode.pro/2aeb75414e5ee0e930b64c2e7feff59efb537f30", - "escalate_blocks": 4, - "escalate_multiple_num": 12, - "escalate_multiple_den": 10, - "min_gas_tip": 100000000, - "max_gas_price": 200000000000, - "max_blob_gas_price": 200000000000, - "tx_type": "DynamicFeeTx", - "check_pending_time": 10, - "confirmations": "0x0", - "max_pending_blob_txs": 3 - }, - "gas_oracle_config": { - "min_gas_price": 0, - "gas_price_diff": 50000, - "alternative_gas_token_config": { - "enabled": false, - "mode": "Fixed", - "fixed_exchange_rate": 0.001, - "token_symbol_pair": "" - } - }, - "chain_monitor": { - "enabled": true, - "timeout": 3, - "try_times": 5, - "base_url": "http://chain-monitor:8080" - }, - "enable_test_env_bypass_features": false, - "finalize_batch_without_proof_timeout_sec": 300, - "finalize_bundle_without_proof_timeout_sec": 300, - "gas_oracle_sender_signer_config": { - "signer_type": "PrivateKey", - "private_key_signer_config": { - "private_key": "1313131313131313131313131313131313131313131313131313131313131313" - } - }, - "commit_sender_signer_config": { - "signer_type": "PrivateKey", - "private_key_signer_config": { - "private_key": "1414141414141414141414141414141414141414141414141414141414141414" - } - }, - "finalize_sender_signer_config": { - "signer_type": "PrivateKey", - "private_key_signer_config": { - "private_key": "1515151515151515151515151515151515151515151515151515151515151515" - } - }, - "l1_commit_gas_limit_multiplier": 1.2 - }, - "chunk_proposer_config": { - "propose_interval_milliseconds": 10000, - "max_block_num_per_chunk": 1000000, - "max_tx_num_per_chunk": 1000000, - "max_l1_commit_gas_per_chunk": 50000000, - "max_l1_commit_calldata_size_per_chunk": 1100000, - "chunk_timeout_sec": 27000, - "max_row_consumption_per_chunk": 10000000, - "gas_cost_increase_multiplier": 1.2, - "max_uncompressed_batch_bytes_size": 634880 - }, - "batch_proposer_config": { - "propose_interval_milliseconds": 1000, - "max_l1_commit_gas_per_batch": 5000000, - "max_l1_commit_calldata_size_per_batch": 110000, - "batch_timeout_sec": 2700, - "gas_cost_increase_multiplier": 1.2, - "max_uncompressed_batch_bytes_size": 634880 - }, - "bundle_proposer_config": { - "max_batch_num_per_bundle": 30, - "bundle_timeout_sec": 36000 - } - }, - "db_config": { - "driver_name": "postgres", - "dsn": "postgres://rollup_node:0.qfxlf8tgld@morty-11-28-do-user-9610937-0.i.db.ondigitalocean.com:25060/scroll_rollup?sslmode=require", - "maxOpenNum": 50, - "maxIdleNum": 20 - } -} From a75075d384f3d98441a9e5969b8f3085529b699a Mon Sep 17 00:00:00 2001 From: Morty Date: Tue, 31 Dec 2024 00:18:18 +0800 Subject: [PATCH 05/29] fix: err log --- coordinator/internal/logic/provertask/batch_prover_task.go | 2 +- coordinator/internal/logic/provertask/bundle_prover_task.go | 2 +- coordinator/internal/logic/provertask/chunk_prover_task.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/coordinator/internal/logic/provertask/batch_prover_task.go b/coordinator/internal/logic/provertask/batch_prover_task.go index 871546f005..f0c692552b 100644 --- a/coordinator/internal/logic/provertask/batch_prover_task.go +++ b/coordinator/internal/logic/provertask/batch_prover_task.go @@ -111,7 +111,7 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato // Don't dispatch the same failing job to the same prover proverTask, getTaskError := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBatchTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) if getTaskError != nil { - log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBatchTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", err) + log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBatchTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError) return nil, ErrCoordinatorInternalFailure } if proverTask == nil || types.ProverProveStatus(proverTask.ProvingStatus) != types.ProverProofInvalid { diff --git a/coordinator/internal/logic/provertask/bundle_prover_task.go b/coordinator/internal/logic/provertask/bundle_prover_task.go index a16b307b6a..d89630c587 100644 --- a/coordinator/internal/logic/provertask/bundle_prover_task.go +++ b/coordinator/internal/logic/provertask/bundle_prover_task.go @@ -111,7 +111,7 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat // Don't dispatch the same failing job to the same prover proverTask, getTaskError := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBundleTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) if getTaskError != nil { - log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBundleTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", err) + log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBundleTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError) return nil, ErrCoordinatorInternalFailure } if proverTask == nil || types.ProverProveStatus(proverTask.ProvingStatus) != types.ProverProofInvalid { diff --git a/coordinator/internal/logic/provertask/chunk_prover_task.go b/coordinator/internal/logic/provertask/chunk_prover_task.go index 306090cac6..5aa396e2f2 100644 --- a/coordinator/internal/logic/provertask/chunk_prover_task.go +++ b/coordinator/internal/logic/provertask/chunk_prover_task.go @@ -109,7 +109,7 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato // Don't dispatch the same failing job to the same prover proverTask, getTaskError := cp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeChunk, tmpChunkTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) if getTaskError != nil { - log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeChunk.String(), "taskID", tmpChunkTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", err) + log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeChunk.String(), "taskID", tmpChunkTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError) return nil, ErrCoordinatorInternalFailure } if proverTask == nil || types.ProverProveStatus(proverTask.ProvingStatus) != types.ProverProofInvalid { From 1c5d88dc7c69fddb0dc32f430cf14ae05a4ad39b Mon Sep 17 00:00:00 2001 From: Morty Date: Tue, 31 Dec 2024 00:35:52 +0800 Subject: [PATCH 06/29] fix: orm GetTaskOfProver --- coordinator/internal/orm/prover_task.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/coordinator/internal/orm/prover_task.go b/coordinator/internal/orm/prover_task.go index 841f9a061e..53aea7a383 100644 --- a/coordinator/internal/orm/prover_task.go +++ b/coordinator/internal/orm/prover_task.go @@ -156,9 +156,10 @@ func (o *ProverTask) GetTaskOfProver(ctx context.Context, taskType message.Proof db = db.Where("task_id", taskID) db = db.Where("prover_public_key", proverPublicKey) db = db.Where("prover_version", proverVersion) + db = db.Limit(1) var proverTask ProverTask - err := db.First(&proverTask).Error + err := db.Find(&proverTask).Error if err != nil { return nil, fmt.Errorf("ProverTask.GetTaskOfProver error: %w, taskID: %v, publicKey:%s", err, taskID, proverPublicKey) } From e4c0779e7399cdd4f1f53a297564a13b43928ef9 Mon Sep 17 00:00:00 2001 From: Morty Date: Tue, 31 Dec 2024 01:46:12 +0800 Subject: [PATCH 07/29] fix: comments --- coordinator/internal/logic/provertask/batch_prover_task.go | 6 +++--- coordinator/internal/logic/provertask/bundle_prover_task.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/coordinator/internal/logic/provertask/batch_prover_task.go b/coordinator/internal/logic/provertask/batch_prover_task.go index f0c692552b..08fa468fb0 100644 --- a/coordinator/internal/logic/provertask/batch_prover_task.go +++ b/coordinator/internal/logic/provertask/batch_prover_task.go @@ -67,7 +67,7 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) { unassignedBatchCount, getCountError := bp.batchOrm.GetUnassignedBatchCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) if getCountError != nil { - log.Error("failed to get unassigned chunk proving tasks count", "height", getTaskParameter.ProverHeight, "err", err) + log.Error("failed to get unassigned batch proving tasks count", "height", getTaskParameter.ProverHeight, "err", err) return nil, ErrCoordinatorInternalFailure } // Assign external prover if unassigned task number exceeds threshold @@ -83,14 +83,14 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato var assignedOffset, unassignedOffset = 0, 0 tmpAssignedBatchTasks, getTaskError := bp.batchOrm.GetAssignedBatches(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50) if getTaskError != nil { - log.Error("failed to get assigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) + log.Error("failed to get assigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) return nil, ErrCoordinatorInternalFailure } // Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned` // chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql. tmpUnassignedBatchTask, getTaskError := bp.batchOrm.GetUnassignedBatches(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50) if getTaskError != nil { - log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) + log.Error("failed to get unassigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) return nil, ErrCoordinatorInternalFailure } for { diff --git a/coordinator/internal/logic/provertask/bundle_prover_task.go b/coordinator/internal/logic/provertask/bundle_prover_task.go index d89630c587..a13823e10d 100644 --- a/coordinator/internal/logic/provertask/bundle_prover_task.go +++ b/coordinator/internal/logic/provertask/bundle_prover_task.go @@ -67,7 +67,7 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) { unassignedBundleCount, getCountError := bp.bundleOrm.GetUnassignedBundleCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) if getCountError != nil { - log.Error("failed to get unassigned chunk proving tasks count", "height", getTaskParameter.ProverHeight, "err", err) + log.Error("failed to get unassigned batch proving tasks count", "height", getTaskParameter.ProverHeight, "err", err) return nil, ErrCoordinatorInternalFailure } // Assign external prover if unassigned task number exceeds threshold @@ -83,14 +83,14 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat var assignedOffset, unassignedOffset = 0, 0 tmpAssignedBundleTasks, getTaskError := bp.bundleOrm.GetAssignedBundles(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50) if getTaskError != nil { - log.Error("failed to get assigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) + log.Error("failed to get assigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) return nil, ErrCoordinatorInternalFailure } // Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned` // chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql. tmpUnassignedBundleTask, getTaskError := bp.bundleOrm.GetUnassignedBundles(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50) if getTaskError != nil { - log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) + log.Error("failed to get unassigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) return nil, ErrCoordinatorInternalFailure } for { From 0d1c303934fa50246623e3e7c665c075f2c98f25 Mon Sep 17 00:00:00 2001 From: yiweichi Date: Mon, 6 Jan 2025 07:10:40 +0000 Subject: [PATCH 08/29] =?UTF-8?q?chore:=20auto=20version=20bump=E2=80=89[b?= =?UTF-8?q?ot]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/version/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/version/version.go b/common/version/version.go index 6fbb114b60..b3ccf9b841 100644 --- a/common/version/version.go +++ b/common/version/version.go @@ -5,7 +5,7 @@ import ( "runtime/debug" ) -var tag = "v4.4.84" +var tag = "v4.4.85" var commit = func() string { if info, ok := debug.ReadBuildInfo(); ok { From 25796dfe0b17ac2fabd2bf85465fe954734dc84d Mon Sep 17 00:00:00 2001 From: Morty Date: Thu, 9 Jan 2025 04:11:54 +0800 Subject: [PATCH 09/29] fix: comments --- .../logic/provertask/batch_prover_task.go | 51 ++++++++---------- .../logic/provertask/bundle_prover_task.go | 53 ++++++++----------- .../logic/provertask/chunk_prover_task.go | 48 +++++++---------- coordinator/internal/orm/batch.go | 49 +++++++++++------ coordinator/internal/orm/bundle.go | 36 +++++++------ coordinator/internal/orm/chunk.go | 36 +++++++------ 6 files changed, 141 insertions(+), 132 deletions(-) diff --git a/coordinator/internal/logic/provertask/batch_prover_task.go b/coordinator/internal/logic/provertask/batch_prover_task.go index 08fa468fb0..1cc7179788 100644 --- a/coordinator/internal/logic/provertask/batch_prover_task.go +++ b/coordinator/internal/logic/provertask/batch_prover_task.go @@ -80,43 +80,36 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato for i := 0; i < 5; i++ { var getTaskError error var tmpBatchTask *orm.Batch - var assignedOffset, unassignedOffset = 0, 0 - tmpAssignedBatchTasks, getTaskError := bp.batchOrm.GetAssignedBatches(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50) + tmpBatchTask, getTaskError = bp.batchOrm.GetAssignedBatch(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) if getTaskError != nil { log.Error("failed to get assigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) return nil, ErrCoordinatorInternalFailure } - // Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned` - // chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql. - tmpUnassignedBatchTask, getTaskError := bp.batchOrm.GetUnassignedBatches(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50) - if getTaskError != nil { - log.Error("failed to get unassigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) - return nil, ErrCoordinatorInternalFailure - } - for { - tmpBatchTask = nil - if assignedOffset < len(tmpAssignedBatchTasks) { - tmpBatchTask = tmpAssignedBatchTasks[assignedOffset] - assignedOffset++ - } else if unassignedOffset < len(tmpUnassignedBatchTask) { - tmpBatchTask = tmpUnassignedBatchTask[unassignedOffset] - unassignedOffset++ - } - if tmpBatchTask == nil { - log.Debug("get empty batch", "height", getTaskParameter.ProverHeight) - return nil, nil - } - - // Don't dispatch the same failing job to the same prover - proverTask, getTaskError := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBatchTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) + // Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned` + // batch to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql. + if tmpBatchTask == nil { + tmpBatchTask, getTaskError = bp.batchOrm.GetUnassignedBatch(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) if getTaskError != nil { - log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBatchTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError) + log.Error("failed to get unassigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) return nil, ErrCoordinatorInternalFailure } - if proverTask == nil || types.ProverProveStatus(proverTask.ProvingStatus) != types.ProverProofInvalid { - break - } + } + + if tmpBatchTask == nil { + log.Debug("get empty batch", "height", getTaskParameter.ProverHeight) + return nil, nil + } + + // Don't dispatch the same failing job to the same prover + proverTask, getTaskError := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBatchTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) + if getTaskError != nil { + log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBatchTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError) + return nil, ErrCoordinatorInternalFailure + } + if proverTask != nil && types.ProverProveStatus(proverTask.ProvingStatus) == types.ProverProofInvalid { + log.Debug("get empty batch, the prover already failed this task", "height", getTaskParameter.ProverHeight) + return nil, nil } rowsAffected, updateAttemptsErr := bp.batchOrm.UpdateBatchAttempts(ctx.Copy(), tmpBatchTask.Index, tmpBatchTask.ActiveAttempts, tmpBatchTask.TotalAttempts) diff --git a/coordinator/internal/logic/provertask/bundle_prover_task.go b/coordinator/internal/logic/provertask/bundle_prover_task.go index a13823e10d..52d237dc3e 100644 --- a/coordinator/internal/logic/provertask/bundle_prover_task.go +++ b/coordinator/internal/logic/provertask/bundle_prover_task.go @@ -80,43 +80,36 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat for i := 0; i < 5; i++ { var getTaskError error var tmpBundleTask *orm.Bundle - var assignedOffset, unassignedOffset = 0, 0 - tmpAssignedBundleTasks, getTaskError := bp.bundleOrm.GetAssignedBundles(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50) + tmpBundleTask, getTaskError = bp.bundleOrm.GetAssignedBundle(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) if getTaskError != nil { - log.Error("failed to get assigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) + log.Error("failed to get assigned bundle proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) return nil, ErrCoordinatorInternalFailure } - // Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned` - // chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql. - tmpUnassignedBundleTask, getTaskError := bp.bundleOrm.GetUnassignedBundles(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50) - if getTaskError != nil { - log.Error("failed to get unassigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) - return nil, ErrCoordinatorInternalFailure - } - for { - tmpBundleTask = nil - if assignedOffset < len(tmpAssignedBundleTasks) { - tmpBundleTask = tmpAssignedBundleTasks[assignedOffset] - assignedOffset++ - } else if unassignedOffset < len(tmpUnassignedBundleTask) { - tmpBundleTask = tmpUnassignedBundleTask[unassignedOffset] - unassignedOffset++ - } - if tmpBundleTask == nil { - log.Debug("get empty bundle", "height", getTaskParameter.ProverHeight) - return nil, nil - } - - // Don't dispatch the same failing job to the same prover - proverTask, getTaskError := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBundleTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) + // Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned` + // bundle to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql. + if tmpBundleTask == nil { + tmpBundleTask, getTaskError = bp.bundleOrm.GetUnassignedBundle(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) if getTaskError != nil { - log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBundleTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError) + log.Error("failed to get unassigned bundle proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) return nil, ErrCoordinatorInternalFailure } - if proverTask == nil || types.ProverProveStatus(proverTask.ProvingStatus) != types.ProverProofInvalid { - break - } + } + + if tmpBundleTask == nil { + log.Debug("get empty bundle", "height", getTaskParameter.ProverHeight) + return nil, nil + } + + // Don't dispatch the same failing job to the same prover + proverTask, getTaskError := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBundleTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) + if getTaskError != nil { + log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBundleTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError) + return nil, ErrCoordinatorInternalFailure + } + if proverTask != nil && types.ProverProveStatus(proverTask.ProvingStatus) == types.ProverProofInvalid { + log.Debug("get empty bundle, the prover already failed this task", "height", getTaskParameter.ProverHeight) + return nil, nil } rowsAffected, updateAttemptsErr := bp.bundleOrm.UpdateBundleAttempts(ctx.Copy(), tmpBundleTask.Hash, tmpBundleTask.ActiveAttempts, tmpBundleTask.TotalAttempts) diff --git a/coordinator/internal/logic/provertask/chunk_prover_task.go b/coordinator/internal/logic/provertask/chunk_prover_task.go index 5aa396e2f2..f076c068fc 100644 --- a/coordinator/internal/logic/provertask/chunk_prover_task.go +++ b/coordinator/internal/logic/provertask/chunk_prover_task.go @@ -78,45 +78,37 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato for i := 0; i < 5; i++ { var getTaskError error var tmpChunkTask *orm.Chunk - var assignedOffset, unassignedOffset = 0, 0 - tmpAssignedChunkTasks, getTaskError := cp.chunkOrm.GetAssignedChunks(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight, 50) + tmpChunkTask, getTaskError = cp.chunkOrm.GetAssignedChunk(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight) if getTaskError != nil { log.Error("failed to get assigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) return nil, ErrCoordinatorInternalFailure } + // Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned` // chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql. - tmpUnassignedChunkTask, getTaskError := cp.chunkOrm.GetUnassignedChunk(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight, 50) - if getTaskError != nil { - log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) - return nil, ErrCoordinatorInternalFailure - } - for { - tmpChunkTask = nil - if assignedOffset < len(tmpAssignedChunkTasks) { - tmpChunkTask = tmpAssignedChunkTasks[assignedOffset] - assignedOffset++ - } else if unassignedOffset < len(tmpUnassignedChunkTask) { - tmpChunkTask = tmpUnassignedChunkTask[unassignedOffset] - unassignedOffset++ - } - - if tmpChunkTask == nil { - log.Debug("get empty chunk", "height", getTaskParameter.ProverHeight) - return nil, nil - } - - // Don't dispatch the same failing job to the same prover - proverTask, getTaskError := cp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeChunk, tmpChunkTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) + if tmpChunkTask == nil { + tmpChunkTask, getTaskError = cp.chunkOrm.GetUnassignedChunk(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight) if getTaskError != nil { - log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeChunk.String(), "taskID", tmpChunkTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError) + log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) return nil, ErrCoordinatorInternalFailure } - if proverTask == nil || types.ProverProveStatus(proverTask.ProvingStatus) != types.ProverProofInvalid { - break - } } + if tmpChunkTask == nil { + log.Debug("get empty chunk", "height", getTaskParameter.ProverHeight) + return nil, nil + } + + // Don't dispatch the same failing job to the same prover + proverTask, getTaskError := cp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeChunk, tmpChunkTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) + if getTaskError != nil { + log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeChunk.String(), "taskID", tmpChunkTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError) + return nil, ErrCoordinatorInternalFailure + } + if proverTask != nil && types.ProverProveStatus(proverTask.ProvingStatus) == types.ProverProofInvalid { + log.Debug("get empty chunk, the prover already failed this task", "height", getTaskParameter.ProverHeight) + return nil, nil + } rowsAffected, updateAttemptsErr := cp.chunkOrm.UpdateChunkAttempts(ctx.Copy(), tmpChunkTask.Index, tmpChunkTask.ActiveAttempts, tmpChunkTask.TotalAttempts) if updateAttemptsErr != nil { log.Error("failed to update chunk attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr) diff --git a/coordinator/internal/orm/batch.go b/coordinator/internal/orm/batch.go index b3907c9ec8..6ed5aec0ca 100644 --- a/coordinator/internal/orm/batch.go +++ b/coordinator/internal/orm/batch.go @@ -78,18 +78,21 @@ func (*Batch) TableName() string { return "batch" } -// GetUnassignedBatches retrieves unassigned batches based on the specified limit. +// GetUnassignedBatch retrieves unassigned batch based on the specified limit. // The returned batches are sorted in ascending order by their index. -func (o *Batch) GetUnassignedBatches(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, limit uint64) ([]*Batch, error) { - var batch []*Batch +func (o *Batch) GetUnassignedBatch(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) { + var batch Batch db := o.db.WithContext(ctx) - sql := fmt.Sprintf("SELECT * FROM batch WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND chunk_proofs_status = %d AND batch.deleted_at IS NULL ORDER BY batch.index LIMIT %d;", - int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, int(types.ChunkProofsStatusReady), limit) + sql := fmt.Sprintf("SELECT * FROM batch WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND chunk_proofs_status = %d AND batch.deleted_at IS NULL ORDER BY batch.index LIMIT 1;", + int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, int(types.ChunkProofsStatusReady)) err := db.Raw(sql).Scan(&batch).Error if err != nil { - return nil, fmt.Errorf("Batch.GetUnassignedBatches error: %w", err) + return nil, fmt.Errorf("Batch.GetUnassignedBatch error: %w", err) + } + if batch.Hash == "" { + return nil, nil } - return batch, nil + return &batch, nil } // GetUnassignedBatchCount retrieves unassigned batch count based on the specified limit. @@ -108,18 +111,21 @@ func (o *Batch) GetUnassignedBatchCount(ctx context.Context, maxActiveAttempts, return count, nil } -// GetAssignedBatches retrieves assigned batches based on the specified limit. +// GetAssignedBatch retrieves assigned batch based on the specified limit. // The returned batches are sorted in ascending order by their index. -func (o *Batch) GetAssignedBatches(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, limit uint64) ([]*Batch, error) { - var batch []*Batch +func (o *Batch) GetAssignedBatch(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) { + var batch Batch db := o.db.WithContext(ctx) - sql := fmt.Sprintf("SELECT * FROM batch WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND chunk_proofs_status = %d AND batch.deleted_at IS NULL ORDER BY batch.index LIMIT %d;", - int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, int(types.ChunkProofsStatusReady), limit) + sql := fmt.Sprintf("SELECT * FROM batch WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND chunk_proofs_status = %d AND batch.deleted_at IS NULL ORDER BY batch.index LIMIT 1;", + int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, int(types.ChunkProofsStatusReady)) err := db.Raw(sql).Scan(&batch).Error if err != nil { - return nil, fmt.Errorf("Batch.GetAssignedBatches error: %w", err) + return nil, fmt.Errorf("Batch.GetAssignedBatch error: %w", err) + } + if batch.Hash == "" { + return nil, nil } - return batch, nil + return &batch, nil } // GetUnassignedAndChunksUnreadyBatches get the batches which is unassigned and chunks is not ready @@ -142,6 +148,19 @@ func (o *Batch) GetUnassignedAndChunksUnreadyBatches(ctx context.Context, offset return batches, nil } +// GetAssignedBatches retrieves all batches whose proving_status is either types.ProvingTaskAssigned. +func (o *Batch) GetAssignedBatches(ctx context.Context) ([]*Batch, error) { + db := o.db.WithContext(ctx) + db = db.Model(&Batch{}) + db = db.Where("proving_status = ?", int(types.ProvingTaskAssigned)) + + var assignedBatches []*Batch + if err := db.Find(&assignedBatches).Error; err != nil { + return nil, fmt.Errorf("Batch.GetAssignedBatches error: %w", err) + } + return assignedBatches, nil +} + // GetProvingStatusByHash retrieves the proving status of a batch given its hash. func (o *Batch) GetProvingStatusByHash(ctx context.Context, hash string) (types.ProvingStatus, error) { db := o.db.WithContext(ctx) @@ -435,4 +454,4 @@ func (o *Batch) DecreaseActiveAttemptsByHash(ctx context.Context, batchHash stri log.Warn("No rows were affected in DecreaseActiveAttemptsByHash", "batch hash", batchHash) } return nil -} +} \ No newline at end of file diff --git a/coordinator/internal/orm/bundle.go b/coordinator/internal/orm/bundle.go index 0bf6efa5b8..7ed8e60f09 100644 --- a/coordinator/internal/orm/bundle.go +++ b/coordinator/internal/orm/bundle.go @@ -54,18 +54,21 @@ func (*Bundle) TableName() string { return "bundle" } -// GetUnassignedBundles retrieves unassigned bundle based on the specified limit. +// GetUnassignedBundle retrieves unassigned bundle based on the specified limit. // The returned batch sorts in ascending order by their index. -func (o *Bundle) GetUnassignedBundles(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, limit uint64) ([]*Bundle, error) { - var bundle []*Bundle +func (o *Bundle) GetUnassignedBundle(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Bundle, error) { + var bundle Bundle db := o.db.WithContext(ctx) - sql := fmt.Sprintf("SELECT * FROM bundle WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND batch_proofs_status = %d AND bundle.deleted_at IS NULL ORDER BY bundle.index LIMIT %d;", - int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, int(types.BatchProofsStatusReady), limit) + sql := fmt.Sprintf("SELECT * FROM bundle WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND batch_proofs_status = %d AND bundle.deleted_at IS NULL ORDER BY bundle.index LIMIT 1;", + int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, int(types.BatchProofsStatusReady)) err := db.Raw(sql).Scan(&bundle).Error if err != nil { - return nil, fmt.Errorf("Batch.GetUnassignedBundles error: %w", err) + return nil, fmt.Errorf("Batch.GetUnassignedBundle error: %w", err) + } + if bundle.StartBatchHash == "" || bundle.EndBatchHash == "" { + return nil, nil } - return bundle, nil + return &bundle, nil } // GetUnassignedBundleCount retrieves unassigned bundle count based on the specified limit. @@ -83,18 +86,21 @@ func (o *Bundle) GetUnassignedBundleCount(ctx context.Context, maxActiveAttempts return count, nil } -// GetAssignedBundles retrieves assigned bundles based on the specified limit. +// GetAssignedBundle retrieves assigned bundle based on the specified limit. // The returned bundle sorts in ascending order by their index. -func (o *Bundle) GetAssignedBundles(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, limit uint64) ([]*Bundle, error) { - var bundle []*Bundle +func (o *Bundle) GetAssignedBundle(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Bundle, error) { + var bundle Bundle db := o.db.WithContext(ctx) - sql := fmt.Sprintf("SELECT * FROM bundle WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND batch_proofs_status = %d AND bundle.deleted_at IS NULL ORDER BY bundle.index LIMIT %d;", - int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, int(types.BatchProofsStatusReady), limit) + sql := fmt.Sprintf("SELECT * FROM bundle WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND batch_proofs_status = %d AND bundle.deleted_at IS NULL ORDER BY bundle.index LIMIT 1;", + int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, int(types.BatchProofsStatusReady)) err := db.Raw(sql).Scan(&bundle).Error if err != nil { - return nil, fmt.Errorf("Bundle.GetAssignedBundles error: %w", err) + return nil, fmt.Errorf("Bundle.GetAssignedBatch error: %w", err) } - return bundle, nil + if bundle.StartBatchHash == "" || bundle.EndBatchHash == "" { + return nil, nil + } + return &bundle, nil } // GetProvingStatusByHash retrieves the proving status of a bundle given its hash. @@ -234,4 +240,4 @@ func (o *Bundle) DecreaseActiveAttemptsByHash(ctx context.Context, bundleHash st log.Warn("No rows were affected in DecreaseActiveAttemptsByHash", "bundle hash", bundleHash) } return nil -} +} \ No newline at end of file diff --git a/coordinator/internal/orm/chunk.go b/coordinator/internal/orm/chunk.go index ce73f3cbb9..7f82740140 100644 --- a/coordinator/internal/orm/chunk.go +++ b/coordinator/internal/orm/chunk.go @@ -73,16 +73,19 @@ func (*Chunk) TableName() string { // GetUnassignedChunk retrieves unassigned chunk based on the specified limit. // The returned chunks are sorted in ascending order by their index. -func (o *Chunk) GetUnassignedChunk(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, height, limit uint64) ([]*Chunk, error) { - var chunks []*Chunk +func (o *Chunk) GetUnassignedChunk(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, height uint64) (*Chunk, error) { + var chunk Chunk db := o.db.WithContext(ctx) - sql := fmt.Sprintf("SELECT * FROM chunk WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND end_block_number <= %d AND chunk.deleted_at IS NULL ORDER BY chunk.index LIMIT %d;", - int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, height, limit) - err := db.Raw(sql).Scan(&chunks).Error + sql := fmt.Sprintf("SELECT * FROM chunk WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND end_block_number <= %d AND chunk.deleted_at IS NULL ORDER BY chunk.index LIMIT 1;", + int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, height) + err := db.Raw(sql).Scan(&chunk).Error if err != nil { return nil, fmt.Errorf("Chunk.GetUnassignedChunk error: %w", err) } - return chunks, nil + if chunk.Hash == "" { + return nil, nil + } + return &chunk, nil } // GetUnassignedChunkCount retrieves unassigned chunk count based on the specified limit. @@ -101,18 +104,21 @@ func (o *Chunk) GetUnassignedChunkCount(ctx context.Context, maxActiveAttempts, return count, nil } -// GetAssignedChunks retrieves assigned chunks based on the specified limit. +// GetAssignedChunk retrieves assigned chunk based on the specified limit. // The returned chunks are sorted in ascending order by their index. -func (o *Chunk) GetAssignedChunks(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, height uint64, limit uint64) ([]*Chunk, error) { - var chunks []*Chunk +func (o *Chunk) GetAssignedChunk(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, height uint64) (*Chunk, error) { + var chunk Chunk db := o.db.WithContext(ctx) - sql := fmt.Sprintf("SELECT * FROM chunk WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND end_block_number <= %d AND chunk.deleted_at IS NULL ORDER BY chunk.index LIMIT %d;", - int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, height, limit) - err := db.Raw(sql).Scan(&chunks).Error + sql := fmt.Sprintf("SELECT * FROM chunk WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND end_block_number <= %d AND chunk.deleted_at IS NULL ORDER BY chunk.index LIMIT 1;", + int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, height) + err := db.Raw(sql).Scan(&chunk).Error if err != nil { - return nil, fmt.Errorf("Chunk.GetAssignedChunks error: %w", err) + return nil, fmt.Errorf("Chunk.GetAssignedChunk error: %w", err) } - return chunks, nil + if chunk.Hash == "" { + return nil, nil + } + return &chunk, nil } // GetChunksByBatchHash retrieves the chunks associated with a specific batch hash. @@ -428,4 +434,4 @@ func (o *Chunk) DecreaseActiveAttemptsByHash(ctx context.Context, chunkHash stri log.Warn("No rows were affected in DecreaseActiveAttemptsByHash", "chunk hash", chunkHash) } return nil -} +} \ No newline at end of file From e51e95d0695960f433d7acc335f60a1d060a20a5 Mon Sep 17 00:00:00 2001 From: yiweichi Date: Wed, 8 Jan 2025 20:13:14 +0000 Subject: [PATCH 10/29] =?UTF-8?q?chore:=20auto=20version=20bump=E2=80=89[b?= =?UTF-8?q?ot]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/version/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/version/version.go b/common/version/version.go index b3ccf9b841..7a3b2af81e 100644 --- a/common/version/version.go +++ b/common/version/version.go @@ -5,7 +5,7 @@ import ( "runtime/debug" ) -var tag = "v4.4.85" +var tag = "v4.4.86" var commit = func() string { if info, ok := debug.ReadBuildInfo(); ok { From e0aa5be44a8f8d77d3d9c03102ba3c635a9433ac Mon Sep 17 00:00:00 2001 From: Morty Date: Thu, 9 Jan 2025 04:15:05 +0800 Subject: [PATCH 11/29] fix: lint --- coordinator/internal/orm/batch.go | 2 +- coordinator/internal/orm/bundle.go | 2 +- coordinator/internal/orm/chunk.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/coordinator/internal/orm/batch.go b/coordinator/internal/orm/batch.go index 6ed5aec0ca..466fbc9f13 100644 --- a/coordinator/internal/orm/batch.go +++ b/coordinator/internal/orm/batch.go @@ -454,4 +454,4 @@ func (o *Batch) DecreaseActiveAttemptsByHash(ctx context.Context, batchHash stri log.Warn("No rows were affected in DecreaseActiveAttemptsByHash", "batch hash", batchHash) } return nil -} \ No newline at end of file +} diff --git a/coordinator/internal/orm/bundle.go b/coordinator/internal/orm/bundle.go index 7ed8e60f09..3c8a003172 100644 --- a/coordinator/internal/orm/bundle.go +++ b/coordinator/internal/orm/bundle.go @@ -240,4 +240,4 @@ func (o *Bundle) DecreaseActiveAttemptsByHash(ctx context.Context, bundleHash st log.Warn("No rows were affected in DecreaseActiveAttemptsByHash", "bundle hash", bundleHash) } return nil -} \ No newline at end of file +} diff --git a/coordinator/internal/orm/chunk.go b/coordinator/internal/orm/chunk.go index 7f82740140..445b894101 100644 --- a/coordinator/internal/orm/chunk.go +++ b/coordinator/internal/orm/chunk.go @@ -434,4 +434,4 @@ func (o *Chunk) DecreaseActiveAttemptsByHash(ctx context.Context, chunkHash stri log.Warn("No rows were affected in DecreaseActiveAttemptsByHash", "chunk hash", chunkHash) } return nil -} \ No newline at end of file +} From 77d3a75a2ad43b2c45184d97c2121a3ac9b410b6 Mon Sep 17 00:00:00 2001 From: Morty <70688412+yiweichi@users.noreply.github.com> Date: Thu, 9 Jan 2025 17:22:02 +0800 Subject: [PATCH 12/29] Update coordinator/internal/logic/provertask/bundle_prover_task.go Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com> --- coordinator/internal/logic/provertask/bundle_prover_task.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coordinator/internal/logic/provertask/bundle_prover_task.go b/coordinator/internal/logic/provertask/bundle_prover_task.go index 52d237dc3e..2dd6c6aa97 100644 --- a/coordinator/internal/logic/provertask/bundle_prover_task.go +++ b/coordinator/internal/logic/provertask/bundle_prover_task.go @@ -67,7 +67,7 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) { unassignedBundleCount, getCountError := bp.bundleOrm.GetUnassignedBundleCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) if getCountError != nil { - log.Error("failed to get unassigned batch proving tasks count", "height", getTaskParameter.ProverHeight, "err", err) + log.Error("failed to get unassigned batch proving tasks count", "height", getTaskParameter.ProverHeight, "err", getCountError) return nil, ErrCoordinatorInternalFailure } // Assign external prover if unassigned task number exceeds threshold From 5ee2388b08955aedf4035c93a0be35555e2b2193 Mon Sep 17 00:00:00 2001 From: Morty <70688412+yiweichi@users.noreply.github.com> Date: Thu, 9 Jan 2025 17:22:13 +0800 Subject: [PATCH 13/29] Update coordinator/internal/logic/provertask/batch_prover_task.go Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com> --- coordinator/internal/logic/provertask/batch_prover_task.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coordinator/internal/logic/provertask/batch_prover_task.go b/coordinator/internal/logic/provertask/batch_prover_task.go index 1cc7179788..364ce51daf 100644 --- a/coordinator/internal/logic/provertask/batch_prover_task.go +++ b/coordinator/internal/logic/provertask/batch_prover_task.go @@ -67,7 +67,7 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) { unassignedBatchCount, getCountError := bp.batchOrm.GetUnassignedBatchCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) if getCountError != nil { - log.Error("failed to get unassigned batch proving tasks count", "height", getTaskParameter.ProverHeight, "err", err) + log.Error("failed to get unassigned batch proving tasks count", "height", getTaskParameter.ProverHeight, "err", getCountError) return nil, ErrCoordinatorInternalFailure } // Assign external prover if unassigned task number exceeds threshold From 816d5cef852c0314c202152dda1d21cf72d0b7b7 Mon Sep 17 00:00:00 2001 From: Morty <70688412+yiweichi@users.noreply.github.com> Date: Thu, 9 Jan 2025 17:22:32 +0800 Subject: [PATCH 14/29] Update coordinator/internal/logic/provertask/batch_prover_task.go Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com> --- coordinator/internal/logic/provertask/batch_prover_task.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coordinator/internal/logic/provertask/batch_prover_task.go b/coordinator/internal/logic/provertask/batch_prover_task.go index 364ce51daf..31f571537a 100644 --- a/coordinator/internal/logic/provertask/batch_prover_task.go +++ b/coordinator/internal/logic/provertask/batch_prover_task.go @@ -104,7 +104,7 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato // Don't dispatch the same failing job to the same prover proverTask, getTaskError := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBatchTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) if getTaskError != nil { - log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBatchTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError) + log.Error("failed to get prover task of prover", "proof type", message.ProofTypeBatch.String(), "task ID", tmpBatchTask.Hash, "key", taskCtx.PublicKey, "prover version", taskCtx.ProverVersion, "error", getTaskError) return nil, ErrCoordinatorInternalFailure } if proverTask != nil && types.ProverProveStatus(proverTask.ProvingStatus) == types.ProverProofInvalid { From 8f0ab505be2a7e7158842a33b19d5c7be7c23d01 Mon Sep 17 00:00:00 2001 From: Morty <70688412+yiweichi@users.noreply.github.com> Date: Thu, 9 Jan 2025 17:22:40 +0800 Subject: [PATCH 15/29] Update coordinator/internal/logic/provertask/bundle_prover_task.go Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com> --- coordinator/internal/logic/provertask/bundle_prover_task.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coordinator/internal/logic/provertask/bundle_prover_task.go b/coordinator/internal/logic/provertask/bundle_prover_task.go index 2dd6c6aa97..34887f48b3 100644 --- a/coordinator/internal/logic/provertask/bundle_prover_task.go +++ b/coordinator/internal/logic/provertask/bundle_prover_task.go @@ -104,7 +104,7 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat // Don't dispatch the same failing job to the same prover proverTask, getTaskError := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBundleTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) if getTaskError != nil { - log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBundleTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError) + log.Error("failed to get prover task of prover", "proof type", message.ProofTypeBatch.String(), "task ID", tmpBundleTask.Hash, "key", taskCtx.PublicKey, "prover version", taskCtx.ProverVersion, "error", getTaskError) return nil, ErrCoordinatorInternalFailure } if proverTask != nil && types.ProverProveStatus(proverTask.ProvingStatus) == types.ProverProofInvalid { From d8ad10f032b33b5e5fcefff1ca34c859bc010385 Mon Sep 17 00:00:00 2001 From: Morty <70688412+yiweichi@users.noreply.github.com> Date: Thu, 9 Jan 2025 17:22:50 +0800 Subject: [PATCH 16/29] Update coordinator/internal/logic/provertask/chunk_prover_task.go Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com> --- coordinator/internal/logic/provertask/chunk_prover_task.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coordinator/internal/logic/provertask/chunk_prover_task.go b/coordinator/internal/logic/provertask/chunk_prover_task.go index f076c068fc..2103bf819e 100644 --- a/coordinator/internal/logic/provertask/chunk_prover_task.go +++ b/coordinator/internal/logic/provertask/chunk_prover_task.go @@ -102,7 +102,7 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato // Don't dispatch the same failing job to the same prover proverTask, getTaskError := cp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeChunk, tmpChunkTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) if getTaskError != nil { - log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeChunk.String(), "taskID", tmpChunkTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError) + log.Error("failed to get prover task of prover", "proof type", message.ProofTypeChunk.String(), "task ID", tmpChunkTask.Hash, "key", taskCtx.PublicKey, "prover version", taskCtx.ProverVersion, "error", getTaskError) return nil, ErrCoordinatorInternalFailure } if proverTask != nil && types.ProverProveStatus(proverTask.ProvingStatus) == types.ProverProofInvalid { From 64e88cecfd47d9b035ba4b87d52128e4f1f74819 Mon Sep 17 00:00:00 2001 From: Morty <70688412+yiweichi@users.noreply.github.com> Date: Thu, 9 Jan 2025 17:23:02 +0800 Subject: [PATCH 17/29] Update coordinator/internal/orm/prover_task.go Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com> --- coordinator/internal/orm/prover_task.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coordinator/internal/orm/prover_task.go b/coordinator/internal/orm/prover_task.go index 53aea7a383..9670889468 100644 --- a/coordinator/internal/orm/prover_task.go +++ b/coordinator/internal/orm/prover_task.go @@ -161,7 +161,7 @@ func (o *ProverTask) GetTaskOfProver(ctx context.Context, taskType message.Proof var proverTask ProverTask err := db.Find(&proverTask).Error if err != nil { - return nil, fmt.Errorf("ProverTask.GetTaskOfProver error: %w, taskID: %v, publicKey:%s", err, taskID, proverPublicKey) + return nil, fmt.Errorf("ProverTask.GetTaskOfProver error: %w, taskID: %v, publicKey: %s", err, taskID, proverPublicKey) } return &proverTask, nil } From dd3c9e19c935156ee5e424e0377709a12da9690f Mon Sep 17 00:00:00 2001 From: Morty <70688412+yiweichi@users.noreply.github.com> Date: Thu, 9 Jan 2025 17:23:16 +0800 Subject: [PATCH 18/29] Update coordinator/internal/logic/provertask/chunk_prover_task.go Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com> --- coordinator/internal/logic/provertask/chunk_prover_task.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coordinator/internal/logic/provertask/chunk_prover_task.go b/coordinator/internal/logic/provertask/chunk_prover_task.go index 2103bf819e..1f76d1dad2 100644 --- a/coordinator/internal/logic/provertask/chunk_prover_task.go +++ b/coordinator/internal/logic/provertask/chunk_prover_task.go @@ -65,7 +65,7 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) { unassignedChunkCount, getCountError := cp.chunkOrm.GetUnassignedChunkCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight) if getCountError != nil { - log.Error("failed to get unassigned chunk proving tasks count", "height", getTaskParameter.ProverHeight, "err", err) + log.Error("failed to get unassigned chunk proving tasks count", "height", getTaskParameter.ProverHeight, "err", getCountError) return nil, ErrCoordinatorInternalFailure } // Assign external prover if unassigned task number exceeds threshold From b11f010827f39b4a350e96dc6f2071126451f300 Mon Sep 17 00:00:00 2001 From: Morty Date: Mon, 13 Jan 2025 01:30:21 +0800 Subject: [PATCH 19/29] feat: handle could prover name when assign task --- coordinator/internal/controller/api/auth.go | 9 +++-- coordinator/internal/logic/auth/login.go | 6 +++ .../logic/provertask/batch_prover_task.go | 17 ++++---- .../logic/provertask/bundle_prover_task.go | 17 ++++---- .../logic/provertask/chunk_prover_task.go | 18 +++++---- .../internal/logic/provertask/prover_task.go | 15 +++++-- coordinator/internal/orm/prover_task.go | 39 ++++++++++--------- coordinator/internal/types/auth.go | 13 ++++--- coordinator/internal/types/prover.go | 23 +++++++++++ coordinator/internal/utils/prover_name.go | 15 +++++++ 10 files changed, 120 insertions(+), 52 deletions(-) create mode 100644 coordinator/internal/utils/prover_name.go diff --git a/coordinator/internal/controller/api/auth.go b/coordinator/internal/controller/api/auth.go index 07c676bba4..b5ca65eca2 100644 --- a/coordinator/internal/controller/api/auth.go +++ b/coordinator/internal/controller/api/auth.go @@ -70,10 +70,11 @@ func (a *AuthController) PayloadFunc(data interface{}) jwt.MapClaims { } return jwt.MapClaims{ - types.HardForkName: v.HardForkName, - types.PublicKey: v.PublicKey, - types.ProverName: v.Message.ProverName, - types.ProverVersion: v.Message.ProverVersion, + types.HardForkName: v.HardForkName, + types.PublicKey: v.PublicKey, + types.ProverName: v.Message.ProverName, + types.ProverVersion: v.Message.ProverVersion, + types.ProverProviderTypeKey: v.Message.ProverProviderType, } } diff --git a/coordinator/internal/logic/auth/login.go b/coordinator/internal/logic/auth/login.go index cf9d3e4fb4..48513df43c 100644 --- a/coordinator/internal/logic/auth/login.go +++ b/coordinator/internal/logic/auth/login.go @@ -106,6 +106,12 @@ func (l *LoginLogic) Check(login *types.LoginParameter) error { } } } + + if login.Message.ProverProviderType != types.ProverProviderTypeInternal && login.Message.ProverProviderType != types.ProverProviderTypeExternal { + log.Error("invalid prover_provider_type", "value", login.Message.ProverProviderType, "prover name", login.Message.ProverName, "prover_version", login.Message.ProverVersion) + return errors.New("invalid prover provider type.") + } + return nil } diff --git a/coordinator/internal/logic/provertask/batch_prover_task.go b/coordinator/internal/logic/provertask/batch_prover_task.go index 1cc7179788..a44522eccf 100644 --- a/coordinator/internal/logic/provertask/batch_prover_task.go +++ b/coordinator/internal/logic/provertask/batch_prover_task.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "strings" "time" "github.com/gin-gonic/gin" @@ -23,6 +22,7 @@ import ( "scroll-tech/coordinator/internal/config" "scroll-tech/coordinator/internal/orm" coordinatorType "scroll-tech/coordinator/internal/types" + cutils "scroll-tech/coordinator/internal/utils" ) // BatchProverTask is prover task implement for batch proof @@ -64,7 +64,7 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato maxActiveAttempts := bp.cfg.ProverManager.ProversPerSession maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts - if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) { + if taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) { unassignedBatchCount, getCountError := bp.batchOrm.GetUnassignedBatchCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) if getCountError != nil { log.Error("failed to get unassigned batch proving tasks count", "height", getTaskParameter.ProverHeight, "err", err) @@ -102,14 +102,17 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato } // Don't dispatch the same failing job to the same prover - proverTask, getTaskError := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBatchTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) + proverTasks, getTaskError := bp.proverTaskOrm.GetFailedProverTasksByHash(ctx.Copy(), message.ProofTypeBatch, tmpBatchTask.Hash, 2) if getTaskError != nil { - log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBatchTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError) + log.Error("failed to get prover tasks", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBatchTask.Hash, "error", getTaskError) return nil, ErrCoordinatorInternalFailure } - if proverTask != nil && types.ProverProveStatus(proverTask.ProvingStatus) == types.ProverProofInvalid { - log.Debug("get empty batch, the prover already failed this task", "height", getTaskParameter.ProverHeight) - return nil, nil + for i := 0; i < len(proverTasks); i++ { + if proverTasks[i].ProverName == taskCtx.ProverName || + taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) && cutils.IsExternalProverNameMatch(proverTasks[i].ProverName, taskCtx.ProverName) { + log.Debug("get empty batch, the prover already failed this task", "height", getTaskParameter.ProverHeight) + return nil, nil + } } rowsAffected, updateAttemptsErr := bp.batchOrm.UpdateBatchAttempts(ctx.Copy(), tmpBatchTask.Index, tmpBatchTask.ActiveAttempts, tmpBatchTask.TotalAttempts) diff --git a/coordinator/internal/logic/provertask/bundle_prover_task.go b/coordinator/internal/logic/provertask/bundle_prover_task.go index 52d237dc3e..8f6b8601eb 100644 --- a/coordinator/internal/logic/provertask/bundle_prover_task.go +++ b/coordinator/internal/logic/provertask/bundle_prover_task.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "strings" "time" "github.com/gin-gonic/gin" @@ -22,6 +21,7 @@ import ( "scroll-tech/coordinator/internal/config" "scroll-tech/coordinator/internal/orm" coordinatorType "scroll-tech/coordinator/internal/types" + cutils "scroll-tech/coordinator/internal/utils" ) // BundleProverTask is prover task implement for bundle proof @@ -64,7 +64,7 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat maxActiveAttempts := bp.cfg.ProverManager.ProversPerSession maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts - if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) { + if taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) { unassignedBundleCount, getCountError := bp.bundleOrm.GetUnassignedBundleCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) if getCountError != nil { log.Error("failed to get unassigned batch proving tasks count", "height", getTaskParameter.ProverHeight, "err", err) @@ -102,14 +102,17 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat } // Don't dispatch the same failing job to the same prover - proverTask, getTaskError := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBundleTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) + proverTasks, getTaskError := bp.proverTaskOrm.GetFailedProverTasksByHash(ctx.Copy(), message.ProofTypeBundle, tmpBundleTask.Hash, 2) if getTaskError != nil { - log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBundleTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError) + log.Error("failed to get prover tasks", "proof_type", message.ProofTypeBundle.String(), "taskID", tmpBundleTask.Hash, "error", getTaskError) return nil, ErrCoordinatorInternalFailure } - if proverTask != nil && types.ProverProveStatus(proverTask.ProvingStatus) == types.ProverProofInvalid { - log.Debug("get empty bundle, the prover already failed this task", "height", getTaskParameter.ProverHeight) - return nil, nil + for i := 0; i < len(proverTasks); i++ { + if proverTasks[i].ProverName == taskCtx.ProverName || + taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) && cutils.IsExternalProverNameMatch(proverTasks[i].ProverName, taskCtx.ProverName) { + log.Debug("get empty bundle, the prover already failed this task", "height", getTaskParameter.ProverHeight) + return nil, nil + } } rowsAffected, updateAttemptsErr := bp.bundleOrm.UpdateBundleAttempts(ctx.Copy(), tmpBundleTask.Hash, tmpBundleTask.ActiveAttempts, tmpBundleTask.TotalAttempts) diff --git a/coordinator/internal/logic/provertask/chunk_prover_task.go b/coordinator/internal/logic/provertask/chunk_prover_task.go index f076c068fc..a82f3548a3 100644 --- a/coordinator/internal/logic/provertask/chunk_prover_task.go +++ b/coordinator/internal/logic/provertask/chunk_prover_task.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "strings" "time" "github.com/gin-gonic/gin" @@ -22,6 +21,7 @@ import ( "scroll-tech/coordinator/internal/config" "scroll-tech/coordinator/internal/orm" coordinatorType "scroll-tech/coordinator/internal/types" + cutils "scroll-tech/coordinator/internal/utils" ) // ChunkProverTask the chunk prover task @@ -62,7 +62,7 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato maxActiveAttempts := cp.cfg.ProverManager.ProversPerSession maxTotalAttempts := cp.cfg.ProverManager.SessionAttempts - if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) { + if taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) { unassignedChunkCount, getCountError := cp.chunkOrm.GetUnassignedChunkCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight) if getCountError != nil { log.Error("failed to get unassigned chunk proving tasks count", "height", getTaskParameter.ProverHeight, "err", err) @@ -100,15 +100,19 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato } // Don't dispatch the same failing job to the same prover - proverTask, getTaskError := cp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeChunk, tmpChunkTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) + proverTasks, getTaskError := cp.proverTaskOrm.GetFailedProverTasksByHash(ctx.Copy(), message.ProofTypeChunk, tmpChunkTask.Hash, 2) if getTaskError != nil { - log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeChunk.String(), "taskID", tmpChunkTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError) + log.Error("failed to get prover tasks", "proof_type", message.ProofTypeChunk.String(), "taskID", tmpChunkTask.Hash, "error", getTaskError) return nil, ErrCoordinatorInternalFailure } - if proverTask != nil && types.ProverProveStatus(proverTask.ProvingStatus) == types.ProverProofInvalid { - log.Debug("get empty chunk, the prover already failed this task", "height", getTaskParameter.ProverHeight) - return nil, nil + for i := 0; i < len(proverTasks); i++ { + if proverTasks[i].ProverName == taskCtx.ProverName || + taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) && cutils.IsExternalProverNameMatch(proverTasks[i].ProverName, taskCtx.ProverName) { + log.Debug("get empty chunk, the prover already failed this task", "height", getTaskParameter.ProverHeight) + return nil, nil + } } + rowsAffected, updateAttemptsErr := cp.chunkOrm.UpdateChunkAttempts(ctx.Copy(), tmpChunkTask.Index, tmpChunkTask.ActiveAttempts, tmpChunkTask.TotalAttempts) if updateAttemptsErr != nil { log.Error("failed to update chunk attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr) diff --git a/coordinator/internal/logic/provertask/prover_task.go b/coordinator/internal/logic/provertask/prover_task.go index 507f3cad0d..02b90784da 100644 --- a/coordinator/internal/logic/provertask/prover_task.go +++ b/coordinator/internal/logic/provertask/prover_task.go @@ -52,10 +52,11 @@ type BaseProverTask struct { } type proverTaskContext struct { - PublicKey string - ProverName string - ProverVersion string - HardForkNames map[string]struct{} + PublicKey string + ProverName string + ProverVersion string + ProverProviderType uint8 + HardForkNames map[string]struct{} } // checkParameter check the prover task parameter illegal @@ -81,6 +82,12 @@ func (b *BaseProverTask) checkParameter(ctx *gin.Context) (*proverTaskContext, e } ptc.ProverVersion = proverVersion.(string) + ProverProviderType, ProverProviderTypeExist := ctx.Get(coordinatorType.ProverProviderTypeKey) + if !ProverProviderTypeExist { + return nil, errors.New("get prover provider type from context failed") + } + ptc.ProverProviderType = ProverProviderType.(uint8) + hardForkNamesStr, hardForkNameExist := ctx.Get(coordinatorType.HardForkName) if !hardForkNameExist { return nil, errors.New("get hard fork name from context failed") diff --git a/coordinator/internal/orm/prover_task.go b/coordinator/internal/orm/prover_task.go index 53aea7a383..4fc56b2675 100644 --- a/coordinator/internal/orm/prover_task.go +++ b/coordinator/internal/orm/prover_task.go @@ -117,6 +117,27 @@ func (o *ProverTask) GetProverTasksByHashes(ctx context.Context, taskType messag return proverTasks, nil } +// GetFailedProverTasksByHash retrieves the failed ProverTask records associated with the specified hash. +// The returned prover task objects are sorted in descending order by their ids. +func (o *ProverTask) GetFailedProverTasksByHash(ctx context.Context, taskType message.ProofType, hash string, limit int) ([]*ProverTask, error) { + db := o.db.WithContext(ctx) + db = db.Model(&ProverTask{}) + db = db.Where("task_type", int(taskType)) + db = db.Where("task_id ?", hash) + db = db.Where("proving_status = ?", int(types.ProverProofInvalid)) + db = db.Order("id desc") + + if limit != 0 { + db = db.Limit(limit) + } + + var proverTasks []*ProverTask + if err := db.Find(&proverTasks).Error; err != nil { + return nil, fmt.Errorf("ProverTask.GetFailedProverTasksByHash error: %w, hash: %v", err, hash) + } + return proverTasks, nil +} + // GetProverTaskByUUIDAndPublicKey get prover task taskID by uuid and public key func (o *ProverTask) GetProverTaskByUUIDAndPublicKey(ctx context.Context, uuid, publicKey string) (*ProverTask, error) { db := o.db.WithContext(ctx) @@ -148,24 +169,6 @@ func (o *ProverTask) GetAssignedTaskOfOtherProvers(ctx context.Context, taskType return proverTasks, nil } -// GetTaskOfOtherProvers get the chunk/batch task of prover -func (o *ProverTask) GetTaskOfProver(ctx context.Context, taskType message.ProofType, taskID, proverPublicKey, proverVersion string) (*ProverTask, error) { - db := o.db.WithContext(ctx) - db = db.Model(&ProverTask{}) - db = db.Where("task_type", int(taskType)) - db = db.Where("task_id", taskID) - db = db.Where("prover_public_key", proverPublicKey) - db = db.Where("prover_version", proverVersion) - db = db.Limit(1) - - var proverTask ProverTask - err := db.Find(&proverTask).Error - if err != nil { - return nil, fmt.Errorf("ProverTask.GetTaskOfProver error: %w, taskID: %v, publicKey:%s", err, taskID, proverPublicKey) - } - return &proverTask, nil -} - // GetProvingStatusByTaskID retrieves the proving status of a prover task func (o *ProverTask) GetProvingStatusByTaskID(ctx context.Context, taskType message.ProofType, taskID string) (types.ProverProveStatus, error) { db := o.db.WithContext(ctx) diff --git a/coordinator/internal/types/auth.go b/coordinator/internal/types/auth.go index a8b1c669ec..093f4272bf 100644 --- a/coordinator/internal/types/auth.go +++ b/coordinator/internal/types/auth.go @@ -18,6 +18,8 @@ const ( ProverName = "prover_name" // ProverVersion the prover version for context ProverVersion = "prover_version" + // ProverProviderTypeKey the prover provider type for context + ProverProviderTypeKey = "prover_provider_type" // HardForkName the hard fork name for context HardForkName = "hard_fork_name" ) @@ -30,11 +32,12 @@ type LoginSchema struct { // Message the login message struct type Message struct { - Challenge string `form:"challenge" json:"challenge" binding:"required"` - ProverVersion string `form:"prover_version" json:"prover_version" binding:"required"` - ProverName string `form:"prover_name" json:"prover_name" binding:"required"` - ProverTypes []ProverType `form:"prover_types" json:"prover_types"` - VKs []string `form:"vks" json:"vks"` + Challenge string `form:"challenge" json:"challenge" binding:"required"` + ProverVersion string `form:"prover_version" json:"prover_version" binding:"required"` + ProverName string `form:"prover_name" json:"prover_name" binding:"required"` + ProverProviderType ProverProviderType `form:"prover_provider_type" json:"prover_provider_type" binding:"required"` + ProverTypes []ProverType `form:"prover_types" json:"prover_types"` + VKs []string `form:"vks" json:"vks"` } // LoginParameterWithHardForkName constructs new payload for login diff --git a/coordinator/internal/types/prover.go b/coordinator/internal/types/prover.go index 0957755fd7..9de1222eaf 100644 --- a/coordinator/internal/types/prover.go +++ b/coordinator/internal/types/prover.go @@ -40,3 +40,26 @@ func MakeProverType(proofType message.ProofType) ProverType { return ProverTypeUndefined } } + +// ProverProviderType represents the type of prover provider. +type ProverProviderType uint8 + +func (r ProverProviderType) String() string { + switch r { + case ProverProviderTypeInternal: + return "prover provider type internal" + case ProverProviderTypeExternal: + return "prover provider type external" + default: + return fmt.Sprintf("prover provider type type: %d", r) + } +} + +const ( + // ProverProviderTypeUndefined is an unknown prover provider type + ProverProviderTypeUndefined ProverProviderType = iota + // ProverProviderTypeInternal is an internal prover provider type + ProverProviderTypeInternal + // ProverProviderTypeExternal is an external prover provider type + ProverProviderTypeExternal +) diff --git a/coordinator/internal/utils/prover_name.go b/coordinator/internal/utils/prover_name.go new file mode 100644 index 0000000000..fd52919ee6 --- /dev/null +++ b/coordinator/internal/utils/prover_name.go @@ -0,0 +1,15 @@ +package utils + +import "strings" + +func IsExternalProverNameMatch(localName, remoteName string) bool { + local := strings.Split(localName, "_") + remote := strings.Split(remoteName, "_") + + if len(local) < 3 || len(remote) < 3 { + return false + } + + // note the name of cloud prover is in fact in the format of "cloud_prover_{provider-name}_index" + return local[0] == remote[0] && local[1] == remote[1] && local[2] == remote[2] +} From 37deef1700d2bf2e1959892f4390b419e6bf8e67 Mon Sep 17 00:00:00 2001 From: Morty Date: Mon, 13 Jan 2025 02:06:39 +0800 Subject: [PATCH 20/29] fix: test --- coordinator/internal/logic/provertask/prover_task.go | 5 ----- coordinator/test/mock_prover.go | 11 ++++++----- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/coordinator/internal/logic/provertask/prover_task.go b/coordinator/internal/logic/provertask/prover_task.go index 02b90784da..23389251fb 100644 --- a/coordinator/internal/logic/provertask/prover_task.go +++ b/coordinator/internal/logic/provertask/prover_task.go @@ -27,11 +27,6 @@ var ( getTaskCounterVec *prometheus.CounterVec = nil ) -var ( - // ExternalProverNamePrefix prefix of prover name - ExternalProverNamePrefix = "external" -) - // ProverTask the interface of a collector who send data to prover type ProverTask interface { Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (*coordinatorType.GetTaskSchema, error) diff --git a/coordinator/test/mock_prover.go b/coordinator/test/mock_prover.go index 06e97185de..b6f2f1b341 100644 --- a/coordinator/test/mock_prover.go +++ b/coordinator/test/mock_prover.go @@ -79,11 +79,12 @@ func (r *mockProver) challenge(t *testing.T) string { func (r *mockProver) login(t *testing.T, challengeString string, proverTypes []types.ProverType) (string, int, string) { authMsg := types.LoginParameter{ Message: types.Message{ - Challenge: challengeString, - ProverName: r.proverName, - ProverVersion: r.proverVersion, - ProverTypes: proverTypes, - VKs: []string{"mock_vk"}, + Challenge: challengeString, + ProverName: r.proverName, + ProverVersion: r.proverVersion, + ProverProviderType: types.ProverProviderTypeInternal, + ProverTypes: proverTypes, + VKs: []string{"mock_vk"}, }, PublicKey: r.publicKey(), } From 078ac88734beab7ac2e754eb84216b4a1d0ca154 Mon Sep 17 00:00:00 2001 From: Morty Date: Mon, 13 Jan 2025 02:09:24 +0800 Subject: [PATCH 21/29] add jwt IdentityHandler --- coordinator/internal/controller/api/auth.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/coordinator/internal/controller/api/auth.go b/coordinator/internal/controller/api/auth.go index b5ca65eca2..7073d123ab 100644 --- a/coordinator/internal/controller/api/auth.go +++ b/coordinator/internal/controller/api/auth.go @@ -97,5 +97,9 @@ func (a *AuthController) IdentityHandler(c *gin.Context) interface{} { c.Set(types.HardForkName, hardForkName) } + if providerType, ok := claims[types.ProverProviderTypeKey]; ok { + c.Set(types.ProverProviderTypeKey, providerType) + } + return nil } From 27ae17402946f5ec8d37743fb71ab49ff7546956 Mon Sep 17 00:00:00 2001 From: Morty Date: Mon, 13 Jan 2025 02:39:23 +0800 Subject: [PATCH 22/29] fix: test --- coordinator/internal/types/auth_test.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/coordinator/internal/types/auth_test.go b/coordinator/internal/types/auth_test.go index 9287dae3eb..2aa833b678 100644 --- a/coordinator/internal/types/auth_test.go +++ b/coordinator/internal/types/auth_test.go @@ -18,11 +18,12 @@ func TestAuthMessageSignAndVerify(t *testing.T) { t.Run("sign", func(t *testing.T) { authMsg = LoginParameter{ Message: Message{ - ProverName: "test1", - ProverVersion: "v0.0.1", - Challenge: "abcdef", - ProverTypes: []ProverType{ProverTypeBatch}, - VKs: []string{"vk1", "vk2"}, + ProverName: "test1", + ProverVersion: "v0.0.1", + Challenge: "abcdef", + ProverProviderType: ProverProviderTypeInternal, + ProverTypes: []ProverType{ProverTypeBatch}, + VKs: []string{"vk1", "vk2"}, }, PublicKey: publicKeyHex, } @@ -59,11 +60,12 @@ func TestGenerateSignature(t *testing.T) { authMsg := LoginParameter{ Message: Message{ - ProverName: "test", - ProverVersion: "v4.4.45-37af5ef5-38a68e2-1c5093c", - Challenge: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MjQ4Mzg0ODUsIm9yaWdfaWF0IjoxNzI0ODM0ODg1LCJyYW5kb20iOiJ6QmdNZGstNGc4UzNUNTFrVEFsYk1RTXg2TGJ4SUs4czY3ejM2SlNuSFlJPSJ9.x9PvihhNx2w4_OX5uCrv8QJCNYVQkIi-K2k8XFXYmik", - ProverTypes: []ProverType{ProverTypeChunk}, - VKs: []string{"mock_vk"}, + ProverName: "test", + ProverVersion: "v4.4.45-37af5ef5-38a68e2-1c5093c", + Challenge: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MjQ4Mzg0ODUsIm9yaWdfaWF0IjoxNzI0ODM0ODg1LCJyYW5kb20iOiJ6QmdNZGstNGc4UzNUNTFrVEFsYk1RTXg2TGJ4SUs4czY3ejM2SlNuSFlJPSJ9.x9PvihhNx2w4_OX5uCrv8QJCNYVQkIi-K2k8XFXYmik", + ProverProviderType: ProverProviderTypeInternal, + ProverTypes: []ProverType{ProverTypeChunk}, + VKs: []string{"mock_vk"}, }, PublicKey: publicKeyHex, } From 37ac2157486df5c8f5b610b9092006a40d85ece3 Mon Sep 17 00:00:00 2001 From: Morty Date: Mon, 13 Jan 2025 14:08:31 +0800 Subject: [PATCH 23/29] fix: test --- coordinator/internal/logic/provertask/prover_task.go | 2 +- coordinator/internal/orm/prover_task.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/coordinator/internal/logic/provertask/prover_task.go b/coordinator/internal/logic/provertask/prover_task.go index 23389251fb..512e06718d 100644 --- a/coordinator/internal/logic/provertask/prover_task.go +++ b/coordinator/internal/logic/provertask/prover_task.go @@ -81,7 +81,7 @@ func (b *BaseProverTask) checkParameter(ctx *gin.Context) (*proverTaskContext, e if !ProverProviderTypeExist { return nil, errors.New("get prover provider type from context failed") } - ptc.ProverProviderType = ProverProviderType.(uint8) + ptc.ProverProviderType = uint8(ProverProviderType.(float64)) hardForkNamesStr, hardForkNameExist := ctx.Get(coordinatorType.HardForkName) if !hardForkNameExist { diff --git a/coordinator/internal/orm/prover_task.go b/coordinator/internal/orm/prover_task.go index 66e5649fcf..283b92cb6e 100644 --- a/coordinator/internal/orm/prover_task.go +++ b/coordinator/internal/orm/prover_task.go @@ -123,7 +123,7 @@ func (o *ProverTask) GetFailedProverTasksByHash(ctx context.Context, taskType me db := o.db.WithContext(ctx) db = db.Model(&ProverTask{}) db = db.Where("task_type", int(taskType)) - db = db.Where("task_id ?", hash) + db = db.Where("task_id", hash) db = db.Where("proving_status = ?", int(types.ProverProofInvalid)) db = db.Order("id desc") From cc577008ee434861e25c448f26c56683f148e8f2 Mon Sep 17 00:00:00 2001 From: Morty Date: Mon, 13 Jan 2025 14:19:57 +0800 Subject: [PATCH 24/29] fix: typos --- coordinator/internal/logic/provertask/bundle_prover_task.go | 2 +- coordinator/internal/orm/batch.go | 2 +- coordinator/internal/orm/bundle.go | 2 +- coordinator/internal/orm/chunk.go | 2 +- coordinator/internal/orm/prover_task.go | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/coordinator/internal/logic/provertask/bundle_prover_task.go b/coordinator/internal/logic/provertask/bundle_prover_task.go index cdc8505229..7eedd59621 100644 --- a/coordinator/internal/logic/provertask/bundle_prover_task.go +++ b/coordinator/internal/logic/provertask/bundle_prover_task.go @@ -67,7 +67,7 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat if taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) { unassignedBundleCount, getCountError := bp.bundleOrm.GetUnassignedBundleCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) if getCountError != nil { - log.Error("failed to get unassigned batch proving tasks count", "height", getTaskParameter.ProverHeight, "err", getCountError) + log.Error("failed to get unassigned bundle proving tasks count", "height", getTaskParameter.ProverHeight, "err", getCountError) return nil, ErrCoordinatorInternalFailure } // Assign external prover if unassigned task number exceeds threshold diff --git a/coordinator/internal/orm/batch.go b/coordinator/internal/orm/batch.go index 466fbc9f13..336a6e1318 100644 --- a/coordinator/internal/orm/batch.go +++ b/coordinator/internal/orm/batch.go @@ -95,7 +95,7 @@ func (o *Batch) GetUnassignedBatch(ctx context.Context, maxActiveAttempts, maxTo return &batch, nil } -// GetUnassignedBatchCount retrieves unassigned batch count based on the specified limit. +// GetUnassignedBatchCount retrieves unassigned batch count. func (o *Batch) GetUnassignedBatchCount(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (int64, error) { var count int64 db := o.db.WithContext(ctx) diff --git a/coordinator/internal/orm/bundle.go b/coordinator/internal/orm/bundle.go index 3c8a003172..54dc8a91c4 100644 --- a/coordinator/internal/orm/bundle.go +++ b/coordinator/internal/orm/bundle.go @@ -71,7 +71,7 @@ func (o *Bundle) GetUnassignedBundle(ctx context.Context, maxActiveAttempts, max return &bundle, nil } -// GetUnassignedBundleCount retrieves unassigned bundle count based on the specified limit. +// GetUnassignedBundleCount retrieves unassigned bundle count. func (o *Bundle) GetUnassignedBundleCount(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (int64, error) { var count int64 db := o.db.WithContext(ctx) diff --git a/coordinator/internal/orm/chunk.go b/coordinator/internal/orm/chunk.go index 445b894101..dd128c8e41 100644 --- a/coordinator/internal/orm/chunk.go +++ b/coordinator/internal/orm/chunk.go @@ -88,7 +88,7 @@ func (o *Chunk) GetUnassignedChunk(ctx context.Context, maxActiveAttempts, maxTo return &chunk, nil } -// GetUnassignedChunkCount retrieves unassigned chunk count based on the specified limit. +// GetUnassignedChunkCount retrieves unassigned chunk count. func (o *Chunk) GetUnassignedChunkCount(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, height uint64) (int64, error) { var count int64 db := o.db.WithContext(ctx) diff --git a/coordinator/internal/orm/prover_task.go b/coordinator/internal/orm/prover_task.go index 283b92cb6e..3049acbd6c 100644 --- a/coordinator/internal/orm/prover_task.go +++ b/coordinator/internal/orm/prover_task.go @@ -182,7 +182,7 @@ func (o *ProverTask) GetTaskOfProver(ctx context.Context, taskType message.Proof var proverTask ProverTask err := db.Find(&proverTask).Error if err != nil { - return nil, fmt.Errorf("ProverTask.GetTaskOfProver error: %w, taskID: %v, publicKey:%s", err, taskID, proverPublicKey) + return nil, fmt.Errorf("ProverTask.GetTaskOfProver error: %w, taskID: %v, publicKey: %s", err, taskID, proverPublicKey) } return &proverTask, nil } From f725c2eadc75a929a44722e2ba61aa46c6863c12 Mon Sep 17 00:00:00 2001 From: Morty Date: Mon, 13 Jan 2025 17:18:53 +0800 Subject: [PATCH 25/29] fix: comments --- coordinator/internal/logic/auth/login.go | 10 ++++++++-- .../logic/provertask/batch_prover_task.go | 2 +- .../logic/provertask/bundle_prover_task.go | 2 +- .../logic/provertask/chunk_prover_task.go | 2 +- coordinator/internal/orm/bundle.go | 1 + coordinator/internal/orm/prover_task.go | 18 ------------------ coordinator/internal/types/auth.go | 2 +- 7 files changed, 13 insertions(+), 24 deletions(-) diff --git a/coordinator/internal/logic/auth/login.go b/coordinator/internal/logic/auth/login.go index 48513df43c..1b3a38ffbd 100644 --- a/coordinator/internal/logic/auth/login.go +++ b/coordinator/internal/logic/auth/login.go @@ -107,9 +107,15 @@ func (l *LoginLogic) Check(login *types.LoginParameter) error { } } + if login.Message.ProverProviderType != types.ProverProviderTypeInternal && login.Message.ProverProviderType != types.ProverProviderTypeExternal { - log.Error("invalid prover_provider_type", "value", login.Message.ProverProviderType, "prover name", login.Message.ProverName, "prover_version", login.Message.ProverVersion) - return errors.New("invalid prover provider type.") + // for backward compatibility, set ProverProviderType as internal + if login.Message.ProverProviderType == types.ProverProviderTypeUndefined { + login.Message.ProverProviderType = types.ProverProviderTypeInternal + } else { + log.Error("invalid prover_provider_type", "value", login.Message.ProverProviderType, "prover name", login.Message.ProverName, "prover_version", login.Message.ProverVersion) + return errors.New("invalid prover provider type.") + } } return nil diff --git a/coordinator/internal/logic/provertask/batch_prover_task.go b/coordinator/internal/logic/provertask/batch_prover_task.go index bb463be738..1a1eb8c06e 100644 --- a/coordinator/internal/logic/provertask/batch_prover_task.go +++ b/coordinator/internal/logic/provertask/batch_prover_task.go @@ -108,7 +108,7 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato return nil, ErrCoordinatorInternalFailure } for i := 0; i < len(proverTasks); i++ { - if proverTasks[i].ProverName == taskCtx.ProverName || + if proverTasks[i].ProverPublicKey == taskCtx.PublicKey || taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) && cutils.IsExternalProverNameMatch(proverTasks[i].ProverName, taskCtx.ProverName) { log.Debug("get empty batch, the prover already failed this task", "height", getTaskParameter.ProverHeight) return nil, nil diff --git a/coordinator/internal/logic/provertask/bundle_prover_task.go b/coordinator/internal/logic/provertask/bundle_prover_task.go index 7eedd59621..2fdb9ccfa2 100644 --- a/coordinator/internal/logic/provertask/bundle_prover_task.go +++ b/coordinator/internal/logic/provertask/bundle_prover_task.go @@ -108,7 +108,7 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat return nil, ErrCoordinatorInternalFailure } for i := 0; i < len(proverTasks); i++ { - if proverTasks[i].ProverName == taskCtx.ProverName || + if proverTasks[i].ProverPublicKey == taskCtx.PublicKey || taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) && cutils.IsExternalProverNameMatch(proverTasks[i].ProverName, taskCtx.ProverName) { log.Debug("get empty bundle, the prover already failed this task", "height", getTaskParameter.ProverHeight) return nil, nil diff --git a/coordinator/internal/logic/provertask/chunk_prover_task.go b/coordinator/internal/logic/provertask/chunk_prover_task.go index 52e7b33ba2..90084bb6a7 100644 --- a/coordinator/internal/logic/provertask/chunk_prover_task.go +++ b/coordinator/internal/logic/provertask/chunk_prover_task.go @@ -106,7 +106,7 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato return nil, ErrCoordinatorInternalFailure } for i := 0; i < len(proverTasks); i++ { - if proverTasks[i].ProverName == taskCtx.ProverName || + if proverTasks[i].ProverPublicKey == taskCtx.PublicKey || taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) && cutils.IsExternalProverNameMatch(proverTasks[i].ProverName, taskCtx.ProverName) { log.Debug("get empty chunk, the prover already failed this task", "height", getTaskParameter.ProverHeight) return nil, nil diff --git a/coordinator/internal/orm/bundle.go b/coordinator/internal/orm/bundle.go index 54dc8a91c4..3ccd12cb50 100644 --- a/coordinator/internal/orm/bundle.go +++ b/coordinator/internal/orm/bundle.go @@ -79,6 +79,7 @@ func (o *Bundle) GetUnassignedBundleCount(ctx context.Context, maxActiveAttempts db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned)) db = db.Where("total_attempts < ?", maxTotalAttempts) db = db.Where("active_attempts < ?", maxActiveAttempts) + db = db.Where("batch_proofs_status = ?", int(types.BatchProofsStatusReady)) db = db.Where("bundle.deleted_at IS NULL") if err := db.Count(&count).Error; err != nil { return 0, fmt.Errorf("Bundle.GetUnassignedBundleCount error: %w", err) diff --git a/coordinator/internal/orm/prover_task.go b/coordinator/internal/orm/prover_task.go index 3049acbd6c..408592d2c0 100644 --- a/coordinator/internal/orm/prover_task.go +++ b/coordinator/internal/orm/prover_task.go @@ -169,24 +169,6 @@ func (o *ProverTask) GetAssignedTaskOfOtherProvers(ctx context.Context, taskType return proverTasks, nil } -// GetTaskOfOtherProvers get the chunk/batch task of prover -func (o *ProverTask) GetTaskOfProver(ctx context.Context, taskType message.ProofType, taskID, proverPublicKey, proverVersion string) (*ProverTask, error) { - db := o.db.WithContext(ctx) - db = db.Model(&ProverTask{}) - db = db.Where("task_type", int(taskType)) - db = db.Where("task_id", taskID) - db = db.Where("prover_public_key", proverPublicKey) - db = db.Where("prover_version", proverVersion) - db = db.Limit(1) - - var proverTask ProverTask - err := db.Find(&proverTask).Error - if err != nil { - return nil, fmt.Errorf("ProverTask.GetTaskOfProver error: %w, taskID: %v, publicKey: %s", err, taskID, proverPublicKey) - } - return &proverTask, nil -} - // GetProvingStatusByTaskID retrieves the proving status of a prover task func (o *ProverTask) GetProvingStatusByTaskID(ctx context.Context, taskType message.ProofType, taskID string) (types.ProverProveStatus, error) { db := o.db.WithContext(ctx) diff --git a/coordinator/internal/types/auth.go b/coordinator/internal/types/auth.go index 093f4272bf..5acbb19ec4 100644 --- a/coordinator/internal/types/auth.go +++ b/coordinator/internal/types/auth.go @@ -35,7 +35,7 @@ type Message struct { Challenge string `form:"challenge" json:"challenge" binding:"required"` ProverVersion string `form:"prover_version" json:"prover_version" binding:"required"` ProverName string `form:"prover_name" json:"prover_name" binding:"required"` - ProverProviderType ProverProviderType `form:"prover_provider_type" json:"prover_provider_type" binding:"required"` + ProverProviderType ProverProviderType `form:"prover_provider_type" json:"prover_provider_type,omitempty"` ProverTypes []ProverType `form:"prover_types" json:"prover_types"` VKs []string `form:"vks" json:"vks"` } From aae19ab74ea1de89fee9f31c5093d1275accd5fb Mon Sep 17 00:00:00 2001 From: Morty Date: Mon, 13 Jan 2025 17:35:00 +0800 Subject: [PATCH 26/29] fix: lint --- coordinator/internal/logic/auth/login.go | 1 - 1 file changed, 1 deletion(-) diff --git a/coordinator/internal/logic/auth/login.go b/coordinator/internal/logic/auth/login.go index 1b3a38ffbd..a6fab1315b 100644 --- a/coordinator/internal/logic/auth/login.go +++ b/coordinator/internal/logic/auth/login.go @@ -107,7 +107,6 @@ func (l *LoginLogic) Check(login *types.LoginParameter) error { } } - if login.Message.ProverProviderType != types.ProverProviderTypeInternal && login.Message.ProverProviderType != types.ProverProviderTypeExternal { // for backward compatibility, set ProverProviderType as internal if login.Message.ProverProviderType == types.ProverProviderTypeUndefined { From 35f22b4928570a595861e33f6af2530a4941d74b Mon Sep 17 00:00:00 2001 From: Morty Date: Tue, 14 Jan 2025 02:39:41 +0800 Subject: [PATCH 27/29] fix: login verify --- coordinator/internal/types/auth.go | 45 +++++++++++++++++++++++------- go.work.sum | 6 +--- 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/coordinator/internal/types/auth.go b/coordinator/internal/types/auth.go index 5acbb19ec4..6e9021962b 100644 --- a/coordinator/internal/types/auth.go +++ b/coordinator/internal/types/auth.go @@ -30,6 +30,14 @@ type LoginSchema struct { Token string `json:"token"` } +type MessageWithoutProverProviderType struct { + Challenge string `json:"challenge"` + ProverVersion string `json:"prover_version"` + ProverName string `json:"prover_name"` + ProverTypes []ProverType `json:"prover_types"` + VKs []string `json:"vks"` +} + // Message the login message struct type Message struct { Challenge string `form:"challenge" json:"challenge" binding:"required"` @@ -56,7 +64,7 @@ type LoginParameter struct { // SignWithKey auth message with private key and set public key in auth message's Identity func (a *LoginParameter) SignWithKey(priv *ecdsa.PrivateKey) error { // Hash identity content - hash, err := a.Message.Hash() + hash, err := Hash(a.Message) if err != nil { return err } @@ -73,7 +81,14 @@ func (a *LoginParameter) SignWithKey(priv *ecdsa.PrivateKey) error { // Verify verifies the message of auth. func (a *LoginParameter) Verify() (bool, error) { - hash, err := a.Message.Hash() + var hash []byte + var err error + if a.Message.ProverProviderType == ProverProviderTypeUndefined { + // for backward compatibility, calculate hash without ProverProviderType + hash, err = Hash(a.Message.ToMessageWithoutProverProviderType()) + } else { + hash, err = Hash(a.Message) + } if err != nil { return false, err } @@ -88,15 +103,14 @@ func (a *LoginParameter) Verify() (bool, error) { return isValid, nil } -// Hash returns the hash of the auth message, which should be the message used -// to construct the Signature. -func (i *Message) Hash() ([]byte, error) { - byt, err := rlp.EncodeToBytes(i) - if err != nil { - return nil, err +func (m *Message) ToMessageWithoutProverProviderType() MessageWithoutProverProviderType { + return MessageWithoutProverProviderType{ + Challenge: m.Challenge, + ProverVersion: m.ProverVersion, + ProverName: m.ProverName, + ProverTypes: m.ProverTypes, + VKs: m.VKs, } - hash := crypto.Keccak256Hash(byt) - return hash[:], nil } // DecodeAndUnmarshalPubkey decodes a hex-encoded public key and unmarshal it into an ecdsa.PublicKey @@ -114,3 +128,14 @@ func (i *Message) DecodeAndUnmarshalPubkey(pubKeyHex string) (*ecdsa.PublicKey, } return pubKey, nil } + +// Hash returns the hash of the auth message, which should be the message used +// to construct the Signature. +func Hash(i interface{}) ([]byte, error) { + byt, err := rlp.EncodeToBytes(i) + if err != nil { + return nil, err + } + hash := crypto.Keccak256Hash(byt) + return hash[:], nil +} diff --git a/go.work.sum b/go.work.sum index 68035f0823..29e37dcc28 100644 --- a/go.work.sum +++ b/go.work.sum @@ -448,8 +448,6 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdko github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06/go.mod h1:7erjKLwalezA0k99cWs5L11HWOAPNjdUZ6RxH1BXbbM= github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 h1:fLjPD/aNc3UIOA6tDi6QXUemppXK3P9BI7mr2hd6gx8= github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= -github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI= -github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI= github.com/aclements/go-moremath v0.0.0-20210112150236-f10218a38794/go.mod h1:7e+I0LQFUI9AXWxOfsQROs9xPhoJtbsyWcjJqDd4KPY= github.com/aead/siphash v1.0.1 h1:FwHfE/T45KPKYuuSAKyyvE+oPWcaQ+CUmFW0bPlM+kg= github.com/agext/levenshtein v1.2.3 h1:YB2fHEn0UJagG8T1rrWknE3ZQzWM06O8AMAatNn7lmo= @@ -862,6 +860,7 @@ github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl/v2 v2.19.1 h1://i05Jqznmb2EXqa39Nsvyan2o5XyMowW5fnCKW5RPI= github.com/hashicorp/hcl/v2 v2.19.1/go.mod h1:ThLC89FV4p9MPW804KVbe/cEXoQ8NZEh+JtMeeGErHE= @@ -1142,8 +1141,6 @@ github.com/scroll-tech/da-codec v0.1.1-0.20241014152913-2703f226fb0b/go.mod h1:4 github.com/scroll-tech/go-ethereum v1.10.14-0.20240607130425-e2becce6a1a4/go.mod h1:byf/mZ8jLYUCnUePTicjJWn+RvKdxDn7buS6glTnMwQ= github.com/scroll-tech/go-ethereum v1.10.14-0.20240821074444-b3fa00861e5e/go.mod h1:swB5NSp8pKNDuYsTxfR08bHS6L56i119PBx8fxvV8Cs= github.com/scroll-tech/go-ethereum v1.10.14-0.20241010064814-3d88e870ae22/go.mod h1:r9FwtxCtybMkTbWYCyBuevT9TW3zHmOTHqD082Uh+Oo= -github.com/scroll-tech/go-ethereum v1.10.14-0.20241023093931-91c2f9c27f4d h1:vuv7fGKEDtoeetI6RkKt8RAByJsYZBWk9Vo6gShv65c= -github.com/scroll-tech/go-ethereum v1.10.14-0.20241023093931-91c2f9c27f4d/go.mod h1:PWEOTg6LeWlJAlFJauO0msSLXWnpHmE+mVh5txtfeRM= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= github.com/segmentio/kafka-go v0.2.0 h1:HtCSf6B4gN/87yc5qTl7WsxPKQIIGXLPPM1bMCPOsoY= @@ -1439,7 +1436,6 @@ golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= From 1f05c6ee15c9da536b896fa73fe61cb3aae42e8f Mon Sep 17 00:00:00 2001 From: Morty <70688412+yiweichi@users.noreply.github.com> Date: Tue, 14 Jan 2025 14:33:23 +0800 Subject: [PATCH 28/29] Apply suggestions from code review Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com> --- coordinator/internal/logic/auth/login.go | 2 +- coordinator/internal/types/prover.go | 2 +- coordinator/internal/utils/prover_name.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/coordinator/internal/logic/auth/login.go b/coordinator/internal/logic/auth/login.go index a6fab1315b..41e7dce56d 100644 --- a/coordinator/internal/logic/auth/login.go +++ b/coordinator/internal/logic/auth/login.go @@ -112,7 +112,7 @@ func (l *LoginLogic) Check(login *types.LoginParameter) error { if login.Message.ProverProviderType == types.ProverProviderTypeUndefined { login.Message.ProverProviderType = types.ProverProviderTypeInternal } else { - log.Error("invalid prover_provider_type", "value", login.Message.ProverProviderType, "prover name", login.Message.ProverName, "prover_version", login.Message.ProverVersion) + log.Error("invalid prover_provider_type", "value", login.Message.ProverProviderType, "prover name", login.Message.ProverName, "prover version", login.Message.ProverVersion) return errors.New("invalid prover provider type.") } } diff --git a/coordinator/internal/types/prover.go b/coordinator/internal/types/prover.go index 9de1222eaf..219d63a58e 100644 --- a/coordinator/internal/types/prover.go +++ b/coordinator/internal/types/prover.go @@ -51,7 +51,7 @@ func (r ProverProviderType) String() string { case ProverProviderTypeExternal: return "prover provider type external" default: - return fmt.Sprintf("prover provider type type: %d", r) + return fmt.Sprintf("prover provider type: %d", r) } } diff --git a/coordinator/internal/utils/prover_name.go b/coordinator/internal/utils/prover_name.go index fd52919ee6..ecca948bf3 100644 --- a/coordinator/internal/utils/prover_name.go +++ b/coordinator/internal/utils/prover_name.go @@ -10,6 +10,6 @@ func IsExternalProverNameMatch(localName, remoteName string) bool { return false } - // note the name of cloud prover is in fact in the format of "cloud_prover_{provider-name}_index" + // note the name of cloud prover is in the format of "cloud_prover_{provider-name}_index" return local[0] == remote[0] && local[1] == remote[1] && local[2] == remote[2] } From 1e63677c54d903e124807317b2f822a6b3a37d0b Mon Sep 17 00:00:00 2001 From: Morty Date: Tue, 14 Jan 2025 21:27:53 +0800 Subject: [PATCH 29/29] add comments for IsExternalProverNameMatch --- coordinator/internal/utils/prover_name.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/coordinator/internal/utils/prover_name.go b/coordinator/internal/utils/prover_name.go index ecca948bf3..d07b120774 100644 --- a/coordinator/internal/utils/prover_name.go +++ b/coordinator/internal/utils/prover_name.go @@ -2,6 +2,8 @@ package utils import "strings" +// IsExternalProverNameMatch checks if the local and remote external prover names belong to the same provider. +// It returns true if they do, otherwise false. func IsExternalProverNameMatch(localName, remoteName string) bool { local := strings.Split(localName, "_") remote := strings.Split(remoteName, "_")