Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

77133 check if transform exist before stopping and deleting #77640

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { SavedObjectsClientContract } from 'kibana/server';
import { Logger, SavedObjectsClientContract } from 'kibana/server';

import { saveInstalledEsRefs } from '../../packages/install';
import * as Registry from '../../registry';
Expand Down Expand Up @@ -33,89 +33,100 @@ export const installTransformForDataset = async (
registryPackage: RegistryPackage,
paths: string[],
callCluster: CallESAsCurrentUser,
savedObjectsClient: SavedObjectsClientContract
savedObjectsClient: SavedObjectsClientContract,
logger: Logger
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our current pattern is to have functions use the logger like const logger = appContextService.getLogger(); vs adding them to every function signature.

e.g.

) => {
const installation = await getInstallation({ savedObjectsClient, pkgName: registryPackage.name });
let previousInstalledTransformEsAssets: EsAssetReference[] = [];
if (installation) {
previousInstalledTransformEsAssets = installation.installed_es.filter(
({ type, id }) => type === ElasticsearchAssetType.transform
try {
const installation = await getInstallation({
savedObjectsClient,
pkgName: registryPackage.name,
});
let previousInstalledTransformEsAssets: EsAssetReference[] = [];
if (installation) {
previousInstalledTransformEsAssets = installation.installed_es.filter(
({ type, id }) => type === ElasticsearchAssetType.transform
);
}

// delete all previous transform
await deleteTransforms(
callCluster,
previousInstalledTransformEsAssets.map((asset) => asset.id)
);
}

// delete all previous transform
await deleteTransforms(
callCluster,
previousInstalledTransformEsAssets.map((asset) => asset.id)
);
// install the latest dataset
const datasets = registryPackage.datasets;
if (!datasets?.length) return [];
const installNameSuffix = `${registryPackage.version}`;

const transformPaths = paths.filter((path) => isTransform(path));
let installedTransforms: EsAssetReference[] = [];
if (transformPaths.length > 0) {
const transformPathDatasets = datasets.reduce<TransformPathDataset[]>((acc, dataset) => {
transformPaths.forEach((path) => {
if (isDatasetTransform(path, dataset.path)) {
acc.push({ path, dataset });
}
});
return acc;
}, []);

const transformRefs = transformPathDatasets.reduce<EsAssetReference[]>(
(acc, transformPathDataset) => {
if (transformPathDataset) {
acc.push({
id: getTransformNameForInstallation(transformPathDataset, installNameSuffix),
type: ElasticsearchAssetType.transform,
});
}
// install the latest dataset
const datasets = registryPackage.datasets;
if (!datasets?.length) return [];
const installNameSuffix = `${registryPackage.version}`;

const transformPaths = paths.filter((path) => isTransform(path));
let installedTransforms: EsAssetReference[] = [];
if (transformPaths.length > 0) {
const transformPathDatasets = datasets.reduce<TransformPathDataset[]>((acc, dataset) => {
transformPaths.forEach((path) => {
if (isDatasetTransform(path, dataset.path)) {
acc.push({ path, dataset });
}
});
return acc;
},
[]
);

// get and save transform refs before installing transforms
await saveInstalledEsRefs(savedObjectsClient, registryPackage.name, transformRefs);

const transforms: TransformInstallation[] = transformPathDatasets.map(
(transformPathDataset: TransformPathDataset) => {
return {
installationName: getTransformNameForInstallation(
transformPathDataset,
installNameSuffix
),
content: getAsset(transformPathDataset.path).toString('utf-8'),
};
}
);
}, []);

const transformRefs = transformPathDatasets.reduce<EsAssetReference[]>(
(acc, transformPathDataset) => {
if (transformPathDataset) {
acc.push({
id: getTransformNameForInstallation(transformPathDataset, installNameSuffix),
type: ElasticsearchAssetType.transform,
});
}
return acc;
},
[]
);

// get and save transform refs before installing transforms
await saveInstalledEsRefs(savedObjectsClient, registryPackage.name, transformRefs);

const transforms: TransformInstallation[] = transformPathDatasets.map(
(transformPathDataset: TransformPathDataset) => {
return {
installationName: getTransformNameForInstallation(
transformPathDataset,
installNameSuffix
),
content: getAsset(transformPathDataset.path).toString('utf-8'),
};
}
);

const installationPromises = transforms.map(async (transform) => {
return installTransform({ callCluster, transform });
});
const installationPromises = transforms.map(async (transform) => {
return installTransform({ callCluster, transform, logger });
});

installedTransforms = await Promise.all(installationPromises).then((results) => results.flat());
}
installedTransforms = await Promise.all(installationPromises).then((results) =>
results.flat()
);
}

if (previousInstalledTransformEsAssets.length > 0) {
const currentInstallation = await getInstallation({
savedObjectsClient,
pkgName: registryPackage.name,
});
if (previousInstalledTransformEsAssets.length > 0) {
const currentInstallation = await getInstallation({
savedObjectsClient,
pkgName: registryPackage.name,
});

// remove the saved object reference
await deleteTransformRefs(
savedObjectsClient,
currentInstallation?.installed_es || [],
registryPackage.name,
previousInstalledTransformEsAssets.map((asset) => asset.id),
installedTransforms.map((installed) => installed.id)
);
// remove the saved object reference
await deleteTransformRefs(
savedObjectsClient,
currentInstallation?.installed_es || [],
registryPackage.name,
previousInstalledTransformEsAssets.map((asset) => asset.id),
installedTransforms.map((installed) => installed.id)
);
}
return installedTransforms;
} catch (err) {
logger.error(err);
throw err;
}
return installedTransforms;
};

const isTransform = (path: string) => {
Expand All @@ -136,24 +147,31 @@ const isDatasetTransform = (path: string, datasetName: string) => {
async function installTransform({
callCluster,
transform,
logger,
}: {
callCluster: CallESAsCurrentUser;
transform: TransformInstallation;
logger: Logger;
}): Promise<EsAssetReference> {
// defer validation on put if the source index is not available
await callCluster('transport.request', {
method: 'PUT',
path: `_transform/${transform.installationName}`,
query: 'defer_validation=true',
body: transform.content,
});

await callCluster('transport.request', {
method: 'POST',
path: `_transform/${transform.installationName}/_start`,
});

return { id: transform.installationName, type: ElasticsearchAssetType.transform };
try {
// defer validation on put if the source index is not available
await callCluster('transport.request', {
method: 'PUT',
path: `_transform/${transform.installationName}`,
query: 'defer_validation=true',
body: transform.content,
});

await callCluster('transport.request', {
method: 'POST',
path: `_transform/${transform.installationName}/_start`,
});

return { id: transform.installationName, type: ElasticsearchAssetType.transform };
} catch (err) {
logger.error(err);
throw err;
}
}

const getTransformNameForInstallation = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,20 @@ export const deleteTransforms = async (
) => {
await Promise.all(
transformIds.map(async (transformId) => {
await stopTransforms([transformId], callCluster);
await callCluster('transport.request', {
method: 'DELETE',
query: 'force=true',
const response = await callCluster('transport.request', {
method: 'GET',
path: `_transform/${transformId}`,
ignore: [404],
});
if (response && response.count > 0) {
await stopTransforms([transformId], callCluster);
await callCluster('transport.request', {
method: 'DELETE',
query: 'force=true',
path: `_transform/${transformId}`,
ignore: [404],
});
}
})
);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
* you may not use this file except in compliance with the Elastic License.
*/

// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import { loggingSystemMock } from '../../../../../../../../src/core/server/logging/logging_system.mock';

jest.mock('../../packages/get', () => {
return { getInstallation: jest.fn(), getInstallationObject: jest.fn() };
});
Expand All @@ -15,7 +18,12 @@ jest.mock('./common', () => {
});

import { installTransformForDataset } from './install';
import { ILegacyScopedClusterClient, SavedObject, SavedObjectsClientContract } from 'kibana/server';
import {
ILegacyScopedClusterClient,
LoggerFactory,
SavedObject,
SavedObjectsClientContract,
} from 'kibana/server';
import { ElasticsearchAssetType, Installation, RegistryPackage } from '../../../../types';
import { getInstallation, getInstallationObject } from '../../packages';
import { getAsset } from './common';
Expand All @@ -25,6 +33,7 @@ import { savedObjectsClientMock } from '../../../../../../../../src/core/server/
describe('test transform install', () => {
let legacyScopedClusterClient: jest.Mocked<ILegacyScopedClusterClient>;
let savedObjectsClient: jest.Mocked<SavedObjectsClientContract>;
let logger: jest.Mocked<LoggerFactory>;
beforeEach(() => {
legacyScopedClusterClient = {
callAsInternalUser: jest.fn(),
Expand All @@ -33,6 +42,7 @@ describe('test transform install', () => {
(getInstallation as jest.MockedFunction<typeof getInstallation>).mockReset();
(getInstallationObject as jest.MockedFunction<typeof getInstallationObject>).mockReset();
savedObjectsClient = savedObjectsClientMock.create();
logger = loggingSystemMock.create();
});

afterEach(() => {
Expand Down Expand Up @@ -90,7 +100,7 @@ describe('test transform install', () => {
},
} as unknown) as SavedObject<Installation>)
);

legacyScopedClusterClient.callAsCurrentUser.mockReturnValueOnce({ count: 1 });
await installTransformForDataset(
({
name: 'endpoint',
Expand Down Expand Up @@ -132,10 +142,18 @@ describe('test transform install', () => {
'endpoint-0.16.0-dev.0/dataset/metadata_current/elasticsearch/transform/default.json',
],
legacyScopedClusterClient.callAsCurrentUser,
savedObjectsClient
savedObjectsClient,
logger.get('ingest')
);

expect(legacyScopedClusterClient.callAsCurrentUser.mock.calls).toEqual([
[
'transport.request',
{
method: 'GET',
path: '_transform/metrics-endpoint.metadata_current-default-0.15.0-dev.0',
ignore: [404],
},
],
[
'transport.request',
{
Expand Down Expand Up @@ -287,7 +305,8 @@ describe('test transform install', () => {
} as unknown) as RegistryPackage,
['endpoint-0.16.0-dev.0/dataset/metadata_current/elasticsearch/transform/default.json'],
legacyScopedClusterClient.callAsCurrentUser,
savedObjectsClient
savedObjectsClient,
logger.get('ingest')
);

expect(legacyScopedClusterClient.callAsCurrentUser.mock.calls).toEqual([
Expand Down Expand Up @@ -346,7 +365,7 @@ describe('test transform install', () => {
attributes: { installed_es: currentInstallation.installed_es },
} as unknown) as SavedObject<Installation>)
);

legacyScopedClusterClient.callAsCurrentUser.mockReturnValueOnce({ count: 1 });
await installTransformForDataset(
({
name: 'endpoint',
Expand Down Expand Up @@ -384,26 +403,35 @@ describe('test transform install', () => {
} as unknown) as RegistryPackage,
[],
legacyScopedClusterClient.callAsCurrentUser,
savedObjectsClient
savedObjectsClient,
logger.get('ingest')
);

expect(legacyScopedClusterClient.callAsCurrentUser.mock.calls).toEqual([
[
'transport.request',
{
method: 'GET',
path: '_transform/metrics-endpoint.metadata-current-default-0.15.0-dev.0',
ignore: [404],
},
],
[
'transport.request',
{
method: 'POST',
path: '_transform/metrics-endpoint.metadata-current-default-0.15.0-dev.0/_stop',
query: 'force=true',
ignore: [404],
},
],
[
'transport.request',
{
ignore: [404],
method: 'DELETE',
path: '_transform/metrics-endpoint.metadata-current-default-0.15.0-dev.0',
query: 'force=true',
path: '_transform/metrics-endpoint.metadata-current-default-0.15.0-dev.0',
ignore: [404],
},
],
]);
Expand Down
Loading