Skip to content

Commit

Permalink
Take the node id into account when creating geoip tmp dir.
Browse files Browse the repository at this point in the history
Backport elastic#70462 to 7.x branch.

This change adjust where the geoip tmp directory is created
to avoid issues when running multiple nodes on the same machine.

In the java tmp dir, a 'geoip-databases' directory is created and
directly under this directory a directory with the node id as name is created.
This allows safely running multiple nodes on the same machine (this
happens mainly during tests).

Closes elastic#69972
Relates to elastic#68920
  • Loading branch information
martijnvg committed Mar 17, 2021
1 parent 6ad6711 commit 471bdfd
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.collect.Set;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -40,6 +41,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
Expand Down Expand Up @@ -143,7 +145,6 @@ public void testGeoIpDatabasesDownload() throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/69972")
@TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/69972")
public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
// setup:
Expand Down Expand Up @@ -228,12 +229,24 @@ public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
Settings.Builder settings = Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true);
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings));

final List<Path> geoipTmpDirs = StreamSupport.stream(internalCluster().getInstances(Environment.class).spliterator(), false)
final DiscoveryNodes nodes = clusterService().state().nodes();
final java.util.Set<String> ids = StreamSupport.stream(nodes.getDataNodes().values().spliterator(), false)
.map(c -> c.value.getId())
.collect(Collectors.toSet());
final List<Path> geoipTmpDirs = StreamSupport.stream(internalCluster().getDataNodeInstances(Environment.class).spliterator(), false)
.map(env -> {
Path geoipTmpDir = env.tmpFile().resolve("geoip-databases");
assertThat(Files.exists(geoipTmpDir), is(true));
return geoipTmpDir;
}).collect(Collectors.toList());
}).flatMap(path -> {
try {
return Files.list(path);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}).filter(path -> ids.contains(path.getFileName().toString()))
.collect(Collectors.toList());
assertThat(geoipTmpDirs.size(), equalTo(internalCluster().numDataNodes()));
assertBusy(() -> {
for (Path geoipTmpDir : geoipTmpDirs) {
try (Stream<Path> list = Files.list(geoipTmpDir)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private static DatabaseRegistry createRegistry(Path geoIpModulesDir, Path geoIpC
LocalDatabases localDatabases = new LocalDatabases(geoIpModulesDir, geoIpConfigDir, cache);
DatabaseRegistry databaseRegistry =
new DatabaseRegistry(geoIpTmpDir, mock(Client.class), cache, localDatabases, Runnable::run);
databaseRegistry.initialize(mock(ResourceWatcherService.class), mock(IngestService.class));
databaseRegistry.initialize("nodeId", mock(ResourceWatcherService.class), mock(IngestService.class));
return databaseRegistry;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ final class DatabaseRegistry implements Closeable {

private final Client client;
private final GeoIpCache cache;
private final Path geoipTmpDirectory;
private final Path geoipTmpBaseDirectory;
private Path geoipTmpDirectory;
private final LocalDatabases localDatabases;
private final Consumer<Runnable> genericExecutor;

Expand All @@ -100,13 +101,14 @@ final class DatabaseRegistry implements Closeable {
Consumer<Runnable> genericExecutor) {
this.client = client;
this.cache = cache;
this.geoipTmpDirectory = tmpDir.resolve("geoip-databases");
this.geoipTmpBaseDirectory = tmpDir.resolve("geoip-databases");
this.localDatabases = localDatabases;
this.genericExecutor = genericExecutor;
}

public void initialize(ResourceWatcherService resourceWatcher, IngestService ingestService) throws IOException {
public void initialize(String nodeId, ResourceWatcherService resourceWatcher, IngestService ingestService) throws IOException {
localDatabases.initialize(resourceWatcher);
geoipTmpDirectory = geoipTmpBaseDirectory.resolve(nodeId);
Files.walkFileTree(geoipTmpDirectory, new FileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) {
Expand Down Expand Up @@ -138,7 +140,7 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) {
}
});
if (Files.exists(geoipTmpDirectory) == false) {
Files.createDirectory(geoipTmpDirectory);
Files.createDirectories(geoipTmpDirectory);
}
LOGGER.info("initialized database registry, using geoip-databases directory [{}]", geoipTmpDirectory);
ingestService.addIngestClusterStateListener(this::checkDatabases);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public Collection<Object> createComponents(Client client,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier) {
try {
databaseRegistry.get().initialize(resourceWatcherService, ingestService.get());
String nodeId = nodeEnvironment.nodeId();
databaseRegistry.get().initialize(nodeId, resourceWatcherService, ingestService.get());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void setup() throws IOException {
LocalDatabases localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, cache);
geoIpTmpDir = createTempDir();
databaseRegistry = new DatabaseRegistry(geoIpTmpDir, client, cache, localDatabases, Runnable::run);
databaseRegistry.initialize(resourceWatcherService, mock(IngestService.class));
databaseRegistry.initialize("nodeId", resourceWatcherService, mock(IngestService.class));
}

@After
Expand Down Expand Up @@ -142,7 +142,7 @@ public void testCheckDatabases() throws Exception {
databaseRegistry.checkDatabases(state);
assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), notNullValue());
verify(client, times(10)).search(any());
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases"))) {
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
assertThat(files.collect(Collectors.toList()), hasSize(1));
}
}
Expand All @@ -166,7 +166,7 @@ public void testCheckDatabases_dontCheckDatabaseOnNonIngestNode() throws Excepti
databaseRegistry.checkDatabases(state);
assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue());
verify(client, never()).search(any());
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases"))) {
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
assertThat(files.collect(Collectors.toList()), empty());
}
}
Expand All @@ -188,7 +188,7 @@ public void testCheckDatabases_dontCheckDatabaseWhenNoDatabasesIndex() throws Ex
databaseRegistry.checkDatabases(state);
assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue());
verify(client, never()).search(any());
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases"))) {
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
assertThat(files.collect(Collectors.toList()), empty());
}
}
Expand All @@ -209,7 +209,7 @@ public void testCheckDatabases_dontCheckDatabaseWhenGeoIpDownloadTask() throws E
databaseRegistry.checkDatabases(state);
assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue());
verify(client, never()).search(any());
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases"))) {
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
assertThat(files.collect(Collectors.toList()), empty());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public void testLoadingCustomDatabase() throws IOException {
Client client = mock(Client.class);
GeoIpCache cache = new GeoIpCache(1000);
DatabaseRegistry databaseRegistry = new DatabaseRegistry(createTempDir(), client, cache, localDatabases, Runnable::run);
databaseRegistry.initialize(resourceWatcherService, mock(IngestService.class));
databaseRegistry.initialize("nodeId", resourceWatcherService, mock(IngestService.class));
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry);
for (DatabaseReaderLazyLoader lazyLoader : localDatabases.getAllDatabases()) {
assertNull(lazyLoader.databaseReader.get());
Expand Down

0 comments on commit 471bdfd

Please sign in to comment.