From 756cd5b8c1c16930c2eee81c7fcd144fd0cd3270 Mon Sep 17 00:00:00 2001 From: Slawomir Jaranowski Date: Wed, 27 Jul 2022 20:00:36 +0200 Subject: [PATCH 1/2] [MSHARED-1072] fix blocking in StreamFeeder If input stream has no more available data StreamFeeder was block forever --- .../cli/CommandLineTimeOutException.java | 14 ++- .../shared/utils/cli/CommandLineUtils.java | 87 ++------------ .../maven/shared/utils/cli/StreamFeeder.java | 106 +++++++----------- .../shared/utils/cli/StreamFeederTest.java | 91 +++++++++++++++ 4 files changed, 153 insertions(+), 145 deletions(-) create mode 100644 src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java diff --git a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineTimeOutException.java b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineTimeOutException.java index c1f82091..620fb3f8 100644 --- a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineTimeOutException.java +++ b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineTimeOutException.java @@ -20,16 +20,15 @@ */ /** + * Report a timeout for executing process. + * * @author Olivier Lamy - * + * */ public class CommandLineTimeOutException extends CommandLineException { - /** - * - */ private static final long serialVersionUID = 7322428741683224481L; /** @@ -41,4 +40,11 @@ public CommandLineTimeOutException( String message, Throwable cause ) super( message, cause ); } + /** + * @param message The message of the exception. + */ + public CommandLineTimeOutException( String message ) + { + super( message ); + } } diff --git a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java index 2b17245d..baaf869f 100644 --- a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java +++ b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Properties; import java.util.StringTokenizer; +import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -49,7 +50,7 @@ public static class StringStreamConsumer implements StreamConsumer { - private final StringBuffer string = new StringBuffer(); + private final StringBuilder string = new StringBuilder(); private static final String LS = System.getProperty( "line.separator", "\n" ); @@ -72,16 +73,6 @@ public String getOutput() } - /** - * Number of milliseconds per second. - */ - private static final long MILLIS_PER_SECOND = 1000L; - - /** - * Number of nanoseconds per second. - */ - private static final long NANOS_PER_SECOND = 1000000000L; - /** * @param cl The command line {@link Commandline} * @param systemOut {@link StreamConsumer} @@ -288,31 +279,13 @@ public Integer call() errorPumper.setName( "StreamPumper-systemErr" ); errorPumper.start(); - int returnValue; - if ( timeoutInSeconds <= 0 ) + if ( timeoutInSeconds > 0 && !p.waitFor( timeoutInSeconds, TimeUnit.SECONDS ) ) { - returnValue = p.waitFor(); + throw new CommandLineTimeOutException( + String.format( "Process timed out after %d seconds.", timeoutInSeconds ) ); } - else - { - final long now = System.nanoTime(); - final long timeout = now + NANOS_PER_SECOND * timeoutInSeconds; - while ( isAlive( p ) && ( System.nanoTime() < timeout ) ) - { - // The timeout is specified in seconds. Therefore we must not sleep longer than one second - // but we should sleep as long as possible to reduce the number of iterations performed. - Thread.sleep( MILLIS_PER_SECOND - 1L ); - } - if ( isAlive( p ) ) - { - throw new InterruptedException( String.format( "Process timed out after %d seconds.", - timeoutInSeconds ) ); - - } - - returnValue = p.exitValue(); - } + int returnValue = p.waitFor(); // TODO Find out if waitUntilDone needs to be called using a try-finally construct. The method may throw an // InterruptedException so that calls to waitUntilDone may be skipped. @@ -342,14 +315,9 @@ public Integer call() outputPumper.waitUntilDone(); errorPumper.waitUntilDone(); - if ( inputFeeder != null ) + if ( inputFeeder != null && inputFeeder.getException() != null ) { - inputFeeder.close(); - - if ( inputFeeder.getException() != null ) - { - throw new CommandLineException( "Failure processing stdin.", inputFeeder.getException() ); - } + throw new CommandLineException( "Failure processing stdin.", inputFeeder.getException() ); } if ( outputPumper.getException() != null ) @@ -366,16 +334,12 @@ public Integer call() } catch ( InterruptedException ex ) { + Thread.currentThread().interrupt(); throw new CommandLineTimeOutException( "Error while executing external command, process killed.", ex ); - } finally { - if ( inputFeeder != null ) - { - inputFeeder.disable(); - } if ( outputPumper != null ) { outputPumper.disable(); @@ -395,18 +359,7 @@ public Integer call() finally { ShutdownHookUtils.removeShutdownHook( processHook ); - - try - { - processHook.run(); - } - finally - { - if ( inputFeeder != null ) - { - inputFeeder.close(); - } - } + processHook.run(); } } } @@ -444,24 +397,6 @@ public static Properties getSystemEnvVars( boolean caseSensitive ) return ensureCaseSensitivity( envs, caseSensitive ); } - private static boolean isAlive( Process p ) - { - if ( p == null ) - { - return false; - } - - try - { - p.exitValue(); - return false; - } - catch ( IllegalThreadStateException e ) - { - return true; - } - } - /** * @param toProcess The command line to translate. * @return The array of translated parts. @@ -482,7 +417,7 @@ public static String[] translateCommandline( String toProcess ) throws CommandLi boolean inEscape = false; int state = normal; final StringTokenizer tok = new StringTokenizer( toProcess, "\"\' \\", true ); - List tokens = new ArrayList(); + List tokens = new ArrayList<>(); StringBuilder current = new StringBuilder(); while ( tok.hasMoreTokens() ) diff --git a/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java b/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java index 6f6723c4..18e15d34 100644 --- a/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java +++ b/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java @@ -22,7 +22,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.concurrent.atomic.AtomicReference; +import java.util.Objects; /** * Read from an InputStream and write the output to an OutputStream. @@ -30,86 +30,70 @@ * @author Trygve Laugstøl */ class StreamFeeder - extends AbstractStreamHandler + extends Thread { - private final AtomicReference input; + private final InputStream input; - private final AtomicReference output; + private final OutputStream output; - private volatile Throwable exception; + private Throwable exception; + private boolean done; + + private final Object lock = new Object(); /** * Create a new StreamFeeder * - * @param input Stream to read from + * @param input Stream to read from * @param output Stream to write to */ StreamFeeder( InputStream input, OutputStream output ) { - super(); - this.input = new AtomicReference( input ); - this.output = new AtomicReference( output ); + this.input = Objects.requireNonNull( input ); + this.output = Objects.requireNonNull( output ); + this.done = false; } @Override + @SuppressWarnings( "checkstyle:innerassignment" ) public void run() { try { - feed(); - } - catch ( Throwable e ) - { - // Catch everything so the streams will be closed and flagged as done. - if ( this.exception != null ) + for ( int data; !isInterrupted() && ( data = input.read() ) != -1; ) { - this.exception = e; + output.write( data ); } + output.flush(); + } + catch ( IOException e ) + { + exception = e; } finally { close(); + } - synchronized ( this ) - { - notifyAll(); - } + synchronized ( lock ) + { + done = true; + lock.notifyAll(); } } - public void close() + private void close() { - setDone(); - final InputStream is = input.getAndSet( null ); - if ( is != null ) + try { - try - { - is.close(); - } - catch ( IOException ex ) - { - if ( this.exception != null ) - { - this.exception = ex; - } - } + output.close(); } - - final OutputStream os = output.getAndSet( null ); - if ( os != null ) + catch ( IOException e ) { - try - { - os.close(); - } - catch ( IOException ex ) + if ( exception == null ) { - if ( this.exception != null ) - { - this.exception = ex; - } + exception = e; } } } @@ -122,30 +106,22 @@ public Throwable getException() return this.exception; } - @SuppressWarnings( "checkstyle:innerassignment" ) - private void feed() - throws IOException + public void waitUntilDone() { - InputStream is = input.get(); - OutputStream os = output.get(); - boolean flush = false; - - if ( is != null && os != null ) + interrupt(); + synchronized ( lock ) { - for ( int data; !isDone() && ( data = is.read() ) != -1; ) + while ( !done ) { - if ( !isDisabled() ) + try { - os.write( data ); - flush = true; + lock.wait(); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); } - } - - if ( flush ) - { - os.flush(); } } } - } diff --git a/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java b/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java new file mode 100644 index 00000000..c0a015ca --- /dev/null +++ b/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java @@ -0,0 +1,91 @@ +package org.apache.maven.shared.utils.cli; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class StreamFeederTest +{ + static class BlockingInputStream extends ByteArrayInputStream + { + boolean endStream = false; + final Object lock = new Object(); + + public BlockingInputStream( byte[] buf ) + { + super( buf ); + } + + @Override + public synchronized int read() + { + int data = super.read(); + if ( data >= 0 ) + { + return data; + } + + // end test data ... block + endStream = true; + + try + { + wait(); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + } + return -1; + } + + public synchronized void waitForEndStream() throws InterruptedException + { + while ( !endStream ) + { + wait( 100 ); + } + } + } + + @Test + public void waitUntilFeederDone() throws InterruptedException + { + + BlockingInputStream inputStream = new BlockingInputStream( "TestData".getBytes() ); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + StreamFeeder streamFeeder = new StreamFeeder( inputStream, outputStream ); + + streamFeeder.start(); + + // wait until input stream will be in block mode + inputStream.waitForEndStream(); + + streamFeeder.waitUntilDone(); // wait until process finish + + assertEquals( "TestData", outputStream.toString() ); + } +} From a0bde683c24a8cf6b925eb117973ccb54e8cf292 Mon Sep 17 00:00:00 2001 From: Slawomir Jaranowski Date: Fri, 21 Apr 2023 19:54:40 +0200 Subject: [PATCH 2/2] Interrupt Feeder thread not current thread --- .../maven/shared/utils/cli/StreamFeeder.java | 2 +- .../shared/utils/cli/StreamFeederTest.java | 23 +++++++------------ 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java b/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java index f3b595ef..f26db634 100644 --- a/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java +++ b/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java @@ -89,7 +89,7 @@ public Throwable getException() { } public void waitUntilDone() { - interrupt(); + this.interrupt(); synchronized (lock) { while (!done) { try { diff --git a/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java b/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java index fb17acb9..8decce03 100644 --- a/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java +++ b/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java @@ -46,9 +46,6 @@ public class StreamFeederTest { static class BlockingInputStream extends ByteArrayInputStream { - boolean endStream = false; - final Object lock = new Object(); - public BlockingInputStream(byte[] buf) { super(buf); } @@ -61,8 +58,6 @@ public synchronized int read() { } // end test data ... block - endStream = true; - try { wait(); } catch (InterruptedException e) { @@ -70,29 +65,27 @@ public synchronized int read() { } return -1; } - - public synchronized void waitForEndStream() throws InterruptedException { - while (!endStream) { - wait(100); - } - } } @Test public void waitUntilFeederDone() throws InterruptedException { - BlockingInputStream inputStream = new BlockingInputStream("TestData".getBytes()); + String TEST_DATA = "TestData"; + + BlockingInputStream inputStream = new BlockingInputStream(TEST_DATA.getBytes()); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); StreamFeeder streamFeeder = new StreamFeeder(inputStream, outputStream); streamFeeder.start(); - // wait until input stream will be in block mode - inputStream.waitForEndStream(); + // wait until all data from steam will be read + while (outputStream.size() < TEST_DATA.length()) { + Thread.sleep(10); + } streamFeeder.waitUntilDone(); // wait until process finish - assertEquals("TestData", outputStream.toString()); + assertEquals(TEST_DATA, outputStream.toString()); } }