Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SKYEDEN-3234 detect unused topics #1922

Merged
merged 45 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
22dd762
SKYEDEN-3234 | Add marking and unmarking commands
questras Nov 3, 2024
1e28093
SKYEDEN-3234 | Change to batch upserting
questras Nov 3, 2024
0f05e49
SKYEDEN-3234 | Implement zk unusted topics repo
questras Nov 3, 2024
c255ef5
SKYEDEN-3234 | Implement unused topics service
questras Nov 3, 2024
b82f3ac
SKYEDEN-3234 | Add repo for last published message timestamp
questras Nov 4, 2024
3f89e44
SKYEDEN-3234 | Rename from read to get
questras Nov 4, 2024
5f888d4
SKYEDEN-3234 | Change last notified to list with timestamps
questras Nov 4, 2024
c5e8ad0
SKYEDEN-3234 | Implement unused topics detection job
questras Nov 4, 2024
a850566
SKYEDEN-3234 | Add tests for unused topics detection
questras Nov 5, 2024
a6779b9
SKYEDEN-3234 | Add detection job test
questras Nov 6, 2024
299cf80
SKYEDEN-3234 | Refactor detection job test
questras Nov 7, 2024
da49bac
SKYEDEN-3234 | Make unused topics notifier bean optional
questras Nov 7, 2024
3ffe9f8
SKYEDEN-3234 | Add scheduling
questras Nov 12, 2024
ad86f37
SKYEDEN-3234 | Add properties with default values
questras Nov 12, 2024
f207433
SKYEDEN-3234 | Add more logging and refactor
questras Nov 12, 2024
719b705
SKYEDEN-3234 | Rename from unused to inactive
questras Nov 12, 2024
edbec33
SKYEDEN-3234 | Handle lack of last published message metrics
questras Nov 12, 2024
bfd9acf
SKYEDEN-3234 | Do not call notifier when no inactive topics
questras Nov 12, 2024
655e508
Merge branch 'master' into SKYEDEN-3234-detect-unused-topics
questras Nov 12, 2024
e0af3ee
SKYEDEN-3234 | Fix style
questras Nov 12, 2024
c9f3bac
SKYEDEN-3234 | Add log when detection starts
questras Nov 12, 2024
ebc1541
SKYEDEN-3234 | Fix style
questras Nov 12, 2024
a2d47ca
SKYEDEN-3234 | Add more logging
questras Nov 13, 2024
26e70f4
SKYEDEN-3234 | Move leader and config to separate classes
questras Nov 13, 2024
972c488
SKYEDEN-3234 | Change log message
questras Nov 13, 2024
caf1ca5
SKYEDEN-3234 | Remove unnecessary annotation
questras Nov 13, 2024
2227be7
SKYEDEN-3234 | Do not update notification timestamps when notificatio…
questras Nov 13, 2024
4d52b3e
SKYEDEN-3234 | Add docs for inactive topics detection
questras Nov 14, 2024
15a22d1
SKYEDEN-3234 | Add notification result as return type of notifier
questras Nov 18, 2024
7dd593a
Merge branch 'master' into SKYEDEN-3234-detect-unused-topics
questras Nov 18, 2024
4627546
Merge branch 'master' into SKYEDEN-3234-detect-unused-topics
questras Nov 20, 2024
7a13491
Merge branch 'master' into SKYEDEN-3234-detect-unused-topics
questras Nov 25, 2024
fd2b590
SKYEDEN-3234 | Add owner info to be used by notifier
questras Nov 26, 2024
9841f44
SKYEDEN-3234 | Limit number of notification timestamps in history
questras Nov 26, 2024
ca4216f
SKYEDEN-3234 | Update docs
questras Nov 26, 2024
8194321
SKYEDEN-3234 | Specify shorter names to be saved in json
questras Nov 27, 2024
9545569
Merge branch 'master' into SKYEDEN-3234-detect-unused-topics
questras Dec 3, 2024
894f8bd
SKYEDEN-3271 |hermes management leader (#1934)
MarcinBobinski Dec 3, 2024
7726994
SKYEDEN-3234 | Remove unused property
questras Dec 3, 2024
12c809e
SKYEDEN-3271 | fix leader path
MarcinBobinski Dec 3, 2024
cf17940
Revert "Merge branch 'master' into SKYEDEN-3234-detect-unused-topics"
questras Dec 3, 2024
e55faa7
Revert "Revert "Merge branch 'master' into SKYEDEN-3234-detect-unused…
questras Dec 5, 2024
65e54ec
SKYEDEN-3234 | Add metrics for number of inactive topics
questras Dec 11, 2024
f33ee4d
SKYEDEN-3234 | Remove * import
questras Dec 19, 2024
db8e9c8
Merge branch 'master' into SKYEDEN-3234-detect-unused-topics
questras Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions docs/docs/configuration/inactive-topics-detection.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Inactive Topics Detection

Hermes Management provides an optional feature to detect inactive topics and
notify about them. This feature is **disabled by default**. You can enable it
and configure other options in the Hermes Management configuration.

Option | Description | Default value
-------------------------------------------------------------|----------------------------------------------------------------------------|---------------
detection.inactive-topics.enabled | enable inactive topics detection | false
detection.inactive-topics.inactivity-threshold | duration after which a topic is considered inactive and first notified | 60d
detection.inactive-topics.next-notification-threshold | duration after previous notification after which a topic is notified again | 14d
detection.inactive-topics.whitelisted-qualified-topic-names | list of qualified topic names that will not be notified event if inactive | []
detection.inactive-topics.cron | cron expression for the detection job | 0 0 8 * * *
detection.inactive-topics.notifications-history-limit | how many notification timestamps will be kept in history | 5

The detection job runs on a single instance of Hermes Management that is a
leader based on the leader election Zookeeper instance.

Option | Description | Default Value
------------------------------------|-----------------------------------------------------------------------------|---------------
management.leadership.zookeeper-dc | Specifies the datacenter of the Zookeeper instance used for leader election | dc

To make notifying work, you need to provide an implementation of
`pl.allegro.tech.hermes.management.domain.detection.InactiveTopicsNotifier`
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public class ZookeeperPaths {
public static final String DATACENTER_READINESS_PATH = "datacenter-readiness";
public static final String OFFLINE_RETRANSMISSION_PATH = "offline-retransmission";
public static final String OFFLINE_RETRANSMISSION_TASKS_PATH = "tasks";
public static final String INACTIVE_TOPICS_PATH = "inactive-topics";
public static final String MANAGEMENT_PATH = "management";
public static final String MANAGEMENT_PATH_LEADER = "leader";

private final String basePath;

Expand Down Expand Up @@ -182,6 +185,14 @@ public String offlineRetransmissionPath(String taskId) {
.join(basePath, OFFLINE_RETRANSMISSION_PATH, OFFLINE_RETRANSMISSION_TASKS_PATH, taskId);
}

public String inactiveTopicsPath() {
return Joiner.on(URL_SEPARATOR).join(basePath, INACTIVE_TOPICS_PATH);
}

public String managementLeaderPath() {
return Joiner.on(URL_SEPARATOR).join(basePath, MANAGEMENT_PATH, MANAGEMENT_PATH_LEADER);
}

public String join(String... parts) {
return Joiner.on(URL_SEPARATOR).join(parts);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.micrometer.core.instrument.MeterRegistry;
import java.time.Clock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
Expand All @@ -19,8 +20,11 @@
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.common.util.InetAddressInstanceIdResolver;
import pl.allegro.tech.hermes.common.util.InstanceIdResolver;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;
import pl.allegro.tech.hermes.management.domain.subscription.SubscriptionLagSource;
import pl.allegro.tech.hermes.management.infrastructure.leader.ManagementLeadership;
import pl.allegro.tech.hermes.management.infrastructure.metrics.NoOpSubscriptionLagSource;
import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperClientManager;
import pl.allegro.tech.hermes.metrics.PathsCompiler;

@Configuration
Expand All @@ -29,7 +33,7 @@
HttpClientProperties.class,
ConsistencyCheckerProperties.class,
PrometheusProperties.class,
MicrometerRegistryProperties.class
MicrometerRegistryProperties.class,
})
public class ManagementConfiguration {

Expand Down Expand Up @@ -85,4 +89,12 @@ public SubscriptionLagSource consumerLagSource() {
public Clock clock() {
return new ClockFactory().provide();
}

@Bean
ManagementLeadership managementLeadership(
ZookeeperClientManager zookeeperClientManager,
@Value("${management.leadership.zookeeper-dc}") String leaderElectionDc,
ZookeeperPaths zookeeperPaths) {
return new ManagementLeadership(zookeeperClientManager, leaderElectionDc, zookeeperPaths);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package pl.allegro.tech.hermes.management.config.detection;

import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import pl.allegro.tech.hermes.management.domain.detection.InactiveTopicsDetectionJob;
import pl.allegro.tech.hermes.management.infrastructure.detection.InactiveTopicsDetectionScheduler;
import pl.allegro.tech.hermes.management.infrastructure.leader.ManagementLeadership;

@Configuration
@EnableConfigurationProperties(InactiveTopicsDetectionProperties.class)
@EnableScheduling
public class InactiveTopicsDetectionConfig {
@ConditionalOnProperty(
prefix = "detection.inactive-topics",
value = "enabled",
havingValue = "true")
@Bean
InactiveTopicsDetectionScheduler inactiveTopicsDetectionScheduler(
InactiveTopicsDetectionJob job, ManagementLeadership leader) {
return new InactiveTopicsDetectionScheduler(job, leader);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package pl.allegro.tech.hermes.management.config.detection;

import java.time.Duration;
import java.util.Set;
import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "detection.inactive-topics")
public record InactiveTopicsDetectionProperties(
Duration inactivityThreshold,
Duration nextNotificationThreshold,
Set<String> whitelistedQualifiedTopicNames,
int notificationsHistoryLimit) {}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperWorkloadConstraintsRepository;
import pl.allegro.tech.hermes.management.domain.blacklist.TopicBlacklistRepository;
import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor;
import pl.allegro.tech.hermes.management.domain.detection.InactiveTopicsRepository;
import pl.allegro.tech.hermes.management.domain.mode.ModeService;
import pl.allegro.tech.hermes.management.domain.readiness.DatacenterReadinessRepository;
import pl.allegro.tech.hermes.management.domain.retransmit.OfflineRetransmissionRepository;
import pl.allegro.tech.hermes.management.infrastructure.blacklist.ZookeeperTopicBlacklistRepository;
import pl.allegro.tech.hermes.management.infrastructure.detection.ZookeeperInactiveTopicsRepository;
import pl.allegro.tech.hermes.management.infrastructure.metrics.SummedSharedCounter;
import pl.allegro.tech.hermes.management.infrastructure.readiness.ZookeeperDatacenterReadinessRepository;
import pl.allegro.tech.hermes.management.infrastructure.retransmit.ZookeeperOfflineRetransmissionRepository;
Expand Down Expand Up @@ -177,4 +179,11 @@ DatacenterReadinessRepository readinessRepository() {
return new ZookeeperDatacenterReadinessRepository(
localClient.getCuratorFramework(), objectMapper, zookeeperPaths());
}

@Bean
InactiveTopicsRepository inactiveTopicsRepository() {
ZookeeperClient localClient = clientManager().getLocalClient();
return new ZookeeperInactiveTopicsRepository(
localClient.getCuratorFramework(), objectMapper, zookeeperPaths());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package pl.allegro.tech.hermes.management.domain.detection;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;

public record InactiveTopic(
@JsonProperty("topic") String qualifiedTopicName,
@JsonProperty("lastPublishedTsMs") long lastPublishedMessageTimestampMs,
@JsonProperty("notificationTsMs") List<Long> notificationTimestampsMs,
@JsonProperty("whitelisted") boolean whitelisted) {

InactiveTopic notificationSent(Instant timestamp) {
List<Long> newNotificationTimestampsMs = new ArrayList<>(notificationTimestampsMs);
newNotificationTimestampsMs.add(timestamp.toEpochMilli());
return new InactiveTopic(
this.qualifiedTopicName,
this.lastPublishedMessageTimestampMs,
newNotificationTimestampsMs,
this.whitelisted);
}

InactiveTopic limitNotificationsHistory(int limit) {
List<Long> newNotificationTimestampsMs =
notificationTimestampsMs.stream()
.sorted((a, b) -> Long.compare(b, a))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we really need to sort this list? Timestamps should be order first so we can just take last limit elements?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to rely on the current implementation that the timestamps will be sorted. If we ever change anything that changes the order of timestamps then we might get an unexpected behavior and this sorting makes sure we are always safe.

.limit(limit)
.toList();
return new InactiveTopic(
this.qualifiedTopicName,
this.lastPublishedMessageTimestampMs,
newNotificationTimestampsMs,
this.whitelisted);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package pl.allegro.tech.hermes.management.domain.detection;

import pl.allegro.tech.hermes.api.OwnerId;

public record InactiveTopicWithOwner(InactiveTopic topic, OwnerId ownerId) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package pl.allegro.tech.hermes.management.domain.detection;

import static java.util.stream.Collectors.groupingBy;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import java.time.Clock;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import pl.allegro.tech.hermes.api.OwnerId;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.management.config.detection.InactiveTopicsDetectionProperties;
import pl.allegro.tech.hermes.management.domain.topic.TopicService;

@Component
public class InactiveTopicsDetectionJob {
private final TopicService topicService;
private final InactiveTopicsStorageService inactiveTopicsStorageService;
private final InactiveTopicsDetectionService inactiveTopicsDetectionService;
private final Optional<InactiveTopicsNotifier> notifier;
private final InactiveTopicsDetectionProperties properties;
private final Clock clock;
private final MeterRegistry meterRegistry;

private static final Logger logger = LoggerFactory.getLogger(InactiveTopicsDetectionJob.class);

public InactiveTopicsDetectionJob(
TopicService topicService,
InactiveTopicsStorageService inactiveTopicsStorageService,
InactiveTopicsDetectionService inactiveTopicsDetectionService,
Optional<InactiveTopicsNotifier> notifier,
InactiveTopicsDetectionProperties properties,
Clock clock,
MeterRegistry meterRegistry) {
this.topicService = topicService;
this.inactiveTopicsStorageService = inactiveTopicsStorageService;
this.inactiveTopicsDetectionService = inactiveTopicsDetectionService;
this.properties = properties;
this.clock = clock;
this.meterRegistry = meterRegistry;
if (notifier.isEmpty()) {
logger.info("Inactive topics notifier bean is absent");
}
this.notifier = notifier;
}

public void detectAndNotify() {
List<Topic> topics = topicService.getAllTopics();
List<String> qualifiedTopicNames = topics.stream().map(Topic::getQualifiedName).toList();
List<InactiveTopic> historicalInactiveTopics = inactiveTopicsStorageService.getInactiveTopics();
List<InactiveTopic> foundInactiveTopics =
detectInactiveTopics(qualifiedTopicNames, historicalInactiveTopics);

Map<Boolean, List<InactiveTopic>> groupedByNeedOfNotification =
foundInactiveTopics.stream()
.collect(groupingBy(inactiveTopicsDetectionService::shouldBeNotified));

List<InactiveTopic> topicsToNotify = groupedByNeedOfNotification.getOrDefault(true, List.of());
List<InactiveTopic> topicsToSkipNotification =
groupedByNeedOfNotification.getOrDefault(false, List.of());
List<InactiveTopic> notifiedTopics = notify(enrichWithOwner(topicsToNotify, topics));

List<InactiveTopic> processedTopics =
limitHistory(
Stream.concat(notifiedTopics.stream(), topicsToSkipNotification.stream()).toList());
measureInactiveTopics(processedTopics);
inactiveTopicsStorageService.markAsInactive(processedTopics);
}

private List<InactiveTopic> detectInactiveTopics(
List<String> qualifiedTopicNames, List<InactiveTopic> historicalInactiveTopics) {
Map<String, InactiveTopic> historicalInactiveTopicsByName =
groupByName(historicalInactiveTopics);
return qualifiedTopicNames.stream()
.map(
qualifiedTopicName ->
inactiveTopicsDetectionService.detectInactiveTopic(
TopicName.fromQualifiedName(qualifiedTopicName),
Optional.ofNullable(historicalInactiveTopicsByName.get(qualifiedTopicName))))
.map(opt -> opt.orElse(null))
.filter(Objects::nonNull)
.toList();
}

private Map<String, InactiveTopic> groupByName(List<InactiveTopic> inactiveTopics) {
return inactiveTopics.stream()
.collect(Collectors.toMap(InactiveTopic::qualifiedTopicName, v -> v, (v1, v2) -> v1));
}

private List<InactiveTopicWithOwner> enrichWithOwner(
List<InactiveTopic> inactiveTopics, List<Topic> topics) {
Map<String, OwnerId> ownerByTopicName = new HashMap<>();
topics.forEach(topic -> ownerByTopicName.put(topic.getQualifiedName(), topic.getOwner()));

return inactiveTopics.stream()
.map(
inactiveTopic ->
new InactiveTopicWithOwner(
inactiveTopic, ownerByTopicName.get(inactiveTopic.qualifiedTopicName())))
.toList();
}

private List<InactiveTopic> notify(List<InactiveTopicWithOwner> inactiveTopics) {
if (inactiveTopics.isEmpty()) {
logger.info("No inactive topics to notify");
return List.of();
} else if (notifier.isPresent()) {
logger.info("Notifying {} inactive topics", inactiveTopics.size());
NotificationResult result = notifier.get().notify(inactiveTopics);
Instant now = clock.instant();

return inactiveTopics.stream()
.map(InactiveTopicWithOwner::topic)
.map(
topic ->
result.isSuccess(topic.qualifiedTopicName())
? topic.notificationSent(now)
: topic)
.toList();
} else {
logger.info("Skipping notification of {} inactive topics", inactiveTopics.size());
return inactiveTopics.stream().map(InactiveTopicWithOwner::topic).toList();
}
}

private List<InactiveTopic> limitHistory(List<InactiveTopic> inactiveTopics) {
return inactiveTopics.stream()
.map(topic -> topic.limitNotificationsHistory(properties.notificationsHistoryLimit()))
.toList();
}

private void measureInactiveTopics(List<InactiveTopic> processedTopics) {
processedTopics.stream()
.collect(
Collectors.groupingBy(
topic -> topic.notificationTimestampsMs().size(), Collectors.counting()))
.forEach(
(notificationsCount, topicsCount) -> {
Tags tags = Tags.of("notifications", notificationsCount.toString());
meterRegistry.gauge("inactive-topics", tags, topicsCount);
});
}
}
Loading
Loading