From defa6ba3f1b2d606998eb403eb79085378a9e202 Mon Sep 17 00:00:00 2001 From: Ying Date: Tue, 3 Sep 2024 14:30:32 -0400 Subject: [PATCH] Using esClient bulk for true partial update --- x-pack/plugins/task_manager/server/task.ts | 8 + .../task_claimers/strategy_mget.test.ts | 845 +++++++++--------- .../server/task_claimers/strategy_mget.ts | 65 +- .../task_manager/server/task_store.mock.ts | 1 + .../task_manager/server/task_store.test.ts | 382 +++++++- .../plugins/task_manager/server/task_store.ts | 99 +- 6 files changed, 911 insertions(+), 489 deletions(-) diff --git a/x-pack/plugins/task_manager/server/task.ts b/x-pack/plugins/task_manager/server/task.ts index 772d2615ce84a..bbe2935bdfc6d 100644 --- a/x-pack/plugins/task_manager/server/task.ts +++ b/x-pack/plugins/task_manager/server/task.ts @@ -456,6 +456,10 @@ export interface ConcreteTaskInstance extends TaskInstance { partition?: number; } +export type PartialConcreteTaskInstance = Partial & { + id: ConcreteTaskInstance['id']; +}; + export interface ConcreteTaskInstanceVersion { /** The _id of the the document (not the SO id) */ esId: string; @@ -490,3 +494,7 @@ export type SerializedConcreteTaskInstance = Omit< runAt: string; partition?: number; }; + +export type PartialSerializedConcreteTaskInstance = Partial & { + id: SerializedConcreteTaskInstance['id']; +}; diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts index 767c84e37ca21..36c49c1b3ddb0 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts @@ -19,6 +19,7 @@ import { ConcreteTaskInstanceVersion, TaskPriority, TaskCost, + PartialConcreteTaskInstance, } from '../task'; import { SearchOpts, StoreOpts } from '../task_store'; import { asTaskClaimEvent, TaskEvent } from '../task_events'; @@ -177,8 +178,8 @@ describe('TaskClaiming', () => { for (let i = 0; i < hits.length; i++) { store.msearch.mockResolvedValueOnce({ docs: hits[i], versionMap: versionMaps[i] }); store.getDocVersions.mockResolvedValueOnce(versionMaps[i]); - const oneBulkResult = hits[i].map((hit) => asOk(hit)); - store.bulkUpdate.mockResolvedValueOnce(oneBulkResult); + const oneBulkResult = hits[i].map((hit) => getPartialUpdateResult(hit)); + store.bulkPartialUpdate.mockResolvedValueOnce(oneBulkResult); const oneBulkGetResult = hits[i].map((hit) => asOk(hit)); store.bulkGet.mockResolvedValueOnce(oneBulkGetResult); } @@ -352,8 +353,8 @@ describe('TaskClaiming', () => { store.bulkGet.mockResolvedValueOnce( [fetchedTasks[0], fetchedTasks[1], fetchedTasks[2]].map(asOk) ); - store.bulkUpdate.mockResolvedValueOnce( - [fetchedTasks[0], fetchedTasks[1], fetchedTasks[2]].map(asOk) + store.bulkPartialUpdate.mockResolvedValueOnce( + [fetchedTasks[0], fetchedTasks[1], fetchedTasks[2]].map(getPartialUpdateResult) ); const taskClaiming = new TaskClaiming({ @@ -401,36 +402,39 @@ describe('TaskClaiming', () => { 'task:id-5', 'task:id-6', ]); - expect(store.bulkUpdate).toHaveBeenCalledTimes(1); - expect(store.bulkUpdate).toHaveBeenCalledWith( - [ - { - ...fetchedTasks[0], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - { - ...fetchedTasks[1], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - { - ...fetchedTasks[2], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - ], - { validate: false, excludeLargeFields: true } - ); + expect(store.bulkPartialUpdate).toHaveBeenCalledTimes(1); + expect(store.bulkPartialUpdate).toHaveBeenCalledWith([ + { + id: fetchedTasks[0].id, + version: fetchedTasks[0].version, + scheduledAt: fetchedTasks[0].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + { + id: fetchedTasks[1].id, + version: fetchedTasks[1].version, + scheduledAt: fetchedTasks[1].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + { + id: fetchedTasks[2].id, + version: fetchedTasks[2].version, + scheduledAt: fetchedTasks[2].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + ]); expect(store.bulkGet).toHaveBeenCalledWith(['id-1', 'id-2', 'id-3']); expect(result.stats).toEqual({ @@ -458,8 +462,10 @@ describe('TaskClaiming', () => { store.getDocVersions.mockResolvedValueOnce(docLatestVersions); store.bulkGet.mockResolvedValueOnce([fetchedTasks[2]].map(asOk)); - store.bulkUpdate.mockResolvedValueOnce([fetchedTasks[2]].map(asOk)); - store.bulkUpdate.mockResolvedValueOnce([fetchedTasks[0], fetchedTasks[1]].map(asOk)); + store.bulkPartialUpdate.mockResolvedValueOnce([fetchedTasks[2]].map(getPartialUpdateResult)); + store.bulkPartialUpdate.mockResolvedValueOnce( + [fetchedTasks[0], fetchedTasks[1]].map(getPartialUpdateResult) + ); const taskClaiming = new TaskClaiming({ logger: taskManagerLogger, @@ -499,35 +505,31 @@ describe('TaskClaiming', () => { seq_no_primary_term: true, }); expect(store.getDocVersions).toHaveBeenCalledWith(['task:id-1', 'task:id-2', 'task:id-3']); - expect(store.bulkUpdate).toHaveBeenCalledTimes(2); - expect(store.bulkUpdate).toHaveBeenNthCalledWith( - 1, - [ - { - ...fetchedTasks[2], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - ], - { validate: false, excludeLargeFields: true } - ); - expect(store.bulkUpdate).toHaveBeenNthCalledWith( - 2, - [ - { - ...fetchedTasks[0], - status: 'unrecognized', - }, - { - ...fetchedTasks[1], - status: 'unrecognized', - }, - ], - { validate: false, excludeLargeFields: true } - ); + expect(store.bulkPartialUpdate).toHaveBeenCalledTimes(2); + expect(store.bulkPartialUpdate).toHaveBeenNthCalledWith(1, [ + { + id: fetchedTasks[2].id, + version: fetchedTasks[2].version, + scheduledAt: fetchedTasks[2].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + ]); + expect(store.bulkPartialUpdate).toHaveBeenNthCalledWith(2, [ + { + id: fetchedTasks[0].id, + version: fetchedTasks[0].version, + status: 'unrecognized', + }, + { + id: fetchedTasks[1].id, + version: fetchedTasks[1].version, + status: 'unrecognized', + }, + ]); expect(store.bulkGet).toHaveBeenCalledWith(['id-3']); expect(result.stats).toEqual({ @@ -555,8 +557,8 @@ describe('TaskClaiming', () => { store.getDocVersions.mockResolvedValueOnce(docLatestVersions); store.bulkGet.mockResolvedValueOnce([fetchedTasks[2]].map(asOk)); - store.bulkUpdate.mockResolvedValueOnce([fetchedTasks[2]].map(asOk)); - store.bulkUpdate.mockResolvedValueOnce([ + store.bulkPartialUpdate.mockResolvedValueOnce([fetchedTasks[2]].map(getPartialUpdateResult)); + store.bulkPartialUpdate.mockResolvedValueOnce([ asOk(fetchedTasks[0]), // @ts-expect-error asErr({ @@ -608,35 +610,31 @@ describe('TaskClaiming', () => { seq_no_primary_term: true, }); expect(store.getDocVersions).toHaveBeenCalledWith(['task:id-1', 'task:id-2', 'task:id-3']); - expect(store.bulkUpdate).toHaveBeenCalledTimes(2); - expect(store.bulkUpdate).toHaveBeenNthCalledWith( - 1, - [ - { - ...fetchedTasks[2], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - ], - { validate: false, excludeLargeFields: true } - ); - expect(store.bulkUpdate).toHaveBeenNthCalledWith( - 2, - [ - { - ...fetchedTasks[0], - status: 'unrecognized', - }, - { - ...fetchedTasks[1], - status: 'unrecognized', - }, - ], - { validate: false, excludeLargeFields: true } - ); + expect(store.bulkPartialUpdate).toHaveBeenCalledTimes(2); + expect(store.bulkPartialUpdate).toHaveBeenNthCalledWith(1, [ + { + id: fetchedTasks[2].id, + version: fetchedTasks[2].version, + scheduledAt: fetchedTasks[2].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + ]); + expect(store.bulkPartialUpdate).toHaveBeenNthCalledWith(2, [ + { + id: fetchedTasks[0].id, + version: fetchedTasks[0].version, + status: 'unrecognized', + }, + { + id: fetchedTasks[1].id, + version: fetchedTasks[1].version, + status: 'unrecognized', + }, + ]); expect(store.bulkGet).toHaveBeenCalledWith(['id-3']); expect(result.stats).toEqual({ @@ -664,8 +662,8 @@ describe('TaskClaiming', () => { store.getDocVersions.mockResolvedValueOnce(docLatestVersions); store.bulkGet.mockResolvedValueOnce([fetchedTasks[2]].map(asOk)); - store.bulkUpdate.mockResolvedValueOnce([fetchedTasks[2]].map(asOk)); - store.bulkUpdate.mockRejectedValueOnce(new Error('Oh no')); + store.bulkPartialUpdate.mockResolvedValueOnce([fetchedTasks[2]].map(getPartialUpdateResult)); + store.bulkPartialUpdate.mockRejectedValueOnce(new Error('Oh no')); const taskClaiming = new TaskClaiming({ logger: taskManagerLogger, @@ -710,35 +708,31 @@ describe('TaskClaiming', () => { }); expect(store.getDocVersions).toHaveBeenCalledWith(['task:id-1', 'task:id-2', 'task:id-3']); expect(store.bulkGet).toHaveBeenCalledWith(['id-3']); - expect(store.bulkUpdate).toHaveBeenCalledTimes(2); - expect(store.bulkUpdate).toHaveBeenNthCalledWith( - 1, - [ - { - ...fetchedTasks[2], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - ], - { validate: false, excludeLargeFields: true } - ); - expect(store.bulkUpdate).toHaveBeenNthCalledWith( - 2, - [ - { - ...fetchedTasks[0], - status: 'unrecognized', - }, - { - ...fetchedTasks[1], - status: 'unrecognized', - }, - ], - { validate: false, excludeLargeFields: true } - ); + expect(store.bulkPartialUpdate).toHaveBeenCalledTimes(2); + expect(store.bulkPartialUpdate).toHaveBeenNthCalledWith(1, [ + { + id: fetchedTasks[2].id, + version: fetchedTasks[2].version, + scheduledAt: fetchedTasks[2].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + ]); + expect(store.bulkPartialUpdate).toHaveBeenNthCalledWith(2, [ + { + id: fetchedTasks[0].id, + version: fetchedTasks[0].version, + status: 'unrecognized', + }, + { + id: fetchedTasks[1].id, + version: fetchedTasks[1].version, + status: 'unrecognized', + }, + ]); expect(result.stats).toEqual({ tasksClaimed: 1, @@ -795,7 +789,7 @@ describe('TaskClaiming', () => { }); expect(store.getDocVersions).not.toHaveBeenCalled(); expect(store.bulkGet).not.toHaveBeenCalled(); - expect(store.bulkUpdate).not.toHaveBeenCalled(); + expect(store.bulkPartialUpdate).not.toHaveBeenCalled(); expect(result.stats).toEqual({ tasksClaimed: 0, @@ -821,7 +815,9 @@ describe('TaskClaiming', () => { store.getDocVersions.mockResolvedValueOnce(docLatestVersions); store.bulkGet.mockResolvedValueOnce([fetchedTasks[1], fetchedTasks[2]].map(asOk)); - store.bulkUpdate.mockResolvedValueOnce([fetchedTasks[1], fetchedTasks[2]].map(asOk)); + store.bulkPartialUpdate.mockResolvedValueOnce( + [fetchedTasks[1], fetchedTasks[2]].map(getPartialUpdateResult) + ); const taskClaiming = new TaskClaiming({ logger: taskManagerLogger, @@ -861,28 +857,29 @@ describe('TaskClaiming', () => { seq_no_primary_term: true, }); expect(store.getDocVersions).toHaveBeenCalledWith(['task:id-1', 'task:id-2', 'task:id-3']); - expect(store.bulkUpdate).toHaveBeenCalledTimes(1); - expect(store.bulkUpdate).toHaveBeenCalledWith( - [ - { - ...fetchedTasks[1], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - { - ...fetchedTasks[2], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - ], - { validate: false, excludeLargeFields: true } - ); + expect(store.bulkPartialUpdate).toHaveBeenCalledTimes(1); + expect(store.bulkPartialUpdate).toHaveBeenCalledWith([ + { + id: fetchedTasks[1].id, + version: fetchedTasks[1].version, + scheduledAt: fetchedTasks[1].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + { + id: fetchedTasks[2].id, + version: fetchedTasks[2].version, + scheduledAt: fetchedTasks[2].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + ]); expect(store.bulkGet).toHaveBeenCalledWith(['id-2', 'id-3']); expect(result.stats).toEqual({ @@ -911,7 +908,9 @@ describe('TaskClaiming', () => { store.getDocVersions.mockResolvedValueOnce(docLatestVersions); store.bulkGet.mockResolvedValueOnce([fetchedTasks[1], fetchedTasks[2]].map(asOk)); - store.bulkUpdate.mockResolvedValueOnce([fetchedTasks[1], fetchedTasks[2]].map(asOk)); + store.bulkPartialUpdate.mockResolvedValueOnce( + [fetchedTasks[1], fetchedTasks[2]].map(getPartialUpdateResult) + ); const taskClaiming = new TaskClaiming({ logger: taskManagerLogger, @@ -951,28 +950,29 @@ describe('TaskClaiming', () => { seq_no_primary_term: true, }); expect(store.getDocVersions).toHaveBeenCalledWith(['task:id-1', 'task:id-2', 'task:id-3']); - expect(store.bulkUpdate).toHaveBeenCalledTimes(1); - expect(store.bulkUpdate).toHaveBeenCalledWith( - [ - { - ...fetchedTasks[1], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - { - ...fetchedTasks[2], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - ], - { validate: false, excludeLargeFields: true } - ); + expect(store.bulkPartialUpdate).toHaveBeenCalledTimes(1); + expect(store.bulkPartialUpdate).toHaveBeenCalledWith([ + { + id: fetchedTasks[1].id, + version: fetchedTasks[1].version, + scheduledAt: fetchedTasks[1].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + { + id: fetchedTasks[2].id, + version: fetchedTasks[2].version, + scheduledAt: fetchedTasks[2].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + ]); expect(store.bulkGet).toHaveBeenCalledWith(['id-2', 'id-3']); expect(result.stats).toEqual({ @@ -1001,7 +1001,9 @@ describe('TaskClaiming', () => { store.getDocVersions.mockResolvedValueOnce(docLatestVersions); store.bulkGet.mockResolvedValueOnce([fetchedTasks[1], fetchedTasks[2]].map(asOk)); - store.bulkUpdate.mockResolvedValueOnce([fetchedTasks[1], fetchedTasks[2]].map(asOk)); + store.bulkPartialUpdate.mockResolvedValueOnce( + [fetchedTasks[1], fetchedTasks[2]].map(getPartialUpdateResult) + ); const taskClaiming = new TaskClaiming({ logger: taskManagerLogger, @@ -1041,28 +1043,29 @@ describe('TaskClaiming', () => { seq_no_primary_term: true, }); expect(store.getDocVersions).toHaveBeenCalledWith(['task:id-1', 'task:id-2', 'task:id-3']); - expect(store.bulkUpdate).toHaveBeenCalledTimes(1); - expect(store.bulkUpdate).toHaveBeenCalledWith( - [ - { - ...fetchedTasks[1], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - { - ...fetchedTasks[2], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - ], - { validate: false, excludeLargeFields: true } - ); + expect(store.bulkPartialUpdate).toHaveBeenCalledTimes(1); + expect(store.bulkPartialUpdate).toHaveBeenCalledWith([ + { + id: fetchedTasks[1].id, + version: fetchedTasks[1].version, + scheduledAt: fetchedTasks[1].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + { + id: fetchedTasks[2].id, + version: fetchedTasks[2].version, + scheduledAt: fetchedTasks[2].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + ]); expect(store.bulkGet).toHaveBeenCalledWith(['id-2', 'id-3']); expect(result.stats).toEqual({ @@ -1095,8 +1098,10 @@ describe('TaskClaiming', () => { store.bulkGet.mockResolvedValueOnce( [fetchedTasks[0], fetchedTasks[1], fetchedTasks[2], fetchedTasks[4]].map(asOk) ); - store.bulkUpdate.mockResolvedValueOnce( - [fetchedTasks[0], fetchedTasks[1], fetchedTasks[2], fetchedTasks[4]].map(asOk) + store.bulkPartialUpdate.mockResolvedValueOnce( + [fetchedTasks[0], fetchedTasks[1], fetchedTasks[2], fetchedTasks[4]].map( + getPartialUpdateResult + ) ); const taskClaiming = new TaskClaiming({ @@ -1144,44 +1149,49 @@ describe('TaskClaiming', () => { 'task:id-5', 'task:id-6', ]); - expect(store.bulkUpdate).toHaveBeenCalledTimes(1); - expect(store.bulkUpdate).toHaveBeenCalledWith( - [ - { - ...fetchedTasks[0], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - { - ...fetchedTasks[1], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - { - ...fetchedTasks[2], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - { - ...fetchedTasks[4], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - ], - { validate: false, excludeLargeFields: true } - ); + expect(store.bulkPartialUpdate).toHaveBeenCalledTimes(1); + expect(store.bulkPartialUpdate).toHaveBeenCalledWith([ + { + id: fetchedTasks[0].id, + version: fetchedTasks[0].version, + scheduledAt: fetchedTasks[0].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + { + id: fetchedTasks[1].id, + version: fetchedTasks[1].version, + scheduledAt: fetchedTasks[1].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + { + id: fetchedTasks[2].id, + version: fetchedTasks[2].version, + scheduledAt: fetchedTasks[2].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + { + id: fetchedTasks[4].id, + version: fetchedTasks[4].version, + scheduledAt: fetchedTasks[4].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + ]); expect(store.bulkGet).toHaveBeenCalledWith(['id-1', 'id-2', 'id-3', 'id-5']); expect(result.stats).toEqual({ @@ -1208,8 +1218,10 @@ describe('TaskClaiming', () => { const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks); store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); store.getDocVersions.mockResolvedValueOnce(docLatestVersions); - store.bulkUpdate.mockResolvedValueOnce( - [fetchedTasks[0], fetchedTasks[1], fetchedTasks[2], fetchedTasks[3]].map(asOk) + store.bulkPartialUpdate.mockResolvedValueOnce( + [fetchedTasks[0], fetchedTasks[1], fetchedTasks[2], fetchedTasks[3]].map( + getPartialUpdateResult + ) ); store.bulkGet.mockResolvedValueOnce([ asOk(fetchedTasks[0]), @@ -1270,44 +1282,49 @@ describe('TaskClaiming', () => { 'task:id-3', 'task:id-4', ]); - expect(store.bulkUpdate).toHaveBeenCalledTimes(1); - expect(store.bulkUpdate).toHaveBeenCalledWith( - [ - { - ...fetchedTasks[0], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - { - ...fetchedTasks[1], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - { - ...fetchedTasks[2], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - { - ...fetchedTasks[3], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - ], - { validate: false, excludeLargeFields: true } - ); + expect(store.bulkPartialUpdate).toHaveBeenCalledTimes(1); + expect(store.bulkPartialUpdate).toHaveBeenCalledWith([ + { + id: fetchedTasks[0].id, + version: fetchedTasks[0].version, + scheduledAt: fetchedTasks[0].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + { + id: fetchedTasks[1].id, + version: fetchedTasks[1].version, + scheduledAt: fetchedTasks[1].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + { + id: fetchedTasks[2].id, + version: fetchedTasks[2].version, + scheduledAt: fetchedTasks[2].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + { + id: fetchedTasks[3].id, + version: fetchedTasks[3].version, + scheduledAt: fetchedTasks[3].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + ]); expect(store.bulkGet).toHaveBeenCalledWith(['id-1', 'id-2', 'id-3', 'id-4']); expect(result.stats).toEqual({ @@ -1334,8 +1351,10 @@ describe('TaskClaiming', () => { const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks); store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); store.getDocVersions.mockResolvedValueOnce(docLatestVersions); - store.bulkUpdate.mockResolvedValueOnce( - [fetchedTasks[0], fetchedTasks[1], fetchedTasks[2], fetchedTasks[3]].map(asOk) + store.bulkPartialUpdate.mockResolvedValueOnce( + [fetchedTasks[0], fetchedTasks[1], fetchedTasks[2], fetchedTasks[3]].map( + getPartialUpdateResult + ) ); store.bulkGet.mockRejectedValueOnce(new Error('oh no')); @@ -1373,44 +1392,49 @@ describe('TaskClaiming', () => { 'task:id-3', 'task:id-4', ]); - expect(store.bulkUpdate).toHaveBeenCalledTimes(1); - expect(store.bulkUpdate).toHaveBeenCalledWith( - [ - { - ...fetchedTasks[0], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - { - ...fetchedTasks[1], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - { - ...fetchedTasks[2], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - { - ...fetchedTasks[3], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - ], - { validate: false, excludeLargeFields: true } - ); + expect(store.bulkPartialUpdate).toHaveBeenCalledTimes(1); + expect(store.bulkPartialUpdate).toHaveBeenCalledWith([ + { + id: fetchedTasks[0].id, + version: fetchedTasks[0].version, + scheduledAt: fetchedTasks[0].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + { + id: fetchedTasks[1].id, + version: fetchedTasks[1].version, + scheduledAt: fetchedTasks[1].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + { + id: fetchedTasks[2].id, + version: fetchedTasks[2].version, + scheduledAt: fetchedTasks[2].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + { + id: fetchedTasks[3].id, + version: fetchedTasks[3].version, + scheduledAt: fetchedTasks[3].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + ]); expect(store.bulkGet).toHaveBeenCalledWith(['id-1', 'id-2', 'id-3', 'id-4']); }); @@ -1428,16 +1452,16 @@ describe('TaskClaiming', () => { const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks); store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); store.getDocVersions.mockResolvedValueOnce(docLatestVersions); - store.bulkUpdate.mockResolvedValueOnce([ - asOk(fetchedTasks[0]), + store.bulkPartialUpdate.mockResolvedValueOnce([ + getPartialUpdateResult(fetchedTasks[0]), // @ts-expect-error asErr({ type: 'task', id: fetchedTasks[1].id, error: new Error('Oh no'), }), - asOk(fetchedTasks[2]), - asOk(fetchedTasks[3]), + getPartialUpdateResult(fetchedTasks[2]), + getPartialUpdateResult(fetchedTasks[3]), ]); store.bulkGet.mockResolvedValueOnce([ asOk(fetchedTasks[0]), @@ -1492,44 +1516,49 @@ describe('TaskClaiming', () => { 'task:id-3', 'task:id-4', ]); - expect(store.bulkUpdate).toHaveBeenCalledTimes(1); - expect(store.bulkUpdate).toHaveBeenCalledWith( - [ - { - ...fetchedTasks[0], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - { - ...fetchedTasks[1], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - { - ...fetchedTasks[2], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - { - ...fetchedTasks[3], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - ], - { validate: false, excludeLargeFields: true } - ); + expect(store.bulkPartialUpdate).toHaveBeenCalledTimes(1); + expect(store.bulkPartialUpdate).toHaveBeenCalledWith([ + { + id: fetchedTasks[0].id, + version: fetchedTasks[0].version, + scheduledAt: fetchedTasks[0].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + { + id: fetchedTasks[1].id, + version: fetchedTasks[1].version, + scheduledAt: fetchedTasks[1].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + { + id: fetchedTasks[2].id, + version: fetchedTasks[2].version, + scheduledAt: fetchedTasks[2].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + { + id: fetchedTasks[3].id, + version: fetchedTasks[3].version, + scheduledAt: fetchedTasks[3].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + ]); expect(store.bulkGet).toHaveBeenCalledWith(['id-1', 'id-3', 'id-4']); expect(result.stats).toEqual({ @@ -1556,7 +1585,7 @@ describe('TaskClaiming', () => { const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks); store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); store.getDocVersions.mockResolvedValueOnce(docLatestVersions); - store.bulkUpdate.mockRejectedValueOnce(new Error('oh no')); + store.bulkPartialUpdate.mockRejectedValueOnce(new Error('oh no')); store.bulkGet.mockResolvedValueOnce([]); const taskClaiming = new TaskClaiming({ @@ -1593,44 +1622,49 @@ describe('TaskClaiming', () => { 'task:id-3', 'task:id-4', ]); - expect(store.bulkUpdate).toHaveBeenCalledTimes(1); - expect(store.bulkUpdate).toHaveBeenCalledWith( - [ - { - ...fetchedTasks[0], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - { - ...fetchedTasks[1], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - { - ...fetchedTasks[2], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - { - ...fetchedTasks[3], - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - ], - { validate: false, excludeLargeFields: true } - ); + expect(store.bulkPartialUpdate).toHaveBeenCalledTimes(1); + expect(store.bulkPartialUpdate).toHaveBeenCalledWith([ + { + id: fetchedTasks[0].id, + version: fetchedTasks[0].version, + scheduledAt: fetchedTasks[0].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + { + id: fetchedTasks[1].id, + version: fetchedTasks[1].version, + scheduledAt: fetchedTasks[1].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + { + id: fetchedTasks[2].id, + version: fetchedTasks[2].version, + scheduledAt: fetchedTasks[2].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + { + id: fetchedTasks[3].id, + version: fetchedTasks[3].version, + scheduledAt: fetchedTasks[3].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + ]); expect(store.bulkGet).not.toHaveBeenCalled(); }); @@ -1880,13 +1914,13 @@ describe('TaskClaiming', () => { doc = { ...doc, retryAt: null }; return asOk(doc); }); - taskStore.bulkUpdate.mockResolvedValueOnce(updatedDocs); + taskStore.bulkPartialUpdate.mockResolvedValueOnce(updatedDocs); taskStore.bulkGet.mockResolvedValueOnce(updatedDocs); } taskStore.msearch.mockResolvedValue({ docs: [], versionMap: new Map() }); taskStore.getDocVersions.mockResolvedValue(new Map()); - taskStore.bulkUpdate.mockResolvedValue([]); + taskStore.bulkPartialUpdate.mockResolvedValue([]); taskStore.bulkGet.mockResolvedValue([]); const taskClaiming = new TaskClaiming({ @@ -1968,6 +2002,17 @@ function generateFakeTasks(count: number = 1) { return _.times(count, (index) => mockInstance({ id: `task:id-${index}` })); } +function getPartialUpdateResult(task: ConcreteTaskInstance) { + return asOk({ + id: task.id, + version: task.version, + scheduledAt: task.runAt, + ownerId: 'test-test', + retryAt: task.runAt, + status: 'claiming', + } as PartialConcreteTaskInstance); +} + function mockInstance(instance: Partial = {}) { return Object.assign( { diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts index 260f398571570..eff90bffb7434 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts @@ -18,7 +18,6 @@ import { SavedObjectsErrorHelpers } from '@kbn/core/server'; import apm from 'elastic-apm-node'; import { Subject, Observable } from 'rxjs'; -import { omit } from 'lodash'; import { TaskTypeDictionary } from '../task_type_dictionary'; import { TaskClaimerOpts, @@ -26,7 +25,13 @@ import { getEmptyClaimOwnershipResult, getExcludedTaskTypes, } from '.'; -import { ConcreteTaskInstance, TaskStatus, ConcreteTaskInstanceVersion, TaskCost } from '../task'; +import { + ConcreteTaskInstance, + TaskStatus, + ConcreteTaskInstanceVersion, + TaskCost, + PartialConcreteTaskInstance, +} from '../task'; import { TASK_MANAGER_TRANSACTION_TYPE } from '../task_running'; import { TASK_MANAGER_MARK_AS_CLAIMED } from '../queries/task_claiming'; import { TaskClaim, asTaskClaimEvent, startTaskTimer } from '../task_events'; @@ -188,12 +193,11 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise task.id))).reduce< - ConcreteTaskInstance[] - >((acc, task) => { - if (isOk(task)) { - acc.push(task.value); - } else { - const { id, type, error } = task.error; - logger.error(`Error getting full task ${id}:${type} during claim: ${error.message}`, logMeta); - bulkGetErrors++; - } - return acc; - }, []); + const fullTasksToRun = (await taskStore.bulkGet(updatedTaskIds)).reduce( + (acc, task) => { + if (isOk(task)) { + acc.push(task.value); + } else { + const { id, type, error } = task.error; + logger.error( + `Error getting full task ${id}:${type} during claim: ${error.message}`, + logMeta + ); + bulkGetErrors++; + } + return acc; + }, + [] + ); // separate update for removed tasks; shouldn't happen often, so unlikely // a performance concern, and keeps the rest of the logic simpler let removedCount = 0; if (removedTasks.length > 0) { const tasksToRemove = Array.from(removedTasks); + const tasksToRemoveUpdates: PartialConcreteTaskInstance[] = []; for (const task of tasksToRemove) { - task.status = TaskStatus.Unrecognized; + tasksToRemoveUpdates.push({ + id: task.id, + version: task.version, + status: TaskStatus.Unrecognized, + }); } // don't worry too much about errors, we'll get them next time try { - const removeResults = await taskStore.bulkUpdate(tasksToRemove, { - validate: false, - excludeLargeFields: true, - }); + const removeResults = await taskStore.bulkPartialUpdate(tasksToRemoveUpdates); for (const removeResult of removeResults) { if (isOk(removeResult)) { removedCount++; diff --git a/x-pack/plugins/task_manager/server/task_store.mock.ts b/x-pack/plugins/task_manager/server/task_store.mock.ts index 7cf051f406532..2872286ba861e 100644 --- a/x-pack/plugins/task_manager/server/task_store.mock.ts +++ b/x-pack/plugins/task_manager/server/task_store.mock.ts @@ -24,6 +24,7 @@ export const taskStoreMock = { schedule: jest.fn(), bulkSchedule: jest.fn(), bulkUpdate: jest.fn(), + bulkPartialUpdate: jest.fn(), bulkRemove: jest.fn(), get: jest.fn(), getLifecycle: jest.fn(), diff --git a/x-pack/plugins/task_manager/server/task_store.test.ts b/x-pack/plugins/task_manager/server/task_store.test.ts index 19f2861b0ed16..3d9399130f7b4 100644 --- a/x-pack/plugins/task_manager/server/task_store.test.ts +++ b/x-pack/plugins/task_manager/server/task_store.test.ts @@ -8,7 +8,7 @@ import { schema } from '@kbn/config-schema'; import { Client } from '@elastic/elasticsearch'; import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; -import _, { omit } from 'lodash'; +import _ from 'lodash'; import { first } from 'rxjs'; import { @@ -17,14 +17,18 @@ import { TaskLifecycleResult, SerializedConcreteTaskInstance, } from './task'; -import { elasticsearchServiceMock, savedObjectsServiceMock } from '@kbn/core/server/mocks'; +import { + ElasticsearchClientMock, + elasticsearchServiceMock, + savedObjectsServiceMock, +} from '@kbn/core/server/mocks'; import { TaskStore, SearchOpts, AggregationOpts, taskInstanceToAttributes } from './task_store'; import { savedObjectsRepositoryMock } from '@kbn/core/server/mocks'; import { SavedObjectAttributes, SavedObjectsErrorHelpers } from '@kbn/core/server'; import { TaskTypeDictionary } from './task_type_dictionary'; import { mockLogger } from './test_utils'; import { AdHocTaskCounter } from './lib/adhoc_task_counter'; -import { asErr } from './lib/result_type'; +import { asErr, asOk } from './lib/result_type'; import { UpdateByQueryResponse } from '@elastic/elasticsearch/lib/api/types'; const mockGetValidatedTaskInstanceFromReading = jest.fn(); @@ -347,7 +351,7 @@ describe('TaskStore', () => { expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`); }); - test('excludes state and params from source when excludeState is true', async () => { + test('excludes state and params from source when limitResponse is true', async () => { const { args } = await testFetch({}, [], true); expect(args).toMatchObject({ index: 'tasky', @@ -889,7 +893,7 @@ describe('TaskStore', () => { ); }); - test(`logs warning and doesn't validate whenever excludeLargeFields option is passed-in`, async () => { + test('pushes error from saved objects client to errors$', async () => { const task = { runAt: mockedDate, scheduledAt: mockedDate, @@ -906,66 +910,354 @@ describe('TaskStore', () => { traceparent: '', }; - savedObjectsClient.bulkUpdate.mockResolvedValue({ - saved_objects: [ - { - id: '324242', - type: 'task', - attributes: { - ...task, - state: '{"foo":"bar"}', - params: '{"hello":"world"}', - }, - references: [], - version: '123', - }, - ], - }); + const firstErrorPromise = store.errors$.pipe(first()).toPromise(); + savedObjectsClient.bulkUpdate.mockRejectedValue(new Error('Failure')); + await expect( + store.bulkUpdate([task], { validate: true }) + ).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`); + expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`); + }); + }); - await store.bulkUpdate([task], { validate: true, excludeLargeFields: true }); + describe('bulkPartialUpdate', () => { + let store: TaskStore; + let esClient: ElasticsearchClientMock; + const logger = mockLogger(); - expect(logger.warn).toHaveBeenCalledWith( - `Skipping validation for bulk update because excludeLargeFields=true.` - ); - expect(mockGetValidatedTaskInstanceForUpdating).toHaveBeenCalledWith(task, { - validate: false, + beforeAll(() => { + esClient = elasticsearchServiceMock.createClusterClient().asInternalUser; + store = new TaskStore({ + logger, + index: 'tasky', + taskManagerId: '', + serializer, + esClient, + definitions: taskDefinitions, + savedObjectsRepository: savedObjectsClient, + adHocTaskCounter, + allowReadingInvalidState: false, + requestTimeouts: { + update_by_query: 1000, + }, }); + }); - expect(savedObjectsClient.bulkUpdate).toHaveBeenCalledWith( - [ - { - id: task.id, - type: 'task', - version: task.version, - attributes: omit(taskInstanceToAttributes(task, task.id), ['state', 'params']), - }, - ], - { refresh: false } - ); + test(`should return immediately if no docs to update`, async () => { + await store.bulkPartialUpdate([]); + + expect(mockGetValidatedTaskInstanceForUpdating).not.toHaveBeenCalled(); + expect(esClient.bulk).not.toHaveBeenCalled(); }); - test('pushes error from saved objects client to errors$', async () => { + test(`should perform partial update using esClient`, async () => { const task = { + id: '324242', + version: 'WzQsMV0=', runAt: mockedDate, scheduledAt: mockedDate, startedAt: null, retryAt: null, - id: 'task:324242', params: { hello: 'world' }, state: { foo: 'bar' }, taskType: 'report', attempts: 3, status: 'idle' as TaskStatus, - version: '123', - ownerId: null, + ownerId: 'testtest', traceparent: '', }; + esClient.bulk.mockResolvedValue({ + errors: false, + took: 0, + items: [ + { + update: { + _index: '.kibana_task_manager_8.16.0_001', + _id: 'task:324242', + _version: 2, + result: 'updated', + _shards: { total: 1, successful: 1, failed: 0 }, + _seq_no: 84, + _primary_term: 1, + status: 200, + }, + }, + ], + }); + + const result = await store.bulkPartialUpdate([task]); + + expect(mockGetValidatedTaskInstanceForUpdating).not.toHaveBeenCalled(); + + expect(esClient.bulk).toHaveBeenCalledWith({ + body: [ + { update: { _id: 'task:324242', if_primary_term: 1, if_seq_no: 4 } }, + { + doc: { + task: { + attempts: 3, + ownerId: 'testtest', + params: '{"hello":"world"}', + retryAt: null, + runAt: '2019-02-12T21:01:22.479Z', + scheduledAt: '2019-02-12T21:01:22.479Z', + startedAt: null, + state: '{"foo":"bar"}', + status: 'idle', + taskType: 'report', + traceparent: '', + }, + }, + }, + ], + index: 'tasky', + refresh: false, + }); + + expect(result).toEqual([asOk(task)]); + }); + + test(`should perform partial update with minimal fields`, async () => { + const task = { + id: '324242', + version: 'WzQsMV0=', + attempts: 3, + }; + + esClient.bulk.mockResolvedValue({ + errors: false, + took: 0, + items: [ + { + update: { + _index: '.kibana_task_manager_8.16.0_001', + _id: 'task:324242', + _version: 2, + result: 'updated', + _shards: { total: 1, successful: 1, failed: 0 }, + _seq_no: 84, + _primary_term: 1, + status: 200, + }, + }, + ], + }); + + const result = await store.bulkPartialUpdate([task]); + + expect(mockGetValidatedTaskInstanceForUpdating).not.toHaveBeenCalled(); + + expect(esClient.bulk).toHaveBeenCalledWith({ + body: [ + { update: { _id: 'task:324242', if_primary_term: 1, if_seq_no: 4 } }, + { doc: { task: { attempts: 3 } } }, + ], + index: 'tasky', + refresh: false, + }); + + expect(result).toEqual([asOk(task)]); + }); + + test(`should perform partial update with no version`, async () => { + const task = { + id: '324242', + attempts: 3, + }; + + esClient.bulk.mockResolvedValue({ + errors: false, + took: 0, + items: [ + { + update: { + _index: '.kibana_task_manager_8.16.0_001', + _id: 'task:324242', + _version: 2, + result: 'updated', + _shards: { total: 1, successful: 1, failed: 0 }, + _seq_no: 84, + _primary_term: 1, + status: 200, + }, + }, + ], + }); + + const result = await store.bulkPartialUpdate([task]); + + expect(mockGetValidatedTaskInstanceForUpdating).not.toHaveBeenCalled(); + + expect(esClient.bulk).toHaveBeenCalledWith({ + body: [{ update: { _id: 'task:324242' } }, { doc: { task: { attempts: 3 } } }], + index: 'tasky', + refresh: false, + }); + + expect(result).toEqual([asOk(task)]); + }); + + test(`should gracefully handle errors within the response`, async () => { + const task1 = { + id: '324242', + version: 'WzQsMV0=', + attempts: 3, + }; + + const task2 = { + id: '45343254', + version: 'WzQsMV0=', + status: 'running' as TaskStatus, + }; + + esClient.bulk.mockResolvedValue({ + errors: false, + took: 0, + items: [ + { + update: { + _index: '.kibana_task_manager_8.16.0_001', + _id: 'task:324242', + _version: 2, + result: 'updated', + _shards: { total: 1, successful: 1, failed: 0 }, + _seq_no: 84, + _primary_term: 1, + status: 200, + }, + }, + { + update: { + _index: '.kibana_task_manager_8.16.0_001', + _id: 'task:45343254', + _version: 2, + error: { + type: 'document_missing_exception', + reason: '[5]: document missing', + index_uuid: 'aAsFqTI0Tc2W0LCWgPNrOA', + shard: '0', + index: '.kibana_task_manager_8.16.0_001', + }, + status: 404, + }, + }, + ], + }); + + const result = await store.bulkPartialUpdate([task1, task2]); + + expect(mockGetValidatedTaskInstanceForUpdating).not.toHaveBeenCalled(); + + expect(esClient.bulk).toHaveBeenCalledWith({ + body: [ + { update: { _id: 'task:324242', if_primary_term: 1, if_seq_no: 4 } }, + { doc: { task: { attempts: 3 } } }, + { update: { _id: 'task:45343254', if_primary_term: 1, if_seq_no: 4 } }, + { doc: { task: { status: 'running' } } }, + ], + index: 'tasky', + refresh: false, + }); + + expect(result).toEqual([ + asOk(task1), + asErr({ + type: 'task', + id: '45343254', + error: { + type: 'document_missing_exception', + reason: '[5]: document missing', + index_uuid: 'aAsFqTI0Tc2W0LCWgPNrOA', + shard: '0', + index: '.kibana_task_manager_8.16.0_001', + }, + }), + ]); + }); + + test(`should gracefully handle malformed errors within the response`, async () => { + const task1 = { + id: '324242', + version: 'WzQsMV0=', + attempts: 3, + }; + + const task2 = { + id: '45343254', + version: 'WzQsMV0=', + status: 'running' as TaskStatus, + }; + + esClient.bulk.mockResolvedValue({ + errors: false, + took: 0, + items: [ + { + update: { + _index: '.kibana_task_manager_8.16.0_001', + _id: 'task:324242', + _version: 2, + result: 'updated', + _shards: { total: 1, successful: 1, failed: 0 }, + _seq_no: 84, + _primary_term: 1, + status: 200, + }, + }, + { + update: { + _index: '.kibana_task_manager_8.16.0_001', + _version: 2, + error: { + type: 'document_missing_exception', + reason: '[5]: document missing', + index_uuid: 'aAsFqTI0Tc2W0LCWgPNrOA', + shard: '0', + index: '.kibana_task_manager_8.16.0_001', + }, + status: 404, + }, + }, + ], + }); + + const result = await store.bulkPartialUpdate([task1, task2]); + + expect(mockGetValidatedTaskInstanceForUpdating).not.toHaveBeenCalled(); + + expect(esClient.bulk).toHaveBeenCalledWith({ + body: [ + { update: { _id: 'task:324242', if_primary_term: 1, if_seq_no: 4 } }, + { doc: { task: { attempts: 3 } } }, + { update: { _id: 'task:45343254', if_primary_term: 1, if_seq_no: 4 } }, + { doc: { task: { status: 'running' } } }, + ], + index: 'tasky', + refresh: false, + }); + + expect(result).toEqual([ + asOk(task1), + asErr({ + type: 'task', + id: 'unknown', + error: { type: 'malformed response' }, + }), + ]); + }); + + test('pushes error from saved objects client to errors$', async () => { + const task = { + id: '324242', + version: 'WzQsMV0=', + attempts: 3, + }; + const firstErrorPromise = store.errors$.pipe(first()).toPromise(); - savedObjectsClient.bulkUpdate.mockRejectedValue(new Error('Failure')); - await expect( - store.bulkUpdate([task], { validate: true }) - ).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`); + esClient.bulk.mockRejectedValue(new Error('Failure')); + await expect(store.bulkPartialUpdate([task])).rejects.toThrowErrorMatchingInlineSnapshot( + `"Failure"` + ); expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`); }); }); diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index 12a1f256c585b..4450d30ae3cf9 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -26,6 +26,7 @@ import { ElasticsearchClient, } from '@kbn/core/server'; +import { decodeRequestVersion } from '@kbn/core-saved-objects-base-server-internal'; import { RequestTimeoutsConfig } from './config'; import { asOk, asErr, Result } from './lib/result_type'; @@ -36,6 +37,8 @@ import { TaskLifecycle, TaskLifecycleResult, SerializedConcreteTaskInstance, + PartialConcreteTaskInstance, + PartialSerializedConcreteTaskInstance, } from './task'; import { TaskTypeDictionary } from './task_type_dictionary'; @@ -87,7 +90,6 @@ export interface FetchResult { export interface BulkUpdateOpts { validate: boolean; - excludeLargeFields?: boolean; } export type BulkUpdateResult = Result< @@ -95,6 +97,11 @@ export type BulkUpdateResult = Result< { type: string; id: string; error: SavedObjectError } >; +export type PartialBulkUpdateResult = Result< + PartialConcreteTaskInstance, + { type: string; id: string; error: estypes.ErrorCause } +>; + export type BulkGetResult = Array< Result >; @@ -114,7 +121,6 @@ export class TaskStore { public readonly taskManagerId: string; public readonly errors$ = new Subject(); public readonly taskValidator: TaskValidator; - private readonly logger: Logger; private esClient: ElasticsearchClient; private esClientWithoutRetries: ElasticsearchClient; @@ -141,7 +147,6 @@ export class TaskStore { this.serializer = opts.serializer; this.savedObjectsRepository = opts.savedObjectsRepository; this.adHocTaskCounter = opts.adHocTaskCounter; - this.logger = opts.logger; this.taskValidator = new TaskValidator({ logger: opts.logger, definitions: opts.definitions, @@ -302,23 +307,13 @@ export class TaskStore { */ public async bulkUpdate( docs: ConcreteTaskInstance[], - { validate, excludeLargeFields = false }: BulkUpdateOpts + { validate }: BulkUpdateOpts ): Promise { - // if we're excluding large fields (state and params), we cannot apply validation so log a warning - if (validate && excludeLargeFields) { - validate = false; - this.logger.warn(`Skipping validation for bulk update because excludeLargeFields=true.`); - } - const attributesByDocId = docs.reduce((attrsById, doc) => { const taskInstance = this.taskValidator.getValidatedTaskInstanceForUpdating(doc, { validate, }); - const taskAttributes = taskInstanceToAttributes(taskInstance, doc.id); - attrsById.set( - doc.id, - excludeLargeFields ? omit(taskAttributes, 'state', 'params') : taskAttributes - ); + attrsById.set(doc.id, taskInstanceToAttributes(taskInstance, doc.id)); return attrsById; }, new Map()); @@ -364,6 +359,66 @@ export class TaskStore { }); } + public async bulkPartialUpdate( + docs: PartialConcreteTaskInstance[] + ): Promise { + if (docs.length === 0) { + return []; + } + + const bulkBody = []; + for (const doc of docs) { + bulkBody.push({ + update: { + _id: `task:${doc.id}`, + ...(doc.version ? decodeRequestVersion(doc.version) : {}), + }, + }); + bulkBody.push({ + doc: { + task: partialTaskInstanceToAttributes(doc), + }, + }); + } + + let result: estypes.BulkResponse; + try { + result = await this.esClient.bulk({ index: this.index, refresh: false, body: bulkBody }); + } catch (e) { + this.errors$.next(e); + throw e; + } + + return result.items.map((item) => { + if (!item.update || !item.update._id) { + return asErr({ + type: 'task', + id: 'unknown', + error: { type: 'malformed response' }, + }); + } + + const docId = item.update._id.startsWith('task:') + ? item.update._id.slice(5) + : item.update._id; + + if (item.update?.error) { + return asErr({ + type: 'task', + id: docId, + error: item.update.error, + }); + } + + const doc = docs.find((d) => d.id === docId); + + return asOk({ + ...doc, + id: docId, + }); + }); + } + /** * Removes the specified task from the index. * @@ -719,6 +774,20 @@ export function taskInstanceToAttributes( } as SerializedConcreteTaskInstance; } +export function partialTaskInstanceToAttributes( + doc: PartialConcreteTaskInstance +): PartialSerializedConcreteTaskInstance { + return { + ...omit(doc, 'id', 'version'), + ...(doc.params ? { params: JSON.stringify(doc.params) } : {}), + ...(doc.state ? { state: JSON.stringify(doc.state) } : {}), + ...(doc.scheduledAt ? { scheduledAt: doc.scheduledAt.toISOString() } : {}), + ...(doc.startedAt ? { startedAt: doc.startedAt.toISOString() } : {}), + ...(doc.retryAt ? { retryAt: doc.retryAt.toISOString() } : {}), + ...(doc.runAt ? { runAt: doc.runAt.toISOString() } : {}), + } as PartialSerializedConcreteTaskInstance; +} + export function savedObjectToConcreteTaskInstance( savedObject: Omit, 'references'> ): ConcreteTaskInstance {