Skip to content

Commit

Permalink
Merge branch 'master' into gradle_cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
kszapsza authored Jan 9, 2025
2 parents b881307 + da5a91f commit dc16706
Show file tree
Hide file tree
Showing 109 changed files with 3,563 additions and 58 deletions.
24 changes: 24 additions & 0 deletions docs/docs/configuration/inactive-topics-detection.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Inactive Topics Detection

Hermes Management provides an optional feature to detect inactive topics and
notify about them. This feature is **disabled by default**. You can enable it
and configure other options in the Hermes Management configuration.

Option | Description | Default value
-------------------------------------------------------------|----------------------------------------------------------------------------|---------------
detection.inactive-topics.enabled | enable inactive topics detection | false
detection.inactive-topics.inactivity-threshold | duration after which a topic is considered inactive and first notified | 60d
detection.inactive-topics.next-notification-threshold | duration after previous notification after which a topic is notified again | 14d
detection.inactive-topics.whitelisted-qualified-topic-names | list of qualified topic names that will not be notified event if inactive | []
detection.inactive-topics.cron | cron expression for the detection job | 0 0 8 * * *
detection.inactive-topics.notifications-history-limit | how many notification timestamps will be kept in history | 5

The detection job runs on a single instance of Hermes Management that is a
leader based on the leader election Zookeeper instance.

Option | Description | Default Value
------------------------------------|-----------------------------------------------------------------------------|---------------
management.leadership.zookeeper-dc | Specifies the datacenter of the Zookeeper instance used for leader election | dc

To make notifying work, you need to provide an implementation of
`pl.allegro.tech.hermes.management.domain.detection.InactiveTopicsNotifier`
5 changes: 5 additions & 0 deletions docs/docs/configuration/message-tracking.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,8 @@ LogRepository logRepository(Client client) {
return new ElasticsearchLogRepository(client);
}
```

### UI configuration
Ui console can be configured to show tracking urls to users for topics and subscriptions.
To enable this, make bean implementing `pl.allegro.tech.hermes.tracker.management.TrackingUrlProvider`
available in Spring context.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public enum ErrorCode {
INVALID_QUERY(BAD_REQUEST),
IMPLEMENTATION_ABSENT(NOT_FOUND),
MOVING_SUBSCRIPTION_OFFSETS_VALIDATION_ERROR(BAD_REQUEST),
SENDING_TO_KAFKA_TIMEOUT(SERVICE_UNAVAILABLE);
SENDING_TO_KAFKA_TIMEOUT(SERVICE_UNAVAILABLE),
CONSUMER_GROUP_DELETION_ERROR(INTERNAL_SERVER_ERROR);

private final int httpCode;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.allegro.tech.hermes.common.kafka.offset;

import java.util.List;
import java.util.Set;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.kafka.KafkaTopic;
import pl.allegro.tech.hermes.common.kafka.KafkaTopicName;
Expand All @@ -14,7 +15,7 @@ void setSubscriptionOffset(
PartitionOffset partitionOffset);

PartitionOffsets getSubscriptionOffsets(
TopicName topic, String subscriptionName, String brokersClusterName);
TopicName topic, String subscriptionName, String brokersClusterName, Set<Integer> partitions);

boolean areOffsetsMoved(
TopicName topicName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public class ZookeeperPaths {
public static final String DATACENTER_READINESS_PATH = "datacenter-readiness";
public static final String OFFLINE_RETRANSMISSION_PATH = "offline-retransmission";
public static final String OFFLINE_RETRANSMISSION_TASKS_PATH = "tasks";
public static final String INACTIVE_TOPICS_PATH = "inactive-topics";
public static final String MANAGEMENT_PATH = "management";
public static final String MANAGEMENT_PATH_LEADER = "leader";
public static final String CONSUMER_GROUP_TO_DELETE = "consumer-group-to-delete";
public static final String CONSUMER_GROUP_TO_DELETE_TASKS = "tasks";

private final String basePath;

Expand Down Expand Up @@ -182,6 +187,24 @@ public String offlineRetransmissionPath(String taskId) {
.join(basePath, OFFLINE_RETRANSMISSION_PATH, OFFLINE_RETRANSMISSION_TASKS_PATH, taskId);
}

public String inactiveTopicsPath() {
return Joiner.on(URL_SEPARATOR).join(basePath, INACTIVE_TOPICS_PATH);
}

public String managementLeaderPath() {
return Joiner.on(URL_SEPARATOR).join(basePath, MANAGEMENT_PATH, MANAGEMENT_PATH_LEADER);
}

public String consumerGroupToDeletePath() {
return Joiner.on(URL_SEPARATOR)
.join(basePath, CONSUMER_GROUP_TO_DELETE, CONSUMER_GROUP_TO_DELETE_TASKS);
}

public String consumerGroupToDeletePath(String taskId) {
return Joiner.on(URL_SEPARATOR)
.join(basePath, CONSUMER_GROUP_TO_DELETE, CONSUMER_GROUP_TO_DELETE_TASKS, taskId);
}

public String join(String... parts) {
return Joiner.on(URL_SEPARATOR).join(parts);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.TopicName;
Expand Down Expand Up @@ -63,7 +65,10 @@ public void setSubscriptionOffset(

@Override
public PartitionOffsets getSubscriptionOffsets(
TopicName topic, String subscriptionName, String brokersClusterName) {
TopicName topic,
String subscriptionName,
String brokersClusterName,
Set<Integer> partitions) {
subscriptionRepository.ensureSubscriptionExists(topic, subscriptionName);
String kafkaTopicsPath = paths.subscribedKafkaTopicsPath(topic, subscriptionName);

Expand All @@ -74,7 +79,7 @@ public PartitionOffsets getSubscriptionOffsets(
kafkaTopic ->
allOffsets.addAll(
getOffsetsForKafkaTopic(
topic, kafkaTopic, subscriptionName, brokersClusterName)));
topic, kafkaTopic, subscriptionName, brokersClusterName, partitions)));
return allOffsets;
}

Expand Down Expand Up @@ -134,19 +139,29 @@ private PartitionOffsets getOffsetsForKafkaTopic(
TopicName topic,
KafkaTopicName kafkaTopicName,
String subscriptionName,
String brokersClusterName) {
String offsetsPath =
paths.offsetsPath(topic, subscriptionName, kafkaTopicName, brokersClusterName);

String brokersClusterName,
Set<Integer> partitions) {
PartitionOffsets offsets = new PartitionOffsets();
for (String partitionAsString : getZookeeperChildrenForPath(offsetsPath)) {
Integer partition = Integer.valueOf(partitionAsString);
offsets.add(
new PartitionOffset(
for (Integer partition : partitions) {
try {
offsets.add(
new PartitionOffset(
kafkaTopicName,
getOffsetForPartition(
topic, kafkaTopicName, subscriptionName, brokersClusterName, partition),
partition));
} catch (InternalProcessingException ex) {
if (ex.getCause() instanceof NoNodeException) {
logger.warn(
"No offset for partition {} in kafka topic {} for topic {} subscription {}",
partition,
kafkaTopicName,
getOffsetForPartition(
topic, kafkaTopicName, subscriptionName, brokersClusterName, partition),
partition));
topic,
subscriptionName);
continue;
}
throw ex;
}
}
return offsets;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package pl.allegro.tech.hermes.infrastructure.zookeeper

import com.google.common.primitives.Longs

import pl.allegro.tech.hermes.api.Topic
import pl.allegro.tech.hermes.api.TopicName
import pl.allegro.tech.hermes.common.kafka.KafkaTopicName
Expand All @@ -9,8 +9,6 @@ import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets
import pl.allegro.tech.hermes.domain.subscription.SubscriptionNotExistsException
import pl.allegro.tech.hermes.test.IntegrationTest

import java.nio.charset.StandardCharsets

import static pl.allegro.tech.hermes.test.helper.builder.GroupBuilder.group
import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription
import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topic
Expand Down Expand Up @@ -43,7 +41,7 @@ class ZookeeperSubscriptionOffsetChangeIndicatorTest extends IntegrationTest {
indicator.setSubscriptionOffset(TOPIC.name, 'override', 'primary', new PartitionOffset(primaryKafkaTopicName, 10, 1))

then:
def offsets = indicator.getSubscriptionOffsets(TOPIC.name, 'override', 'primary')
def offsets = indicator.getSubscriptionOffsets(TOPIC.name, 'override', 'primary', [1] as Set)
offsets.find { it.partition == 1 } == new PartitionOffset(primaryKafkaTopicName, 10, 1)
}

Expand All @@ -54,7 +52,7 @@ class ZookeeperSubscriptionOffsetChangeIndicatorTest extends IntegrationTest {
indicator.setSubscriptionOffset(TOPIC.name, 'read', 'primary', new PartitionOffset(primaryKafkaTopicName, 10, 1))

when:
PartitionOffsets offsets = indicator.getSubscriptionOffsets(TOPIC.name, 'read', 'primary')
PartitionOffsets offsets = indicator.getSubscriptionOffsets(TOPIC.name, 'read', 'primary', [1] as Set)

then:
(offsets.find { it.partition == 1 } as PartitionOffset).offset == 10
Expand Down
22 changes: 20 additions & 2 deletions hermes-console/json-server/db.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
{
"inactiveTopics": [
{"topic": "group.topic1", "lastPublishedTsMs": 1732499845200, "notificationTsMs": [1733499835210, 1733499645212], "whitelisted": false},
{"topic": "group.topic2", "lastPublishedTsMs": 1633928665148, "notificationTsMs": [], "whitelisted": true},
{"topic": "pl.allegro.public.group.DummyEvent", "lastPublishedTsMs": 1633928665148, "notificationTsMs": [1733499645212], "whitelisted": false}
],
"groups": [
"pl.allegro.public.offer",
"pl.allegro.public.offer.product",
Expand Down Expand Up @@ -138,6 +143,11 @@
"offlineClientsSource": {
"source": "https://www.openstreetmap.org/export/embed.html?bbox=-0.004017949104309083%2C51.47612752641776%2C0.00030577182769775396%2C51.478569861898606&layer=mapnik"
},
"topicClients": [
"[email protected]",
"[email protected]",
"[email protected]"
],
"inconsistentTopics": [
"pl.allegro.group.Topic1_avro",
"pl.allegro.group.Topic2_avro",
Expand Down Expand Up @@ -203,7 +213,7 @@
},
"jsonToAvroDryRun": false,
"ack": "LEADER",
"trackingEnabled": false,
"trackingEnabled": true,
"migratedFromJsonType": false,
"schemaIdAwareSerializationEnabled": false,
"contentType": "AVRO",
Expand Down Expand Up @@ -241,6 +251,14 @@
"throughput": "0.0"
}
],
"topicsTrackingUrls": [
{"name": "Tracking Link 1", "url": "#"},
{"name": "Tracking Link 2", "url": "#"}
],
"subscriptionsTrackingUrls": [
{"name": "Tracking Link 1", "url": "#"},
{"name": "Tracking Link 2", "url": "#"}
],
"topicsOwners": [
{
"id": "41",
Expand Down Expand Up @@ -383,7 +401,7 @@
"retryClientErrors": true,
"backoffMaxIntervalMillis": 600000
},
"trackingEnabled": false,
"trackingEnabled": true,
"trackingMode": "trackingOff",
"owner": {
"source": "Service Catalog",
Expand Down
6 changes: 5 additions & 1 deletion hermes-console/json-server/routes.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
"/owners/sources/Service%20Catalog/:id": "/topicsOwners/:id",
"/readiness/datacenters": "/readinessDatacenters",
"/topics": "/topicNames",
"/tracking-urls/topics/:topicName": "/topicsTrackingUrls",
"/tracking-urls/topics/:topicName/subscriptions/:subscriptionName": "/subscriptionsTrackingUrls",
"/topics/:id/metrics": "/topicsMetrics/:id",
"/topics/:id/preview": "/topicPreview",
"/topics/:id/offline-clients-source": "/offlineClientsSource",
"/topics/:id/clients": "/topicClients",
"/topics/:id/subscriptions": "/topicSubscriptions",
"/topics/:topicName/subscriptions/:id": "/subscriptions/:id",
"/topics/:topicName/subscriptions/:id/consumer-groups": "/consumerGroups",
Expand All @@ -26,5 +29,6 @@
"/owners/sources": "/ownerSources",
"/owners/sources/*?search=:searchPhrase": "/topicsOwners?name_like=:searchPhrase",
"/dashboards/topics/:topicName": "/topicDashboardUrl",
"/dashboards/topics/:topicName/subscriptions/:id": "/subscriptionDashboardUrl"
"/dashboards/topics/:topicName/subscriptions/:id": "/subscriptionDashboardUrl",
"/inactive-topics": "/inactiveTopics"
}
27 changes: 27 additions & 0 deletions hermes-console/src/api/hermes-client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import type {
Readiness,
} from '@/api/datacenter-readiness';
import type { Group } from '@/api/group';
import type { InactiveTopic } from '@/api/inactive-topics';
import type { InconsistentGroup } from '@/api/inconsistent-group';
import type {
MessageFiltersVerification,
Expand All @@ -44,6 +45,7 @@ import type { Stats } from '@/api/stats';
import type { SubscriptionHealth } from '@/api/subscription-health';
import type { SubscriptionMetrics } from '@/api/subscription-metrics';
import type { TopicForm } from '@/composables/topic/use-form-topic/types';
import type { TrackingUrl } from '@/api/tracking-url';

const acceptHeader = 'Accept';
const contentTypeHeader = 'Content-Type';
Expand Down Expand Up @@ -189,10 +191,35 @@ export function fetchOfflineClientsSource(
);
}

export function getTopicTrackingUrls(
topicName: string,
): ResponsePromise<TrackingUrl[]> {
return axios.get<TrackingUrl[]>(`/tracking-urls/topics/${topicName}`);
}

export function getSubscriptionTrackingUrls(
topicName: string,
subscriptionName: string,
): ResponsePromise<TrackingUrl[]> {
return axios.get<TrackingUrl[]>(
`/tracking-urls/topics/${topicName}/subscriptions/${subscriptionName}`,
);
}

export function fetchTopicClients(
topicName: string,
): ResponsePromise<string[]> {
return axios.get(`/topics/${topicName}/clients`);
}

export function fetchConstraints(): ResponsePromise<ConstraintsConfig> {
return axios.get<ConstraintsConfig>('/workload-constraints');
}

export function fetchInactiveTopics(): ResponsePromise<InactiveTopic[]> {
return axios.get<InactiveTopic[]>('/inactive-topics');
}

export function fetchReadiness(): ResponsePromise<DatacenterReadiness[]> {
return axios.get<DatacenterReadiness[]>('/readiness/datacenters');
}
Expand Down
6 changes: 6 additions & 0 deletions hermes-console/src/api/inactive-topics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export interface InactiveTopic {
topic: string;
lastPublishedTsMs: number;
notificationTsMs: number[];
whitelisted: boolean;
}
4 changes: 4 additions & 0 deletions hermes-console/src/api/tracking-url.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export interface TrackingUrl {
name: string;
url: string;
}
43 changes: 43 additions & 0 deletions hermes-console/src/components/tracking-card/TrackingCard.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { expect } from 'vitest';
import { render } from '@/utils/test-utils';
import TrackingCard from '@/components/tracking-card/TrackingCard.vue';

describe('TrackingCard', () => {
const props = {
trackingUrls: [
{ name: 'url1', url: 'https://test-tracking-url1' },
{ name: 'url2', url: 'https://test-tracking-url2' },
],
};

it('should render title properly', () => {
// when
const { getByText } = render(TrackingCard, { props });

// then
const row = getByText('trackingCard.title');
expect(row).toBeVisible();
});

it('should render all tracking urls', () => {
// when
const { container } = render(TrackingCard, { props });

// then
const elements = container.querySelectorAll('a')!!;
expect(elements[0]).toHaveAttribute('href', 'https://test-tracking-url1');
expect(elements[0]).toHaveTextContent('url1');
expect(elements[1]).toHaveAttribute('href', 'https://test-tracking-url2');
expect(elements[1]).toHaveTextContent('url2');
});

it('should render message when no tracking urls', () => {
// given
const emptyProps = { trackingUrls: [] };
const { getByText } = render(TrackingCard, { emptyProps });

// then
const row = getByText('trackingCard.noTrackingUrls');
expect(row).toBeVisible();
});
});
Loading

0 comments on commit dc16706

Please sign in to comment.