Skip to content

Commit

Permalink
Merge branch 'master' into mandatory-meta-plugins
Browse files Browse the repository at this point in the history
* master:
  Enable selecting adaptive selection stats
  Remove leftover mention of file-based scripts
  Fix threading issue on listener notification (elastic#28730)
  Revisit deletion policy after release the last snapshot (elastic#28627)
  Remove unused method
  Track deletes only in the tombstone map instead of maintaining as copy (elastic#27868)
  [Docs] Correct typo in README.textile (elastic#28716)
  Fix AdaptiveSelectionStats serialization bug (elastic#28718)
  TEST: Fix InternalEngine#testAcquireIndexCommit
  Add note on temporary directory for Windows service
  Added coming annotation and breaking changes link to release notes script
  Remove leftover PR link for previously disabled bwc tests
  Separate acquiring safe commit and last commit (elastic#28271)
  Fix BWC issue of the translog last modified age stats
  • Loading branch information
jasontedor committed Feb 19, 2018
2 parents 368f5f7 + 105dcb5 commit ba4d0a5
Show file tree
Hide file tree
Showing 26 changed files with 420 additions and 186 deletions.
2 changes: 1 addition & 1 deletion README.textile
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ h3. Distributed, Highly Available

Let's face it, things will fail....

Elasticsearch is a highly available and distributed search engine. Each index is broken down into shards, and each shard can have one or more replica. By default, an index is created with 5 shards and 1 replica per shard (5/1). There are many topologies that can be used, including 1/10 (improve search performance), or 20/1 (improve indexing performance, with search executed in a map reduce fashion across shards).
Elasticsearch is a highly available and distributed search engine. Each index is broken down into shards, and each shard can have one or more replicas. By default, an index is created with 5 shards and 1 replica per shard (5/1). There are many topologies that can be used, including 1/10 (improve search performance), or 20/1 (improve indexing performance, with search executed in a map reduce fashion across shards).

In order to play with the distributed nature of Elasticsearch, simply bring more nodes up and shut down nodes. The system will continue to serve requests (make sure you use the correct http port) with the latest data indexed.

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ task verifyVersions {
* after the backport of the backcompat code is complete.
*/
final boolean bwc_tests_enabled = true
final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/28673" /* place a PR link here when commiting bwc changes */
final String bwc_tests_disabled_issue = "" /* place a PR link here when commiting bwc changes */
if (bwc_tests_enabled == false) {
if (bwc_tests_disabled_issue.isEmpty()) {
throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false")
Expand Down
21 changes: 14 additions & 7 deletions dev-tools/es_release_notes.pl
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@
">breaking", ">breaking-java", ">deprecation", ">feature",
">enhancement", ">bug", ">regression", ">upgrade"
);
my %Ignore = map { $_ => 1 } (
">non-issue", ">refactoring", ">docs", ">test", ":Core/Build"
);
my %Ignore = map { $_ => 1 }
( ">non-issue", ">refactoring", ">docs", ">test", ":Core/Build" );

my %Group_Labels = (
'>breaking' => 'Breaking changes',
Expand Down Expand Up @@ -70,6 +69,9 @@ sub dump_issues {
my $issues = shift;

$version =~ s/v//;
my $branch = $version;
$branch =~ s/\.\d+$//;

my ( $day, $month, $year ) = (gmtime)[ 3 .. 5 ];
$month++;
$year += 1900;
Expand All @@ -81,12 +83,16 @@ sub dump_issues {
[[release-notes-$version]]
== $version Release Notes
coming[$version]
Also see <<breaking-changes-$branch>>.
ASCIIDOC

for my $group ( @Groups, 'other' ) {
my $group_issues = $issues->{$group} or next;
my $group_id = $group;
$group_id=~s/^>//;
$group_id =~ s/^>//;
print "[[$group_id-$version]]\n"
. "[float]\n"
. "=== $Group_Labels{$group}\n\n";
Expand Down Expand Up @@ -162,14 +168,15 @@ sub fetch_issues {
for my $issue (@issues) {
next if $seen{ $issue->{number} } && !$issue->{pull_request};

for (@{ $issue->{labels} }) {
next ISSUE if $Ignore{$_->{name}};
for ( @{ $issue->{labels} } ) {
next ISSUE if $Ignore{ $_->{name} };
}

# uncomment for including/excluding PRs already issued in other versions
# next if grep {$_->{name}=~/^v2/} @{$issue->{labels}};
my %labels = map { $_->{name} => 1 } @{ $issue->{labels} };
my ($header) = map { /:.+\/(.+)/ && $1 } grep {/^:/} sort keys %labels;
my ($header) = map { m{:[^/]+/(.+)} && $1 }
grep {/^:/} sort keys %labels;
$header ||= 'NOT CLASSIFIED';
for (@Groups) {
if ( $labels{$_} ) {
Expand Down
42 changes: 37 additions & 5 deletions docs/reference/cluster/nodes-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ of `indices`, `os`, `process`, `jvm`, `transport`, `http`,
Statistics about the discovery

`ingest`::
Statistics about ingest preprocessing
Statistics about ingest preprocessing

`adaptive_selection`::
Statistics about <<search-adaptive-replica,adaptive replica selection>>. See
<<adaptive-selection-stats,adaptive selection statistics>>.

[source,js]
--------------------------------------------------
Expand Down Expand Up @@ -370,15 +374,43 @@ GET /_nodes/stats/indices?groups=foo,bar
The `ingest` flag can be set to retrieve statistics that concern ingest:

`ingest.total.count`::
The total number of document ingested during the lifetime of this node
The total number of document ingested during the lifetime of this node

`ingest.total.time_in_millis`::
The total time spent on ingest preprocessing documents during the lifetime of this node
The total time spent on ingest preprocessing documents during the lifetime of this node

`ingest.total.current`::
The total number of documents currently being ingested.
The total number of documents currently being ingested.

`ingest.total.failed`::
The total number ingest preprocessing operations failed during the lifetime of this node
The total number ingest preprocessing operations failed during the lifetime of this node

On top of these overall ingest statistics, these statistics are also provided on a per pipeline basis.

[float]
[[adaptive-selection-stats]]
=== Adaptive selection statistics

The `adaptive_selection` flag can be set to retrieve statistics that concern
<<search-adaptive-replica,adaptive replica selection>>. These statistics are
keyed by node. For each node:

`adaptive_selection.outgoing_searches`::
The number of outstanding search requests from the node these stats are for to
the keyed node.

`avg_queue_size`::
The exponentially weighted moving average queue size of search requests on the
keyed node.

`avg_service_time_ns`::
The exponentially weighted moving average service time of search requests on
the keyed node.

`avg_response_time_ns`::
The exponentially weighted moving average response time of search requests on
the keyed node.

`rank`::
The rank of this node; used for shard selection when routing search requests.

13 changes: 8 additions & 5 deletions docs/reference/setup/install/zip-windows.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,14 @@ command line, or when installing Elasticsearch as a service for the
first time. To adjust the heap size for an already installed service,
use the service manager: `bin\elasticsearch-service.bat manager`.

NOTE: The service automatically configures a private temporary directory for use
by Elasticsearch when it is running. This private temporary directory is
configured as a sub-directory of the private temporary directory for the user
running the installation. If the service will run under a different user, you
can configure the location of the temporary directory that the service should
use by setting the environment variable `ES_TMPDIR` to the preferred location
before you execute the service installation.

Using the Manager GUI::

It is also possible to configure the service after it's been installed using the manager GUI (`elasticsearch-service-mgr.exe`), which offers insight into the installed service, including its status, startup type, JVM, start and stop settings amongst other things. Simply invoking `elasticsearch-service.bat manager` from the command-line will open up the manager window:
Expand Down Expand Up @@ -258,11 +266,6 @@ directory so that you do not delete important data later on.
d| Not configured
| path.repo

| script
| Location of script files.
| %ES_HOME%\scripts
| path.scripts

|=======================================================================

include::next-steps.asciidoc[]
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void queueWrite(WriteOperation writeOperation) {
if (isOpen() == false) {
boolean wasRemoved = queuedWrites.remove(writeOperation);
if (wasRemoved) {
executeFailedListener(writeOperation.getListener(), new ClosedSelectorException());
writeOperation.getListener().accept(null, new ClosedSelectorException());
}
} else {
wakeup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ setup:
# non empty generation with one op may be smaller or larger than that.
# - gt: { indices.test.primaries.translog.uncommitted_size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.uncommitted_operations: 1 }
- gte: { indices.test.primaries.translog.earliest_last_modified_age: 0 }

- do:
indices.flush:
Expand All @@ -47,7 +46,6 @@ setup:
## creation translog size has some overhead due to an initial empty generation that will be trimmed later
- lt: { indices.test.primaries.translog.uncommitted_size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.uncommitted_operations: 0 }
- gte: { indices.test.primaries.translog.earliest_last_modified_age: 0 }

- do:
indices.put_settings:
Expand All @@ -69,4 +67,20 @@ setup:
- match: { indices.test.primaries.translog.operations: 0 }
- lte: { indices.test.primaries.translog.uncommitted_size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.uncommitted_operations: 0 }

---
"Translog last modified age stats":
- skip:
version: " - 6.2.99"
reason: translog last modified age stats was added in 6.3.0
- do:
index:
index: test
type: bar
id: 1
body: { "foo": "bar" }

- do:
indices.stats:
metric: [ translog ]
- gte: { indices.test.primaries.translog.earliest_last_modified_age: 0 }
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,14 @@ public NodesStatsRequestBuilder setDiscovery(boolean discovery) {
/**
* Should ingest statistics be returned.
*/
public NodesStatsRequestBuilder ingest(boolean ingest) {
public NodesStatsRequestBuilder setIngest(boolean ingest) {
request.ingest(ingest);
return this;
}

public NodesStatsRequestBuilder setAdaptiveSelection(boolean adaptiveSelection) {
request.adaptiveSelection(adaptiveSelection);
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,6 @@ public SearchTransportService(Settings settings, TransportService transportServi
this.responseWrapper = responseWrapper;
}

public Map<String, Long> getClientConnections() {
return Collections.unmodifiableMap(clientConnections);
}

public void sendFreeContext(Transport.Connection connection, final long contextId, OriginalIndices originalIndices) {
transportService.sendRequest(connection, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(originalIndices, contextId),
TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(new ActionListener<SearchFreeContextResponse>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,10 @@ synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) {

/**
* Releases an index commit that acquired by {@link #acquireIndexCommit(boolean)}.
*
* @return true if the snapshotting commit can be clean up.
*/
synchronized void releaseCommit(final IndexCommit snapshotCommit) {
synchronized boolean releaseCommit(final IndexCommit snapshotCommit) {
final IndexCommit releasingCommit = ((SnapshotIndexCommit) snapshotCommit).delegate;
assert snapshottedCommits.containsKey(releasingCommit) : "Release non-snapshotted commit;" +
"snapshotted commits [" + snapshottedCommits + "], releasing commit [" + releasingCommit + "]";
Expand All @@ -178,6 +180,8 @@ synchronized void releaseCommit(final IndexCommit snapshotCommit) {
if (refCount == 0) {
snapshottedCommits.remove(releasingCommit);
}
// The commit can be clean up only if no pending snapshot and it is neither the safe commit nor last commit.
return refCount == 0 && releasingCommit.equals(safeCommit) == false && releasingCommit.equals(lastCommit) == false;
}

/**
Expand Down
15 changes: 12 additions & 3 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -868,13 +868,17 @@ public void forceMerge(boolean flush) throws IOException {
public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException, IOException;

/**
* Snapshots the index and returns a handle to it. If needed will try and "commit" the
* Snapshots the most recent index and returns a handle to it. If needed will try and "commit" the
* lucene index to make sure we have a "fresh" copy of the files to snapshot.
*
* @param safeCommit indicates whether the engine should acquire the most recent safe commit, or the most recent commit.
* @param flushFirst indicates whether the engine should flush before returning the snapshot
*/
public abstract IndexCommitRef acquireIndexCommit(boolean safeCommit, boolean flushFirst) throws EngineException;
public abstract IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException;

/**
* Snapshots the most recent safe index commit from the engine.
*/
public abstract IndexCommitRef acquireSafeIndexCommit() throws EngineException;

/**
* fail engine due to some error. the engine will also be closed.
Expand Down Expand Up @@ -1537,4 +1541,9 @@ public interface Warmer {
public boolean isRecovering() {
return false;
}

/**
* Tries to prune buffered deletes from the version map.
*/
public abstract void maybePruneDeletes();
}
Original file line number Diff line number Diff line change
Expand Up @@ -1182,7 +1182,7 @@ public DeleteResult delete(Delete delete) throws IOException {
}
throw e;
}
maybePruneDeletedTombstones();
maybePruneDeletes();
return deleteResult;
}

Expand Down Expand Up @@ -1311,7 +1311,8 @@ public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted, lo
}
}

private void maybePruneDeletedTombstones() {
@Override
public void maybePruneDeletes() {
// It's expensive to prune because we walk the deletes map acquiring dirtyLock for each uid so we only do it
// every 1/4 of gcDeletesInMillis:
if (engineConfig.isEnableGcDeletes() && engineConfig.getThreadPool().relativeTimeInMillis() - lastDeleteVersionPruneTimeMSec > getGcDeletesInMillis() * 0.25) {
Expand Down Expand Up @@ -1401,7 +1402,7 @@ final void refresh(String source, SearcherScope scope) throws EngineException {
// TODO: maybe we should just put a scheduled job in threadPool?
// We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes
// for a long time:
maybePruneDeletedTombstones();
maybePruneDeletes();
mergeScheduler.refreshConfig();
}

Expand Down Expand Up @@ -1621,32 +1622,15 @@ public void trimTranslog() throws EngineException {
}

private void pruneDeletedTombstones() {
long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis();

// TODO: not good that we reach into LiveVersionMap here; can we move this inside VersionMap instead? problem is the dirtyLock...

// we only need to prune the deletes map; the current/old version maps are cleared on refresh:
for (Map.Entry<BytesRef, DeleteVersionValue> entry : versionMap.getAllTombstones()) {
BytesRef uid = entry.getKey();
try (Releasable ignored = versionMap.acquireLock(uid)) {
// can we do it without this lock on each value? maybe batch to a set and get the lock once per set?

// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
DeleteVersionValue versionValue = versionMap.getTombstoneUnderLock(uid);
if (versionValue != null) {
if (timeMSec - versionValue.time > getGcDeletesInMillis()) {
versionMap.removeTombstoneUnderLock(uid);
}
}
}
}

final long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
versionMap.pruneTombstones(timeMSec, engineConfig.getIndexSettings().getGcDeletesInMillis());
lastDeleteVersionPruneTimeMSec = timeMSec;
}

// testing
void clearDeletedTombstones() {
versionMap.clearTombstones();
// clean with current time Long.MAX_VALUE and interval 0 since we use a greater than relationship here.
versionMap.pruneTombstones(Long.MAX_VALUE, 0);
}

@Override
Expand Down Expand Up @@ -1719,16 +1703,30 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu
}

@Override
public IndexCommitRef acquireIndexCommit(final boolean safeCommit, final boolean flushFirst) throws EngineException {
public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws EngineException {
// we have to flush outside of the readlock otherwise we might have a problem upgrading
// the to a write lock when we fail the engine in this operation
if (flushFirst) {
logger.trace("start flush for snapshot");
flush(false, true);
logger.trace("finish flush for snapshot");
}
final IndexCommit snapshotCommit = combinedDeletionPolicy.acquireIndexCommit(safeCommit);
return new Engine.IndexCommitRef(snapshotCommit, () -> combinedDeletionPolicy.releaseCommit(snapshotCommit));
final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false);
return new Engine.IndexCommitRef(lastCommit, () -> releaseIndexCommit(lastCommit));
}

@Override
public IndexCommitRef acquireSafeIndexCommit() throws EngineException {
final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true);
return new Engine.IndexCommitRef(safeCommit, () -> releaseIndexCommit(safeCommit));
}

private void releaseIndexCommit(IndexCommit snapshot) throws IOException {
// Revisit the deletion policy if we can clean up the snapshotting commit.
if (combinedDeletionPolicy.releaseCommit(snapshot)) {
ensureOpen();
indexWriter.deleteUnusedFiles();
}
}

private boolean failOnTragicEvent(AlreadyClosedException ex) {
Expand Down Expand Up @@ -2175,7 +2173,7 @@ private void ensureCanFlush() {
public void onSettingsChanged() {
mergeScheduler.refreshConfig();
// config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed:
maybePruneDeletedTombstones();
maybePruneDeletes();
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
// this is an anti-viral settings you can only opt out for the entire index
// only if a shard starts up again due to relocation or if the index is closed
Expand Down
Loading

0 comments on commit ba4d0a5

Please sign in to comment.