Skip to content

Commit

Permalink
[Fleet] Kafka preconfiguration (#163601)
Browse files Browse the repository at this point in the history
  • Loading branch information
szwarckonrad and juliaElastic authored Aug 10, 2023
1 parent 1ed94da commit e5a591c
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ describe('output preconfiguration', () => {
hosts: ['http://es.co:80'],
is_preconfigured: true,
},
{
id: 'existing-kafka-output-1',
is_default: false,
is_default_monitoring: false,
name: 'Kafka Output 1',
// @ts-ignore
type: 'kafka',
hosts: ['kafka.co:80'],
is_preconfigured: true,
},
];
});
});
Expand Down Expand Up @@ -112,6 +122,25 @@ describe('output preconfiguration', () => {
expect(spyAgentPolicyServicBumpAllAgentPoliciesForOutput).not.toBeCalled();
});

it('should create preconfigured kafka output that does not exists', async () => {
const soClient = savedObjectsClientMock.create();
const esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
await createOrUpdatePreconfiguredOutputs(soClient, esClient, [
{
id: 'non-existing-kafka-output-1',
name: 'Output 1',
type: 'kafka',
is_default: false,
is_default_monitoring: false,
hosts: ['test.fr:2000'],
},
]);

expect(mockedOutputService.create).toBeCalled();
expect(mockedOutputService.update).not.toBeCalled();
expect(spyAgentPolicyServicBumpAllAgentPoliciesForOutput).not.toBeCalled();
});

it('should create a preconfigured output with ca_trusted_fingerprint that does not exists', async () => {
const soClient = savedObjectsClientMock.create();
const esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
Expand Down Expand Up @@ -238,6 +267,26 @@ describe('output preconfiguration', () => {
expect(spyAgentPolicyServicBumpAllAgentPoliciesForOutput).toBeCalled();
});

it('should update output if preconfigured kafka output exists and changed', async () => {
const soClient = savedObjectsClientMock.create();
const esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
soClient.find.mockResolvedValue({ saved_objects: [], page: 0, per_page: 0, total: 0 });
await createOrUpdatePreconfiguredOutputs(soClient, esClient, [
{
id: 'existing-kafka-output-1',
is_default: false,
is_default_monitoring: false,
name: 'Kafka Output 1',
type: 'kafka',
hosts: ['kafka.co:8080'],
},
]);

expect(mockedOutputService.create).not.toBeCalled();
expect(mockedOutputService.update).toBeCalled();
expect(spyAgentPolicyServicBumpAllAgentPoliciesForOutput).toBeCalled();
});

it('should not update output if preconfigured output exists and did not changed', async () => {
const soClient = savedObjectsClientMock.create();
const esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
Expand All @@ -258,6 +307,26 @@ describe('output preconfiguration', () => {
expect(spyAgentPolicyServicBumpAllAgentPoliciesForOutput).toBeCalled();
});

it('should not update output if preconfigured kafka output exists and did not change', async () => {
const soClient = savedObjectsClientMock.create();
const esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
soClient.find.mockResolvedValue({ saved_objects: [], page: 0, per_page: 0, total: 0 });
await createOrUpdatePreconfiguredOutputs(soClient, esClient, [
{
id: 'existing-kafka-output-1',
is_default: false,
is_default_monitoring: false,
name: 'Kafka Output 1',
type: 'kafka',
hosts: ['kafka.co:8080'],
},
]);

expect(mockedOutputService.create).not.toBeCalled();
expect(mockedOutputService.update).toBeCalled();
expect(spyAgentPolicyServicBumpAllAgentPoliciesForOutput).toBeCalled();
});

const SCENARIOS: Array<{ name: string; data: PreconfiguredOutput }> = [
{
name: 'no changes',
Expand Down
31 changes: 30 additions & 1 deletion x-pack/plugins/fleet/server/services/preconfiguration/outputs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,34 @@ function isPreconfiguredOutputDifferentFromCurrent(
existingOutput: Output,
preconfiguredOutput: Partial<Output>
): boolean {
const kafkaFieldsAreDifferent = (): boolean => {
if (existingOutput.type !== 'kafka' || preconfiguredOutput.type !== 'kafka') {
return false;
}

return (
isDifferent(existingOutput.client_id, preconfiguredOutput.client_id) ||
isDifferent(existingOutput.version, preconfiguredOutput.version) ||
isDifferent(existingOutput.key, preconfiguredOutput.key) ||
isDifferent(existingOutput.compression, preconfiguredOutput.compression) ||
isDifferent(existingOutput.compression_level, preconfiguredOutput.compression_level) ||
isDifferent(existingOutput.auth_type, preconfiguredOutput.auth_type) ||
isDifferent(existingOutput.connection_type, preconfiguredOutput.connection_type) ||
isDifferent(existingOutput.username, preconfiguredOutput.username) ||
isDifferent(existingOutput.password, preconfiguredOutput.password) ||
isDifferent(existingOutput.sasl, preconfiguredOutput.sasl) ||
isDifferent(existingOutput.partition, preconfiguredOutput.partition) ||
isDifferent(existingOutput.random, preconfiguredOutput.random) ||
isDifferent(existingOutput.round_robin, preconfiguredOutput.round_robin) ||
isDifferent(existingOutput.hash, preconfiguredOutput.hash) ||
isDifferent(existingOutput.topics, preconfiguredOutput.topics) ||
isDifferent(existingOutput.headers, preconfiguredOutput.headers) ||
isDifferent(existingOutput.timeout, preconfiguredOutput.timeout) ||
isDifferent(existingOutput.broker_timeout, preconfiguredOutput.broker_timeout) ||
isDifferent(existingOutput.required_acks, preconfiguredOutput.required_acks)
);
};

return (
!existingOutput.is_preconfigured ||
isDifferent(existingOutput.is_default, preconfiguredOutput.is_default) ||
Expand All @@ -191,6 +219,7 @@ function isPreconfiguredOutputDifferentFromCurrent(
) ||
isDifferent(existingOutput.config_yaml, preconfiguredOutput.config_yaml) ||
isDifferent(existingOutput.proxy_id, preconfiguredOutput.proxy_id) ||
isDifferent(existingOutput.allow_edit ?? [], preconfiguredOutput.allow_edit ?? [])
isDifferent(existingOutput.allow_edit ?? [], preconfiguredOutput.allow_edit ?? []) ||
kafkaFieldsAreDifferent()
);
}

0 comments on commit e5a591c

Please sign in to comment.