Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow slow when the count of the execution point more and more #1028

Open
qazq opened this issue Apr 8, 2022 · 5 comments
Open

Workflow slow when the count of the execution point more and more #1028

qazq opened this issue Apr 8, 2022 · 5 comments
Assignees

Comments

@qazq
Copy link

qazq commented Apr 8, 2022

Hi,

I use workflow-core to control the manufacturing process. One product might spend 1-2 months so that the number of execution points goes to almost 3000. In this case, executing one step will take a lot of time (1-2 seconds). I use sample.10 to reproduce this issue.

    public class WhileWorkflow : IWorkflow<MyData>
    {
        public string Id => "While";
        public int Version => 1;

        public void Build(IWorkflowBuilder<MyData> builder)
        {
            builder
                .StartWith<SayHello>()
                .While(data => data.Counter < 3000)  // increase to 3000 times
                    .Do(x => x
                        .StartWith<DoSomething>()
                        .Then<IncrementStep>()
                            .Input(step => step.Value1, data => data.Counter)
                            .Output(data => data.Counter, step => step.Value2))
                .Then<SayGoodbye>();
        }
    }

add debug message

EntityFrameworkPersistenceProvider

public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default)
{
    long t1 = 0, t2 = 0, t3 = 0, t4 = 0;
    var watch = Stopwatch.StartNew();

    using (var db = ConstructDbContext())
    {
        t1 = watch.ElapsedMilliseconds;
        watch.Restart();

        var uid = new Guid(workflow.Id);
        var existingEntity = await db.Set<PersistedWorkflow>()
            .Where(x => x.InstanceId == uid)
            .Include(wf => wf.ExecutionPointers)
            .ThenInclude(ep => ep.ExtensionAttributes)
            .Include(wf => wf.ExecutionPointers)
            .AsTracking()
            .FirstAsync(cancellationToken);
        t2 = watch.ElapsedMilliseconds;
        watch.Restart();

        var persistable = workflow.ToPersistable(existingEntity);
        t3 = watch.ElapsedMilliseconds;
        watch.Restart();

        await db.SaveChangesAsync(cancellationToken);
        t4 = watch.ElapsedMilliseconds;
        watch.Restart();
    }

    Console.WriteLine($"\n\n  PersistWorkflow >>> p1={t1}, p2={t2}, p3={t3}, p4={t4}\n\n");
}

WorkflowConsumer

        protected override async Task ProcessItem(string itemId, CancellationToken cancellationToken)
        {
            if (!await _lockProvider.AcquireLock(itemId, cancellationToken))
            {
                Logger.LogInformation("Workflow locked {0}", itemId);
                return;
            }

            WorkflowInstance workflow = null;
            WorkflowExecutorResult result = null;

            long t1, t2, t3, t4, t5;
            t1 = t2 = t3 = t4 = t5 = 0;
            var watch = Stopwatch.StartNew();

            Logger.LogDebug("ProcessItem >>> step.1");
            try
            {
                cancellationToken.ThrowIfCancellationRequested();
                workflow = await _persistenceStore.GetWorkflowInstance(itemId, cancellationToken);
                t1 = watch.ElapsedMilliseconds;
                watch.Restart();
                if (workflow.Status == WorkflowStatus.Runnable)
                {
                    try
                    {
                        result = await _executor.Execute(workflow, cancellationToken);
                        t2 = watch.ElapsedMilliseconds;
                        watch.Restart();
                    }
                    finally
                    {
                        await _persistenceStore.PersistWorkflow(workflow, cancellationToken);
                        t3 = watch.ElapsedMilliseconds;
                        watch.Restart();
                        await QueueProvider.QueueWork(itemId, QueueType.Index);
                        t4 = watch.ElapsedMilliseconds;
                        watch.Restart();
                        _greylist.Remove($"wf:{itemId}");
                    }
                }
            }
            finally
            {
                await _lockProvider.ReleaseLock(itemId);
                if ((workflow != null) && (result != null))
                {
                    foreach (var sub in result.Subscriptions)
                    {
                        await SubscribeEvent(sub, _persistenceStore, cancellationToken);
                    }

                    await _persistenceStore.PersistErrors(result.Errors, cancellationToken);

                    if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue)
                    {
                        var readAheadTicks = _datetimeProvider.UtcNow.Add(Options.PollInterval).Ticks;
                        if (workflow.NextExecution.Value < readAheadTicks)
                        {
                            new Task(() => FutureQueue(workflow, cancellationToken)).Start();
                        }
                        else
                        {
                            if (_persistenceStore.SupportsScheduledCommands)
                            {
                                await _persistenceStore.ScheduleCommand(new ScheduledCommand()
                                {
                                    CommandName = ScheduledCommand.ProcessWorkflow,
                                    Data = workflow.Id,
                                    ExecuteTime = workflow.NextExecution.Value
                                });
                            }
                        }
                    }
                }
                t5 = watch.ElapsedMilliseconds;
                watch.Restart();
            }

            Logger.LogDebug($"ProcessItem >>> t1={t1}, t2={t2}, t3={t3}, t4={t4}, t5={t5}");
        }
[11:02:34.106] info: Microsoft.EntityFrameworkCore.Infrastructure[10403]
      Entity Framework Core 5.0.1 initialized 'SqlServerContext' using provider 'Microsoft.EntityFrameworkCore.SqlServer' with options: NoTracking
dbug: WorkflowCore.Services.BackgroundTasks.WorkflowConsumer[0]
      ProcessItem >>> step.1
info: Microsoft.EntityFrameworkCore.Infrastructure[10403]
      Entity Framework Core 5.0.1 initialized 'SqlServerContext' using provider 'Microsoft.EntityFrameworkCore.SqlServer' with options: NoTracking
info: Microsoft.EntityFrameworkCore.Database.Command[20101]
      Executed DbCommand (14ms) [Parameters=[@__uid_0='?' (DbType = Guid)], CommandType='Text', CommandTimeout='30']
      SELECT [t].[PersistenceId], [t].[CompleteTime], [t].[CreateTime], [t].[Data], [t].[Description], [t].[InstanceId], [t].[NextExecution], [t].[Reference], [t].[Status], [t].[Version], [t].[WorkflowDefinitionId], [t0].[PersistenceId], [t0].[Active], [t0].[Children], [t0].[ContextItem], [t0].[EndTime], [t0].[EventData], [t0].[EventKey], [t0].[EventName], [t0].[EventPublished], [t0].[Id], [t0].[Outcome], [t0].[PersistenceData], [t0].[PredecessorId], [t0].[RetryCount], [t0].[Scope], [t0].[SleepUntil], [t0].[StartTime], [t0].[Status], [t0].[StepId], [t0].[StepName], [t0].[WorkflowId], [t0].[PersistenceId0], [t0].[AttributeKey], [t0].[AttributeValue], [t0].[ExecutionPointerId]
      FROM (
          SELECT TOP(1) [w].[PersistenceId], [w].[CompleteTime], [w].[CreateTime], [w].[Data], [w].[Description], [w].[InstanceId], [w].[NextExecution], [w].[Reference], [w].[Status], [w].[Version], [w].[WorkflowDefinitionId]
          FROM [wfc].[Workflow] AS [w]
          WHERE [w].[InstanceId] = @__uid_0
      ) AS [t]
      LEFT JOIN (
          SELECT [e].[PersistenceId], [e].[Active], [e].[Children], [e].[ContextItem], [e].[EndTime], [e].[EventData], [e].[EventKey], [e].[EventName], [e].[EventPublished], [e].[Id], [e].[Outcome], [e].[PersistenceData], [e].[PredecessorId], [e].[RetryCount], [e].[Scope], [e].[SleepUntil], [e].[StartTime], [e].[Status], [e].[StepId], [e].[StepName], [e].[WorkflowId], [e0].[PersistenceId] AS [PersistenceId0], [e0].[AttributeKey], [e0].[AttributeValue], [e0].[ExecutionPointerId]
          FROM [wfc].[ExecutionPointer] AS [e]
          LEFT JOIN [wfc].[ExtensionAttribute] AS [e0] ON [e].[PersistenceId] = [e0].[ExecutionPointerId]
      ) AS [t0] ON [t].[PersistenceId] = [t0].[WorkflowId]
      ORDER BY [t].[PersistenceId], [t0].[PersistenceId], [t0].[PersistenceId0]
info: Microsoft.EntityFrameworkCore.Database.Command[20101]
      Executed DbCommand (15ms) [Parameters=[@__uid_0='?' (DbType = Guid)], CommandType='Text', CommandTimeout='30']
      SELECT [t].[PersistenceId], [t].[CompleteTime], [t].[CreateTime], [t].[Data], [t].[Description], [t].[InstanceId], [t].[NextExecution], [t].[Reference], [t].[Status], [t].[Version], [t].[WorkflowDefinitionId], [t0].[PersistenceId], [t0].[Active], [t0].[Children], [t0].[ContextItem], [t0].[EndTime], [t0].[EventData], [t0].[EventKey], [t0].[EventName], [t0].[EventPublished], [t0].[Id], [t0].[Outcome], [t0].[PersistenceData], [t0].[PredecessorId], [t0].[RetryCount], [t0].[Scope], [t0].[SleepUntil], [t0].[StartTime], [t0].[Status], [t0].[StepId], [t0].[StepName], [t0].[WorkflowId], [t0].[PersistenceId0], [t0].[AttributeKey], [t0].[AttributeValue], [t0].[ExecutionPointerId]
      FROM (
          SELECT TOP(1) [w].[PersistenceId], [w].[CompleteTime], [w].[CreateTime], [w].[Data], [w].[Description], [w].[InstanceId], [w].[NextExecution], [w].[Reference], [w].[Status], [w].[Version], [w].[WorkflowDefinitionId]
          FROM [wfc].[Workflow] AS [w]
          WHERE [w].[InstanceId] = @__uid_0
      ) AS [t]
      LEFT JOIN (
          SELECT [e].[PersistenceId], [e].[Active], [e].[Children], [e].[ContextItem], [e].[EndTime], [e].[EventData], [e].[EventKey], [e].[EventName], [e].[EventPublished], [e].[Id], [e].[Outcome], [e].[PersistenceData], [e].[PredecessorId], [e].[RetryCount], [e].[Scope], [e].[SleepUntil], [e].[StartTime], [e].[Status], [e].[StepId], [e].[StepName], [e].[WorkflowId], [e0].[PersistenceId] AS [PersistenceId0], [e0].[AttributeKey], [e0].[AttributeValue], [e0].[ExecutionPointerId]
          FROM [wfc].[ExecutionPointer] AS [e]
          LEFT JOIN [wfc].[ExtensionAttribute] AS [e0] ON [e].[PersistenceId] = [e0].[ExecutionPointerId]
      ) AS [t0] ON [t].[PersistenceId] = [t0].[WorkflowId]
      ORDER BY [t].[PersistenceId], [t0].[PersistenceId], [t0].[PersistenceId0]
[11:02:34.310] dbug: WorkflowCore.Services.WorkflowExecutor[0]
      Starting step (null) on workflow c3ec1e32-c549-490b-81d2-94f2250781bf
info: Microsoft.EntityFrameworkCore.Infrastructure[10403]
      Entity Framework Core 5.0.1 initialized 'SqlServerContext' using provider 'Microsoft.EntityFrameworkCore.SqlServer' with options: NoTracking
info: Microsoft.EntityFrameworkCore.Database.Command[20101]
      Executed DbCommand (12ms) [Parameters=[@__uid_0='?' (DbType = Guid)], CommandType='Text', CommandTimeout='30']
      SELECT [t].[PersistenceId], [t].[CompleteTime], [t].[CreateTime], [t].[Data], [t].[Description], [t].[InstanceId], [t].[NextExecution], [t].[Reference], [t].[Status], [t].[Version], [t].[WorkflowDefinitionId], [t0].[PersistenceId], [t0].[Active], [t0].[Children], [t0].[ContextItem], [t0].[EndTime], [t0].[EventData], [t0].[EventKey], [t0].[EventName], [t0].[EventPublished], [t0].[Id], [t0].[Outcome], [t0].[PersistenceData], [t0].[PredecessorId], [t0].[RetryCount], [t0].[Scope], [t0].[SleepUntil], [t0].[StartTime], [t0].[Status], [t0].[StepId], [t0].[StepName], [t0].[WorkflowId], [t0].[PersistenceId0], [t0].[AttributeKey], [t0].[AttributeValue], [t0].[ExecutionPointerId]
      FROM (
          SELECT TOP(1) [w].[PersistenceId], [w].[CompleteTime], [w].[CreateTime], [w].[Data], [w].[Description], [w].[InstanceId], [w].[NextExecution], [w].[Reference], [w].[Status], [w].[Version], [w].[WorkflowDefinitionId]
          FROM [wfc].[Workflow] AS [w]
          WHERE [w].[InstanceId] = @__uid_0
      ) AS [t]
      LEFT JOIN (
          SELECT [e].[PersistenceId], [e].[Active], [e].[Children], [e].[ContextItem], [e].[EndTime], [e].[EventData], [e].[EventKey], [e].[EventName], [e].[EventPublished], [e].[Id], [e].[Outcome], [e].[PersistenceData], [e].[PredecessorId], [e].[RetryCount], [e].[Scope], [e].[SleepUntil], [e].[StartTime], [e].[Status], [e].[StepId], [e].[StepName], [e].[WorkflowId], [e0].[PersistenceId] AS [PersistenceId0], [e0].[AttributeKey], [e0].[AttributeValue], [e0].[ExecutionPointerId]
          FROM [wfc].[ExecutionPointer] AS [e]
          LEFT JOIN [wfc].[ExtensionAttribute] AS [e0] ON [e].[PersistenceId] = [e0].[ExecutionPointerId]
      ) AS [t0] ON [t].[PersistenceId] = [t0].[WorkflowId]
      ORDER BY [t].[PersistenceId], [t0].[PersistenceId], [t0].[PersistenceId0]
info: Microsoft.EntityFrameworkCore.Database.Command[20101]
      Executed DbCommand (10ms) [Parameters=[@p2='?' (DbType = Int64), @p0='?' (Size = -1), @p1='?' (Size = 4000)], CommandType='Text', CommandTimeout='30']
      SET NOCOUNT ON;
      UPDATE [wfc].[ExecutionPointer] SET [Children] = @p0, [PersistenceData] = @p1
      WHERE [PersistenceId] = @p2;
      SELECT @@ROWCOUNT;
info: Microsoft.EntityFrameworkCore.Database.Command[20101]
      Executed DbCommand (5ms) [Parameters=[@p0='?' (DbType = Boolean), @p1='?' (Size = 4000), @p2='?' (Size = 4000), @p3='?' (DbType = DateTime2), @p4='?' (Size = 4000), @p5='?' (Size = 100), @p6='?' (Size = 100), @p7='?' (DbType = Boolean), @p8='?' (Size = 50), @p9='?' (Size = 4000), @p10='?' (Size = 4000), @p11='?' (Size = 100), @p12='?' (DbType = Int32), @p13='?' (Size = 4000), @p14='?' (DbType = DateTime2), @p15='?' (DbType = DateTime2), @p16='?' (DbType = Int32), @p17='?' (DbType = Int32), @p18='?' (Size = 100), @p19='?' (DbType = Int64)], CommandType='Text', CommandTimeout='30']
      SET NOCOUNT ON;
      INSERT INTO [wfc].[ExecutionPointer] ([Active], [Children], [ContextItem], [EndTime], [EventData], [EventKey], [EventName], [EventPublished], [Id], [Outcome], [PersistenceData], [PredecessorId], [RetryCount], [Scope], [SleepUntil], [StartTime], [Status], [StepId], [StepName], [WorkflowId])
      VALUES (@p0, @p1, @p2, @p3, @p4, @p5, @p6, @p7, @p8, @p9, @p10, @p11, @p12, @p13, @p14, @p15, @p16, @p17, @p18, @p19);
      SELECT [PersistenceId]
      FROM [wfc].[ExecutionPointer]
      WHERE @@ROWCOUNT = 1 AND [PersistenceId] = scope_identity();


  PersistWorkflow >>> p1=0, p2=123, p3=21, p4=41

dbug: WorkflowCore.Services.BackgroundTasks.WorkflowConsumer[0]
      ProcessItem >>> t1=219, t2=1, t3=186, t4=0, t5=0

ProcessItem t1 is time to get the workflow instance from the database. It spend a lot of time because there are so many execution points.

ProcessItem t3 is time to persist the workflow. The detailed time span is shown in the PersistWorkflow p1~p4. p2 is time to query workflow from the database, p3 is ToPersistable(), p4 is SaveChangesAsync()

There are 1595 execution points in the database, and the number of children is large (string length is 29489).

image

Maybe we can optimize

  • Reduce the number of execution points when querying from the database. (Only return active execution point?)
  • To prevent Children growth. (I'm not sure about the Children purpose, but it looks like can use Scope instead of it?)

Thanks!

@danielgerlag danielgerlag self-assigned this Apr 12, 2022
@dthemg
Copy link

dthemg commented May 4, 2022

Hi! Me and my team are experiencing similar issues, where the execution becomes slower and slower as the number of executionPointers increase. In my application each evaluation, shown on the x-axis, adds another 10 or so execution pointers:
image

As you can see, the application steadily slows down as the number of execution pointers increase.

@dthemg
Copy link

dthemg commented May 5, 2022

Update: I changed my persistence provder to remove "Cancelled" and "Completed" steps. After doing this, continuing the test above showed better behavior:
image

Changes:

 public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default)
        {
           // Start of change
            var cancelledPointers = workflow.ExecutionPointers.FindByStatus(PointerStatus.Cancelled);
            var completedPointers = workflow.ExecutionPointers.FindByStatus(PointerStatus.Complete);
            foreach (var pointer in cancelledPointers)
                workflow.ExecutionPointers.Remove(pointer);
            foreach (var pointer in completedPointers)
                workflow.ExecutionPointers.Remove(pointer);
            // End of change

            await WorkflowInstances.ReplaceOneAsync(x => x.Id == workflow.Id, workflow, cancellationToken: cancellationToken);
        }

@vladimir-kovalyuk
Copy link

vladimir-kovalyuk commented May 19, 2022

@dthemg This issue is well known. I commented in a couple of earlier discussions. Retrieving active EPs only is a way around the problem with iterative loops. But what about parallel loops?

@LingDian2019
Copy link

LingDian2019 commented May 17, 2024

I have also encountered this problem, the jump between nodes is getting slower and slower. After restarting the program, it works. How can I solve the problem?

Stress test the same process as follows

There are three nodes in the process, the second of which pauses for 3 seconds and then throws an exception

public class ConveyorBeginJob : WorkflowStepBody
{
    public override async Task<ExecutionResult> RunAsync(IStepExecutionContext context)
    {
        PassingData.ExecutionStartTime = DateTime.Now;
        return await Task.FromResult(ExecutionResult.Next());
    }
}
public class ThrowExceptionJobV1 : WorkflowStepBody
{
     public override async Task<ExecutionResult> RunAsync(IStepExecutionContext context)
     {
         try
         {
             await Task.Delay(TimeSpan.FromSeconds(3));
             throw new Exception("My Exception");
         }
         catch (Exception)
         {
             throw;
         }

         return await Task.FromResult(ExecutionResult.Next());
     }
}
public class ConveyorEndJob : WorkflowStepBody
{
     public override async Task<ExecutionResult> RunAsync(IStepExecutionContext context)
     {
         PassingData.ExecutionEndTime = DateTime.Now;
         return await Task.FromResult(ExecutionResult.Next());
     }
}
public class WorkflowErrorCommonHandler : IWorkflowErrorHandler
{
    public WorkflowErrorHandling Type => WorkflowErrorHandling.Terminate;

    public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step, Exception exception, Queue<ExecutionPointer> bubbleUpQueue)
    {
        try
        {
            var workflowStepPassingData = (WorkflowStepPassingData)workflow.Data;
            workflowStepPassingData.EndTime = DateTime.Now;
           
            foreach (var stepItem in workflow.ExecutionPointers)
            {
                if (!string.IsNullOrWhiteSpace(stepItem.StepName) && stepItem.StartTime != null)
                {
                    long stepTotalMilliseconds = -1;
                    if (stepItem.EndTime != null)
                    {
                        stepTotalMilliseconds = stepItem.EndTime.ToTotalMilliseconds(stepItem.StartTime);
                    }

                    var jobElapsedTimeName = $"({stepItem.StepName})-({stepItem.Status})-({stepItem.StartTime?.ToLocalTime().ToString("yyyy-MM-dd HH:mm:ss.fff")})-({stepItem.EndTime?.ToLocalTime().ToString("yyyy-MM-dd HH:mm:ss.fff")})";
                    workflowStepPassingData.JobElapsedTime.TryAdd(jobElapsedTimeName, stepTotalMilliseconds);
                }
            }

          //Logging workflow
            
        }
        catch (Exception ex)
        {
            _log4NetService.WriteLog(LogNameKey.SystemError, ex);
        }
    }
}

At 500 concurrency, process execution takes a long time
1、The conclusion is that the node jump takes a long time
2、It takes a long time to enter the IWorkflowErrorHandler

{
  "id": "7199437576333133140",
  "createTime": "2024-05-23 15:54:37.279",
  "executionStartTime": "2024-05-23 15:54:44.796",
  "executionEndTime": null,
  "endTime": "2024-05-23 15:55:36.984",
  "jobElapsedTime": {
    "(ConveyorBeginJob)-(Complete)-(2024-05-23 15:54:44.796)-(2024-05-23 15:54:44.796)": "0",
    "(ThrowExceptionJobV1)-(Failed)-(2024-05-23 15:54:47.174)-()": "-1"
  },
  "condition": {},
  "data": {}
}
{
  "id": "7199459854026519216",
  "equipmentCode": "1112",
  "createTime": "2024-05-23 17:23:08.695",
  "executionStartTime": "2024-05-23 17:23:41.791",
  "executionEndTime": null,
  "endTime": "2024-05-23 17:26:25.098",
  "jobElapsedTime": {
    "(ConveyorBeginJob)-(Complete)-(2024-05-23 17:23:41.791)-(2024-05-23 17:23:41.791)": "0",
    "(ThrowExceptionJobV1)-(Failed)-(2024-05-23 17:24:37.686)-()": "-1"
  },
  "condition": {},
  "data": {}
}

@q913777031
Copy link

I don't have persistence needs, can I use in-memory mode to clear terminated state workflows to prevent memory leaks and improve performance

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants