diff --git a/docs/reference/ingest/processors/user-agent.asciidoc b/docs/reference/ingest/processors/user-agent.asciidoc index 201e3beab8313..f6b6d46fe7b9d 100644 --- a/docs/reference/ingest/processors/user-agent.asciidoc +++ b/docs/reference/ingest/processors/user-agent.asciidoc @@ -60,13 +60,13 @@ Which returns "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36", "user_agent": { "name": "Chrome", - "major": "51", - "minor": "0", - "patch": "2704", - "os_name": "Mac OS X", - "os": "Mac OS X 10.10.5", - "os_major": "10", - "os_minor": "10", + "original": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36", + "version": "51.0.2704", + "os": { + "name": "Mac OS X", + "version": "10.10.5", + "full": "Mac OS X 10.10.5" + }, "device": "Other" } } diff --git a/docs/reference/migration/migrate_7_0/settings.asciidoc b/docs/reference/migration/migrate_7_0/settings.asciidoc index 6e9f7451e094f..c6874856011ce 100644 --- a/docs/reference/migration/migrate_7_0/settings.asciidoc +++ b/docs/reference/migration/migrate_7_0/settings.asciidoc @@ -182,3 +182,9 @@ could have lead to dropping audit events while the operations on the system were allowed to continue as usual. The recommended replacement is the use of the `logfile` audit output type and using other components from the Elastic Stack to handle the indexing part. + +[float] +[[ingest-user-agent-ecs-always]] +==== Ingest User Agent processor always uses `ecs` output format +The deprecated `ecs` setting for the user agent ingest processor has been +removed. https://github.com/elastic/ecs[ECS] format is now the default. diff --git a/modules/ingest-user-agent/src/main/java/org/elasticsearch/ingest/useragent/UserAgentProcessor.java b/modules/ingest-user-agent/src/main/java/org/elasticsearch/ingest/useragent/UserAgentProcessor.java index 6e7f588f0bd8a..6f2518eede673 100644 --- a/modules/ingest-user-agent/src/main/java/org/elasticsearch/ingest/useragent/UserAgentProcessor.java +++ b/modules/ingest-user-agent/src/main/java/org/elasticsearch/ingest/useragent/UserAgentProcessor.java @@ -19,6 +19,8 @@ package org.elasticsearch.ingest.useragent; +import org.apache.logging.log4j.LogManager; +import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; @@ -40,6 +42,8 @@ public class UserAgentProcessor extends AbstractProcessor { + private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger(UserAgentProcessor.class)); + public static final String TYPE = "user_agent"; private final String field; @@ -63,7 +67,7 @@ boolean isIgnoreMissing() { } @Override - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) { String userAgent = ingestDocument.getFieldValue(field, String.class, ignoreMissing); if (userAgent == null && ignoreMissing) { @@ -75,68 +79,64 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { Details uaClient = parser.parse(userAgent); Map uaDetails = new HashMap<>(); + + // Parse the user agent in the ECS (Elastic Common Schema) format for (Property property : this.properties) { switch (property) { + case ORIGINAL: + uaDetails.put("original", userAgent); + break; case NAME: if (uaClient.userAgent != null && uaClient.userAgent.name != null) { uaDetails.put("name", uaClient.userAgent.name); - } - else { + } else { uaDetails.put("name", "Other"); } break; - case MAJOR: + case VERSION: + StringBuilder version = new StringBuilder(); if (uaClient.userAgent != null && uaClient.userAgent.major != null) { - uaDetails.put("major", uaClient.userAgent.major); - } - break; - case MINOR: - if (uaClient.userAgent != null && uaClient.userAgent.minor != null) { - uaDetails.put("minor", uaClient.userAgent.minor); - } - break; - case PATCH: - if (uaClient.userAgent != null && uaClient.userAgent.patch != null) { - uaDetails.put("patch", uaClient.userAgent.patch); - } - break; - case BUILD: - if (uaClient.userAgent != null && uaClient.userAgent.build != null) { - uaDetails.put("build", uaClient.userAgent.build); + version.append(uaClient.userAgent.major); + if (uaClient.userAgent.minor != null) { + version.append(".").append(uaClient.userAgent.minor); + if (uaClient.userAgent.patch != null) { + version.append(".").append(uaClient.userAgent.patch); + if (uaClient.userAgent.build != null) { + version.append(".").append(uaClient.userAgent.build); + } + } + } + uaDetails.put("version", version.toString()); } break; case OS: if (uaClient.operatingSystem != null) { - uaDetails.put("os", buildFullOSName(uaClient.operatingSystem)); - } - else { - uaDetails.put("os", "Other"); - } - - break; - case OS_NAME: - if (uaClient.operatingSystem != null && uaClient.operatingSystem.name != null) { - uaDetails.put("os_name", uaClient.operatingSystem.name); - } - else { - uaDetails.put("os_name", "Other"); - } - break; - case OS_MAJOR: - if (uaClient.operatingSystem != null && uaClient.operatingSystem.major != null) { - uaDetails.put("os_major", uaClient.operatingSystem.major); - } - break; - case OS_MINOR: - if (uaClient.operatingSystem != null && uaClient.operatingSystem.minor != null) { - uaDetails.put("os_minor", uaClient.operatingSystem.minor); + Map osDetails = new HashMap<>(3); + if (uaClient.operatingSystem.name != null) { + osDetails.put("name", uaClient.operatingSystem.name); + StringBuilder sb = new StringBuilder(); + if (uaClient.operatingSystem.major != null) { + sb.append(uaClient.operatingSystem.major); + if (uaClient.operatingSystem.minor != null) { + sb.append(".").append(uaClient.operatingSystem.minor); + if (uaClient.operatingSystem.patch != null) { + sb.append(".").append(uaClient.operatingSystem.patch); + if (uaClient.operatingSystem.build != null) { + sb.append(".").append(uaClient.operatingSystem.build); + } + } + } + osDetails.put("version", sb.toString()); + osDetails.put("full", uaClient.operatingSystem.name + " " + sb.toString()); + } + uaDetails.put("os", osDetails); + } } break; case DEVICE: if (uaClient.device != null && uaClient.device.name != null) { uaDetails.put("device", uaClient.device.name); - } - else { + } else { uaDetails.put("device", "Other"); } break; @@ -215,6 +215,10 @@ public UserAgentProcessor create(Map factories, Strin String regexFilename = readStringProperty(TYPE, processorTag, config, "regex_file", IngestUserAgentPlugin.DEFAULT_PARSER_NAME); List propertyNames = readOptionalList(TYPE, processorTag, config, "properties"); boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false); + Object ecsValue = config.remove("ecs"); + if (ecsValue != null) { + deprecationLogger.deprecated("setting [ecs] is deprecated as ECS format is the default and only option"); + } UserAgentParser parser = userAgentParsers.get(regexFilename); if (parser == null) { @@ -242,13 +246,16 @@ public UserAgentProcessor create(Map factories, Strin enum Property { - NAME, MAJOR, MINOR, PATCH, OS, OS_NAME, OS_MAJOR, OS_MINOR, DEVICE, BUILD; + NAME, + OS, + DEVICE, + ORIGINAL, + VERSION; public static Property parseProperty(String propertyName) { try { return valueOf(propertyName.toUpperCase(Locale.ROOT)); - } - catch (IllegalArgumentException e) { + } catch (IllegalArgumentException e) { throw new IllegalArgumentException("illegal property value [" + propertyName + "]. valid values are " + Arrays.toString(EnumSet.allOf(Property.class).toArray())); } diff --git a/modules/ingest-user-agent/src/test/java/org/elasticsearch/ingest/useragent/UserAgentProcessorFactoryTests.java b/modules/ingest-user-agent/src/test/java/org/elasticsearch/ingest/useragent/UserAgentProcessorFactoryTests.java index d9c6fc17620da..f723c13f23022 100644 --- a/modules/ingest-user-agent/src/test/java/org/elasticsearch/ingest/useragent/UserAgentProcessorFactoryTests.java +++ b/modules/ingest-user-agent/src/test/java/org/elasticsearch/ingest/useragent/UserAgentProcessorFactoryTests.java @@ -178,8 +178,8 @@ public void testInvalidProperty() throws Exception { config.put("properties", Collections.singletonList("invalid")); ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config)); - assertThat(e.getMessage(), equalTo("[properties] illegal property value [invalid]. valid values are [NAME, MAJOR, MINOR, " - + "PATCH, OS, OS_NAME, OS_MAJOR, OS_MINOR, DEVICE, BUILD]")); + assertThat(e.getMessage(), equalTo("[properties] illegal property value [invalid]. valid values are [NAME, OS, DEVICE, " + + "ORIGINAL, VERSION]")); } public void testInvalidPropertiesType() throws Exception { diff --git a/modules/ingest-user-agent/src/test/java/org/elasticsearch/ingest/useragent/UserAgentProcessorTests.java b/modules/ingest-user-agent/src/test/java/org/elasticsearch/ingest/useragent/UserAgentProcessorTests.java index 0a8b453724c90..3938fccd832a3 100644 --- a/modules/ingest-user-agent/src/test/java/org/elasticsearch/ingest/useragent/UserAgentProcessorTests.java +++ b/modules/ingest-user-agent/src/test/java/org/elasticsearch/ingest/useragent/UserAgentProcessorTests.java @@ -103,16 +103,13 @@ public void testCommonBrowser() throws Exception { Map target = (Map) data.get("target_field"); assertThat(target.get("name"), is("Chrome")); - assertThat(target.get("major"), is("33")); - assertThat(target.get("minor"), is("0")); - assertThat(target.get("patch"), is("1750")); - assertNull(target.get("build")); - - assertThat(target.get("os"), is("Mac OS X 10.9.2")); - assertThat(target.get("os_name"), is("Mac OS X")); - assertThat(target.get("os_major"), is("10")); - assertThat(target.get("os_minor"), is("9")); + assertThat(target.get("version"), is("33.0.1750")); + Map os = new HashMap<>(); + os.put("name", "Mac OS X"); + os.put("version", "10.9.2"); + os.put("full", "Mac OS X 10.9.2"); + assertThat(target.get("os"), is(os)); assertThat(target.get("device"), is("Other")); } @@ -131,15 +128,13 @@ public void testUncommonDevice() throws Exception { Map target = (Map) data.get("target_field"); assertThat(target.get("name"), is("Android")); - assertThat(target.get("major"), is("3")); - assertThat(target.get("minor"), is("0")); - assertNull(target.get("patch")); - assertNull(target.get("build")); + assertThat(target.get("version"), is("3.0")); - assertThat(target.get("os"), is("Android 3.0")); - assertThat(target.get("os_name"), is("Android")); - assertThat(target.get("os_major"), is("3")); - assertThat(target.get("os_minor"), is("0")); + Map os = new HashMap<>(); + os.put("name", "Android"); + os.put("version", "3.0"); + os.put("full", "Android 3.0"); + assertThat(target.get("os"), is(os)); assertThat(target.get("device"), is("Motorola Xoom")); } @@ -158,15 +153,9 @@ public void testSpider() throws Exception { Map target = (Map) data.get("target_field"); assertThat(target.get("name"), is("EasouSpider")); - assertNull(target.get("major")); - assertNull(target.get("minor")); - assertNull(target.get("patch")); - assertNull(target.get("build")); - assertThat(target.get("os"), is("Other")); - assertThat(target.get("os_name"), is("Other")); - assertNull(target.get("os_major")); - assertNull(target.get("os_minor")); + assertNull(target.get("version")); + assertNull(target.get("os")); assertThat(target.get("device"), is("Spider")); } @@ -190,10 +179,7 @@ public void testUnknown() throws Exception { assertNull(target.get("patch")); assertNull(target.get("build")); - assertThat(target.get("os"), is("Other")); - assertThat(target.get("os_name"), is("Other")); - assertNull(target.get("os_major")); - assertNull(target.get("os_minor")); + assertNull(target.get("os")); assertThat(target.get("device"), is("Other")); } diff --git a/modules/ingest-user-agent/src/test/resources/rest-api-spec/test/ingest-useragent/20_useragent_processor.yml b/modules/ingest-user-agent/src/test/resources/rest-api-spec/test/ingest-useragent/20_useragent_processor.yml index 28c218edd6935..fc44d7261e80f 100644 --- a/modules/ingest-user-agent/src/test/resources/rest-api-spec/test/ingest-useragent/20_useragent_processor.yml +++ b/modules/ingest-user-agent/src/test/resources/rest-api-spec/test/ingest-useragent/20_useragent_processor.yml @@ -29,13 +29,9 @@ id: 1 - match: { _source.field1: "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.149 Safari/537.36" } - match: { _source.user_agent.name: "Chrome" } - - match: { _source.user_agent.os: "Mac OS X 10.9.2" } - - match: { _source.user_agent.os_name: "Mac OS X" } - - match: { _source.user_agent.os_major: "10" } - - match: { _source.user_agent.os_minor: "9" } - - match: { _source.user_agent.major: "33" } - - match: { _source.user_agent.minor: "0" } - - match: { _source.user_agent.patch: "1750" } + - match: { _source.user_agent.original: "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.149 Safari/537.36" } + - match: { _source.user_agent.os: {"name":"Mac OS X", "version":"10.9.2", "full":"Mac OS X 10.9.2"} } + - match: { _source.user_agent.version: "33.0.1750" } - match: { _source.user_agent.device: "Other" } --- @@ -70,13 +66,8 @@ index: test id: 1 - match: { _source.field1: "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.149 Safari/537.36" } - - match: { _source.field2.os: "Mac OS X 10.9.2" } + - match: { _source.field2.os.full: "Mac OS X 10.9.2" } - is_false: _source.user_agent - is_false: _source.field2.name - - is_false: _source.field2.os_name - - is_false: _source.field2.os_major - - is_false: _source.field2.os_minor - - is_false: _source.field2.major - - is_false: _source.field2.minor - - is_false: _source.field2.patch - is_false: _source.field2.device + - is_false: _source.field2.original diff --git a/modules/ingest-user-agent/src/test/resources/rest-api-spec/test/ingest-useragent/30_custom_regex.yml b/modules/ingest-user-agent/src/test/resources/rest-api-spec/test/ingest-useragent/30_custom_regex.yml index 22df584e13166..ac90a3457fa65 100644 --- a/modules/ingest-user-agent/src/test/resources/rest-api-spec/test/ingest-useragent/30_custom_regex.yml +++ b/modules/ingest-user-agent/src/test/resources/rest-api-spec/test/ingest-useragent/30_custom_regex.yml @@ -30,11 +30,6 @@ id: 1 - match: { _source.field1: "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.149 Safari/537.36" } - match: { _source.user_agent.name: "Test" } - - match: { _source.user_agent.os: "Other" } - - match: { _source.user_agent.os_name: "Other" } - match: { _source.user_agent.device: "Other" } - - is_false: _source.user_agent.os_major - - is_false: _source.user_agent.os_minor - - is_false: _source.user_agent.major - - is_false: _source.user_agent.minor - - is_false: _source.user_agent.patch + - is_false: _source.user_agent.os + - is_false: _source.user_agent.version diff --git a/modules/lang-painless/src/test/java/org/elasticsearch/painless/StandardCastTests.java b/modules/lang-painless/src/test/java/org/elasticsearch/painless/StandardCastTests.java index 739d9d021a427..d19ba0847b2de 100644 --- a/modules/lang-painless/src/test/java/org/elasticsearch/painless/StandardCastTests.java +++ b/modules/lang-painless/src/test/java/org/elasticsearch/painless/StandardCastTests.java @@ -641,4 +641,158 @@ public void testStringCasts() { expectScriptThrows(ClassCastException.class, () -> exec("String o = 'string'; ArrayList b = o;")); expectScriptThrows(ClassCastException.class, () -> exec("String o = 'string'; ArrayList b = (ArrayList)o;")); } + + public void testPrimitiveBooleanCasts() { + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; Object n = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; Object n = (Object)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; Number n = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; Number n = (boolean)o;")); + + exec("boolean o = true; boolean b = o;"); + exec("boolean o = true; boolean b = (boolean)o;"); + + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; byte b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; byte b = (byte)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; short b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; short b = (short)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; char b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; char b = (char)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; int b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; int b = (int)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; long b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; long b = (long)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; float b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; float b = (float)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; double b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; double b = (double)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; Boolean b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; Boolean b = (Boolean)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; Byte b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; Byte b = (Byte)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; Short b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; Short b = (Short)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; Character b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; Character b = (Character)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; Integer b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; Integer b = (Integer)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; Long b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; Long b = (Long)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; Float b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; Float b = (Float)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; Double b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; Double b = (Double)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; ArrayList b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("boolean o = true; ArrayList b = (ArrayList)o;")); + } + + public void testBoxedBooleanCasts() { + exec("Boolean o = Boolean.valueOf(true); Object n = o;"); + exec("Boolean o = null; Object n = o;"); + exec("Boolean o = Boolean.valueOf(true); Object n = (Object)o;"); + exec("Boolean o = null; Object n = (Object)o;"); + + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); Number n = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; Number n = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); Number n = (Boolean)o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; Number n = (Boolean)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); boolean b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; boolean b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); boolean b = (boolean)o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; boolean b = (boolean)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); byte b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; byte b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); byte b = (byte)o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; byte b = (byte)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); short b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; short b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); short b = (short)o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; short b = (short)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); char b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; char b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); char b = (char)o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; char b = (char)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); int b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; int b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); int b = (int)o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; int b = (int)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); long b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; long b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); long b = (long)o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; long b = (long)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); float b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; float b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); float b = (float)o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; float b = (float)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); double b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; double b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); double b = (double)o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; double b = (double)o;")); + + exec("Boolean o = Boolean.valueOf(true); Boolean b = o;"); + exec("Boolean o = null; Boolean b = o;"); + exec("Boolean o = Boolean.valueOf(true); Boolean b = (Boolean)o;"); + exec("Boolean o = null; Boolean b = (Boolean)o;"); + + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); Byte b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; Byte b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); Byte b = (Byte)o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; Byte b = (Byte)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); Short b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; Short b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); Short b = (Short)o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; Short b = (Short)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); Character b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; Character b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); Character b = (Character)o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; Character b = (Character)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); Integer b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; Integer b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); Integer b = (Integer)o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; Integer b = (Integer)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); Long b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; Long b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); Long b = (Long)o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; Long b = (Long)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); Float b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; Float b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); Float b = (Float)o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; Float b = (Float)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); Double b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; Double b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); Double b = (Double)o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = null; Double b = (Double)o;")); + + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); ArrayList b = o;")); + expectScriptThrows(ClassCastException.class, () -> exec("Boolean o = Boolean.valueOf(true); ArrayList b = (ArrayList)o;")); + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index a5c4adc53c42a..4cf81c24fbf1a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.CommitStats; +import org.elasticsearch.index.seqno.RetentionLeaseStats; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; @@ -101,21 +102,25 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq // only report on fully started shards CommitStats commitStats; SeqNoStats seqNoStats; + RetentionLeaseStats retentionLeaseStats; try { commitStats = indexShard.commitStats(); seqNoStats = indexShard.seqNoStats(); - } catch (AlreadyClosedException e) { + retentionLeaseStats = indexShard.getRetentionLeaseStats(); + } catch (final AlreadyClosedException e) { // shard is closed - no stats is fine commitStats = null; seqNoStats = null; + retentionLeaseStats = null; } shardsStats.add( - new ShardStats( - indexShard.routingEntry(), - indexShard.shardPath(), - new CommonStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS), - commitStats, - seqNoStats)); + new ShardStats( + indexShard.routingEntry(), + indexShard.shardPath(), + new CommonStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS), + commitStats, + seqNoStats, + retentionLeaseStats)); } } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java index 898f3d69456b0..9297447695a61 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java @@ -26,22 +26,36 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.ToXContent.Params; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.engine.CommitStats; +import org.elasticsearch.index.seqno.RetentionLeaseStats; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.ShardPath; import java.io.IOException; public class ShardStats implements Streamable, Writeable, ToXContentFragment { + private ShardRouting shardRouting; private CommonStats commonStats; @Nullable private CommitStats commitStats; @Nullable private SeqNoStats seqNoStats; + + @Nullable + private RetentionLeaseStats retentionLeaseStats; + + /** + * Gets the current retention lease stats. + * + * @return the current retention lease stats + */ + public RetentionLeaseStats getRetentionLeaseStats() { + return retentionLeaseStats; + } + private String dataPath; private String statePath; private boolean isCustomDataPath; @@ -49,7 +63,13 @@ public class ShardStats implements Streamable, Writeable, ToXContentFragment { ShardStats() { } - public ShardStats(ShardRouting routing, ShardPath shardPath, CommonStats commonStats, CommitStats commitStats, SeqNoStats seqNoStats) { + public ShardStats( + final ShardRouting routing, + final ShardPath shardPath, + final CommonStats commonStats, + final CommitStats commitStats, + final SeqNoStats seqNoStats, + final RetentionLeaseStats retentionLeaseStats) { this.shardRouting = routing; this.dataPath = shardPath.getRootDataPath().toString(); this.statePath = shardPath.getRootStatePath().toString(); @@ -57,6 +77,7 @@ public ShardStats(ShardRouting routing, ShardPath shardPath, CommonStats commonS this.commitStats = commitStats; this.commonStats = commonStats; this.seqNoStats = seqNoStats; + this.retentionLeaseStats = retentionLeaseStats; } /** @@ -109,6 +130,9 @@ public void readFrom(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { seqNoStats = in.readOptionalWriteable(SeqNoStats::new); } + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + retentionLeaseStats = in.readOptionalWriteable(RetentionLeaseStats::new); + } } @Override @@ -122,6 +146,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { out.writeOptionalWriteable(seqNoStats); } + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeOptionalWriteable(retentionLeaseStats); + } } @Override @@ -140,6 +167,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (seqNoStats != null) { seqNoStats.toXContent(builder, params); } + if (retentionLeaseStats != null) { + retentionLeaseStats.toXContent(builder, params); + } builder.startObject(Fields.SHARD_PATH); builder.field(Fields.STATE_PATH, statePath); builder.field(Fields.DATA_PATH, dataPath); @@ -159,4 +189,5 @@ static final class Fields { static final String NODE = "node"; static final String RELOCATING_NODE = "relocating_node"; } + } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java index d339184c5f814..8371023738b3b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.CommitStats; +import org.elasticsearch.index.seqno.RetentionLeaseStats; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardNotFoundException; @@ -106,15 +107,23 @@ protected ShardStats shardOperation(IndicesStatsRequest request, ShardRouting sh CommonStats commonStats = new CommonStats(indicesService.getIndicesQueryCache(), indexShard, request.flags()); CommitStats commitStats; SeqNoStats seqNoStats; + RetentionLeaseStats retentionLeaseStats; try { commitStats = indexShard.commitStats(); seqNoStats = indexShard.seqNoStats(); - } catch (AlreadyClosedException e) { + retentionLeaseStats = indexShard.getRetentionLeaseStats(); + } catch (final AlreadyClosedException e) { // shard is closed - no stats is fine commitStats = null; seqNoStats = null; + retentionLeaseStats = null; } - return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), commonStats, - commitStats, seqNoStats); + return new ShardStats( + indexShard.routingEntry(), + indexShard.shardPath(), + commonStats, + commitStats, + seqNoStats, + retentionLeaseStats); } } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 4894ca1f772c4..c0f0278479a0c 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -1192,12 +1192,12 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, R onSuccess.run(); } - protected final ShardStateAction.Listener createShardActionListener(final Runnable onSuccess, + protected final ActionListener createShardActionListener(final Runnable onSuccess, final Consumer onPrimaryDemoted, final Consumer onIgnoredFailure) { - return new ShardStateAction.Listener() { + return new ActionListener() { @Override - public void onSuccess() { + public void onResponse(Void aVoid) { onSuccess.run(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 4419d921a3b4a..071885202c458 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; @@ -48,18 +49,17 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.EmptyTransportResponseHandler; -import org.elasticsearch.transport.NodeDisconnectedException; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestDeduplicator; import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; @@ -71,7 +71,6 @@ import java.util.Locale; import java.util.Objects; import java.util.Set; -import java.util.concurrent.ConcurrentMap; import java.util.function.Predicate; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; @@ -89,7 +88,7 @@ public class ShardStateAction { // a list of shards that failed during replication // we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard. - private final ConcurrentMap remoteFailedShardsCache = ConcurrentCollections.newConcurrentMap(); + private final TransportRequestDeduplicator remoteFailedShardsDeduplicator = new TransportRequestDeduplicator<>(); @Inject public ShardStateAction(ClusterService clusterService, TransportService transportService, @@ -106,7 +105,7 @@ public ShardStateAction(ClusterService clusterService, TransportService transpor } private void sendShardAction(final String actionName, final ClusterState currentState, - final TransportRequest request, final Listener listener) { + final TransportRequest request, final ActionListener listener) { ClusterStateObserver observer = new ClusterStateObserver(currentState, clusterService, null, logger, threadPool.getThreadContext()); DiscoveryNode masterNode = currentState.nodes().getMasterNode(); @@ -120,7 +119,7 @@ private void sendShardAction(final String actionName, final ClusterState current actionName, request, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleResponse(TransportResponse.Empty response) { - listener.onSuccess(); + listener.onResponse(null); } @Override @@ -163,44 +162,22 @@ private static boolean isMasterChannelException(TransportException exp) { * @param listener callback upon completion of the request */ public void remoteShardFailed(final ShardId shardId, String allocationId, long primaryTerm, boolean markAsStale, final String message, - @Nullable final Exception failure, Listener listener) { + @Nullable final Exception failure, ActionListener listener) { assert primaryTerm > 0L : "primary term should be strictly positive"; - final FailedShardEntry shardEntry = new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale); - final CompositeListener compositeListener = new CompositeListener(listener); - final CompositeListener existingListener = remoteFailedShardsCache.putIfAbsent(shardEntry, compositeListener); - if (existingListener == null) { - sendShardAction(SHARD_FAILED_ACTION_NAME, clusterService.state(), shardEntry, new Listener() { - @Override - public void onSuccess() { - try { - compositeListener.onSuccess(); - } finally { - remoteFailedShardsCache.remove(shardEntry); - } - } - @Override - public void onFailure(Exception e) { - try { - compositeListener.onFailure(e); - } finally { - remoteFailedShardsCache.remove(shardEntry); - } - } - }); - } else { - existingListener.addListener(listener); - } + remoteFailedShardsDeduplicator.executeOnce( + new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale), listener, + (req, reqListener) -> sendShardAction(SHARD_FAILED_ACTION_NAME, clusterService.state(), req, reqListener)); } int remoteShardFailedCacheSize() { - return remoteFailedShardsCache.size(); + return remoteFailedShardsDeduplicator.size(); } /** * Send a shard failed request to the master node to update the cluster state when a shard on the local node failed. */ public void localShardFailed(final ShardRouting shardRouting, final String message, - @Nullable final Exception failure, Listener listener) { + @Nullable final Exception failure, ActionListener listener) { localShardFailed(shardRouting, message, failure, listener, clusterService.state()); } @@ -208,7 +185,7 @@ public void localShardFailed(final ShardRouting shardRouting, final String messa * Send a shard failed request to the master node to update the cluster state when a shard on the local node failed. */ public void localShardFailed(final ShardRouting shardRouting, final String message, @Nullable final Exception failure, - Listener listener, final ClusterState currentState) { + ActionListener listener, final ClusterState currentState) { FailedShardEntry shardEntry = new FailedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), 0L, message, failure, true); sendShardAction(SHARD_FAILED_ACTION_NAME, currentState, shardEntry, listener); @@ -216,7 +193,8 @@ public void localShardFailed(final ShardRouting shardRouting, final String messa // visible for testing protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, - TransportRequest request, Listener listener, Predicate changePredicate) { + TransportRequest request, ActionListener listener, + Predicate changePredicate) { observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { @@ -497,14 +475,14 @@ public int hashCode() { public void shardStarted(final ShardRouting shardRouting, final long primaryTerm, final String message, - final Listener listener) { + final ActionListener listener) { shardStarted(shardRouting, primaryTerm, message, listener, clusterService.state()); } public void shardStarted(final ShardRouting shardRouting, final long primaryTerm, final String message, - final Listener listener, + final ActionListener listener, final ClusterState currentState) { StartedShardEntry entry = new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), primaryTerm, message); sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, entry, listener); @@ -670,97 +648,6 @@ public String toString() { } } - public interface Listener { - - default void onSuccess() { - } - - /** - * Notification for non-channel exceptions that are not handled - * by {@link ShardStateAction}. - * - * The exceptions that are handled by {@link ShardStateAction} - * are: - * - {@link NotMasterException} - * - {@link NodeDisconnectedException} - * - {@link FailedToCommitClusterStateException} - * - * Any other exception is communicated to the requester via - * this notification. - * - * @param e the unexpected cause of the failure on the master - */ - default void onFailure(final Exception e) { - } - - } - - /** - * A composite listener that allows registering multiple listeners dynamically. - */ - static final class CompositeListener implements Listener { - private boolean isNotified = false; - private Exception failure = null; - private final List listeners = new ArrayList<>(); - - CompositeListener(Listener listener) { - listeners.add(listener); - } - - void addListener(Listener listener) { - final boolean ready; - synchronized (this) { - ready = this.isNotified; - if (ready == false) { - listeners.add(listener); - } - } - if (ready) { - if (failure != null) { - listener.onFailure(failure); - } else { - listener.onSuccess(); - } - } - } - - private void onCompleted(Exception failure) { - synchronized (this) { - this.failure = failure; - this.isNotified = true; - } - RuntimeException firstException = null; - for (Listener listener : listeners) { - try { - if (failure != null) { - listener.onFailure(failure); - } else { - listener.onSuccess(); - } - } catch (RuntimeException innerEx) { - if (firstException == null) { - firstException = innerEx; - } else { - firstException.addSuppressed(innerEx); - } - } - } - if (firstException != null) { - throw firstException; - } - } - - @Override - public void onSuccess() { - onCompleted(null); - } - - @Override - public void onFailure(Exception failure) { - onCompleted(failure); - } - } - public static class NoLongerPrimaryShardException extends ElasticsearchException { public NoLongerPrimaryShardException(ShardId shardId, String msg) { diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java index 24d144d810d9c..362d068f45e2d 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java @@ -28,7 +28,9 @@ import java.util.Collection; import java.util.Collections; import java.util.Locale; +import java.util.Map; import java.util.Objects; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -242,4 +244,14 @@ public String toString() { '}'; } + /** + * A utility method to convert a collection of retention leases to a map from retention lease ID to retention lease. + * + * @param leases the leases + * @return the map from retention lease ID to retention lease + */ + static Map toMap(final Collection leases) { + return leases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity())); + } + } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseStats.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseStats.java new file mode 100644 index 0000000000000..b8f1454a12c2b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseStats.java @@ -0,0 +1,124 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Collection; +import java.util.Objects; + +/** + * Represents retention lease stats. + */ +public final class RetentionLeaseStats implements ToXContentFragment, Writeable { + + private final Collection leases; + + /** + * The underlying retention leases backing this stats object. + * + * @return the leases + */ + public Collection leases() { + return leases; + } + + /** + * Constructs a new retention lease stats object from the specified leases. + * + * @param leases the leases + */ + public RetentionLeaseStats(final Collection leases) { + this.leases = Objects.requireNonNull(leases); + } + + /** + * Constructs a new retention lease stats object from a stream. The retention lease stats should have been written via + * {@link #writeTo(StreamOutput)}. + * + * @param in the stream to construct the retention lease stats from + * @throws IOException if an I/O exception occurs reading from the stream + */ + public RetentionLeaseStats(final StreamInput in) throws IOException { + leases = in.readList(RetentionLease::new); + } + + /** + * Writes a retention lease stats object to a stream in a manner suitable for later reconstruction via + * {@link #RetentionLeaseStats(StreamInput)} (StreamInput)}. + * + * @param out the stream to write the retention lease stats to + * @throws IOException if an I/O exception occurs writing to the stream + */ + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeCollection(leases); + } + + /** + * Converts the retention lease stats to {@link org.elasticsearch.common.xcontent.XContent} using the specified builder and pararms. + * + * @param builder the builder + * @param params the params + * @return the builder that these retention leases were converted to {@link org.elasticsearch.common.xcontent.XContent} into + * @throws IOException if an I/O exception occurs writing to the builder + */ + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject("retention_leases"); + { + builder.startArray("leases"); + { + for (final RetentionLease retentionLease : leases) { + builder.startObject(); + { + builder.field("id", retentionLease.id()); + builder.field("retaining_seq_no", retentionLease.retainingSequenceNumber()); + builder.field("timestamp", retentionLease.timestamp()); + builder.field("source", retentionLease.source()); + } + builder.endObject(); + } + } + builder.endArray(); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final RetentionLeaseStats that = (RetentionLeaseStats) o; + return Objects.equals(leases, that.leases); + } + + @Override + public int hashCode() { + return Objects.hash(leases); + } + +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 031499153fdc3..61b7917ff501e 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -108,6 +108,7 @@ import org.elasticsearch.index.search.stats.ShardSearchStats; import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLease; +import org.elasticsearch.index.seqno.RetentionLeaseStats; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask; @@ -1896,6 +1897,11 @@ public Collection getRetentionLeases() { return replicationTracker.getRetentionLeases(); } + public RetentionLeaseStats getRetentionLeaseStats() { + verifyNotClosed(); + return new RetentionLeaseStats(getRetentionLeases()); + } + /** * Adds a new retention lease. * diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index cca63c015f1c7..9113ba3f23b14 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -95,6 +95,7 @@ import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.search.stats.SearchStats; +import org.elasticsearch.index.seqno.RetentionLeaseStats; import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.IllegalIndexShardStateException; @@ -367,23 +368,29 @@ IndexShardStats indexShardStats(final IndicesService indicesService, final Index CommitStats commitStats; SeqNoStats seqNoStats; + RetentionLeaseStats retentionLeaseStats; try { commitStats = indexShard.commitStats(); seqNoStats = indexShard.seqNoStats(); + retentionLeaseStats = indexShard.getRetentionLeaseStats(); } catch (AlreadyClosedException e) { // shard is closed - no stats is fine commitStats = null; seqNoStats = null; + retentionLeaseStats = null; } - return new IndexShardStats(indexShard.shardId(), - new ShardStats[] { - new ShardStats(indexShard.routingEntry(), - indexShard.shardPath(), - new CommonStats(indicesService.getIndicesQueryCache(), indexShard, flags), - commitStats, - seqNoStats) - }); + return new IndexShardStats( + indexShard.shardId(), + new ShardStats[]{ + new ShardStats( + indexShard.routingEntry(), + indexShard.shardPath(), + new CommonStats(indicesService.getIndicesQueryCache(), indexShard, flags), + commitStats, + seqNoStats, + retentionLeaseStats) + }); } /** diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 5955a749fea34..57ec87d1c6493 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -109,8 +109,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final ShardStateAction shardStateAction; private final NodeMappingRefreshAction nodeMappingRefreshAction; - private static final ShardStateAction.Listener SHARD_STATE_ACTION_LISTENER = new ShardStateAction.Listener() { - }; + private static final ActionListener SHARD_STATE_ACTION_LISTENER = ActionListener.wrap(() -> {}); private final Settings settings; // a list of shards that failed during recovery diff --git a/server/src/main/java/org/elasticsearch/transport/TransportRequestDeduplicator.java b/server/src/main/java/org/elasticsearch/transport/TransportRequestDeduplicator.java new file mode 100644 index 0000000000000..d929ef34ce2c3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/TransportRequestDeduplicator.java @@ -0,0 +1,114 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentMap; +import java.util.function.BiConsumer; + +/** + * Deduplicator for {@link TransportRequest}s that keeps track of {@link TransportRequest}s that should + * not be sent in parallel. + * @param Transport Request Class + */ +public final class TransportRequestDeduplicator { + + private final ConcurrentMap requests = ConcurrentCollections.newConcurrentMap(); + + /** + * Ensures a given request not executed multiple times when another equal request is already in-flight. + * If the request is not yet known to the deduplicator it will invoke the passed callback with an {@link ActionListener} + * that must be completed by the caller when the request completes. Once that listener is completed the request will be removed from + * the deduplicator's internal state. If the request is already known to the deduplicator it will keep + * track of the given listener and invoke it when the listener passed to the callback on first invocation is completed. + * @param request Request to deduplicate + * @param listener Listener to invoke on request completion + * @param callback Callback to be invoked with request and completion listener the first time the request is added to the deduplicator + */ + public void executeOnce(T request, ActionListener listener, BiConsumer> callback) { + ActionListener completionListener = requests.computeIfAbsent(request, CompositeListener::new).addListener(listener); + if (completionListener != null) { + callback.accept(request, completionListener); + } + } + + public int size() { + return requests.size(); + } + + private final class CompositeListener implements ActionListener { + + private final List> listeners = new ArrayList<>(); + + private final T request; + + private boolean isNotified; + private Exception failure; + + CompositeListener(T request) { + this.request = request; + } + + CompositeListener addListener(ActionListener listener) { + synchronized (this) { + if (this.isNotified == false) { + listeners.add(listener); + return listeners.size() == 1 ? this : null; + } + } + if (failure != null) { + listener.onFailure(failure); + } else { + listener.onResponse(null); + } + return null; + } + + private void onCompleted(Exception failure) { + synchronized (this) { + this.failure = failure; + this.isNotified = true; + } + try { + if (failure == null) { + ActionListener.onResponse(listeners, null); + } else { + ActionListener.onFailure(listeners, failure); + } + } finally { + requests.remove(request); + } + } + + @Override + public void onResponse(final Void aVoid) { + onCompleted(null); + } + + @Override + public void onFailure(Exception failure) { + onCompleted(failure); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java index c4fcb9bdb53e2..fcccaf6c0f021 100644 --- a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java @@ -113,8 +113,8 @@ public void testFillShardLevelInfo() { CommonStats commonStats1 = new CommonStats(); commonStats1.store = new StoreStats(1000); ShardStats[] stats = new ShardStats[] { - new ShardStats(test_0, new ShardPath(false, test0Path, test0Path, test_0.shardId()), commonStats0 , null, null), - new ShardStats(test_1, new ShardPath(false, test1Path, test1Path, test_1.shardId()), commonStats1 , null, null) + new ShardStats(test_0, new ShardPath(false, test0Path, test0Path, test_0.shardId()), commonStats0 , null, null, null), + new ShardStats(test_1, new ShardPath(false, test1Path, test1Path, test_1.shardId()), commonStats1 , null, null, null) }; ImmutableOpenMap.Builder shardSizes = ImmutableOpenMap.builder(); ImmutableOpenMap.Builder routingToPath = ImmutableOpenMap.builder(); diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index a800c0c79929c..69743c101ee10 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -22,6 +22,7 @@ import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; @@ -75,7 +76,6 @@ import static org.elasticsearch.test.VersionUtils.randomCompatibleVersion; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; @@ -110,7 +110,7 @@ public void setOnAfterWaitForNewMasterAndRetry(Runnable onAfterWaitForNewMasterA @Override protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, TransportRequest request, - Listener listener, Predicate changePredicate) { + ActionListener listener, Predicate changePredicate) { onBeforeWaitForNewMasterAndRetry.run(); super.waitForNewMasterAndRetry(actionName, observer, request, listener, changePredicate); onAfterWaitForNewMasterAndRetry.run(); @@ -197,9 +197,9 @@ public void testNoMaster() throws InterruptedException { }); ShardRouting failedShard = getRandomShardRouting(index); - shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ActionListener() { @Override - public void onSuccess() { + public void onResponse(Void aVoid) { success.set(true); latch.countDown(); } @@ -246,9 +246,9 @@ public void testMasterChannelException() throws InterruptedException { setUpMasterRetryVerification(numberOfRetries, retries, latch, retryLoop); ShardRouting failedShard = getRandomShardRouting(index); - shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ActionListener() { @Override - public void onSuccess() { + public void onResponse(Void aVoid) { success.set(true); latch.countDown(); } @@ -343,9 +343,9 @@ public void testCacheRemoteShardFailed() throws Exception { long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); for (int i = 0; i < numListeners; i++) { shardStateAction.remoteShardFailed(failedShard.shardId(), failedShard.allocationId().getId(), - primaryTerm, markAsStale, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + primaryTerm, markAsStale, "test", getSimulatedFailure(), new ActionListener() { @Override - public void onSuccess() { + public void onResponse(Void aVoid) { latch.countDown(); } @Override @@ -394,9 +394,9 @@ public void testRemoteShardFailedConcurrently() throws Exception { ShardRouting failedShard = randomFrom(failedShards); shardStateAction.remoteShardFailed(failedShard.shardId(), failedShard.allocationId().getId(), randomLongBetween(1, Long.MAX_VALUE), randomBoolean(), "test", getSimulatedFailure(), - new ShardStateAction.Listener() { + new ActionListener() { @Override - public void onSuccess() { + public void onResponse(Void aVoid) { notifiedResponses.incrementAndGet(); } @Override @@ -561,70 +561,13 @@ BytesReference serialize(Writeable writeable, Version version) throws IOExceptio } } - public void testCompositeListener() throws Exception { - AtomicInteger successCount = new AtomicInteger(); - AtomicInteger failureCount = new AtomicInteger(); - Exception failure = randomBoolean() ? getSimulatedFailure() : null; - ShardStateAction.CompositeListener compositeListener = new ShardStateAction.CompositeListener(new ShardStateAction.Listener() { - @Override - public void onSuccess() { - successCount.incrementAndGet(); - } - @Override - public void onFailure(Exception e) { - assertThat(e, sameInstance(failure)); - failureCount.incrementAndGet(); - } - }); - int iterationsPerThread = scaledRandomIntBetween(100, 1000); - Thread[] threads = new Thread[between(1, 4)]; - Phaser barrier = new Phaser(threads.length + 1); - for (int i = 0; i < threads.length; i++) { - threads[i] = new Thread(() -> { - barrier.arriveAndAwaitAdvance(); - for (int n = 0; n < iterationsPerThread; n++) { - compositeListener.addListener(new ShardStateAction.Listener() { - @Override - public void onSuccess() { - successCount.incrementAndGet(); - } - @Override - public void onFailure(Exception e) { - assertThat(e, sameInstance(failure)); - failureCount.incrementAndGet(); - } - }); - } - }); - threads[i].start(); - } - barrier.arriveAndAwaitAdvance(); - if (failure != null) { - compositeListener.onFailure(failure); - } else { - compositeListener.onSuccess(); - } - for (Thread t : threads) { - t.join(); - } - assertBusy(() -> { - if (failure != null) { - assertThat(successCount.get(), equalTo(0)); - assertThat(failureCount.get(), equalTo(threads.length*iterationsPerThread + 1)); - } else { - assertThat(successCount.get(), equalTo(threads.length*iterationsPerThread + 1)); - assertThat(failureCount.get(), equalTo(0)); - } - }); - } - - private static class TestListener implements ShardStateAction.Listener { + private static class TestListener implements ActionListener { private final SetOnce failure = new SetOnce<>(); private final CountDownLatch latch = new CountDownLatch(1); @Override - public void onSuccess() { + public void onResponse(Void aVoid) { try { failure.set(null); } finally { diff --git a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java index 330c73b9c02c5..0a9016c20111b 100644 --- a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; @@ -317,10 +318,10 @@ public void testSendingShardFailure() throws Exception { setDisruptionScheme(networkDisruption); networkDisruption.startDisrupting(); - service.localShardFailed(failedShard, "simulated", new CorruptIndexException("simulated", (String) null), new - ShardStateAction.Listener() { + service.localShardFailed(failedShard, "simulated", new CorruptIndexException("simulated", (String) null), + new ActionListener() { @Override - public void onSuccess() { + public void onResponse(final Void aVoid) { success.set(true); latch.countDown(); } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java new file mode 100644 index 0000000000000..d77acc53b247e --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java @@ -0,0 +1,67 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.test.ESSingleNodeTestCase; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.equalTo; + +public class RetentionLeaseStatsTests extends ESSingleNodeTestCase { + + public void testRetentionLeaseStats() throws InterruptedException { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .build(); + createIndex("index", settings); + ensureGreen("index"); + final IndexShard primary = + node().injector().getInstance(IndicesService.class).getShardOrNull(new ShardId(resolveIndex("index"), 0)); + final int length = randomIntBetween(0, 8); + final Map currentRetentionLeases = new HashMap<>(); + for (int i = 0; i < length; i++) { + final String id = randomValueOtherThanMany(currentRetentionLeases.keySet()::contains, () -> randomAlphaOfLength(8)); + final long retainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE); + final String source = randomAlphaOfLength(8); + final CountDownLatch latch = new CountDownLatch(1); + final ActionListener listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())); + currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener)); + latch.await(); + } + + final IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("index").execute().actionGet(); + assertThat(indicesStats.getShards(), arrayWithSize(1)); + final RetentionLeaseStats retentionLeaseStats = indicesStats.getShards()[0].getRetentionLeaseStats(); + assertThat(RetentionLease.toMap(retentionLeaseStats.leases()), equalTo(currentRetentionLeases)); + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsWireSerializingTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsWireSerializingTests.java new file mode 100644 index 0000000000000..fe5dee782c4fe --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsWireSerializingTests.java @@ -0,0 +1,55 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +public class RetentionLeaseStatsWireSerializingTests extends AbstractWireSerializingTestCase { + + @Override + protected RetentionLeaseStats createTestInstance() { + final int length = randomIntBetween(0, 8); + final Collection leases; + if (length == 0) { + leases = Collections.emptyList(); + } else { + leases = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + final long timestamp = randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); + leases.add(new RetentionLease(id, retainingSequenceNumber, timestamp, source)); + } + } + return new RetentionLeaseStats(leases); + } + + @Override + protected Writeable.Reader instanceReader() { + return RetentionLeaseStats::new; + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java index a99d0caea8e6c..f2819bfe161eb 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java @@ -37,8 +37,6 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.stream.Collectors; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.contains; @@ -46,6 +44,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) public class RetentionLeaseSyncIT extends ESIntegTestCase { public void testRetentionLeasesSyncedOnAdd() throws Exception { @@ -77,7 +76,7 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { // check retention leases have been committed on the primary final Collection primaryCommittedRetentionLeases = RetentionLease.decodeRetentionLeases( primary.acquireLastIndexCommit(false).getIndexCommit().getUserData().get(Engine.RETENTION_LEASES)); - assertThat(currentRetentionLeases, equalTo(toMap(primaryCommittedRetentionLeases))); + assertThat(currentRetentionLeases, equalTo(RetentionLease.toMap(primaryCommittedRetentionLeases))); // check current retention leases have been synced to all replicas for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) { @@ -86,13 +85,13 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { final IndexShard replica = internalCluster() .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - final Map retentionLeasesOnReplica = toMap(replica.getRetentionLeases()); + final Map retentionLeasesOnReplica = RetentionLease.toMap(replica.getRetentionLeases()); assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases)); // check retention leases have been committed on the replica final Collection replicaCommittedRetentionLeases = RetentionLease.decodeRetentionLeases( replica.acquireLastIndexCommit(false).getIndexCommit().getUserData().get(Engine.RETENTION_LEASES)); - assertThat(currentRetentionLeases, equalTo(toMap(replicaCommittedRetentionLeases))); + assertThat(currentRetentionLeases, equalTo(RetentionLease.toMap(replicaCommittedRetentionLeases))); } } } @@ -164,8 +163,4 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception { } } - private static Map toMap(final Collection replicaCommittedRetentionLeases) { - return replicaCommittedRetentionLeases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity())); - } - } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index 26e67fb6dd264..f66b383c2799c 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.seqno.RetentionLease; +import org.elasticsearch.index.seqno.RetentionLeaseStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.threadpool.ThreadPool; @@ -81,7 +82,8 @@ public void testAddOrRenewRetentionLease() throws IOException { for (int i = 0; i < length; i++) { minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); indexShard.addRetentionLease( - Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); + Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> { + })); assertRetentionLeases(indexShard, i + 1, minimumRetainingSequenceNumbers, () -> 0L, true); } @@ -115,7 +117,8 @@ private void runExpirationTest(final boolean primary) throws IOException { final long[] retainingSequenceNumbers = new long[1]; retainingSequenceNumbers[0] = randomLongBetween(0, Long.MAX_VALUE); if (primary) { - indexShard.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {})); + indexShard.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> { + })); } else { indexShard.updateRetentionLeasesOnReplica( Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0"))); @@ -176,7 +179,8 @@ public void testCommit() throws IOException { minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); currentTimeMillis.set(TimeUnit.NANOSECONDS.toMillis(randomNonNegativeLong())); indexShard.addRetentionLease( - Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); + Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> { + })); } currentTimeMillis.set(TimeUnit.NANOSECONDS.toMillis(Long.MAX_VALUE)); @@ -215,13 +219,52 @@ public void testCommit() throws IOException { } } + public void testRetentionLeaseStats() throws IOException { + final IndexShard indexShard = newStartedShard(true); + try { + final int length = randomIntBetween(0, 8); + final long[] minimumRetainingSequenceNumbers = new long[length]; + for (int i = 0; i < length; i++) { + minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + indexShard.addRetentionLease( + Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> { + })); + } + final RetentionLeaseStats stats = indexShard.getRetentionLeaseStats(); + assertRetentionLeases( + stats.leases(), + indexShard.indexSettings().getRetentionLeaseMillis(), + length, + minimumRetainingSequenceNumbers, + () -> 0L, + true); + } finally { + closeShards(indexShard); + } + } + private void assertRetentionLeases( final IndexShard indexShard, final int size, final long[] minimumRetainingSequenceNumbers, final LongSupplier currentTimeMillisSupplier, final boolean primary) { - final Collection retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); + assertRetentionLeases( + indexShard.getEngine().config().retentionLeasesSupplier().get(), + indexShard.indexSettings().getRetentionLeaseMillis(), + size, + minimumRetainingSequenceNumbers, + currentTimeMillisSupplier, + primary); + } + + private void assertRetentionLeases( + final Collection retentionLeases, + final long retentionLeaseMillis, + final int size, + final long[] minimumRetainingSequenceNumbers, + final LongSupplier currentTimeMillisSupplier, + final boolean primary) { final Map idToRetentionLease = new HashMap<>(); for (final RetentionLease retentionLease : retentionLeases) { idToRetentionLease.put(retentionLease.id(), retentionLease); @@ -234,9 +277,7 @@ private void assertRetentionLeases( assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i])); if (primary) { // retention leases can be expired on replicas, so we can only assert on primaries here - assertThat( - currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(), - lessThanOrEqualTo(indexShard.indexSettings().getRetentionLeaseMillis())); + assertThat(currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(), lessThanOrEqualTo(retentionLeaseMillis)); } assertThat(retentionLease.source(), equalTo("test-" + i)); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 26bc4c964eec2..12a7fad466e29 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1385,8 +1385,13 @@ public void testMinimumCompatVersion() throws IOException { public void testShardStats() throws IOException { IndexShard shard = newStartedShard(); - ShardStats stats = new ShardStats(shard.routingEntry(), shard.shardPath(), - new CommonStats(new IndicesQueryCache(Settings.EMPTY), shard, new CommonStatsFlags()), shard.commitStats(), shard.seqNoStats()); + ShardStats stats = new ShardStats( + shard.routingEntry(), + shard.shardPath(), + new CommonStats(new IndicesQueryCache(Settings.EMPTY), shard, new CommonStatsFlags()), + shard.commitStats(), + shard.seqNoStats(), + shard.getRetentionLeaseStats()); assertEquals(shard.shardPath().getRootDataPath().toString(), stats.getDataPath()); assertEquals(shard.shardPath().getRootStatePath().toString(), stats.getStatePath()); assertEquals(shard.shardPath().isCustomDataPath(), stats.isCustomDataPath()); diff --git a/server/src/test/java/org/elasticsearch/rest/action/cat/RestIndicesActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/cat/RestIndicesActionTests.java index 13e94f7fe5368..83bb8b309a7cb 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/cat/RestIndicesActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/cat/RestIndicesActionTests.java @@ -163,7 +163,7 @@ private IndicesStatsResponse randomIndicesStatsResponse(final Index[] indices) { stats.get = new GetStats(); stats.flush = new FlushStats(); stats.warmer = new WarmerStats(); - shardStats.add(new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null)); + shardStats.add(new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null)); } } return IndicesStatsTests.newIndicesStatsResponse( diff --git a/server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java b/server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java new file mode 100644 index 0000000000000..ab178134995a7 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java @@ -0,0 +1,91 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.transport; + +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.ESTestCase; + +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.sameInstance; + +public class TransportRequestDeduplicatorTests extends ESTestCase { + + public void testRequestDeduplication() throws Exception { + AtomicInteger successCount = new AtomicInteger(); + AtomicInteger failureCount = new AtomicInteger(); + Exception failure = randomBoolean() ? new TransportException("simulated") : null; + final TransportRequest request = new TransportRequest() { + @Override + public void setParentTask(final TaskId taskId) { + } + }; + final TransportRequestDeduplicator deduplicator = new TransportRequestDeduplicator<>(); + final SetOnce> listenerHolder = new SetOnce<>(); + int iterationsPerThread = scaledRandomIntBetween(100, 1000); + Thread[] threads = new Thread[between(1, 4)]; + Phaser barrier = new Phaser(threads.length + 1); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + barrier.arriveAndAwaitAdvance(); + for (int n = 0; n < iterationsPerThread; n++) { + deduplicator.executeOnce(request, new ActionListener() { + @Override + public void onResponse(Void aVoid) { + successCount.incrementAndGet(); + } + + @Override + public void onFailure(Exception e) { + assertThat(e, sameInstance(failure)); + failureCount.incrementAndGet(); + } + }, (req, reqListener) -> listenerHolder.set(reqListener)); + } + }); + threads[i].start(); + } + barrier.arriveAndAwaitAdvance(); + for (Thread t : threads) { + t.join(); + } + final ActionListener listener = listenerHolder.get(); + assertThat(deduplicator.size(), equalTo(1)); + if (failure != null) { + listener.onFailure(failure); + } else { + listener.onResponse(null); + } + assertThat(deduplicator.size(), equalTo(0)); + assertBusy(() -> { + if (failure != null) { + assertThat(successCount.get(), equalTo(0)); + assertThat(failureCount.get(), equalTo(threads.length * iterationsPerThread)); + } else { + assertThat(successCount.get(), equalTo(threads.length * iterationsPerThread)); + assertThat(failureCount.get(), equalTo(0)); + } + }); + } + +} diff --git a/x-pack/docs/build.gradle b/x-pack/docs/build.gradle index de2400c0e85f0..27f815b1637f1 100644 --- a/x-pack/docs/build.gradle +++ b/x-pack/docs/build.gradle @@ -122,8 +122,7 @@ setups['my_inactive_watch'] = ''' "actions": { "test_index": { "index": { - "index": "test", - "doc_type": "test2" + "index": "test" } } } diff --git a/x-pack/docs/en/rest-api/watcher/ack-watch.asciidoc b/x-pack/docs/en/rest-api/watcher/ack-watch.asciidoc index a1704e9acc329..f1e281819eec3 100644 --- a/x-pack/docs/en/rest-api/watcher/ack-watch.asciidoc +++ b/x-pack/docs/en/rest-api/watcher/ack-watch.asciidoc @@ -68,8 +68,7 @@ PUT _watcher/watch/my_watch "test_index": { "throttle_period": "15m", "index": { - "index": "test", - "doc_type": "test2" + "index": "test" } } } diff --git a/x-pack/docs/en/rest-api/watcher/execute-watch.asciidoc b/x-pack/docs/en/rest-api/watcher/execute-watch.asciidoc index ca3a0de0af781..8c7f747969373 100644 --- a/x-pack/docs/en/rest-api/watcher/execute-watch.asciidoc +++ b/x-pack/docs/en/rest-api/watcher/execute-watch.asciidoc @@ -255,7 +255,7 @@ This is an example of the output: "index": { "response": { "index": "test", - "type": "test2", + "type": "_doc", "version": 1, "created": true, "result": "created", diff --git a/x-pack/docs/en/rest-api/watcher/get-watch.asciidoc b/x-pack/docs/en/rest-api/watcher/get-watch.asciidoc index 7d62b5c76c41d..87a13d0829f9b 100644 --- a/x-pack/docs/en/rest-api/watcher/get-watch.asciidoc +++ b/x-pack/docs/en/rest-api/watcher/get-watch.asciidoc @@ -81,8 +81,7 @@ Response: "actions": { "test_index": { "index": { - "index": "test", - "doc_type": "test2" + "index": "test" } } } diff --git a/x-pack/docs/en/watcher/actions/index.asciidoc b/x-pack/docs/en/watcher/actions/index.asciidoc index 8a31b150f22cb..34fdad7c50d0b 100644 --- a/x-pack/docs/en/watcher/actions/index.asciidoc +++ b/x-pack/docs/en/watcher/actions/index.asciidoc @@ -16,8 +16,7 @@ The following snippet shows a simple `index` action definition: "transform": { ... }, <3> "index" : { "index" : "my-index", <4> - "doc_type" : "my-type", <5> - "doc_id": "my-id" <6> + "doc_id": "my-id" <5> } } } @@ -27,8 +26,7 @@ The following snippet shows a simple `index` action definition: <2> An optional <> to restrict action execution <3> An optional <> to transform the payload and prepare the data that should be indexed <4> The elasticsearch index to store the data to -<5> The document type to store the data as -<6> An optional `_id` for the document, if it should always be the same document. +<5> An optional `_id` for the document, if it should always be the same document. [[index-action-attributes]] @@ -40,7 +38,6 @@ The following snippet shows a simple `index` action definition: | `index` | yes | - | The Elasticsearch index to index into. -| `doc_type` | yes | - | The type of the document the data will be indexed as. | `doc_id` | no | - | The optional `_id` of the document. @@ -75,5 +72,5 @@ When a `_doc` field exists, if the field holds an object, it is extracted and in as a single document. If the field holds an array of objects, each object is treated as a document and the index action indexes all of them in a bulk. -An `_index`, `_type` or `_id` value can be added per document to dynamically set the ID +An `_index`, or `_id` value can be added per document to dynamically set the ID of the indexed document. diff --git a/x-pack/docs/en/watcher/input/search.asciidoc b/x-pack/docs/en/watcher/input/search.asciidoc index 3b50b22081b60..d4548a159a640 100644 --- a/x-pack/docs/en/watcher/input/search.asciidoc +++ b/x-pack/docs/en/watcher/input/search.asciidoc @@ -24,7 +24,6 @@ documents from the `logs` index: "search" : { "request" : { "indices" : [ "logs" ], - "types" : [ "event" ], "body" : { "query" : { "match_all" : {}} } @@ -172,9 +171,6 @@ accurately. | `request.indices` | no | - | The indices to search. If omitted, all indices are searched, which is the default behaviour in Elasticsearch. -| `request.types` | no | - | The document types to search for. If omitted, all document types are are - searched, which is the default behaviour in Elasticsearch. - | `request.body` | no | - | The body of the request. The {ref}/search-request-body.html[request body] follows the same structure you normally send in the body of a REST `_search` request. The body can be static text or include `mustache` <>. diff --git a/x-pack/docs/en/watcher/transform/search.asciidoc b/x-pack/docs/en/watcher/transform/search.asciidoc index 56f9304d986ce..d7f468f183182 100644 --- a/x-pack/docs/en/watcher/transform/search.asciidoc +++ b/x-pack/docs/en/watcher/transform/search.asciidoc @@ -56,10 +56,6 @@ The following table lists all available settings for the search transform: | `request.indices` | no | all indices | One or more indices to search on. -| `request.types` | no | all types | One or more document types to search on (may be a - comma-delimited string or an array of document types - names) - | `request.body` | no | `match_all` query | The body of the request. The {ref}/search-request-body.html[request body] follows the same structure you normally send in the body of @@ -105,7 +101,6 @@ time of the watch: "search" : { "request" : { "indices" : [ "logstash-*" ], - "types" : [ "event" ], "body" : { "size" : 0, "query" : { @@ -145,7 +140,6 @@ The following is an example of using templates that refer to provided parameters "search" : { "request" : { "indices" : [ "logstash-*" ], - "types" : [ "event" ], "template" : { "source" : { "size" : 0, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CCRFeatureSetTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CCRFeatureSetTests.java index b95e8fc7c4008..f2fb475816ec2 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CCRFeatureSetTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CCRFeatureSetTests.java @@ -27,6 +27,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -36,7 +37,7 @@ public class CCRFeatureSetTests extends ESTestCase { private ClusterService clusterService; @Before - public void init() throws Exception { + public void init() { licenseState = mock(XPackLicenseState.class); clusterService = mock(ClusterService.class); } @@ -116,7 +117,11 @@ public void testUsageStats() throws Exception { assertThat(ccrUsage.available(), equalTo(ccrFeatureSet.available())); assertThat(ccrUsage.getNumberOfFollowerIndices(), equalTo(numFollowerIndices)); - assertThat(ccrUsage.getLastFollowTimeInMillis(), greaterThanOrEqualTo(0L)); + if (numFollowerIndices != 0) { + assertThat(ccrUsage.getLastFollowTimeInMillis(), greaterThanOrEqualTo(0L)); + } else { + assertThat(ccrUsage.getLastFollowTimeInMillis(), nullValue()); + } assertThat(ccrUsage.getNumberOfAutoFollowPatterns(), equalTo(numAutoFollowPatterns)); } diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java index 66b41d40943d0..be2eb529d86be 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java @@ -47,10 +47,10 @@ public void setUp() throws Exception { super.setUp(); indicesStats = Collections.singletonList(new IndexStats("index-0", "dcvO5uZATE-EhIKc3tk9Bg", new ShardStats[] { // Primaries - new ShardStats(mockShardRouting(true), mockShardPath(), mockCommonStats(), null, null), - new ShardStats(mockShardRouting(true), mockShardPath(), mockCommonStats(), null, null), + new ShardStats(mockShardRouting(true), mockShardPath(), mockCommonStats(), null, null, null), + new ShardStats(mockShardRouting(true), mockShardPath(), mockCommonStats(), null, null, null), // Replica - new ShardStats(mockShardRouting(false), mockShardPath(), mockCommonStats(), null, null) + new ShardStats(mockShardRouting(false), mockShardPath(), mockCommonStats(), null, null, null) })); }