Skip to content

Commit

Permalink
Merge remote-tracking branch 'elastic/main' into resolve-cluster/clus…
Browse files Browse the repository at this point in the history
…ter-info-only
  • Loading branch information
quux00 committed Jan 14, 2025
2 parents bd10f6e + f9b9007 commit 05284eb
Show file tree
Hide file tree
Showing 21 changed files with 788 additions and 112 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/119772.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 119772
summary: ESQL Support IN operator for Date nanos
area: ES|QL
type: enhancement
issues:
- 118578
5 changes: 5 additions & 0 deletions docs/changelog/120084.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120084
summary: Improve how reindex data stream index action handles api blocks
area: Data streams
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolStats;

Expand All @@ -49,10 +50,6 @@
* threads that wait on a phaser. This lets us verify that operations on system indices
* are being directed to other thread pools.</p>
*/
@TestLogging(
reason = "investigate",
value = "org.elasticsearch.kibana.KibanaThreadPoolIT:DEBUG,org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor:TRACE"
)
public class KibanaThreadPoolIT extends ESIntegTestCase {
private static final Logger logger = LogManager.getLogger(KibanaThreadPoolIT.class);

Expand All @@ -68,6 +65,8 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
.put("thread_pool.write.queue_size", 1)
.put("thread_pool.get.size", 1)
.put("thread_pool.get.queue_size", 1)
// a rejected GET may retry on an INITIALIZING shard (the target of a relocation) and unexpectedly succeed, so block rebalancing
.put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)
.build();
}

Expand Down Expand Up @@ -112,7 +111,12 @@ public void testKibanaThreadPoolByPassesBlockedThreadPools() throws Exception {
}

public void testBlockedThreadPoolsRejectUserRequests() throws Exception {
assertAcked(client().admin().indices().prepareCreate(USER_INDEX));
assertAcked(
client().admin()
.indices()
.prepareCreate(USER_INDEX)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)) // avoid retrying rejected actions
);

runWithBlockedThreadPools(this::assertThreadPoolsBlocked);

Expand Down
24 changes: 3 additions & 21 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ tests:
- class: org.elasticsearch.xpack.transform.integration.TransformIT
method: testStopWaitForCheckpoint
issue: https://github.com/elastic/elasticsearch/issues/106113
- class: org.elasticsearch.kibana.KibanaThreadPoolIT
method: testBlockedThreadPoolsRejectUserRequests
issue: https://github.com/elastic/elasticsearch/issues/113939
- class: org.elasticsearch.xpack.inference.TextEmbeddingCrudIT
method: testPutE5Small_withPlatformAgnosticVariant
issue: https://github.com/elastic/elasticsearch/issues/113983
Expand Down Expand Up @@ -230,26 +227,8 @@ tests:
- class: org.elasticsearch.xpack.inference.InferenceCrudIT
method: testGetServicesWithCompletionTaskType
issue: https://github.com/elastic/elasticsearch/issues/119959
- class: org.elasticsearch.lucene.RollingUpgradeSearchableSnapshotIndexCompatibilityIT
method: testSearchableSnapshotUpgrade {p0=[9.0.0, 8.18.0, 8.18.0]}
issue: https://github.com/elastic/elasticsearch/issues/119978
- class: org.elasticsearch.lucene.RollingUpgradeSearchableSnapshotIndexCompatibilityIT
method: testSearchableSnapshotUpgrade {p0=[9.0.0, 9.0.0, 8.18.0]}
issue: https://github.com/elastic/elasticsearch/issues/119979
- class: org.elasticsearch.lucene.RollingUpgradeSearchableSnapshotIndexCompatibilityIT
method: testMountSearchableSnapshot {p0=[9.0.0, 8.18.0, 8.18.0]}
issue: https://github.com/elastic/elasticsearch/issues/119550
- class: org.elasticsearch.lucene.RollingUpgradeSearchableSnapshotIndexCompatibilityIT
method: testMountSearchableSnapshot {p0=[9.0.0, 9.0.0, 8.18.0]}
issue: https://github.com/elastic/elasticsearch/issues/119980
- class: org.elasticsearch.multi_cluster.MultiClusterYamlTestSuiteIT
issue: https://github.com/elastic/elasticsearch/issues/119983
- class: org.elasticsearch.lucene.RollingUpgradeSearchableSnapshotIndexCompatibilityIT
method: testMountSearchableSnapshot {p0=[9.0.0, 9.0.0, 9.0.0]}
issue: https://github.com/elastic/elasticsearch/issues/119989
- class: org.elasticsearch.lucene.RollingUpgradeSearchableSnapshotIndexCompatibilityIT
method: testSearchableSnapshotUpgrade {p0=[9.0.0, 9.0.0, 9.0.0]}
issue: https://github.com/elastic/elasticsearch/issues/119990
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=transform/transforms_unattended/Test unattended put and start}
issue: https://github.com/elastic/elasticsearch/issues/120019
Expand All @@ -275,6 +254,9 @@ tests:
issue: https://github.com/elastic/elasticsearch/issues/120117
- class: org.elasticsearch.repositories.blobstore.testkit.analyze.MinioRepositoryAnalysisRestIT
issue: https://github.com/elastic/elasticsearch/issues/118548
- class: org.elasticsearch.xpack.security.QueryableReservedRolesIT
method: testConfiguredReservedRolesAfterClosingAndOpeningIndex
issue: https://github.com/elastic/elasticsearch/issues/120127

# Examples:
#
Expand Down
2 changes: 2 additions & 0 deletions rest-api-spec/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,6 @@ tasks.named("yamlRestCompatTestTransform").configure ({ task ->
task.skipTest("search.vectors/110_knn_query_with_filter/PRE_FILTER: pre-filter across multiple aliases", "waiting for #118774 backport")
task.skipTest("search.vectors/160_knn_query_missing_params/kNN search in a dis_max query - missing num_candidates", "waiting for #118774 backport")
task.skipTest("search.highlight/30_max_analyzed_offset/Plain highlighter with max_analyzed_offset < 0 should FAIL", "semantics of test has changed")
task.skipTest("indices.create/10_basic/Create lookup index", "default auto_expand_replicas was removed")
task.skipTest("indices.create/10_basic/Create lookup index with one shard", "default auto_expand_replicas was removed")
})
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@
index: test_lookup

- match: { test_lookup.settings.index.number_of_shards: "1"}
- match: { test_lookup.settings.index.auto_expand_replicas: "0-all"}

---
"Create lookup index with one shard":
Expand All @@ -196,7 +195,6 @@
index: test_lookup

- match: { test_lookup.settings.index.number_of_shards: "1"}
- match: { test_lookup.settings.index.auto_expand_replicas: "0-all"}

---
"Create lookup index with two shards":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void testBasic() {
assertAcked(client().admin().indices().execute(TransportCreateIndexAction.TYPE, createRequest));
Settings settings = client().admin().indices().prepareGetSettings("hosts").get().getIndexToSettings().get("hosts");
assertThat(settings.get("index.mode"), equalTo("lookup"));
assertThat(settings.get("index.auto_expand_replicas"), equalTo("0-all"));
assertNull(settings.get("index.auto_expand_replicas"));
Map<String, String> allHosts = Map.of(
"192.168.1.2",
"Windows",
Expand Down Expand Up @@ -141,7 +141,6 @@ public void testResizeLookupIndex() {
Settings settings = client().admin().indices().prepareGetSettings("lookup-2").get().getIndexToSettings().get("lookup-2");
assertThat(settings.get("index.mode"), equalTo("lookup"));
assertThat(settings.get("index.number_of_shards"), equalTo("1"));
assertThat(settings.get("index.auto_expand_replicas"), equalTo("0-all"));

ResizeRequest split = new ResizeRequest("lookup-3", "lookup-1");
split.setResizeType(ResizeType.SPLIT);
Expand Down
5 changes: 1 addition & 4 deletions server/src/main/java/org/elasticsearch/index/IndexMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -623,10 +623,7 @@ public Settings getAdditionalIndexSettings(
}
}
if (indexMode == LOOKUP) {
return Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-all")
.build();
return Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build();
} else {
return Settings.EMPTY;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,14 @@ final class TransportHandshaker {
* ignores the body of the request. After the handshake, the OutboundHandler uses the min(local,remote) protocol version for all later
* messages.
*
* This version supports two handshake protocols, v6080099 and v7170099, which respectively have the same message structure as the
* transport protocols of v6.8.0 and v7.17.0. This node only sends v7170099 requests, but it can send a valid response to any v6080099
* requests that it receives.
* This version supports three handshake protocols, v6080099, v7170099 and v8800000, which respectively have the same message structure
* as the transport protocols of v6.8.0, v7.17.0, and v8.18.0. This node only sends v7170099 requests, but it can send a valid response
* to any v6080099 or v8800000 requests that it receives.
*
* Note that these are not really TransportVersion constants as used elsewhere in ES, they're independent things that just happen to be
* stored in the same location in the message header and which roughly match the same ID numbering scheme. Older versions of ES did
* rely on them matching the real transport protocol (which itself matched the release version numbers), but these days that's no longer
* true.
*
* Here are some example messages, broken down to show their structure:
*
Expand Down Expand Up @@ -79,7 +84,7 @@ final class TransportHandshaker {
* c3 f9 eb 03 -- max acceptable protocol version (vInt: 00000011 11101011 11111001 11000011 == 8060099)
*
*
* ## v7170099 Request:
* ## v7170099 and v8800000 Requests:
*
* 45 53 -- 'ES' marker
* 00 00 00 31 -- total message length
Expand All @@ -98,7 +103,7 @@ final class TransportHandshaker {
* 04 -- payload length
* c3 f9 eb 03 -- max acceptable protocol version (vInt: 00000011 11101011 11111001 11000011 == 8060099)
*
* ## v7170099 Response:
* ## v7170099 and v8800000 Responses:
*
* 45 53 -- 'ES' marker
* 00 00 00 17 -- total message length
Expand All @@ -118,7 +123,12 @@ final class TransportHandshaker {

static final TransportVersion EARLIEST_HANDSHAKE_VERSION = TransportVersion.fromId(6080099);
static final TransportVersion REQUEST_HANDSHAKE_VERSION = TransportVersions.MINIMUM_COMPATIBLE;
static final Set<TransportVersion> ALLOWED_HANDSHAKE_VERSIONS = Set.of(EARLIEST_HANDSHAKE_VERSION, REQUEST_HANDSHAKE_VERSION);
static final TransportVersion V9_HANDSHAKE_VERSION = TransportVersion.fromId(8_800_00_0);
static final Set<TransportVersion> ALLOWED_HANDSHAKE_VERSIONS = Set.of(
EARLIEST_HANDSHAKE_VERSION,
REQUEST_HANDSHAKE_VERSION,
V9_HANDSHAKE_VERSION
);

static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake";
private final ConcurrentMap<Long, HandshakeResponseHandler> pendingHandshakes = new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import static org.elasticsearch.common.bytes.ReleasableBytesReferenceStreamInputTests.wrapAsReleasable;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.instanceOf;

Expand Down Expand Up @@ -182,7 +183,7 @@ public void testDecodePreHeaderSizeVariableInt() throws IOException {
}
}

public void testDecodeHandshakeCompatibility() throws IOException {
public void testDecodeHandshakeV7Compatibility() throws IOException {
String action = "test-request";
long requestId = randomNonNegativeLong();
final String headerKey = randomAlphaOfLength(10);
Expand Down Expand Up @@ -223,6 +224,55 @@ public void testDecodeHandshakeCompatibility() throws IOException {

}

public void testDecodeHandshakeV8Compatibility() throws IOException {
doHandshakeCompatibilityTest(TransportHandshaker.REQUEST_HANDSHAKE_VERSION, null);
doHandshakeCompatibilityTest(TransportHandshaker.REQUEST_HANDSHAKE_VERSION, Compression.Scheme.DEFLATE);
}

public void testDecodeHandshakeV9Compatibility() throws IOException {
doHandshakeCompatibilityTest(TransportHandshaker.V9_HANDSHAKE_VERSION, null);
doHandshakeCompatibilityTest(TransportHandshaker.V9_HANDSHAKE_VERSION, Compression.Scheme.DEFLATE);
}

private void doHandshakeCompatibilityTest(TransportVersion transportVersion, Compression.Scheme compressionScheme) throws IOException {
String action = "test-request";
long requestId = randomNonNegativeLong();
final String headerKey = randomAlphaOfLength(10);
final String headerValue = randomAlphaOfLength(20);
threadContext.putHeader(headerKey, headerValue);
OutboundMessage message = new OutboundMessage.Request(
threadContext,
new TestRequest(randomAlphaOfLength(100)),
transportVersion,
action,
requestId,
true,
compressionScheme
);

try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
final BytesReference bytes = message.serialize(os);
int totalHeaderSize = TcpHeader.headerSize(transportVersion);

InboundDecoder decoder = new InboundDecoder(recycler);
final ArrayList<Object> fragments = new ArrayList<>();
final ReleasableBytesReference releasable1 = wrapAsReleasable(bytes);
int bytesConsumed = decoder.decode(releasable1, fragments::add);
assertThat(bytesConsumed, greaterThan(totalHeaderSize));
assertTrue(releasable1.hasReferences());

final Header header = (Header) fragments.get(0);
assertEquals(requestId, header.getRequestId());
assertEquals(transportVersion, header.getVersion());
assertEquals(compressionScheme == Compression.Scheme.DEFLATE, header.isCompressed());
assertTrue(header.isHandshake());
assertTrue(header.isRequest());
assertFalse(header.needsToReadVariableHeader());
assertEquals(headerValue, header.getRequestHeaders().get(headerKey));
fragments.clear();
}
}

public void testClientChannelTypeFailsDecodingRequests() throws Exception {
String action = "test-request";
long requestId = randomNonNegativeLong();
Expand Down Expand Up @@ -488,23 +538,16 @@ public void testCheckVersionCompatibility() {
}

public void testCheckHandshakeCompatibility() {
try {
InboundDecoder.checkHandshakeVersionCompatibility(randomFrom(TransportHandshaker.ALLOWED_HANDSHAKE_VERSIONS));
} catch (IllegalStateException e) {
throw new AssertionError(e);
}
for (final var allowedHandshakeVersion : TransportHandshaker.ALLOWED_HANDSHAKE_VERSIONS) {
InboundDecoder.checkHandshakeVersionCompatibility(allowedHandshakeVersion); // should not throw

var invalid = TransportVersion.fromId(TransportHandshaker.EARLIEST_HANDSHAKE_VERSION.id() - 1);
try {
InboundDecoder.checkHandshakeVersionCompatibility(invalid);
fail();
} catch (IllegalStateException expected) {
var invalid = TransportVersion.fromId(allowedHandshakeVersion.id() + randomFrom(-1, +1));
assertEquals(
"Received message from unsupported version: ["
+ invalid
+ "] allowed versions are: "
+ TransportHandshaker.ALLOWED_HANDSHAKE_VERSIONS,
expected.getMessage()
expectThrows(IllegalStateException.class, () -> InboundDecoder.checkHandshakeVersionCompatibility(invalid)).getMessage()
);
}
}
Expand Down
27 changes: 21 additions & 6 deletions x-pack/plugin/esql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,9 @@ tasks.named('checkstyleMain').configure {
exclude { normalize(it.file.toString()).contains("src/main/generated") }
}

def prop(Type, type, TYPE, BYTES, Array) {
def prop(Name, Type, type, TYPE, BYTES, Array) {
return [
"Name" : Name,
"Type" : Type,
"type" : type,
"TYPE" : TYPE,
Expand All @@ -296,15 +297,19 @@ def prop(Type, type, TYPE, BYTES, Array) {
"double" : type == "double" ? "true" : "",
"BytesRef" : type == "BytesRef" ? "true" : "",
"boolean" : type == "boolean" ? "true" : "",
"nanosMillis" : Name == "NanosMillis" ? "true" : "",
"millisNanos" : Name == "MillisNanos" ? "true" : "",
]
}

tasks.named('stringTemplates').configure {
var intProperties = prop("Int", "int", "INT", "Integer.BYTES", "IntArray")
var longProperties = prop("Long", "long", "LONG", "Long.BYTES", "LongArray")
var doubleProperties = prop("Double", "double", "DOUBLE", "Double.BYTES", "DoubleArray")
var bytesRefProperties = prop("BytesRef", "BytesRef", "BYTES_REF", "org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_OBJECT_REF", "")
var booleanProperties = prop("Boolean", "boolean", "BOOLEAN", "Byte.BYTES", "BitArray")
var intProperties = prop("Int", "Int", "int", "INT", "Integer.BYTES", "IntArray")
var longProperties = prop("Long", "Long", "long", "LONG", "Long.BYTES", "LongArray")
var nanosMillisProperties = prop("NanosMillis", "Long", "long", "LONG", "Long.BYTES", "LongArray")
var millisNanosProperties = prop("MillisNanos", "Long", "long", "LONG", "Long.BYTES", "LongArray")
var doubleProperties = prop("Double", "Double", "double", "DOUBLE", "Double.BYTES", "DoubleArray")
var bytesRefProperties = prop("BytesRef", "BytesRef", "BytesRef", "BYTES_REF", "org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_OBJECT_REF", "")
var booleanProperties = prop("Boolean", "Boolean", "boolean", "BOOLEAN", "Byte.BYTES", "BitArray")

File inInputFile = file("src/main/java/org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/X-InEvaluator.java.st")
template {
Expand All @@ -322,6 +327,16 @@ tasks.named('stringTemplates').configure {
it.inputFile = inInputFile
it.outputFile = "org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/InLongEvaluator.java"
}
template {
it.properties = nanosMillisProperties
it.inputFile = inInputFile
it.outputFile = "org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/InNanosMillisEvaluator.java"
}
template {
it.properties = millisNanosProperties
it.inputFile = inInputFile
it.outputFile = "org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/InMillisNanosEvaluator.java"
}
template {
it.properties = doubleProperties
it.inputFile = inInputFile
Expand Down
Loading

0 comments on commit 05284eb

Please sign in to comment.