-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add DatabaseRegistry for locally managing databases managed by GeoIpDownloader #69540
Conversation
…ownloader This component is responsible for making the databases maintained by GeoIpDownloader available for ingest processors. Also provided a lookup mechanism for geoip processors with fallback to {@link LocalDatabases}. All databases are downloaded into a geoip tmp directory, which is created at node startup. The following high level steps are executed after each cluster state update: 1) Check which databases are available in {@link GeoIpTaskState}, which is part of the geoip downloader persistent task. 2) For each database check whether the databases have changed by comparing the local and remote md5 hash or are locally missing. 3) For each database identified in step 2 start downloading the database chunks. Each chunks is appended to a tmp file (inside geoip tmp dir) and after all chunks have been downloaded, the database is uncompressed and renamed to the final filename.After this the database is loaded and if there is an old instance of this database then that is closed. 4) Cleanup locally loaded databases that are no longer mentioned in {@link GeoIpTaskState}. Relates to elastic#68920
Pinging @elastic/es-core-features (Team:Core/Features) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good, there are several simplifications we can do
|
||
private final ConcurrentMap<String, DatabaseReference> databases = new ConcurrentHashMap<>(); | ||
|
||
DatabaseRegistry(Environment environment, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'm not a fan of argument-per-line style especially if they all fit nicely in single line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, I think at least for some of those statements I assumed that these were longer than 140 chars.
return; | ||
} | ||
|
||
PersistentTasksCustomMetadata persistentTasks = state.metadata().custom(PersistentTasksCustomMetadata.TYPE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can simplify here with PersistentTasksCustomMetadata.getTaskWithId
:
PersistentTask<?> task = PersistentTasksCustomMetadata.getTaskWithId(state, GeoIpDownloader.GEOIP_DOWNLOADER);
GeoIpTaskState taskState = task == null || task.getState() == null ? GeoIpTaskState.EMPTY : (GeoIpTaskState) task.getState();
for (var entry : taskState.getDatabases().entrySet()) { | ||
String name = entry.getKey(); | ||
GeoIpTaskState.Metadata metadata = entry.getValue(); | ||
DatabaseReference reference = databases.get(entry.getKey()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DatabaseReference reference = databases.get(entry.getKey()); | |
DatabaseReference reference = databases.get(name); |
String name = entry.getKey(); | ||
GeoIpTaskState.Metadata metadata = entry.getValue(); | ||
DatabaseReference reference = databases.get(entry.getKey()); | ||
String remoteMd5 = entry.getValue().getMd5(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String remoteMd5 = entry.getValue().getMd5(); | |
String remoteMd5 = metadata.getMd5(); |
for (var entry : taskState.getDatabases().entrySet()) { | ||
String name = entry.getKey(); | ||
GeoIpTaskState.Metadata metadata = entry.getValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can use Map.forEach
here:
for (var entry : taskState.getDatabases().entrySet()) { | |
String name = entry.getKey(); | |
GeoIpTaskState.Metadata metadata = entry.getValue(); | |
taskState.getDatabases().forEach((name, metadata) -> { |
|
||
for (DatabaseReference reference : references) { | ||
reference.close(); | ||
Files.delete(reference.databaseFile); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will it mess something if we delete the file during concurrent lookup? refence.close()
may not actually close anything if there are any ongoing lookups
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think *unix like systems handle this gracefully (deleting a file in use, only after its file descriptor has been closed). So I think the current will not mess anything up, but windows not so sure. In anyway perhaps it makes more sense to move the actual deletion of the db file elsewhere (maybe to DatabaseReaderLazyLoader#doClose()
are at least initiated from there).
|
||
void removeStaleEntries(Collection<String> staleEntries) { | ||
try { | ||
List<DatabaseReference> references = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this list? can we just merge these two for
loops?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah i think merging is possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a direct copy from the poc, which did something silly.
if (lastSortValue != null) { | ||
lastSortValue = null; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (lastSortValue != null) { | |
lastSortValue = null; | |
} | |
lastSortValue = null; |
(we always end up with null
here anyway)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think that is a leftover.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(the if check)
builder.endObject(); | ||
} | ||
builder.endObject(); | ||
// TODO: change geoip fixture to not return city db content for country and asn databases: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather not have this big TODOs
here, I can fix geoip fixture if you want before we merge this PR.
I wonder how stable/fragile this will be considering third-party test (where we hit real service and data may change eventually)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can fix geoip fixture if you want before we merge this PR.
That would be great then also the other databases can be tested here instead just the city db.
I wonder how stable/fragile this will be considering third-party test (where we hit real service and data may change eventually)
Not sure how to enforce that the test fixture stays inline with the actual api that infra is building.
Best thing I can come up with is that we share a spec, like we do with our rest apis. If any of us
make a change then we at least now about it.
DatabaseRegistry registry = new DatabaseRegistry( | ||
parameters.env, | ||
parameters.client, | ||
geoIpCache, | ||
parameters.genericExecutor | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
DatabaseRegistry registry = new DatabaseRegistry( | |
parameters.env, | |
parameters.client, | |
geoIpCache, | |
parameters.genericExecutor | |
); | |
DatabaseRegistry registry = new DatabaseRegistry(parameters.env, parameters.client, geoIpCache, parameters.genericExecutor); |
Also, test failure in |
…Registry instead of LocalDatabases.
remove the backing file when the loader instance is no longer used. Also removed DatabaseReference class and merged md5 field with DatabaseReaderLazyLoader class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @martijnvg for adding this!
I left 2 super minor nits (optional)
try { | ||
do { | ||
SearchRequest searchRequest = | ||
createSearchRequest(databaseName, metadata.getFirstChunk(), metadata.getLastChunk(), lastSortValue); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can extract metadata.getFirstChunk()
and metadata.getLastChunk()
to local variables (even outside of the loop) and move this to single line
List<String> files = list.map(Path::toString) | ||
.collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
List<String> files = list.map(Path::toString) | |
.collect(Collectors.toList()); | |
List<String> files = list.map(Path::toString).collect(Collectors.toList()); |
// so it is ok if this happens in a blocking manner on a thead from generic thread pool. | ||
// This makes the code easier to understand and maintain. | ||
SearchResponse searchResponse = client.search(searchRequest).actionGet(); | ||
if (searchResponse.getHits().getHits().length == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another idea: we know exactly which chunks we need by id so we can do
for(int i = firstChunk; i <= lastChunk; i++) {
Map<String, Object> source = client.prepareGet(DATABASES_INDEX, name + "_" + i).get().getSource();
...
}
It should be simpler than search with last sort value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that idea. This should make this code here much easier.
Also a search by id would be fine too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed: acf2a4b
…GeoIpDownloader Backport of elastic#69540 to 7.x branch. This component is responsible for making the databases maintained by GeoIpDownloader available for ingest processors. Also provided a lookup mechanism for geoip processors with fallback to {@link LocalDatabases}. All databases are downloaded into a geoip tmp directory, which is created at node startup. The following high level steps are executed after each cluster state update: 1) Check which databases are available in {@link GeoIpTaskState}, which is part of the geoip downloader persistent task. 2) For each database check whether the databases have changed by comparing the local and remote md5 hash or are locally missing. 3) For each database identified in step 2 start downloading the database chunks. Each chunks is appended to a tmp file (inside geoip tmp dir) and after all chunks have been downloaded, the database is uncompressed and renamed to the final filename. After this the database is loaded and if there is an old instance of this database then that is closed. 4) Cleanup locally loaded databases that are no longer mentioned in {@link GeoIpTaskState}. Relates to elastic#68920
…ownloader (#69971) Backport of #69540 to 7.x branch. This component is responsible for making the databases maintained by GeoIpDownloader available for ingest processors. Also provided a lookup mechanism for geoip processors with fallback to {@link LocalDatabases}. All databases are downloaded into a geoip tmp directory, which is created at node startup. The following high level steps are executed after each cluster state update: 1) Check which databases are available in {@link GeoIpTaskState}, which is part of the geoip downloader persistent task. 2) For each database check whether the databases have changed by comparing the local and remote md5 hash or are locally missing. 3) For each database identified in step 2 start downloading the database chunks. Each chunks is appended to a tmp file (inside geoip tmp dir) and after all chunks have been downloaded, the database is uncompressed and renamed to the final filename. After this the database is loaded and if there is an old instance of this database then that is closed. 4) Cleanup locally loaded databases that are no longer mentioned in {@link GeoIpTaskState}. Relates to #68920 Other cherry-picked commits: * Fix ReloadingDatabasesWhilePerformingGeoLookupsIT (#70163) Wait for ingest threads to stop using the DatabaseReaderLazyLoader, so the during the next run the db update thread doesn't try to remove the db again (because the file hasn't yet been deleted). Also delete tmp dirs this test create at the end of the test, so that when repeating this test many times, this test doesn't accumulate many directories with database files. Closes #69980 * Fix clean up of old entries in DatabaseRegistry.initialize (#70135) This change switches clean up in DatabaseRegistry.initialize from using Files.walk and stream operations to Files.walkFileTree which can be made more robust in case of errors * Fix DatabaseRegistryTests (#70180) This test predefined expected md5 hashes in constants, that were expected with java15. However java16 creates different md5 hashes and so the expected md5 hashes don't match with the actual md5 hashes, which caused tests in this test suite to fail (running with java16 only). The tests now generates the expected md5 hash during the test instead of using predefined constants. Closes #69986 * Fix GeoIpDownloaderIT#testUseGeoIpProcessorWithDownloadedDBs(...) test (#70215) The test failure looks legit, because there is a possibility that the same databases was downloaded twice. See added comment in DatabaseRegistry class. Relates to #69972 * Muted GeoIpDownloaderIT#testUseGeoIpProcessorWithDownloadedDBs(...) test, see #69972 Co-authored-by: Przemko Robakowski <[email protected]>
This component is responsible for making the databases maintained by GeoIpDownloader
available for ingest processors.
Also provided a lookup mechanism for geoip processors with fallback to
LocalDatabases
.All databases are downloaded into a geoip tmp directory, which is created at node startup.
The following high level steps are executed after each cluster state update:
GeoIpTaskState
,which is part of the geoip downloader persistent task.
by comparing the local and remote md5 hash or are locally missing.
chunks. Each chunks is appended to a tmp file (inside geoip tmp dir) and
after all chunks have been downloaded, the database is uncompressed and
renamed to the final filename.After this the database is loaded and
if there is an old instance of this database then that is closed.
GeoIpTaskState
.Relates to #68920