diff --git a/host/MR.AspNetCore.Jobs.EFHost/MR.AspNetCore.Jobs.EFHost.csproj b/host/MR.AspNetCore.Jobs.EFHost/MR.AspNetCore.Jobs.EFHost.csproj index baa493a..34c09cc 100644 --- a/host/MR.AspNetCore.Jobs.EFHost/MR.AspNetCore.Jobs.EFHost.csproj +++ b/host/MR.AspNetCore.Jobs.EFHost/MR.AspNetCore.Jobs.EFHost.csproj @@ -7,6 +7,7 @@ + diff --git a/src/MR.AspNetCore.Jobs.EFCore/EFCoreStorageConnection.cs b/src/MR.AspNetCore.Jobs.EFCore/EFCoreStorageConnection.cs index 60836e7..068925c 100644 --- a/src/MR.AspNetCore.Jobs.EFCore/EFCoreStorageConnection.cs +++ b/src/MR.AspNetCore.Jobs.EFCore/EFCoreStorageConnection.cs @@ -194,6 +194,11 @@ private async Task FetchNextDelayedJobUsingTimeoutAsync(string sql) return null; } + return CreateSqlTimeoutFetchedJob(fetchedJob); + } + + protected virtual IFetchedJob CreateSqlTimeoutFetchedJob(FetchedJob fetchedJob) + { return new SqlTimeoutFetchedJob( Services, fetchedJob.Id, diff --git a/src/MR.AspNetCore.Jobs.EFCore/Server/SqlTimeoutFetchedJob.cs b/src/MR.AspNetCore.Jobs.EFCore/Server/SqlTimeoutFetchedJob.cs index 724ce5b..880b675 100644 --- a/src/MR.AspNetCore.Jobs.EFCore/Server/SqlTimeoutFetchedJob.cs +++ b/src/MR.AspNetCore.Jobs.EFCore/Server/SqlTimeoutFetchedJob.cs @@ -44,7 +44,7 @@ public Task RemoveFromQueueAsync() var connection = storageConnection.GetDbConnection(); connection.Execute( - $"DELETE FROM {storageConnection.BaseOptions.Schema}.JobQueue WHERE Id = @id", + CreateRemoveFromQueueQuery(storageConnection), new { id = Id }); _removedFromQueue = true; @@ -54,6 +54,11 @@ public Task RemoveFromQueueAsync() return Task.CompletedTask; } + protected virtual string CreateRemoveFromQueueQuery(EFCoreStorageConnection storageConnection) + { + return $"DELETE FROM {storageConnection.BaseOptions.Schema}.JobQueue WHERE Id = @id"; + } + public Task RequeueAsync() { lock (_lock) @@ -64,7 +69,7 @@ public Task RequeueAsync() var connection = storageConnection.GetDbConnection(); connection.Execute( - $"UPDATE {storageConnection.BaseOptions.Schema}.JobQueue SET FetchedAt = null WHERE Id = @id", + CreateRequeueQuery(storageConnection), new { id = Id }); _requeued = true; @@ -74,6 +79,11 @@ public Task RequeueAsync() return Task.CompletedTask; } + protected virtual string CreateRequeueQuery(EFCoreStorageConnection storageConnection) + { + return $"UPDATE {storageConnection.BaseOptions.Schema}.JobQueue SET FetchedAt = NULL WHERE Id = @id"; + } + public void Dispose() { if (_disposed) return; diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/JobsDbContextFactory.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/JobsDbContextFactory.cs new file mode 100644 index 0000000..d65723f --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/JobsDbContextFactory.cs @@ -0,0 +1,33 @@ +using System; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Design; +using Microsoft.Extensions.DependencyInjection; +using MR.AspNetCore.Jobs.Models; + +namespace MR.AspNetCore.Jobs +{ + public class JobsDbContextFactory : IDesignTimeDbContextFactory + { + internal static string DevConnectionString = + Environment.GetEnvironmentVariable("MR_ASPNETCORE_JOBS_POSTGRESQL_CS_DEV") ?? + @"Server=127.0.0.1;Port=5432;Database=MR.AspNetCore.Jobs.Dev;User Id=postgres;Password=password;"; + + public JobsDbContext CreateDbContext(string[] args) + { + var services = new ServiceCollection(); + + services.AddSingleton(new PostgreSQLOptions()); + services.AddDbContext(opts => + { + opts.UseNpgsql(DevConnectionString, sqlOpts => + { + sqlOpts.MigrationsHistoryTable( + EFCoreOptions.DefaultMigrationsHistoryTableName, + EFCoreOptions.DefaultSchema); + }); + }); + + return services.BuildServiceProvider().GetRequiredService(); + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/LoggerExtensions.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/LoggerExtensions.cs new file mode 100644 index 0000000..ca3f500 --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/LoggerExtensions.cs @@ -0,0 +1,6 @@ +namespace MR.AspNetCore.Jobs +{ + internal static class LoggerExtensions + { + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/MR.AspNetCore.Jobs.PostgreSQL.csproj b/src/MR.AspNetCore.Jobs.PostgreSQL/MR.AspNetCore.Jobs.PostgreSQL.csproj new file mode 100644 index 0000000..353935a --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/MR.AspNetCore.Jobs.PostgreSQL.csproj @@ -0,0 +1,29 @@ + + + + + + netstandard2.0 + MR.AspNetCore.Jobs + true + true + + + + A PostgreSQL adapter for MR.AspNetCore.Jobs. + aspnetcore;background;jobs;sql;postgresql;netstandard + + + + + + + + + + + + + + + diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171125160624_InitialCreate.Designer.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171125160624_InitialCreate.Designer.cs new file mode 100644 index 0000000..a3fbb3a --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171125160624_InitialCreate.Designer.cs @@ -0,0 +1,98 @@ +// +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Metadata; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.EntityFrameworkCore.Storage.Internal; +using MR.AspNetCore.Jobs.Models; +using System; + +namespace MR.AspNetCore.Jobs.Migrations +{ + [DbContext(typeof(JobsDbContext))] + [Migration("20171125160624_InitialCreate")] + partial class InitialCreate + { + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasDefaultSchema("Jobs") + .HasAnnotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.SerialColumn) + .HasAnnotation("ProductVersion", "2.0.1-rtm-125"); + + modelBuilder.Entity("MR.AspNetCore.Jobs.Models.CronJob", b => + { + b.Property("Id") + .ValueGeneratedOnAdd(); + + b.Property("Cron"); + + b.Property("LastRun"); + + b.Property("Name") + .IsRequired(); + + b.Property("TypeName"); + + b.HasKey("Id"); + + b.HasIndex("Name") + .IsUnique(); + + b.ToTable("CronJobs"); + }); + + modelBuilder.Entity("MR.AspNetCore.Jobs.Models.Job", b => + { + b.Property("Id") + .ValueGeneratedOnAdd(); + + b.Property("Added"); + + b.Property("Data"); + + b.Property("Due"); + + b.Property("ExpiresAt"); + + b.Property("Retries"); + + b.Property("StateName") + .IsRequired(); + + b.HasKey("Id"); + + b.HasIndex("StateName"); + + b.HasIndex("Due", "StateName"); + + b.ToTable("Jobs"); + }); + + modelBuilder.Entity("MR.AspNetCore.Jobs.Models.JobQueue", b => + { + b.Property("Id") + .ValueGeneratedOnAdd(); + + b.Property("JobId"); + + b.HasKey("Id"); + + b.HasIndex("JobId"); + + b.ToTable("JobQueue"); + }); + + modelBuilder.Entity("MR.AspNetCore.Jobs.Models.JobQueue", b => + { + b.HasOne("MR.AspNetCore.Jobs.Models.Job", "Job") + .WithMany() + .HasForeignKey("JobId") + .OnDelete(DeleteBehavior.Cascade); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171125160624_InitialCreate.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171125160624_InitialCreate.cs new file mode 100644 index 0000000..27f625a --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171125160624_InitialCreate.cs @@ -0,0 +1,112 @@ +using Microsoft.EntityFrameworkCore.Metadata; +using Microsoft.EntityFrameworkCore.Migrations; +using System; +using System.Collections.Generic; + +namespace MR.AspNetCore.Jobs.Migrations +{ + public partial class InitialCreate : Migration + { + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.EnsureSchema( + name: "Jobs"); + + migrationBuilder.CreateTable( + name: "CronJobs", + schema: "Jobs", + columns: table => new + { + Id = table.Column(nullable: false), + Cron = table.Column(nullable: true), + LastRun = table.Column(nullable: false), + Name = table.Column(nullable: false), + TypeName = table.Column(nullable: true) + }, + constraints: table => + { + table.PrimaryKey("PK_CronJobs", x => x.Id); + }); + + migrationBuilder.CreateTable( + name: "Jobs", + schema: "Jobs", + columns: table => new + { + Id = table.Column(nullable: false) + .Annotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.SerialColumn), + Added = table.Column(nullable: false), + Data = table.Column(nullable: true), + Due = table.Column(nullable: true), + ExpiresAt = table.Column(nullable: true), + Retries = table.Column(nullable: false), + StateName = table.Column(nullable: false) + }, + constraints: table => + { + table.PrimaryKey("PK_Jobs", x => x.Id); + }); + + migrationBuilder.CreateTable( + name: "JobQueue", + schema: "Jobs", + columns: table => new + { + Id = table.Column(nullable: false) + .Annotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.SerialColumn), + JobId = table.Column(nullable: false) + }, + constraints: table => + { + table.PrimaryKey("PK_JobQueue", x => x.Id); + table.ForeignKey( + name: "FK_JobQueue_Jobs_JobId", + column: x => x.JobId, + principalSchema: "Jobs", + principalTable: "Jobs", + principalColumn: "Id", + onDelete: ReferentialAction.Cascade); + }); + + migrationBuilder.CreateIndex( + name: "IX_CronJobs_Name", + schema: "Jobs", + table: "CronJobs", + column: "Name", + unique: true); + + migrationBuilder.CreateIndex( + name: "IX_JobQueue_JobId", + schema: "Jobs", + table: "JobQueue", + column: "JobId"); + + migrationBuilder.CreateIndex( + name: "IX_Jobs_StateName", + schema: "Jobs", + table: "Jobs", + column: "StateName"); + + migrationBuilder.CreateIndex( + name: "IX_Jobs_Due_StateName", + schema: "Jobs", + table: "Jobs", + columns: new[] { "Due", "StateName" }); + } + + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropTable( + name: "CronJobs", + schema: "Jobs"); + + migrationBuilder.DropTable( + name: "JobQueue", + schema: "Jobs"); + + migrationBuilder.DropTable( + name: "Jobs", + schema: "Jobs"); + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171209095228_add_JobQueue.FetchedAt.Designer.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171209095228_add_JobQueue.FetchedAt.Designer.cs new file mode 100644 index 0000000..3973559 --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171209095228_add_JobQueue.FetchedAt.Designer.cs @@ -0,0 +1,108 @@ +// +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Metadata; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.EntityFrameworkCore.Storage.Internal; +using MR.AspNetCore.Jobs.Models; +using System; + +namespace MR.AspNetCore.Jobs.Migrations +{ + [DbContext(typeof(JobsDbContext))] + [Migration("20171209095228_add_JobQueue.FetchedAt")] + partial class add_JobQueueFetchedAt + { + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasDefaultSchema("Jobs") + .HasAnnotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.SerialColumn) + .HasAnnotation("ProductVersion", "2.0.1-rtm-125"); + + modelBuilder.Entity("MR.AspNetCore.Jobs.Models.CronJob", b => + { + b.Property("Id") + .ValueGeneratedOnAdd(); + + b.Property("Cron"); + + b.Property("LastRun"); + + b.Property("Name") + .IsRequired(); + + b.Property("TypeName"); + + b.HasKey("Id"); + + b.HasIndex("Name") + .IsUnique(); + + b.ToTable("CronJobs"); + }); + + modelBuilder.Entity("MR.AspNetCore.Jobs.Models.Job", b => + { + b.Property("Id") + .ValueGeneratedOnAdd(); + + b.Property("Added"); + + b.Property("Data"); + + b.Property("Due"); + + b.Property("ExpiresAt"); + + b.Property("Retries"); + + b.Property("StateName") + .IsRequired(); + + b.Property("Updated"); + + b.HasKey("Id"); + + b.HasIndex("Added"); + + b.HasIndex("StateName"); + + b.HasIndex("Updated"); + + b.HasIndex("Due", "StateName"); + + b.ToTable("Jobs"); + }); + + modelBuilder.Entity("MR.AspNetCore.Jobs.Models.JobQueue", b => + { + b.Property("Id") + .ValueGeneratedOnAdd(); + + b.Property("FetchedAt"); + + b.Property("JobId"); + + b.HasKey("Id"); + + b.HasIndex("FetchedAt"); + + b.HasIndex("JobId"); + + b.ToTable("JobQueue"); + }); + + modelBuilder.Entity("MR.AspNetCore.Jobs.Models.JobQueue", b => + { + b.HasOne("MR.AspNetCore.Jobs.Models.Job", "Job") + .WithMany() + .HasForeignKey("JobId") + .OnDelete(DeleteBehavior.Cascade); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171209095228_add_JobQueue.FetchedAt.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171209095228_add_JobQueue.FetchedAt.cs new file mode 100644 index 0000000..d455fe2 --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/20171209095228_add_JobQueue.FetchedAt.cs @@ -0,0 +1,70 @@ +using Microsoft.EntityFrameworkCore.Migrations; +using System; +using System.Collections.Generic; + +namespace MR.AspNetCore.Jobs.Migrations +{ + public partial class add_JobQueueFetchedAt : Migration + { + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.AddColumn( + name: "Updated", + schema: "Jobs", + table: "Jobs", + nullable: true); + + migrationBuilder.AddColumn( + name: "FetchedAt", + schema: "Jobs", + table: "JobQueue", + nullable: true); + + migrationBuilder.CreateIndex( + name: "IX_Jobs_Added", + schema: "Jobs", + table: "Jobs", + column: "Added"); + + migrationBuilder.CreateIndex( + name: "IX_Jobs_Updated", + schema: "Jobs", + table: "Jobs", + column: "Updated"); + + migrationBuilder.CreateIndex( + name: "IX_JobQueue_FetchedAt", + schema: "Jobs", + table: "JobQueue", + column: "FetchedAt"); + } + + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropIndex( + name: "IX_Jobs_Added", + schema: "Jobs", + table: "Jobs"); + + migrationBuilder.DropIndex( + name: "IX_Jobs_Updated", + schema: "Jobs", + table: "Jobs"); + + migrationBuilder.DropIndex( + name: "IX_JobQueue_FetchedAt", + schema: "Jobs", + table: "JobQueue"); + + migrationBuilder.DropColumn( + name: "Updated", + schema: "Jobs", + table: "Jobs"); + + migrationBuilder.DropColumn( + name: "FetchedAt", + schema: "Jobs", + table: "JobQueue"); + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/JobsDbContextModelSnapshot.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/JobsDbContextModelSnapshot.cs new file mode 100644 index 0000000..a4e32cb --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/Migrations/JobsDbContextModelSnapshot.cs @@ -0,0 +1,107 @@ +// +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Metadata; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.EntityFrameworkCore.Storage.Internal; +using MR.AspNetCore.Jobs.Models; +using System; + +namespace MR.AspNetCore.Jobs.Migrations +{ + [DbContext(typeof(JobsDbContext))] + partial class JobsDbContextModelSnapshot : ModelSnapshot + { + protected override void BuildModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasDefaultSchema("Jobs") + .HasAnnotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.SerialColumn) + .HasAnnotation("ProductVersion", "2.0.1-rtm-125"); + + modelBuilder.Entity("MR.AspNetCore.Jobs.Models.CronJob", b => + { + b.Property("Id") + .ValueGeneratedOnAdd(); + + b.Property("Cron"); + + b.Property("LastRun"); + + b.Property("Name") + .IsRequired(); + + b.Property("TypeName"); + + b.HasKey("Id"); + + b.HasIndex("Name") + .IsUnique(); + + b.ToTable("CronJobs"); + }); + + modelBuilder.Entity("MR.AspNetCore.Jobs.Models.Job", b => + { + b.Property("Id") + .ValueGeneratedOnAdd(); + + b.Property("Added"); + + b.Property("Data"); + + b.Property("Due"); + + b.Property("ExpiresAt"); + + b.Property("Retries"); + + b.Property("StateName") + .IsRequired(); + + b.Property("Updated"); + + b.HasKey("Id"); + + b.HasIndex("Added"); + + b.HasIndex("StateName"); + + b.HasIndex("Updated"); + + b.HasIndex("Due", "StateName"); + + b.ToTable("Jobs"); + }); + + modelBuilder.Entity("MR.AspNetCore.Jobs.Models.JobQueue", b => + { + b.Property("Id") + .ValueGeneratedOnAdd(); + + b.Property("FetchedAt"); + + b.Property("JobId"); + + b.HasKey("Id"); + + b.HasIndex("FetchedAt"); + + b.HasIndex("JobId"); + + b.ToTable("JobQueue"); + }); + + modelBuilder.Entity("MR.AspNetCore.Jobs.Models.JobQueue", b => + { + b.HasOne("MR.AspNetCore.Jobs.Models.Job", "Job") + .WithMany() + .HasForeignKey("JobId") + .OnDelete(DeleteBehavior.Cascade); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/Models/JobsDbContext.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/Models/JobsDbContext.cs new file mode 100644 index 0000000..69d9378 --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/Models/JobsDbContext.cs @@ -0,0 +1,48 @@ +using Microsoft.EntityFrameworkCore; + +namespace MR.AspNetCore.Jobs.Models +{ + public class JobsDbContext : EFCoreJobsDbContext + { + private PostgreSQLOptions _PostgreSQLOptions; + + public JobsDbContext() + { + } + + public JobsDbContext( + DbContextOptions options, + PostgreSQLOptions PostgreSQLOptions) + : base(options, PostgreSQLOptions) + { + _PostgreSQLOptions = PostgreSQLOptions; + } + + protected override void OnModelCreating(ModelBuilder builder) + { + builder.HasDefaultSchema(RelationalOptions.Schema); + + builder.Entity(b => + { + b.Property(x => x.StateName).IsRequired(); + + b.HasIndex(x => new { x.Due, x.StateName }); + b.HasIndex(x => x.StateName); + b.HasIndex(x => x.Added); + b.HasIndex(x => x.Updated); + }); + + builder.Entity(b => + { + b.Property(x => x.Name).IsRequired(); + + b.HasIndex(x => x.Name).IsUnique(); + }); + + builder.Entity(b => + { + b.HasIndex(x => x.FetchedAt); + }); + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLBootstrapper.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLBootstrapper.cs new file mode 100644 index 0000000..27bd5da --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLBootstrapper.cs @@ -0,0 +1,19 @@ +using System; +using Microsoft.AspNetCore.Hosting; +using MR.AspNetCore.Jobs.Server; + +namespace MR.AspNetCore.Jobs +{ + public class PostgreSQLBootstrapper : BootstrapperBase + { + public PostgreSQLBootstrapper( + JobsOptions options, + IStorage storage, + IProcessingServer server, + IApplicationLifetime appLifetime, + IServiceProvider provider) + : base(options, storage, server, appLifetime, provider) + { + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLJobsOptionsExtension.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLJobsOptionsExtension.cs new file mode 100644 index 0000000..f686e83 --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLJobsOptionsExtension.cs @@ -0,0 +1,43 @@ +using System; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using MR.AspNetCore.Jobs.Models; +using MR.AspNetCore.Jobs.Server; + +namespace MR.AspNetCore.Jobs +{ + public class PostgreSQLJobsOptionsExtension : IJobsOptionsExtension + { + private Action _configure; + + public PostgreSQLJobsOptionsExtension(Action configure) + { + _configure = configure; + } + + public void AddServices(IServiceCollection services) + { + services.AddSingleton(); + services.AddSingleton(); + services.AddScoped(); + + services.AddSingleton(); + + var PostgreSQLOptions = new PostgreSQLOptions(); + _configure(PostgreSQLOptions); + + services.AddSingleton(PostgreSQLOptions); + services.AddSingleton(PostgreSQLOptions); + + services.AddDbContext(options => + { + options.UseNpgsql(PostgreSQLOptions.ConnectionString, sqlOpts => + { + sqlOpts.MigrationsHistoryTable( + PostgreSQLOptions.MigrationsHistoryTableName, + PostgreSQLOptions.MigrationsHistoryTableSchema ?? PostgreSQLOptions.Schema); + }); + }); + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLJobsOptionsExtensions.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLJobsOptionsExtensions.cs new file mode 100644 index 0000000..46d4456 --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLJobsOptionsExtensions.cs @@ -0,0 +1,25 @@ +using System; +using MR.AspNetCore.Jobs; + +namespace Microsoft.Extensions.DependencyInjection +{ + public static class PostgreSQLJobsOptionsExtensions + { + public static JobsOptions UsePostgreSQL(this JobsOptions options, string connectionString) + { + return options.UsePostgreSQL(opts => + { + opts.ConnectionString = connectionString; + }); + } + + public static JobsOptions UsePostgreSQL(this JobsOptions options, Action configure) + { + if (configure == null) throw new ArgumentNullException(nameof(configure)); + + options.RegisterExtension(new PostgreSQLJobsOptionsExtension(configure)); + + return options; + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLOptions.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLOptions.cs new file mode 100644 index 0000000..37b8d2f --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLOptions.cs @@ -0,0 +1,6 @@ +namespace MR.AspNetCore.Jobs +{ + public class PostgreSQLOptions : EFCoreOptions + { + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorage.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorage.cs new file mode 100644 index 0000000..636cfc6 --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorage.cs @@ -0,0 +1,16 @@ +using System; +using Microsoft.Extensions.Logging; +using MR.AspNetCore.Jobs.Models; + +namespace MR.AspNetCore.Jobs +{ + public class PostgreSQLStorage : EFCoreStorage + { + public PostgreSQLStorage( + IServiceProvider provider, + ILogger logger) + : base(provider, logger) + { + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageConnection.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageConnection.cs new file mode 100644 index 0000000..b29e9c8 --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageConnection.cs @@ -0,0 +1,58 @@ +using System; +using MR.AspNetCore.Jobs.Models; +using MR.AspNetCore.Jobs.Server; +using MR.AspNetCore.Jobs.Server.States; + +namespace MR.AspNetCore.Jobs +{ + public class PostgreSQLStorageConnection : EFCoreStorageConnection + { + public PostgreSQLStorageConnection( + JobsDbContext context, + PostgreSQLOptions options, + IServiceProvider services) + : base(context, options, services) + { + } + + protected override bool UseTransactionFetchedJob => false; + + public override IStorageTransaction CreateTransaction() + { + return new PostgreSQLStorageTransaction(this); + } + + protected override string CreateFetchNextJobQuery() + { + var table = nameof(EFCoreJobsDbContext.JobQueue); + var timeoutSeconds = TimeSpan.FromMinutes(1).Negate().TotalSeconds; + + return $@" +UPDATE ""{Options.Schema}"".""{table}"" +SET ""FetchedAt"" = NOW() AT TIME ZONE 'UTC' +WHERE CTID IN ( + SELECT CTID FROM ""{Options.Schema}"".""{table}"" + WHERE ""FetchedAt"" IS NULL OR ""FetchedAt"" < NOW() AT TIME ZONE 'UTC' + INTERVAL '{timeoutSeconds} SECONDS' + LIMIT 1 +) +RETURNING ""{table}"".""JobId"""; + } + + protected override string CreateGetNextJobToBeEnqueuedQuery() + { + return $@" +SELECT * +FROM ""{Options.Schema}"".""{nameof(EFCoreJobsDbContext.Jobs)}"" +WHERE (""Due"" IS NULL OR ""Due"" < NOW() AT TIME ZONE 'UTC') AND ""StateName"" = '{ScheduledState.StateName}' +LIMIT 1"; + } + + protected override IFetchedJob CreateSqlTimeoutFetchedJob(FetchedJob fetchedJob) + { + return new PostgresSQLTimeoutFetchedJob( + Services, + fetchedJob.Id, + fetchedJob.JobId); + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageTransaction.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageTransaction.cs new file mode 100644 index 0000000..b47e643 --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/PostgreSQLStorageTransaction.cs @@ -0,0 +1,12 @@ +using MR.AspNetCore.Jobs.Models; + +namespace MR.AspNetCore.Jobs +{ + public class PostgreSQLStorageTransaction : EFCoreStorageTransaction + { + public PostgreSQLStorageTransaction(PostgreSQLStorageConnection connection) + : base(connection) + { + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/Server/ExpirationManager.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/Server/ExpirationManager.cs new file mode 100644 index 0000000..7b089e6 --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/Server/ExpirationManager.cs @@ -0,0 +1,31 @@ +using System; +using Microsoft.Extensions.Logging; +using MR.AspNetCore.Jobs.Models; + +namespace MR.AspNetCore.Jobs.Server +{ + public class PostgreSQLExpirationManager : EFCoreExpirationManager + { + public PostgreSQLExpirationManager( + ILogger logger, + PostgreSQLOptions options, + IServiceProvider provider) + : base(logger, options, provider) + { + } + + protected override string CreateDeleteTopQuery(string schema, string table) + { + return $@" +DELETE +FROM ""{schema}"".""{table}"" +WHERE ""ExpiresAt"" < @now +AND ctid IN ( + SELECT ctid + FROM ""{schema}"".""{table}"" + ORDER BY ""ExpiresAt"" + LIMIT @count +)"; + } + } +} diff --git a/src/MR.AspNetCore.Jobs.PostgreSQL/Server/PostgresSQLTimeoutFetchedJob.cs b/src/MR.AspNetCore.Jobs.PostgreSQL/Server/PostgresSQLTimeoutFetchedJob.cs new file mode 100644 index 0000000..8eae751 --- /dev/null +++ b/src/MR.AspNetCore.Jobs.PostgreSQL/Server/PostgresSQLTimeoutFetchedJob.cs @@ -0,0 +1,25 @@ +using System; + +namespace MR.AspNetCore.Jobs.Server +{ + public class PostgresSQLTimeoutFetchedJob : SqlTimeoutFetchedJob + { + public PostgresSQLTimeoutFetchedJob( + IServiceProvider services, + int id, + int jobId) + : base(services, id, jobId) + { + } + + protected override string CreateRemoveFromQueueQuery(EFCoreStorageConnection storageConnection) + { + return $@"DELETE FROM ""{storageConnection.BaseOptions.Schema}"".""JobQueue"" WHERE ""Id"" = @id"; + } + + protected override string CreateRequeueQuery(EFCoreStorageConnection storageConnection) + { + return $@"UPDATE ""{storageConnection.BaseOptions.Schema}"".""JobQueue"" SET ""FetchedAt"" = NULL WHERE ""Id"" = @id"; + } + } +}