From 1ec7c0f904ac006dc94e688990a1da23e75cce50 Mon Sep 17 00:00:00 2001 From: Mohammad Rahhal Date: Sat, 25 Nov 2017 18:03:18 +0200 Subject: [PATCH 1/9] Initial impl of postgresql adapter --- .../MR.AspNetCore.Jobs.EFHost.csproj | 1 + .../JobsDbContextFactory.cs | 31 +++++++++++++ .../LoggerExtensions.cs | 6 +++ .../MR.AspNetCore.Jobs.PostgreSQL.csproj | 29 +++++++++++++ .../Models/JobsDbContext.cs | 21 +++++++++ .../PostgreSQLBootstrapper.cs | 19 ++++++++ .../PostgreSQLJobsOptionsExtension.cs | 43 +++++++++++++++++++ .../PostgreSQLJobsOptionsExtensions.cs | 25 +++++++++++ .../PostgreSQLOptions.cs | 6 +++ .../PostgreSQLStorage.cs | 16 +++++++ .../PostgreSQLStorageConnection.cs | 39 +++++++++++++++++ .../PostgreSQLStorageTransaction.cs | 12 ++++++ .../Server/ExpirationManager.cs | 26 +++++++++++ 13 files changed, 274 insertions(+) create mode 100644 src/MR.AspNetCore.Jobs.PostgreSQL/JobsDbContextFactory.cs create mode 100644 src/MR.AspNetCore.Jobs.PostgreSQL/LoggerExtensions.cs create mode 100644 src/MR.AspNetCore.Jobs.PostgreSQL/MR.AspNetCore.Jobs.PostgreSQL.csproj create mode 100644 src/MR.AspNetCore.Jobs.PostgreSQL/Models/JobsDbContext.cs create mode 100644 src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLBootstrapper.cs create mode 100644 src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLJobsOptionsExtension.cs create mode 100644 src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLJobsOptionsExtensions.cs create mode 100644 src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLOptions.cs create mode 100644 src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorage.cs create mode 100644 src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageConnection.cs create mode 100644 src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageTransaction.cs create mode 100644 src/MR.AspNetCore.Jobs.PostgreSQL/Server/ExpirationManager.cs diff --git a/host/MR.AspNetCore.Jobs.EFHost/MR.AspNetCore.Jobs.EFHost.csproj b/host/MR.AspNetCore.Jobs.EFHost/MR.AspNetCore.Jobs.EFHost.csproj index baa493a..34c09cc 100644 --- a/host/MR.AspNetCore.Jobs.EFHost/MR.AspNetCore.Jobs.EFHost.csproj +++ b/host/MR.AspNetCore.Jobs.EFHost/MR.AspNetCore.Jobs.EFHost.csproj @@ -7,6 +7,7 @@ + diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/JobsDbContextFactory.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/JobsDbContextFactory.cs new file mode 100644 index 0000000..7a39eb0 --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/JobsDbContextFactory.cs @@ -0,0 +1,31 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Design; +using Microsoft.Extensions.DependencyInjection; +using MR.AspNetCore.Jobs.Models; + +namespace MR.AspNetCore.Jobs +{ + public class JobsDbContextFactory : IDesignTimeDbContextFactory + { + internal static string DevConnectionString = + @"Server=.\sqlexpress;Database=MR.AspNetCore.Jobs.Dev;Trusted_Connection=True;"; + + public JobsDbContext CreateDbContext(string[] args) + { + var services = new ServiceCollection(); + + services.AddSingleton(new PostgreSQLOptions()); + services.AddDbContext(opts => + { + opts.UseNpgsql(DevConnectionString, sqlOpts => + { + sqlOpts.MigrationsHistoryTable( + EFCoreOptions.DefaultMigrationsHistoryTableName, + EFCoreOptions.DefaultSchema); + }); + }); + + return services.BuildServiceProvider().GetRequiredService(); + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/LoggerExtensions.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/LoggerExtensions.cs new file mode 100644 index 0000000..ca3f500 --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/LoggerExtensions.cs @@ -0,0 +1,6 @@ +namespace MR.AspNetCore.Jobs +{ + internal static class LoggerExtensions + { + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/MR.AspNetCore.Jobs.PostgreSQL.csproj b/src/MR.AspNetCore.Jobs.PostgreSQL/MR.AspNetCore.Jobs.PostgreSQL.csproj new file mode 100644 index 0000000..353935a --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/MR.AspNetCore.Jobs.PostgreSQL.csproj @@ -0,0 +1,29 @@ + + + + + + netstandard2.0 + MR.AspNetCore.Jobs + true + true + + + + A PostgreSQL adapter for MR.AspNetCore.Jobs. + aspnetcore;background;jobs;sql;postgresql;netstandard + + + + + + + + + + + + + + + diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/Models/JobsDbContext.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/Models/JobsDbContext.cs new file mode 100644 index 0000000..70fa004 --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/Models/JobsDbContext.cs @@ -0,0 +1,21 @@ +using Microsoft.EntityFrameworkCore; + +namespace MR.AspNetCore.Jobs.Models +{ + public class JobsDbContext : EFCoreJobsDbContext + { + private PostgreSQLOptions _PostgreSQLOptions; + + public JobsDbContext() + { + } + + public JobsDbContext( + DbContextOptions options, + PostgreSQLOptions PostgreSQLOptions) + : base(options, PostgreSQLOptions) + { + _PostgreSQLOptions = PostgreSQLOptions; + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLBootstrapper.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLBootstrapper.cs new file mode 100644 index 0000000..27bd5da --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLBootstrapper.cs @@ -0,0 +1,19 @@ +using System; +using Microsoft.AspNetCore.Hosting; +using MR.AspNetCore.Jobs.Server; + +namespace MR.AspNetCore.Jobs +{ + public class PostgreSQLBootstrapper : BootstrapperBase + { + public PostgreSQLBootstrapper( + JobsOptions options, + IStorage storage, + IProcessingServer server, + IApplicationLifetime appLifetime, + IServiceProvider provider) + : base(options, storage, server, appLifetime, provider) + { + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLJobsOptionsExtension.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLJobsOptionsExtension.cs new file mode 100644 index 0000000..f686e83 --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLJobsOptionsExtension.cs @@ -0,0 +1,43 @@ +using System; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using MR.AspNetCore.Jobs.Models; +using MR.AspNetCore.Jobs.Server; + +namespace MR.AspNetCore.Jobs +{ + public class PostgreSQLJobsOptionsExtension : IJobsOptionsExtension + { + private Action _configure; + + public PostgreSQLJobsOptionsExtension(Action configure) + { + _configure = configure; + } + + public void AddServices(IServiceCollection services) + { + services.AddSingleton(); + services.AddSingleton(); + services.AddScoped(); + + services.AddSingleton(); + + var PostgreSQLOptions = new PostgreSQLOptions(); + _configure(PostgreSQLOptions); + + services.AddSingleton(PostgreSQLOptions); + services.AddSingleton(PostgreSQLOptions); + + services.AddDbContext(options => + { + options.UseNpgsql(PostgreSQLOptions.ConnectionString, sqlOpts => + { + sqlOpts.MigrationsHistoryTable( + PostgreSQLOptions.MigrationsHistoryTableName, + PostgreSQLOptions.MigrationsHistoryTableSchema ?? PostgreSQLOptions.Schema); + }); + }); + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLJobsOptionsExtensions.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLJobsOptionsExtensions.cs new file mode 100644 index 0000000..46d4456 --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLJobsOptionsExtensions.cs @@ -0,0 +1,25 @@ +using System; +using MR.AspNetCore.Jobs; + +namespace Microsoft.Extensions.DependencyInjection +{ + public static class PostgreSQLJobsOptionsExtensions + { + public static JobsOptions UsePostgreSQL(this JobsOptions options, string connectionString) + { + return options.UsePostgreSQL(opts => + { + opts.ConnectionString = connectionString; + }); + } + + public static JobsOptions UsePostgreSQL(this JobsOptions options, Action configure) + { + if (configure == null) throw new ArgumentNullException(nameof(configure)); + + options.RegisterExtension(new PostgreSQLJobsOptionsExtension(configure)); + + return options; + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLOptions.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLOptions.cs new file mode 100644 index 0000000..37b8d2f --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLOptions.cs @@ -0,0 +1,6 @@ +namespace MR.AspNetCore.Jobs +{ + public class PostgreSQLOptions : EFCoreOptions + { + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorage.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorage.cs new file mode 100644 index 0000000..636cfc6 --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorage.cs @@ -0,0 +1,16 @@ +using System; +using Microsoft.Extensions.Logging; +using MR.AspNetCore.Jobs.Models; + +namespace MR.AspNetCore.Jobs +{ + public class PostgreSQLStorage : EFCoreStorage + { + public PostgreSQLStorage( + IServiceProvider provider, + ILogger logger) + : base(provider, logger) + { + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageConnection.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageConnection.cs new file mode 100644 index 0000000..3f519fc --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageConnection.cs @@ -0,0 +1,39 @@ +using MR.AspNetCore.Jobs.Models; +using MR.AspNetCore.Jobs.Server.States; + +namespace MR.AspNetCore.Jobs +{ + public class PostgreSQLStorageConnection : EFCoreStorageConnection + { + public PostgreSQLStorageConnection( + JobsDbContext context, + PostgreSQLOptions options) + : base(context, options) + { + } + + public override IStorageTransaction CreateTransaction() + { + return new PostgreSQLStorageTransaction(this); + } + + protected override string CreateFetchNextJobQuery() + { + var table = nameof(EFCoreJobsDbContext.JobQueue); + return $@" +DELETE +FROM [{Options.Schema}].[{table}] --WITH (readpast, updlock, rowlock) +RETURNING {table}.JobId +LIMIT 1"; + } + + protected override string CreateGetNextJobToBeEnqueuedQuery() + { + return $@" +SELECT * +FROM [{Options.Schema}].[{nameof(EFCoreJobsDbContext.Jobs)}] --WITH (readpast) +WHERE (Due IS NULL OR Due < GETUTCDATE()) AND StateName = '{ScheduledState.StateName}' +LIMIT 1"; + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageTransaction.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageTransaction.cs new file mode 100644 index 0000000..b47e643 --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageTransaction.cs @@ -0,0 +1,12 @@ +using MR.AspNetCore.Jobs.Models; + +namespace MR.AspNetCore.Jobs +{ + public class PostgreSQLStorageTransaction : EFCoreStorageTransaction + { + public PostgreSQLStorageTransaction(PostgreSQLStorageConnection connection) + : base(connection) + { + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/Server/ExpirationManager.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/Server/ExpirationManager.cs new file mode 100644 index 0000000..dd028c7 --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/Server/ExpirationManager.cs @@ -0,0 +1,26 @@ +using System; +using Microsoft.Extensions.Logging; +using MR.AspNetCore.Jobs.Models; + +namespace MR.AspNetCore.Jobs.Server +{ + public class PostgreSQLExpirationManager : EFCoreExpirationManager + { + public PostgreSQLExpirationManager( + ILogger logger, + PostgreSQLOptions options, + IServiceProvider provider) + : base(logger, options, provider) + { + } + + protected override string CreateDeleteTopQuery(string schema, string table) + { + return $@" +DELETE +FROM [{schema}].[{table}] --WITH (readpast) +WHERE ExpiresAt < @now +LIMIT @count"; + } + } +} From 6aea52a8cdb2a3caa7f43bcbe3153e7cbad1ac8c Mon Sep 17 00:00:00 2001 From: Mohammad Rahhal Date: Sat, 25 Nov 2017 18:07:00 +0200 Subject: [PATCH 2/9] Add postgresql initial migration --- .../20171125160624_InitialCreate.Designer.cs | 98 +++++++++++++++ .../20171125160624_InitialCreate.cs | 112 ++++++++++++++++++ .../Migrations/JobsDbContextModelSnapshot.cs | 97 +++++++++++++++ 3 files changed, 307 insertions(+) create mode 100644 src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171125160624_InitialCreate.Designer.cs create mode 100644 src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171125160624_InitialCreate.cs create mode 100644 src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/JobsDbContextModelSnapshot.cs diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171125160624_InitialCreate.Designer.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171125160624_InitialCreate.Designer.cs new file mode 100644 index 0000000..a3fbb3a --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171125160624_InitialCreate.Designer.cs @@ -0,0 +1,98 @@ +// +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Metadata; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.EntityFrameworkCore.Storage.Internal; +using MR.AspNetCore.Jobs.Models; +using System; + +namespace MR.AspNetCore.Jobs.Migrations +{ + [DbContext(typeof(JobsDbContext))] + [Migration("20171125160624_InitialCreate")] + partial class InitialCreate + { + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasDefaultSchema("Jobs") + .HasAnnotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.SerialColumn) + .HasAnnotation("ProductVersion", "2.0.1-rtm-125"); + + modelBuilder.Entity("MR.AspNetCore.Jobs.Models.CronJob", b => + { + b.Property("Id") + .ValueGeneratedOnAdd(); + + b.Property("Cron"); + + b.Property("LastRun"); + + b.Property("Name") + .IsRequired(); + + b.Property("TypeName"); + + b.HasKey("Id"); + + b.HasIndex("Name") + .IsUnique(); + + b.ToTable("CronJobs"); + }); + + modelBuilder.Entity("MR.AspNetCore.Jobs.Models.Job", b => + { + b.Property("Id") + .ValueGeneratedOnAdd(); + + b.Property("Added"); + + b.Property("Data"); + + b.Property("Due"); + + b.Property("ExpiresAt"); + + b.Property("Retries"); + + b.Property("StateName") + .IsRequired(); + + b.HasKey("Id"); + + b.HasIndex("StateName"); + + b.HasIndex("Due", "StateName"); + + b.ToTable("Jobs"); + }); + + modelBuilder.Entity("MR.AspNetCore.Jobs.Models.JobQueue", b => + { + b.Property("Id") + .ValueGeneratedOnAdd(); + + b.Property("JobId"); + + b.HasKey("Id"); + + b.HasIndex("JobId"); + + b.ToTable("JobQueue"); + }); + + modelBuilder.Entity("MR.AspNetCore.Jobs.Models.JobQueue", b => + { + b.HasOne("MR.AspNetCore.Jobs.Models.Job", "Job") + .WithMany() + .HasForeignKey("JobId") + .OnDelete(DeleteBehavior.Cascade); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171125160624_InitialCreate.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171125160624_InitialCreate.cs new file mode 100644 index 0000000..27f625a --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171125160624_InitialCreate.cs @@ -0,0 +1,112 @@ +using Microsoft.EntityFrameworkCore.Metadata; +using Microsoft.EntityFrameworkCore.Migrations; +using System; +using System.Collections.Generic; + +namespace MR.AspNetCore.Jobs.Migrations +{ + public partial class InitialCreate : Migration + { + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.EnsureSchema( + name: "Jobs"); + + migrationBuilder.CreateTable( + name: "CronJobs", + schema: "Jobs", + columns: table => new + { + Id = table.Column(nullable: false), + Cron = table.Column(nullable: true), + LastRun = table.Column(nullable: false), + Name = table.Column(nullable: false), + TypeName = table.Column(nullable: true) + }, + constraints: table => + { + table.PrimaryKey("PK_CronJobs", x => x.Id); + }); + + migrationBuilder.CreateTable( + name: "Jobs", + schema: "Jobs", + columns: table => new + { + Id = table.Column(nullable: false) + .Annotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.SerialColumn), + Added = table.Column(nullable: false), + Data = table.Column(nullable: true), + Due = table.Column(nullable: true), + ExpiresAt = table.Column(nullable: true), + Retries = table.Column(nullable: false), + StateName = table.Column(nullable: false) + }, + constraints: table => + { + table.PrimaryKey("PK_Jobs", x => x.Id); + }); + + migrationBuilder.CreateTable( + name: "JobQueue", + schema: "Jobs", + columns: table => new + { + Id = table.Column(nullable: false) + .Annotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.SerialColumn), + JobId = table.Column(nullable: false) + }, + constraints: table => + { + table.PrimaryKey("PK_JobQueue", x => x.Id); + table.ForeignKey( + name: "FK_JobQueue_Jobs_JobId", + column: x => x.JobId, + principalSchema: "Jobs", + principalTable: "Jobs", + principalColumn: "Id", + onDelete: ReferentialAction.Cascade); + }); + + migrationBuilder.CreateIndex( + name: "IX_CronJobs_Name", + schema: "Jobs", + table: "CronJobs", + column: "Name", + unique: true); + + migrationBuilder.CreateIndex( + name: "IX_JobQueue_JobId", + schema: "Jobs", + table: "JobQueue", + column: "JobId"); + + migrationBuilder.CreateIndex( + name: "IX_Jobs_StateName", + schema: "Jobs", + table: "Jobs", + column: "StateName"); + + migrationBuilder.CreateIndex( + name: "IX_Jobs_Due_StateName", + schema: "Jobs", + table: "Jobs", + columns: new[] { "Due", "StateName" }); + } + + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropTable( + name: "CronJobs", + schema: "Jobs"); + + migrationBuilder.DropTable( + name: "JobQueue", + schema: "Jobs"); + + migrationBuilder.DropTable( + name: "Jobs", + schema: "Jobs"); + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/JobsDbContextModelSnapshot.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/JobsDbContextModelSnapshot.cs new file mode 100644 index 0000000..2fa168c --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/JobsDbContextModelSnapshot.cs @@ -0,0 +1,97 @@ +// +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Metadata; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.EntityFrameworkCore.Storage.Internal; +using MR.AspNetCore.Jobs.Models; +using System; + +namespace MR.AspNetCore.Jobs.Migrations +{ + [DbContext(typeof(JobsDbContext))] + partial class JobsDbContextModelSnapshot : ModelSnapshot + { + protected override void BuildModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasDefaultSchema("Jobs") + .HasAnnotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.SerialColumn) + .HasAnnotation("ProductVersion", "2.0.1-rtm-125"); + + modelBuilder.Entity("MR.AspNetCore.Jobs.Models.CronJob", b => + { + b.Property("Id") + .ValueGeneratedOnAdd(); + + b.Property("Cron"); + + b.Property("LastRun"); + + b.Property("Name") + .IsRequired(); + + b.Property("TypeName"); + + b.HasKey("Id"); + + b.HasIndex("Name") + .IsUnique(); + + b.ToTable("CronJobs"); + }); + + modelBuilder.Entity("MR.AspNetCore.Jobs.Models.Job", b => + { + b.Property("Id") + .ValueGeneratedOnAdd(); + + b.Property("Added"); + + b.Property("Data"); + + b.Property("Due"); + + b.Property("ExpiresAt"); + + b.Property("Retries"); + + b.Property("StateName") + .IsRequired(); + + b.HasKey("Id"); + + b.HasIndex("StateName"); + + b.HasIndex("Due", "StateName"); + + b.ToTable("Jobs"); + }); + + modelBuilder.Entity("MR.AspNetCore.Jobs.Models.JobQueue", b => + { + b.Property("Id") + .ValueGeneratedOnAdd(); + + b.Property("JobId"); + + b.HasKey("Id"); + + b.HasIndex("JobId"); + + b.ToTable("JobQueue"); + }); + + modelBuilder.Entity("MR.AspNetCore.Jobs.Models.JobQueue", b => + { + b.HasOne("MR.AspNetCore.Jobs.Models.Job", "Job") + .WithMany() + .HasForeignKey("JobId") + .OnDelete(DeleteBehavior.Cascade); + }); +#pragma warning restore 612, 618 + } + } +} From 7896306f7efc417f08f97ce056e6451518d42bc1 Mon Sep 17 00:00:00 2001 From: Mohammad Rahhal Date: Sat, 25 Nov 2017 18:07:41 +0200 Subject: [PATCH 3/9] Clean, remove commented table hints --- .../PostgreSQLStorageConnection.cs | 4 ++-- src/MR.AspNetCore.Jobs.PostgreSQL/Server/ExpirationManager.cs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageConnection.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageConnection.cs index 3f519fc..4e5bfc2 100644 --- a/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageConnection.cs +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageConnection.cs @@ -22,7 +22,7 @@ protected override string CreateFetchNextJobQuery() var table = nameof(EFCoreJobsDbContext.JobQueue); return $@" DELETE -FROM [{Options.Schema}].[{table}] --WITH (readpast, updlock, rowlock) +FROM [{Options.Schema}].[{table}] RETURNING {table}.JobId LIMIT 1"; } @@ -31,7 +31,7 @@ protected override string CreateGetNextJobToBeEnqueuedQuery() { return $@" SELECT * -FROM [{Options.Schema}].[{nameof(EFCoreJobsDbContext.Jobs)}] --WITH (readpast) +FROM [{Options.Schema}].[{nameof(EFCoreJobsDbContext.Jobs)}] WHERE (Due IS NULL OR Due < GETUTCDATE()) AND StateName = '{ScheduledState.StateName}' LIMIT 1"; } diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/Server/ExpirationManager.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/Server/ExpirationManager.cs index dd028c7..5f8a8c9 100644 --- a/src/MR.AspNetCore.Jobs.PostgreSQL/Server/ExpirationManager.cs +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/Server/ExpirationManager.cs @@ -18,7 +18,7 @@ protected override string CreateDeleteTopQuery(string schema, string table) { return $@" DELETE -FROM [{schema}].[{table}] --WITH (readpast) +FROM [{schema}].[{table}] WHERE ExpiresAt < @now LIMIT @count"; } From bddaf87e5e1881e1a7850728e461f43613e9cc94 Mon Sep 17 00:00:00 2001 From: Mohammad Rahhal Date: Sat, 25 Nov 2017 18:19:53 +0200 Subject: [PATCH 4/9] Edit pg dev connection string --- src/MR.AspNetCore.Jobs.PostgreSQL/JobsDbContextFactory.cs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/JobsDbContextFactory.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/JobsDbContextFactory.cs index 7a39eb0..d65723f 100644 --- a/src/MR.AspNetCore.Jobs.PostgreSQL/JobsDbContextFactory.cs +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/JobsDbContextFactory.cs @@ -1,4 +1,5 @@ -using Microsoft.EntityFrameworkCore; +using System; +using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Design; using Microsoft.Extensions.DependencyInjection; using MR.AspNetCore.Jobs.Models; @@ -8,7 +9,8 @@ namespace MR.AspNetCore.Jobs public class JobsDbContextFactory : IDesignTimeDbContextFactory { internal static string DevConnectionString = - @"Server=.\sqlexpress;Database=MR.AspNetCore.Jobs.Dev;Trusted_Connection=True;"; + Environment.GetEnvironmentVariable("MR_ASPNETCORE_JOBS_POSTGRESQL_CS_DEV") ?? + @"Server=127.0.0.1;Port=5432;Database=MR.AspNetCore.Jobs.Dev;User Id=postgres;Password=password;"; public JobsDbContext CreateDbContext(string[] args) { From 222166942fddf80ee547c5caadfe01271c90e9fa Mon Sep 17 00:00:00 2001 From: Mohammad Rahhal Date: Sun, 26 Nov 2017 10:28:05 +0200 Subject: [PATCH 5/9] Brackets to quotes --- .../PostgreSQLStorageConnection.cs | 8 ++++---- .../Server/ExpirationManager.cs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageConnection.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageConnection.cs index 4e5bfc2..38db69c 100644 --- a/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageConnection.cs +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageConnection.cs @@ -22,8 +22,8 @@ protected override string CreateFetchNextJobQuery() var table = nameof(EFCoreJobsDbContext.JobQueue); return $@" DELETE -FROM [{Options.Schema}].[{table}] -RETURNING {table}.JobId +FROM ""{Options.Schema}"".""{table}"" +RETURNING ""{table}"".JobId LIMIT 1"; } @@ -31,8 +31,8 @@ protected override string CreateGetNextJobToBeEnqueuedQuery() { return $@" SELECT * -FROM [{Options.Schema}].[{nameof(EFCoreJobsDbContext.Jobs)}] -WHERE (Due IS NULL OR Due < GETUTCDATE()) AND StateName = '{ScheduledState.StateName}' +FROM ""{Options.Schema}"".""{nameof(EFCoreJobsDbContext.Jobs)}"" +WHERE (Due IS NULL OR Due < NOW() AT TIME ZONE 'UTC') AND StateName = '{ScheduledState.StateName}' LIMIT 1"; } } diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/Server/ExpirationManager.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/Server/ExpirationManager.cs index 5f8a8c9..5ad2859 100644 --- a/src/MR.AspNetCore.Jobs.PostgreSQL/Server/ExpirationManager.cs +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/Server/ExpirationManager.cs @@ -18,7 +18,7 @@ protected override string CreateDeleteTopQuery(string schema, string table) { return $@" DELETE -FROM [{schema}].[{table}] +FROM ""{schema}"".""{table}"" WHERE ExpiresAt < @now LIMIT @count"; } From c56123ee3217ac4159e6f7f496fe486d3fb11555 Mon Sep 17 00:00:00 2001 From: Mohammad Rahhal Date: Sat, 9 Dec 2017 11:49:36 +0200 Subject: [PATCH 6/9] Virtualize creating queries in timeout job --- .../Server/SqlTimeoutFetchedJob.cs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/MR.AspNetCore.Jobs.EFCore/Server/SqlTimeoutFetchedJob.cs b/src/MR.AspNetCore.Jobs.EFCore/Server/SqlTimeoutFetchedJob.cs index 724ce5b..880b675 100644 --- a/src/MR.AspNetCore.Jobs.EFCore/Server/SqlTimeoutFetchedJob.cs +++ b/src/MR.AspNetCore.Jobs.EFCore/Server/SqlTimeoutFetchedJob.cs @@ -44,7 +44,7 @@ public Task RemoveFromQueueAsync() var connection = storageConnection.GetDbConnection(); connection.Execute( - $"DELETE FROM {storageConnection.BaseOptions.Schema}.JobQueue WHERE Id = @id", + CreateRemoveFromQueueQuery(storageConnection), new { id = Id }); _removedFromQueue = true; @@ -54,6 +54,11 @@ public Task RemoveFromQueueAsync() return Task.CompletedTask; } + protected virtual string CreateRemoveFromQueueQuery(EFCoreStorageConnection storageConnection) + { + return $"DELETE FROM {storageConnection.BaseOptions.Schema}.JobQueue WHERE Id = @id"; + } + public Task RequeueAsync() { lock (_lock) @@ -64,7 +69,7 @@ public Task RequeueAsync() var connection = storageConnection.GetDbConnection(); connection.Execute( - $"UPDATE {storageConnection.BaseOptions.Schema}.JobQueue SET FetchedAt = null WHERE Id = @id", + CreateRequeueQuery(storageConnection), new { id = Id }); _requeued = true; @@ -74,6 +79,11 @@ public Task RequeueAsync() return Task.CompletedTask; } + protected virtual string CreateRequeueQuery(EFCoreStorageConnection storageConnection) + { + return $"UPDATE {storageConnection.BaseOptions.Schema}.JobQueue SET FetchedAt = NULL WHERE Id = @id"; + } + public void Dispose() { if (_disposed) return; From 5f9fe1684d532873ecb3b676007dcb67a14647d2 Mon Sep 17 00:00:00 2001 From: Mohammad Rahhal Date: Sat, 9 Dec 2017 11:53:00 +0200 Subject: [PATCH 7/9] React to timeout fetched job usage --- .../EFCoreStorageConnection.cs | 5 + ...9095228_add_JobQueue.FetchedAt.Designer.cs | 108 ++++++++++++++++++ .../20171209095228_add_JobQueue.FetchedAt.cs | 70 ++++++++++++ .../Migrations/JobsDbContextModelSnapshot.cs | 10 ++ .../Models/JobsDbContext.cs | 27 +++++ .../PostgreSQLStorageConnection.cs | 31 ++++- .../Server/PostgresSQLTimeoutFetchedJob.cs | 25 ++++ 7 files changed, 270 insertions(+), 6 deletions(-) create mode 100644 src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171209095228_add_JobQueue.FetchedAt.Designer.cs create mode 100644 src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171209095228_add_JobQueue.FetchedAt.cs create mode 100644 src/MR.AspNetCore.Jobs.PostgreSQL/Server/PostgresSQLTimeoutFetchedJob.cs diff --git a/src/MR.AspNetCore.Jobs.EFCore/EFCoreStorageConnection.cs b/src/MR.AspNetCore.Jobs.EFCore/EFCoreStorageConnection.cs index 60836e7..068925c 100644 --- a/src/MR.AspNetCore.Jobs.EFCore/EFCoreStorageConnection.cs +++ b/src/MR.AspNetCore.Jobs.EFCore/EFCoreStorageConnection.cs @@ -194,6 +194,11 @@ private async Task FetchNextDelayedJobUsingTimeoutAsync(string sql) return null; } + return CreateSqlTimeoutFetchedJob(fetchedJob); + } + + protected virtual IFetchedJob CreateSqlTimeoutFetchedJob(FetchedJob fetchedJob) + { return new SqlTimeoutFetchedJob( Services, fetchedJob.Id, diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171209095228_add_JobQueue.FetchedAt.Designer.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171209095228_add_JobQueue.FetchedAt.Designer.cs new file mode 100644 index 0000000..3973559 --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171209095228_add_JobQueue.FetchedAt.Designer.cs @@ -0,0 +1,108 @@ +// +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Metadata; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.EntityFrameworkCore.Storage.Internal; +using MR.AspNetCore.Jobs.Models; +using System; + +namespace MR.AspNetCore.Jobs.Migrations +{ + [DbContext(typeof(JobsDbContext))] + [Migration("20171209095228_add_JobQueue.FetchedAt")] + partial class add_JobQueueFetchedAt + { + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasDefaultSchema("Jobs") + .HasAnnotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.SerialColumn) + .HasAnnotation("ProductVersion", "2.0.1-rtm-125"); + + modelBuilder.Entity("MR.AspNetCore.Jobs.Models.CronJob", b => + { + b.Property("Id") + .ValueGeneratedOnAdd(); + + b.Property("Cron"); + + b.Property("LastRun"); + + b.Property("Name") + .IsRequired(); + + b.Property("TypeName"); + + b.HasKey("Id"); + + b.HasIndex("Name") + .IsUnique(); + + b.ToTable("CronJobs"); + }); + + modelBuilder.Entity("MR.AspNetCore.Jobs.Models.Job", b => + { + b.Property("Id") + .ValueGeneratedOnAdd(); + + b.Property("Added"); + + b.Property("Data"); + + b.Property("Due"); + + b.Property("ExpiresAt"); + + b.Property("Retries"); + + b.Property("StateName") + .IsRequired(); + + b.Property("Updated"); + + b.HasKey("Id"); + + b.HasIndex("Added"); + + b.HasIndex("StateName"); + + b.HasIndex("Updated"); + + b.HasIndex("Due", "StateName"); + + b.ToTable("Jobs"); + }); + + modelBuilder.Entity("MR.AspNetCore.Jobs.Models.JobQueue", b => + { + b.Property("Id") + .ValueGeneratedOnAdd(); + + b.Property("FetchedAt"); + + b.Property("JobId"); + + b.HasKey("Id"); + + b.HasIndex("FetchedAt"); + + b.HasIndex("JobId"); + + b.ToTable("JobQueue"); + }); + + modelBuilder.Entity("MR.AspNetCore.Jobs.Models.JobQueue", b => + { + b.HasOne("MR.AspNetCore.Jobs.Models.Job", "Job") + .WithMany() + .HasForeignKey("JobId") + .OnDelete(DeleteBehavior.Cascade); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171209095228_add_JobQueue.FetchedAt.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171209095228_add_JobQueue.FetchedAt.cs new file mode 100644 index 0000000..d455fe2 --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171209095228_add_JobQueue.FetchedAt.cs @@ -0,0 +1,70 @@ +using Microsoft.EntityFrameworkCore.Migrations; +using System; +using System.Collections.Generic; + +namespace MR.AspNetCore.Jobs.Migrations +{ + public partial class add_JobQueueFetchedAt : Migration + { + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.AddColumn( + name: "Updated", + schema: "Jobs", + table: "Jobs", + nullable: true); + + migrationBuilder.AddColumn( + name: "FetchedAt", + schema: "Jobs", + table: "JobQueue", + nullable: true); + + migrationBuilder.CreateIndex( + name: "IX_Jobs_Added", + schema: "Jobs", + table: "Jobs", + column: "Added"); + + migrationBuilder.CreateIndex( + name: "IX_Jobs_Updated", + schema: "Jobs", + table: "Jobs", + column: "Updated"); + + migrationBuilder.CreateIndex( + name: "IX_JobQueue_FetchedAt", + schema: "Jobs", + table: "JobQueue", + column: "FetchedAt"); + } + + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropIndex( + name: "IX_Jobs_Added", + schema: "Jobs", + table: "Jobs"); + + migrationBuilder.DropIndex( + name: "IX_Jobs_Updated", + schema: "Jobs", + table: "Jobs"); + + migrationBuilder.DropIndex( + name: "IX_JobQueue_FetchedAt", + schema: "Jobs", + table: "JobQueue"); + + migrationBuilder.DropColumn( + name: "Updated", + schema: "Jobs", + table: "Jobs"); + + migrationBuilder.DropColumn( + name: "FetchedAt", + schema: "Jobs", + table: "JobQueue"); + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/JobsDbContextModelSnapshot.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/JobsDbContextModelSnapshot.cs index 2fa168c..a4e32cb 100644 --- a/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/JobsDbContextModelSnapshot.cs +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/JobsDbContextModelSnapshot.cs @@ -61,10 +61,16 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.Property("StateName") .IsRequired(); + b.Property("Updated"); + b.HasKey("Id"); + b.HasIndex("Added"); + b.HasIndex("StateName"); + b.HasIndex("Updated"); + b.HasIndex("Due", "StateName"); b.ToTable("Jobs"); @@ -75,10 +81,14 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.Property("Id") .ValueGeneratedOnAdd(); + b.Property("FetchedAt"); + b.Property("JobId"); b.HasKey("Id"); + b.HasIndex("FetchedAt"); + b.HasIndex("JobId"); b.ToTable("JobQueue"); diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/Models/JobsDbContext.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/Models/JobsDbContext.cs index 70fa004..69d9378 100644 --- a/src/MR.AspNetCore.Jobs.PostgreSQL/Models/JobsDbContext.cs +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/Models/JobsDbContext.cs @@ -17,5 +17,32 @@ public JobsDbContext( { _PostgreSQLOptions = PostgreSQLOptions; } + + protected override void OnModelCreating(ModelBuilder builder) + { + builder.HasDefaultSchema(RelationalOptions.Schema); + + builder.Entity(b => + { + b.Property(x => x.StateName).IsRequired(); + + b.HasIndex(x => new { x.Due, x.StateName }); + b.HasIndex(x => x.StateName); + b.HasIndex(x => x.Added); + b.HasIndex(x => x.Updated); + }); + + builder.Entity(b => + { + b.Property(x => x.Name).IsRequired(); + + b.HasIndex(x => x.Name).IsUnique(); + }); + + builder.Entity(b => + { + b.HasIndex(x => x.FetchedAt); + }); + } } } diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageConnection.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageConnection.cs index 38db69c..f2dc9ea 100644 --- a/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageConnection.cs +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageConnection.cs @@ -1,4 +1,6 @@ +using System; using MR.AspNetCore.Jobs.Models; +using MR.AspNetCore.Jobs.Server; using MR.AspNetCore.Jobs.Server.States; namespace MR.AspNetCore.Jobs @@ -7,11 +9,14 @@ public class PostgreSQLStorageConnection : EFCoreStorageConnection false; + public override IStorageTransaction CreateTransaction() { return new PostgreSQLStorageTransaction(this); @@ -20,11 +25,17 @@ public override IStorageTransaction CreateTransaction() protected override string CreateFetchNextJobQuery() { var table = nameof(EFCoreJobsDbContext.JobQueue); + var timeoutSeconds = TimeSpan.FromMinutes(1).Negate().TotalSeconds; + return $@" -DELETE -FROM ""{Options.Schema}"".""{table}"" -RETURNING ""{table}"".JobId -LIMIT 1"; +UPDATE ""{Options.Schema}"".""{table}"" +SET ""FetchedAt"" = NOW() AT TIME ZONE 'UTC' +WHERE CTID IN ( + SELECT CTID FROM ""{Options.Schema}"".""{table}"" + WHERE ""FetchedAt"" IS NULL OR ""FetchedAt"" < NOW() AT TIME ZONE 'UTC' + INTERVAL '{timeoutSeconds} SECONDS' + LIMIT 1 +) +RETURNING ""{table}"".""JobId"""; } protected override string CreateGetNextJobToBeEnqueuedQuery() @@ -35,5 +46,13 @@ protected override string CreateGetNextJobToBeEnqueuedQuery() WHERE (Due IS NULL OR Due < NOW() AT TIME ZONE 'UTC') AND StateName = '{ScheduledState.StateName}' LIMIT 1"; } + + protected override IFetchedJob CreateSqlTimeoutFetchedJob(FetchedJob fetchedJob) + { + return new PostgresSQLTimeoutFetchedJob( + Services, + fetchedJob.Id, + fetchedJob.JobId); + } } } diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/Server/PostgresSQLTimeoutFetchedJob.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/Server/PostgresSQLTimeoutFetchedJob.cs new file mode 100644 index 0000000..8eae751 --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/Server/PostgresSQLTimeoutFetchedJob.cs @@ -0,0 +1,25 @@ +using System; + +namespace MR.AspNetCore.Jobs.Server +{ + public class PostgresSQLTimeoutFetchedJob : SqlTimeoutFetchedJob + { + public PostgresSQLTimeoutFetchedJob( + IServiceProvider services, + int id, + int jobId) + : base(services, id, jobId) + { + } + + protected override string CreateRemoveFromQueueQuery(EFCoreStorageConnection storageConnection) + { + return $@"DELETE FROM ""{storageConnection.BaseOptions.Schema}"".""JobQueue"" WHERE ""Id"" = @id"; + } + + protected override string CreateRequeueQuery(EFCoreStorageConnection storageConnection) + { + return $@"UPDATE ""{storageConnection.BaseOptions.Schema}"".""JobQueue"" SET ""FetchedAt"" = NULL WHERE ""Id"" = @id"; + } + } +} From dd8635c7a40cdf7fbc0d081e499861f7d0e28f83 Mon Sep 17 00:00:00 2001 From: Mohammad Rahhal Date: Sat, 9 Dec 2017 12:21:42 +0200 Subject: [PATCH 8/9] Quote everything --- .../PostgreSQLStorageConnection.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageConnection.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageConnection.cs index f2dc9ea..b29e9c8 100644 --- a/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageConnection.cs +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageConnection.cs @@ -43,7 +43,7 @@ protected override string CreateGetNextJobToBeEnqueuedQuery() return $@" SELECT * FROM ""{Options.Schema}"".""{nameof(EFCoreJobsDbContext.Jobs)}"" -WHERE (Due IS NULL OR Due < NOW() AT TIME ZONE 'UTC') AND StateName = '{ScheduledState.StateName}' +WHERE (""Due"" IS NULL OR ""Due"" < NOW() AT TIME ZONE 'UTC') AND ""StateName"" = '{ScheduledState.StateName}' LIMIT 1"; } From 768e7d940c65b20e9e2d4fab53e49e0a78b1de3b Mon Sep 17 00:00:00 2001 From: Mohammad Rahhal Date: Tue, 16 Jan 2018 15:11:47 +0200 Subject: [PATCH 9/9] fix: delete expired entities query in postgres --- .../Server/ExpirationManager.cs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/Server/ExpirationManager.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/Server/ExpirationManager.cs index 5ad2859..7b089e6 100644 --- a/src/MR.AspNetCore.Jobs.PostgreSQL/Server/ExpirationManager.cs +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/Server/ExpirationManager.cs @@ -19,8 +19,13 @@ protected override string CreateDeleteTopQuery(string schema, string table) return $@" DELETE FROM ""{schema}"".""{table}"" -WHERE ExpiresAt < @now -LIMIT @count"; +WHERE ""ExpiresAt"" < @now +AND ctid IN ( + SELECT ctid + FROM ""{schema}"".""{table}"" + ORDER BY ""ExpiresAt"" + LIMIT @count +)"; } } }