Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Download the geoip databases only when needed #92335

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d8169e5
Only downloading geoip databases if geoip processors exist
masseyke Dec 13, 2022
c57a55d
fixing the logic
masseyke Dec 13, 2022
19ff461
Update docs/changelog/92335.yaml
masseyke Dec 13, 2022
09a7247
Update docs/changelog/92335.yaml
masseyke Dec 13, 2022
ba93dcf
fixing tests
masseyke Dec 13, 2022
ded8717
Merge branch 'fix/download-geoip-databases-only-when-needed' of githu…
masseyke Dec 13, 2022
86fd456
fixing tests
masseyke Dec 13, 2022
565fec0
fixing tests
masseyke Dec 13, 2022
afaad92
fixing tests
masseyke Dec 14, 2022
a9207c8
Merge branch 'main' into fix/download-geoip-databases-only-when-needed
elasticmachine Dec 14, 2022
8ea6e90
Allowing for eager download of geoip databases
masseyke Dec 20, 2022
8753b58
fixing integration test
masseyke Jan 5, 2023
b2ab307
Merge branch 'main' into fix/download-geoip-databases-only-when-needed
elasticmachine Jan 5, 2023
d87e105
minor changes
masseyke Jan 9, 2023
faa846c
Merge branch 'main' into fix/download-geoip-databases-only-when-needed
masseyke Jan 12, 2023
ef517b5
attempting to work around geoip downloader bug
masseyke Jan 12, 2023
9ef49c5
adding a comment
masseyke Jan 12, 2023
2abf42e
cleanup
masseyke Jan 12, 2023
854026c
Merge branch 'main' into fix/download-geoip-databases-only-when-needed
elasticmachine Jan 24, 2023
ecdc61b
Update docs/reference/ingest/processors/geoip.asciidoc
masseyke Jan 25, 2023
74ca199
moving the handling of dynamic settings to a long-lived singleton to …
masseyke Jan 26, 2023
087ea79
simplifying clusterChanged
masseyke Jan 26, 2023
ca42d50
cleanup
masseyke Jan 26, 2023
5b41713
removing code that is no longer needed
masseyke Jan 26, 2023
faa0ffd
moving cluster state update logic out of GeoIpDownloader
masseyke Jan 26, 2023
4579412
Merge branch 'main' into fix/download-geoip-databases-only-when-needed
elasticmachine Jan 26, 2023
af351c2
Recursively checking for geoip processors
masseyke Jan 27, 2023
4ec403c
Apply suggestions from code review
masseyke Jan 27, 2023
f7b551f
code review feedback
masseyke Jan 27, 2023
cb87a0a
commenting a unit test
masseyke Jan 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/92335.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 92335
summary: Download the geoip databases only when needed
area: Ingest Node
type: bug
issues:
- 90673
12 changes: 11 additions & 1 deletion docs/reference/ingest/processors/geoip.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ IPv4 or IPv6 address.
By default, the processor uses the GeoLite2 City, GeoLite2 Country, and GeoLite2
ASN GeoIP2 databases from
http://dev.maxmind.com/geoip/geoip2/geolite2/[MaxMind], shared under the
CC BY-SA 4.0 license. {es} automatically downloads updates for
CC BY-SA 4.0 license. It automatically downloads these databases if either
`ingest.geoip.downloader.eager.download` is set to true, or your cluster
has at least one pipeline with a `geoip` processor. {es}
automatically downloads updates for
these databases from the Elastic GeoIP endpoint:
https://geoip.elastic.co/v1/database. To get download statistics for these
updates, use the <<geoip-stats-api,GeoIP stats API>>.
Expand Down Expand Up @@ -412,6 +415,13 @@ If `true`, {es} automatically downloads and manages updates for GeoIP2 databases
from the `ingest.geoip.downloader.endpoint`. If `false`, {es} does not download
updates and deletes all downloaded databases. Defaults to `true`.

[[ingest-geoip-downloader-eager-download]]
(<<dynamic-cluster-setting,Dynamic>>, Boolean)
If `true`, {es} downloads GeoIP2 databases immediately, regardless of whether a
pipeline exists with a geoip processor. If `false`, {es} only begins downloading
the databases if a pipeline with a geoip processor exists or is added. Defaults
to `false`.

[[ingest-geoip-downloader-endpoint]]
`ingest.geoip.downloader.endpoint`::
(<<static-cluster-setting,Static>>, string)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.geoip.stats.GeoIpDownloaderStatsAction;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.search.SearchHit;
Expand All @@ -51,11 +54,13 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
Expand All @@ -82,7 +87,12 @@ public class GeoIpDownloaderIT extends AbstractGeoIpIT {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(ReindexPlugin.class, IngestGeoIpPlugin.class, GeoIpProcessorNonIngestNodeIT.IngestGeoIpSettingsPlugin.class);
return Arrays.asList(
ReindexPlugin.class,
IngestGeoIpPlugin.class,
GeoIpProcessorNonIngestNodeIT.IngestGeoIpSettingsPlugin.class,
NonGeoProcessorsPlugin.class
);
}

@Override
Expand All @@ -104,7 +114,7 @@ public void cleanUp() throws Exception {
.setPersistentSettings(
Settings.builder()
.putNull(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey())
.putNull(GeoIpDownloader.POLL_INTERVAL_SETTING.getKey())
.putNull(GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getKey())
.putNull("ingest.geoip.database_validity")
)
.get();
Expand Down Expand Up @@ -149,6 +159,7 @@ public void cleanUp() throws Exception {
@TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/75221")
public void testInvalidTimestamp() throws Exception {
assumeTrue("only test with fixture to have stable results", ENDPOINT != null);
putGeoIpPipeline();
ClusterUpdateSettingsResponse settingsResponse = client().admin()
.cluster()
.prepareUpdateSettings()
Expand All @@ -160,7 +171,7 @@ public void testInvalidTimestamp() throws Exception {
assertEquals(Set.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb"), state.getDatabases().keySet());
}, 2, TimeUnit.MINUTES);

putPipeline();
putGeoIpPipeline();
verifyUpdatedDatabase();

settingsResponse = client().admin()
Expand All @@ -172,7 +183,9 @@ public void testInvalidTimestamp() throws Exception {
settingsResponse = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(GeoIpDownloader.POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueDays(2)))
.setPersistentSettings(
Settings.builder().put(GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueDays(2))
)
.get();
assertTrue(settingsResponse.isAcknowledged());
List<Path> geoIpTmpDirs = getGeoIpTmpDirs();
Expand All @@ -186,7 +199,7 @@ public void testInvalidTimestamp() throws Exception {
}
}
});
putPipeline();
putGeoIpPipeline();
assertBusy(() -> {
SimulateDocumentBaseResult result = simulatePipeline();
assertThat(result.getFailure(), nullValue());
Expand Down Expand Up @@ -221,14 +234,17 @@ public void testUpdatedTimestamp() throws Exception {
ClusterUpdateSettingsResponse settingsResponse = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(GeoIpDownloader.POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueDays(2)))
.setPersistentSettings(
Settings.builder().put(GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueDays(2))
)
.get();
assertTrue(settingsResponse.isAcknowledged());
assertBusy(() -> assertNotEquals(lastCheck, getGeoIpTaskState().getDatabases().get("GeoLite2-ASN.mmdb").lastCheck()));
testGeoIpDatabasesDownload();
}

public void testGeoIpDatabasesDownload() throws Exception {
putGeoIpPipeline();
ClusterUpdateSettingsResponse settingsResponse = client().admin()
.cluster()
.prepareUpdateSettings()
Expand Down Expand Up @@ -283,12 +299,34 @@ public void testGeoIpDatabasesDownload() throws Exception {
}
}

public void testGeoIpDatabasesDownloadNoGeoipProcessors() throws Exception {
assumeTrue("only test with fixture to have stable results", ENDPOINT != null);
String pipelineId = randomAlphaOfLength(10);
putGeoIpPipeline(pipelineId);
ClusterUpdateSettingsResponse settingsResponse = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true))
.get();
assertTrue(settingsResponse.isAcknowledged());
assertBusy(() -> { assertNull(getTask().getState()); });
putNonGeoipPipeline(pipelineId);
assertBusy(() -> { assertNull(getTask().getState()); });
putNonGeoipPipeline(pipelineId);
assertNull(getTask().getState());
putGeoIpPipeline();
assertBusy(() -> {
GeoIpTaskState state = getGeoIpTaskState();
assertEquals(Set.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb"), state.getDatabases().keySet());
}, 2, TimeUnit.MINUTES);
}

@TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/69972")
public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
assumeTrue("only test with fixture to have stable results", ENDPOINT != null);
setupDatabasesInConfigDirectory();
// setup:
putPipeline();
putGeoIpPipeline();

// verify before updating dbs
{
Expand Down Expand Up @@ -355,7 +393,7 @@ public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
@TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/79074")
public void testStartWithNoDatabases() throws Exception {
assumeTrue("only test with fixture to have stable results", ENDPOINT != null);
putPipeline();
putGeoIpPipeline();

// Behaviour without any databases loaded:
{
Expand Down Expand Up @@ -438,7 +476,21 @@ private SimulateDocumentBaseResult simulatePipeline() throws IOException {
return (SimulateDocumentBaseResult) simulateResponse.getResults().get(0);
}

private void putPipeline() throws IOException {
/**
* This creates a pipeline with a geoip processor, which ought to cause the geoip downloader to begin (assuming it is enabled).
* @throws IOException
*/
private void putGeoIpPipeline() throws IOException {
putGeoIpPipeline("_id");
}

/**
* This creates a pipeline named pipelineId with a geoip processor, which ought to cause the geoip downloader to begin (assuming it is
* enabled).
* @param pipelineId The name of the new pipeline with a geoip processor
* @throws IOException
*/
private void putGeoIpPipeline(String pipelineId) throws IOException {
BytesReference bytes;
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
Expand Down Expand Up @@ -484,7 +536,45 @@ private void putPipeline() throws IOException {
builder.endObject();
bytes = BytesReference.bytes(builder);
}
assertAcked(client().admin().cluster().preparePutPipeline("_id", bytes, XContentType.JSON).get());
assertAcked(client().admin().cluster().preparePutPipeline(pipelineId, bytes, XContentType.JSON).get());
}

/**
* This creates a pipeline named pipelineId that does _not_ have a geoip processor.
* @throws IOException
*/
private void putNonGeoipPipeline(String pipelineId) throws IOException {
BytesReference bytes;
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
{
builder.startArray("processors");
{
builder.startObject();
{
builder.startObject(NonGeoProcessorsPlugin.NON_GEO_PROCESSOR_TYPE);
builder.endObject();
}
builder.endObject();
builder.startObject();
{
builder.startObject(NonGeoProcessorsPlugin.NON_GEO_PROCESSOR_TYPE);
builder.endObject();
}
builder.endObject();
builder.startObject();
{
builder.startObject(NonGeoProcessorsPlugin.NON_GEO_PROCESSOR_TYPE);
builder.endObject();
}
builder.endObject();
}
builder.endArray();
}
builder.endObject();
bytes = BytesReference.bytes(builder);
}
assertAcked(client().admin().cluster().preparePutPipeline(pipelineId, bytes, XContentType.JSON).get());
}

private List<Path> getGeoIpTmpDirs() throws IOException {
Expand Down Expand Up @@ -624,4 +714,32 @@ public int read(byte[] b, int off, int len) throws IOException {
return read;
}
}

/**
* This class defines a processor of type "test".
*/
public static final class NonGeoProcessorsPlugin extends Plugin implements IngestPlugin {
public static final String NON_GEO_PROCESSOR_TYPE = "test";

@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
Map<String, Processor.Factory> procMap = new HashMap<>();
procMap.put(NON_GEO_PROCESSOR_TYPE, (factories, tag, description, config) -> new AbstractProcessor(tag, description) {
@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {}

@Override
public String getType() {
return NON_GEO_PROCESSOR_TYPE;
}

@Override
public boolean isAsync() {
return false;
}

});
return procMap;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.junit.After;

import java.io.IOException;
Expand All @@ -29,6 +31,7 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -80,7 +83,7 @@ public void testStats() throws Exception {
assertThat(jsonMapView.get("stats.databases_count"), equalTo(0));
assertThat(jsonMapView.get("stats.total_download_time"), equalTo(0));
assertEquals(0, jsonMapView.<Map<String, Object>>get("nodes").size());

putPipeline();
ClusterUpdateSettingsResponse settingsResponse = client().admin()
.cluster()
.prepareUpdateSettings()
Expand Down Expand Up @@ -108,6 +111,33 @@ public void testStats() throws Exception {
});
}

private void putPipeline() throws IOException {
BytesReference bytes;
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
{
builder.startArray("processors");
{
builder.startObject();
{
builder.startObject("geoip");
{
builder.field("field", "ip");
builder.field("target_field", "ip-city");
builder.field("database_file", "GeoLite2-City.mmdb");
}
builder.endObject();
}
builder.endObject();
}
builder.endArray();
}
builder.endObject();
bytes = BytesReference.bytes(builder);
}
assertAcked(client().admin().cluster().preparePutPipeline("_id", bytes, XContentType.JSON).get());
}

public static Map<String, Object> convertToMap(ToXContent part) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
part.toXContent(builder, EMPTY_PARAMS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void cleanUp() throws Exception {
.setPersistentSettings(
Settings.builder()
.putNull(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey())
.putNull(GeoIpDownloader.POLL_INTERVAL_SETTING.getKey())
.putNull(GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getKey())
.putNull("ingest.geoip.database_validity")
)
.get()
Expand Down
Loading