Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Fix job ID in C++ logs for normalize and memory estimation #63874

Merged
merged 1 commit into from
Oct 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public NativeAnalyticsProcess createAnalyticsProcess(DataFrameAnalyticsConfig co
String jobId = config.getId();
List<Path> filesToDelete = new ArrayList<>();
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, jobId,
false, true, true, hasState, config.getAnalysis().persistsState());
null, false, true, true, hasState, config.getAnalysis().persistsState());

// The extra 2 are for the checksum and the control field
int numberOfFields = analyticsProcessConfig.cols() + 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,11 @@ public NativeMemoryUsageEstimationProcess createAnalyticsProcess(
ExecutorService executorService,
Consumer<String> onProcessCrash) {
List<Path> filesToDelete = new ArrayList<>();
// The config ID passed to the process pipes is only used to make the file names unique. Since memory estimation can be
// called many times in quick succession for the same config the config ID alone is not sufficient to guarantee that the
// memory estimation process pipe names are unique. Therefore an increasing counter value is appended to the config ID
// to ensure uniqueness between calls.
// Since memory estimation can be called many times in quick succession for the same config the config ID alone is not
// sufficient to guarantee that the memory estimation process pipe names are unique. Therefore an increasing counter
// value is passed as well as the config ID to ensure uniqueness between calls.
ProcessPipes processPipes = new ProcessPipes(
env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, config.getId() + "_" + counter.incrementAndGet(),
env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, config.getId(), counter.incrementAndGet(),
false, false, true, false, false);

createNativeProcess(config.getId(), analyticsProcessConfig, filesToDelete, processPipes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public AutodetectProcess createAutodetectProcess(Job job,
Consumer<String> onProcessCrash) {
List<Path> filesToDelete = new ArrayList<>();
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, AutodetectBuilder.AUTODETECT,
job.getId(), false, true, true, params.modelSnapshot() != null,
job.getId(), null, false, true, true, params.modelSnapshot() != null,
AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING.get(settings) == false);
createNativeProcess(job, params, processPipes, filesToDelete);
boolean includeTokensField = MachineLearning.CATEGORIZATION_TOKENIZATION_IN_JAVA
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ void setProcessConnectTimeout(TimeValue processConnectTimeout) {
@Override
public NormalizerProcess createNormalizerProcess(String jobId, String quantilesState, Integer bucketSpan,
ExecutorService executorService) {
// The job ID passed to the process pipes is only used to make the file names unique. Since normalize can get run many times
// in quick succession for the same job the job ID alone is not sufficient to guarantee that the normalizer process pipe names
// are unique. Therefore an increasing counter value is appended to the job ID to ensure uniqueness between calls.
// Since normalize can get run many times in quick succession for the same job the job ID alone is not sufficient to
// guarantee that the normalizer process pipe names are unique. Therefore an increasing counter value is passed as
// well as the job ID to ensure uniqueness between calls.
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, NormalizerBuilder.NORMALIZE,
jobId + "_" + counter.incrementAndGet(), false, true, true, false, false);
jobId, counter.incrementAndGet(), false, true, true, false, false);
createNativeProcess(jobId, quantilesState, processPipes, bucketSpan);

NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, nativeController, processPipes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static NativeController makeNativeController(String localNodeName, Enviro
NativeController(String localNodeName, Environment env, NamedPipeHelper namedPipeHelper, NamedXContentRegistry xContentRegistry)
throws IOException {
this.localNodeName = localNodeName;
ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, CONTROLLER_CONNECT_TIMEOUT, CONTROLLER, null,
ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, CONTROLLER_CONNECT_TIMEOUT, CONTROLLER, null, null,
true, false, true, false, false);
processPipes.connectLogStream();
this.cppLogHandler = processPipes.getLogStreamHandler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class ProcessPipes {
* May be null or empty for processes not associated with a specific job.
*/
public ProcessPipes(Environment env, NamedPipeHelper namedPipeHelper, Duration timeout, String processName, String jobId,
boolean wantCommandPipe, boolean wantProcessInPipe, boolean wantProcessOutPipe,
Long uniqueId, boolean wantCommandPipe, boolean wantProcessInPipe, boolean wantProcessOutPipe,
boolean wantRestorePipe, boolean wantPersistPipe) {
this.namedPipeHelper = namedPipeHelper;
this.jobId = jobId;
Expand All @@ -91,6 +91,9 @@ public ProcessPipes(Environment env, NamedPipeHelper namedPipeHelper, Duration t
if (!Strings.isNullOrEmpty(jobId)) {
prefixBuilder.append(jobId).append('_');
}
if (uniqueId != null) {
prefixBuilder.append(uniqueId).append('_');
}
String prefix = prefixBuilder.toString();
String suffix = String.format(Locale.ROOT, "_%d", JvmInfo.jvmInfo().getPid());
logPipeName = String.format(Locale.ROOT, "%slog%s", prefix, suffix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testProcessPipes() throws Exception {

int timeoutSeconds = randomIntBetween(5, 100);
ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(timeoutSeconds), AutodetectBuilder.AUTODETECT,
"my_job", false, true, true, true, true);
"my_job", null, false, true, true, true, true);

List<String> command = new ArrayList<>();
processPipes.addArgs(command);
Expand Down Expand Up @@ -110,7 +110,7 @@ public void testCloseUnusedPipes_notConnected() {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
Environment env = TestEnvironment.newEnvironment(settings);

new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(2), AutodetectBuilder.AUTODETECT, "my_job",
new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(2), AutodetectBuilder.AUTODETECT, "my_job", null,
true, true, true, true, true);
}

Expand Down Expand Up @@ -138,7 +138,7 @@ public void testCloseOpenedPipesOnError() throws IOException {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
Environment env = TestEnvironment.newEnvironment(settings);
ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(2), AutodetectBuilder.AUTODETECT, "my_job",
true, true, true, true, true);
null, true, true, true, true, true);

processPipes.connectLogStream();
expectThrows(IOException.class, processPipes::connectOtherStreams);
Expand Down