From 7cbbcbf6aded9025770c54d419248f868f1a6967 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Mon, 19 Oct 2020 12:40:05 +0100 Subject: [PATCH] [ML] Fix job ID in C++ logs for normalize and memory estimation The changes of #54636 and #60395 were incorrect in their assertion that "the job ID passed to the process pipes is only used to make the file names unique". In fact it is also passed to the C++ log handler and gets logged with every message logged by the C++ processes. This PR splits the job ID and unique IDs (added in #54636 and #60395) so that the correct job ID is passed to the log handler. Nothing really bad happened as a result of this problem - it was just cosmetic. --- .../dataframe/process/NativeAnalyticsProcessFactory.java | 2 +- .../NativeMemoryUsageEstimationProcessFactory.java | 9 ++++----- .../autodetect/NativeAutodetectProcessFactory.java | 2 +- .../normalizer/NativeNormalizerProcessFactory.java | 8 ++++---- .../elasticsearch/xpack/ml/process/NativeController.java | 2 +- .../org/elasticsearch/xpack/ml/process/ProcessPipes.java | 5 ++++- .../xpack/ml/process/ProcessPipesTests.java | 6 +++--- 7 files changed, 18 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java index 1ffbe72e33bee..6f48646413851 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java @@ -73,7 +73,7 @@ public NativeAnalyticsProcess createAnalyticsProcess(DataFrameAnalyticsConfig co String jobId = config.getId(); List filesToDelete = new ArrayList<>(); ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, jobId, - false, true, true, hasState, config.getAnalysis().persistsState()); + null, false, true, true, hasState, config.getAnalysis().persistsState()); // The extra 2 are for the checksum and the control field int numberOfFields = analyticsProcessConfig.cols() + 2; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java index 4abce691d41f1..282aaf5275742 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java @@ -62,12 +62,11 @@ public NativeMemoryUsageEstimationProcess createAnalyticsProcess( ExecutorService executorService, Consumer onProcessCrash) { List filesToDelete = new ArrayList<>(); - // The config ID passed to the process pipes is only used to make the file names unique. Since memory estimation can be - // called many times in quick succession for the same config the config ID alone is not sufficient to guarantee that the - // memory estimation process pipe names are unique. Therefore an increasing counter value is appended to the config ID - // to ensure uniqueness between calls. + // Since memory estimation can be called many times in quick succession for the same config the config ID alone is not + // sufficient to guarantee that the memory estimation process pipe names are unique. Therefore an increasing counter + // value is passed as well as the config ID to ensure uniqueness between calls. ProcessPipes processPipes = new ProcessPipes( - env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, config.getId() + "_" + counter.incrementAndGet(), + env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, config.getId(), counter.incrementAndGet(), false, false, true, false, false); createNativeProcess(config.getId(), analyticsProcessConfig, filesToDelete, processPipes); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index 62858d798da56..f022d312cb487 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -77,7 +77,7 @@ public AutodetectProcess createAutodetectProcess(Job job, Consumer onProcessCrash) { List filesToDelete = new ArrayList<>(); ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, AutodetectBuilder.AUTODETECT, - job.getId(), false, true, true, params.modelSnapshot() != null, + job.getId(), null, false, true, true, params.modelSnapshot() != null, AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING.get(settings) == false); createNativeProcess(job, params, processPipes, filesToDelete); boolean includeTokensField = MachineLearning.CATEGORIZATION_TOKENIZATION_IN_JAVA diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java index 4e3c1b688ed9a..2de2110aa3b06 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java @@ -51,11 +51,11 @@ void setProcessConnectTimeout(TimeValue processConnectTimeout) { @Override public NormalizerProcess createNormalizerProcess(String jobId, String quantilesState, Integer bucketSpan, ExecutorService executorService) { - // The job ID passed to the process pipes is only used to make the file names unique. Since normalize can get run many times - // in quick succession for the same job the job ID alone is not sufficient to guarantee that the normalizer process pipe names - // are unique. Therefore an increasing counter value is appended to the job ID to ensure uniqueness between calls. + // Since normalize can get run many times in quick succession for the same job the job ID alone is not sufficient to + // guarantee that the normalizer process pipe names are unique. Therefore an increasing counter value is passed as + // well as the job ID to ensure uniqueness between calls. ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, NormalizerBuilder.NORMALIZE, - jobId + "_" + counter.incrementAndGet(), false, true, true, false, false); + jobId, counter.incrementAndGet(), false, true, true, false, false); createNativeProcess(jobId, quantilesState, processPipes, bucketSpan); NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, nativeController, processPipes); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java index 2af4d86998568..d456206d620c3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java @@ -69,7 +69,7 @@ public static NativeController makeNativeController(String localNodeName, Enviro NativeController(String localNodeName, Environment env, NamedPipeHelper namedPipeHelper, NamedXContentRegistry xContentRegistry) throws IOException { this.localNodeName = localNodeName; - ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, CONTROLLER_CONNECT_TIMEOUT, CONTROLLER, null, + ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, CONTROLLER_CONNECT_TIMEOUT, CONTROLLER, null, null, true, false, true, false, false); processPipes.connectLogStream(); this.cppLogHandler = processPipes.getLogStreamHandler(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessPipes.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessPipes.java index 6b385a1fbba5f..2b86f65e19f47 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessPipes.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessPipes.java @@ -76,7 +76,7 @@ public class ProcessPipes { * May be null or empty for processes not associated with a specific job. */ public ProcessPipes(Environment env, NamedPipeHelper namedPipeHelper, Duration timeout, String processName, String jobId, - boolean wantCommandPipe, boolean wantProcessInPipe, boolean wantProcessOutPipe, + Long uniqueId, boolean wantCommandPipe, boolean wantProcessInPipe, boolean wantProcessOutPipe, boolean wantRestorePipe, boolean wantPersistPipe) { this.namedPipeHelper = namedPipeHelper; this.jobId = jobId; @@ -91,6 +91,9 @@ public ProcessPipes(Environment env, NamedPipeHelper namedPipeHelper, Duration t if (!Strings.isNullOrEmpty(jobId)) { prefixBuilder.append(jobId).append('_'); } + if (uniqueId != null) { + prefixBuilder.append(uniqueId).append('_'); + } String prefix = prefixBuilder.toString(); String suffix = String.format(Locale.ROOT, "_%d", JvmInfo.jvmInfo().getPid()); logPipeName = String.format(Locale.ROOT, "%slog%s", prefix, suffix); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessPipesTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessPipesTests.java index 3d362c90eda4d..3ecf9da1daace 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessPipesTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessPipesTests.java @@ -62,7 +62,7 @@ public void testProcessPipes() throws Exception { int timeoutSeconds = randomIntBetween(5, 100); ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(timeoutSeconds), AutodetectBuilder.AUTODETECT, - "my_job", false, true, true, true, true); + "my_job", null, false, true, true, true, true); List command = new ArrayList<>(); processPipes.addArgs(command); @@ -110,7 +110,7 @@ public void testCloseUnusedPipes_notConnected() { Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); Environment env = TestEnvironment.newEnvironment(settings); - new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(2), AutodetectBuilder.AUTODETECT, "my_job", + new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(2), AutodetectBuilder.AUTODETECT, "my_job", null, true, true, true, true, true); } @@ -138,7 +138,7 @@ public void testCloseOpenedPipesOnError() throws IOException { Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); Environment env = TestEnvironment.newEnvironment(settings); ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(2), AutodetectBuilder.AUTODETECT, "my_job", - true, true, true, true, true); + null, true, true, true, true, true); processPipes.connectLogStream(); expectThrows(IOException.class, processPipes::connectOtherStreams);