From 33fc6884cbd16318cf87ab10f4f705763592489c Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 12 Aug 2024 17:59:34 -0700 Subject: [PATCH 01/33] . --- .../src/mill/main/client/ServerFiles.java | 21 ++++++++++++------- .../src/mill/main/client/lock/Locks.java | 20 ++++++++++++++++++ 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/main/client/src/mill/main/client/ServerFiles.java b/main/client/src/mill/main/client/ServerFiles.java index b99e9a13b42..fb41e3fa533 100644 --- a/main/client/src/mill/main/client/ServerFiles.java +++ b/main/client/src/mill/main/client/ServerFiles.java @@ -8,18 +8,25 @@ public class ServerFiles { final public static String sandbox = "sandbox"; /** - * Lock file used to ensure a single server is running in a particular - * mill-worker folder. + * Lock file ensuring a single server is running in a particular mill-worker + * folder. If multiple servers are spawned in the same folder, only one takes + * the lock and the others fail to do so and terminate immediately. */ final public static String processLock = "processLock"; - - final public static String clientLock = "clientLock"; - - + /** + * Lock file indicating a server is currently waiting for or processing + * a command. Used by the client to detect when the server has completed + * processing. + */ final public static String serverLock = "serverLock"; - + /** + * Ensures only a single client is manipulating each mill-worker folder at + * a time, either spawning the server or submitting a command. Also used by + * the server to detect when a client disconnects, so it can terminate execution + */ + final public static String clientLock = "clientLock"; /** * The pipe by which the client snd server exchange IO diff --git a/main/client/src/mill/main/client/lock/Locks.java b/main/client/src/mill/main/client/lock/Locks.java index 22472592d07..9585b662165 100644 --- a/main/client/src/mill/main/client/lock/Locks.java +++ b/main/client/src/mill/main/client/lock/Locks.java @@ -2,6 +2,26 @@ import mill.main.client.ServerFiles; +/** + * The locks used to manage the relationship of Mill between Mill's clients and servers. + * The protocol is as follows: + * + * - Client: + * - Take clientLock + * - If processLock is not taken, it means we need to spawn the server + * - Spawn the server and wait for processLock to be taken + * - Server: + * - take processLock + * - Server: loop: + * - Take serverLock, + * - Listen for incoming client requests on serverSocket + * - Execute client request + * - If clientLock is released during execution, terminate server + * - Release serverLock + * - Client: + * - Wait for serverLock to be released, indicating server has finished execution + * - Give 50ms grace period for server output to arrive over pipe + */ final public class Locks implements AutoCloseable { final public Lock processLock; From 0933e37cf7a3030d27aac8ca37084a34d16a6bac Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 12 Aug 2024 18:22:43 -0700 Subject: [PATCH 02/33] . --- .../mill/main/client/MillServerLauncher.java | 4 +- .../mill/main/client/ProxyOutputStream.java | 34 ---- .../src/mill/main/client/ProxyStream.java | 153 ++++++++++++++++++ .../mill/main/client/ProxyStreamPumper.java | 63 -------- .../src/mill/main/client/WaitForSilence.java | 23 --- .../src/mill/main/client/ClientTests.java | 6 +- runner/src/mill/runner/MillServerMain.scala | 12 +- 7 files changed, 167 insertions(+), 128 deletions(-) delete mode 100644 main/client/src/mill/main/client/ProxyOutputStream.java create mode 100644 main/client/src/mill/main/client/ProxyStream.java delete mode 100644 main/client/src/mill/main/client/ProxyStreamPumper.java delete mode 100644 main/client/src/mill/main/client/WaitForSilence.java diff --git a/main/client/src/mill/main/client/MillServerLauncher.java b/main/client/src/mill/main/client/MillServerLauncher.java index 2d14c606f34..6ba7183c616 100644 --- a/main/client/src/mill/main/client/MillServerLauncher.java +++ b/main/client/src/mill/main/client/MillServerLauncher.java @@ -155,7 +155,7 @@ public static int run( InputStream outErr = ioSocket.getInputStream(); OutputStream in = ioSocket.getOutputStream(); - ProxyStreamPumper outPump = new ProxyStreamPumper(outErr, stdout, stderr); + ProxyStream.Pumper outPump = new ProxyStream.Pumper(outErr, stdout, stderr); InputPumper inPump = new InputPumper(() -> stdin, () -> in, true); Thread outThread = new Thread(outPump, "outPump"); outThread.setDaemon(true); @@ -173,7 +173,7 @@ public static int run( // in the stream (ProxyOutputStream / ProxyStreamPumper) but that would require a new protocol. // So we just wait until there has been X ms with no data - outPump.getLastData().waitForSilence(50); + outPump.waitForSilence(500); try { return Integer.parseInt(Files.readAllLines(Paths.get(lockBase + "/" + ServerFiles.exitCode)).get(0)); diff --git a/main/client/src/mill/main/client/ProxyOutputStream.java b/main/client/src/mill/main/client/ProxyOutputStream.java deleted file mode 100644 index 339e01501b5..00000000000 --- a/main/client/src/mill/main/client/ProxyOutputStream.java +++ /dev/null @@ -1,34 +0,0 @@ -package mill.main.client; - -import java.io.IOException; - -public class ProxyOutputStream extends java.io.OutputStream { - private java.io.OutputStream out; - private int key; - public ProxyOutputStream(java.io.OutputStream out, int key){ - this.out = out; - this.key = key; - } - @Override synchronized public void write(int b) throws IOException { - out.write(key); - out.write(b); - } - @Override synchronized public void write(byte[] b) throws IOException { - write(b, 0, b.length); - } - @Override synchronized public void write(byte[] b, int off, int len) throws IOException { - int i = 0; - while(i < len && i + off < b.length){ - int chunkLength = Math.min(len - i, 127); - out.write(chunkLength * key); - out.write(b, off + i, Math.min(b.length - off - i, chunkLength)); - i += chunkLength; - } - } - @Override public void flush() throws IOException { - out.flush(); - } - @Override public void close() throws IOException { - out.close(); - } -} diff --git a/main/client/src/mill/main/client/ProxyStream.java b/main/client/src/mill/main/client/ProxyStream.java new file mode 100644 index 00000000000..d93c7251afa --- /dev/null +++ b/main/client/src/mill/main/client/ProxyStream.java @@ -0,0 +1,153 @@ +package mill.main.client; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * Logic to capture a pair of streams (typically stdout and stderr), combining + * them into a single stream, and splitting it back into two streams later while + * preserving ordering. This is useful for capturing stderr and stdout and forwarding + * them to a terminal while strictly preserving the ordering, i.e. users won't see + * exception stack traces and printlns arriving jumbled up and impossible to debug + * + * This works by converting writes from either of the two streams into packets of + * the form: + * + * 1 byte n bytes + * | header | body | + * + * Where header is a single byte of the form: + * + * - header > 0 indicating that this packet is for the `OUT` stream + * - header < 0 indicating that this packet is for the `ERR` stream + * - abs(header) indicating the length of the packet body, in bytes + * - header == 0 indicating the end of the stream + * + * Writes to either of the two `Output`s are synchronized on the shared + * `destination` stream, ensuring that they always arrive complete and without + * interleaving. On the other side, a `Pumper` reads from the combined + * stream, forwards each packet to its respective destination stream, or terminates + * when it hits a packet with `header == 0` + */ +public class ProxyStream{ + + public static final int OUT = 1; + public static final int ERR = -1; + public static final int END = 0; + + public static class Output extends java.io.OutputStream { + private java.io.OutputStream destination; + private int key; + + public Output(java.io.OutputStream out, int key){ + this.destination = out; + this.key = key; + } + + @Override public void write(int b) throws IOException { + synchronized (destination){ + destination.write(key); + destination.write(b); + } + } + + @Override public void write(byte[] b) throws IOException { + if (b.length > 0) { + synchronized (destination) { + write(b, 0, b.length); + } + } + } + + @Override public void write(byte[] b, int off, int len) throws IOException { + + synchronized (destination) { + int i = 0; + while (i < len && i + off < b.length) { + int chunkLength = Math.min(len - i, 127); + if (chunkLength > 0) { + destination.write(chunkLength * key); + destination.write(b, off + i, Math.min(b.length - off - i, chunkLength)); + i += chunkLength; + } + } + } + } + + @Override public void flush() throws IOException { + synchronized (destination) { + destination.flush(); + } + } + + @Override public void close() throws IOException { + synchronized (destination) { + destination.close(); + } + } + } + + public static class Pumper implements Runnable{ + private InputStream src; + private OutputStream dest1; + private OutputStream dest2; + private long last = System.currentTimeMillis(); + + public Pumper(InputStream src, OutputStream destOut, OutputStream destErr){ + this.src = src; + this.dest1 = destOut; + this.dest2 = destErr; + } + + public void waitForSilence(int millis) throws InterruptedException { + do { + Thread.sleep(10); + } while ((System.currentTimeMillis() - last) < millis); + } + + public void run() { + byte[] buffer = new byte[1024]; + boolean running = true; + while (running) { + try { + int quantity0 = (byte)src.read(); + if (quantity0 != 0) { + int quantity = Math.abs(quantity0); + int offset = 0; + int delta = -1; + while (offset < quantity) { + delta = src.read(buffer, offset, quantity - offset); + if (delta == -1) { + running = false; + break; + } else { + offset += delta; + } + } + + if (delta != -1) { + if (quantity0 > 0) dest1.write(buffer, 0, offset); + else dest2.write(buffer, 0, offset); + flush(); + this.last = System.currentTimeMillis(); + } + } + } catch (IOException e) { + e.printStackTrace(); + System.exit(1); + } + } + try { + dest1.close(); + dest2.close(); + } catch(IOException e) {} + } + + public void flush() throws IOException { + dest1.flush(); + dest2.flush(); + } + } +} diff --git a/main/client/src/mill/main/client/ProxyStreamPumper.java b/main/client/src/mill/main/client/ProxyStreamPumper.java deleted file mode 100644 index 21ae6c950bb..00000000000 --- a/main/client/src/mill/main/client/ProxyStreamPumper.java +++ /dev/null @@ -1,63 +0,0 @@ -package mill.main.client; - - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -public class ProxyStreamPumper implements Runnable{ - private InputStream src; - private OutputStream dest1; - private OutputStream dest2; - private WaitForSilence lastData = new WaitForSilence(); - - public ProxyStreamPumper(InputStream src, OutputStream dest1, OutputStream dest2){ - this.src = src; - this.dest1 = dest1; - this.dest2 = dest2; - } - - public WaitForSilence getLastData() { - return lastData; - } - public void run() { - byte[] buffer = new byte[1024]; - boolean running = true; - while (running) { - try { - int quantity0 = (byte)src.read(); - int quantity = Math.abs(quantity0); - int offset = 0; - int delta = -1; - while(offset < quantity){ - delta = src.read(buffer, offset, quantity - offset); - if (delta == -1) { - running = false; - break; - }else{ - offset += delta; - } - } - - if (delta != -1){ - if (quantity0 > 0) dest1.write(buffer, 0, offset); - else dest2.write(buffer, 0, offset); - flush(); - lastData.poke(); - } - } catch (IOException e) { - e.printStackTrace(); - System.exit(1); - } - } - try { - dest1.close(); - dest2.close(); - } catch(IOException e) {} - } - - public void flush() throws IOException { - dest1.flush(); - dest2.flush(); - } -} diff --git a/main/client/src/mill/main/client/WaitForSilence.java b/main/client/src/mill/main/client/WaitForSilence.java deleted file mode 100644 index 79a2ca293e3..00000000000 --- a/main/client/src/mill/main/client/WaitForSilence.java +++ /dev/null @@ -1,23 +0,0 @@ -package mill.main.client; - -/** - * Waits for a minimum period of silence after being poked from another thread - */ - -class WaitForSilence { - private long last = System.currentTimeMillis(); - - public synchronized long getLast() { - return last; - } - - public synchronized void poke() { - this.last = System.currentTimeMillis(); - } - - public void waitForSilence(int millis) throws InterruptedException { - do { - Thread.sleep(10); - } while ((System.currentTimeMillis() - getLast()) < millis); - } -} diff --git a/main/client/test/src/mill/main/client/ClientTests.java b/main/client/test/src/mill/main/client/ClientTests.java index 516d482b3f3..b4972c5a863 100644 --- a/main/client/test/src/mill/main/client/ClientTests.java +++ b/main/client/test/src/mill/main/client/ClientTests.java @@ -115,8 +115,8 @@ public void proxyInputOutputStreams(byte[] samples1, int chunkMax) throws Exception{ ByteArrayOutputStream pipe = new ByteArrayOutputStream(); - OutputStream src1 = new ProxyOutputStream(pipe, 1); - OutputStream src2 = new ProxyOutputStream(pipe, -1); + OutputStream src1 = new ProxyStream.Output(pipe, 1); + OutputStream src2 = new ProxyStream.Output(pipe, -1); Random random = new Random(31337); @@ -140,7 +140,7 @@ public void proxyInputOutputStreams(byte[] samples1, ByteArrayOutputStream dest1 = new ByteArrayOutputStream(); ByteArrayOutputStream dest2 = new ByteArrayOutputStream(); - ProxyStreamPumper pumper = new ProxyStreamPumper( + ProxyStream.Pumper pumper = new ProxyStream.Pumper( new ByteArrayInputStream(bytes), dest1, dest2 ); diff --git a/runner/src/mill/runner/MillServerMain.scala b/runner/src/mill/runner/MillServerMain.scala index c6f9125f4b3..9751bcb772f 100644 --- a/runner/src/mill/runner/MillServerMain.scala +++ b/runner/src/mill/runner/MillServerMain.scala @@ -10,6 +10,7 @@ import org.newsclub.net.unix.AFUNIXSocketAddress import mill.main.BuildInfo import mill.main.client._ import mill.api.{SystemStreams, internal} +import mill.main.client.ProxyStream.Output import mill.main.client.lock.{Lock, Locks} import scala.util.Try @@ -146,8 +147,9 @@ class Server[T]( def handleRun(clientSocket: Socket, initialSystemProperties: Map[String, String]): Unit = { val currentOutErr = clientSocket.getOutputStream - val stdout = new PrintStream(new ProxyOutputStream(currentOutErr, 1), true) - val stderr = new PrintStream(new ProxyOutputStream(currentOutErr, -1), true) + val stdout = new PrintStream(new Output(currentOutErr, ProxyStream.OUT), true) + val stderr = new PrintStream(new Output(currentOutErr, ProxyStream.ERR), true) + // Proxy the input stream through a pair of Piped**putStream via a pumper, // as the `UnixDomainSocketInputStream` we get directly from the socket does // not properly implement `available(): Int` and thus messes up polling logic @@ -222,7 +224,11 @@ class Server[T]( // flush before closing the socket System.out.flush() System.err.flush() - + // Send a termination + currentOutErr.synchronized{ + currentOutErr.write(ProxyStream.END) + currentOutErr.flush() + } clientSocket.close() } } From a8c449f3be4d44960a8d72c6a0442e6ccc9ba95a Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 11:36:04 -0700 Subject: [PATCH 03/33] . --- .../bloop/src/mill/contrib/bloop/Bloop.scala | 3 ++- .../mill/main/client/MillServerLauncher.java | 17 +++++++++-------- .../src/mill/main/client/ProxyStream.java | 8 ++++++-- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/contrib/bloop/src/mill/contrib/bloop/Bloop.scala b/contrib/bloop/src/mill/contrib/bloop/Bloop.scala index 99de7d536ec..7886424a0ce 100644 --- a/contrib/bloop/src/mill/contrib/bloop/Bloop.scala +++ b/contrib/bloop/src/mill/contrib/bloop/Bloop.scala @@ -1,8 +1,9 @@ package mill.contrib.bloop +import mill.api.WorkspaceRoot import mill.eval.Evaluator /** * Usage : `mill mill.contrib.bloop.Bloop/install` */ -object Bloop extends BloopImpl(() => Evaluator.allBootstrapEvaluators.value.value, os.pwd) +object Bloop extends BloopImpl(() => Evaluator.allBootstrapEvaluators.value.value, WorkspaceRoot.workspaceRoot) diff --git a/main/client/src/mill/main/client/MillServerLauncher.java b/main/client/src/mill/main/client/MillServerLauncher.java index 6ba7183c616..3066b28671c 100644 --- a/main/client/src/mill/main/client/MillServerLauncher.java +++ b/main/client/src/mill/main/client/MillServerLauncher.java @@ -164,16 +164,17 @@ public static int run( outThread.start(); inThread.start(); + // Fallback mechanism to terminate ProxyStream.Pumper. + // + // We don't expect this to be used much, because the `ProxyStream` protocol + // should provide a `0` packet to terminate the stream and stop the pumper. + // However, in the event that this does not happen, we still want the pumper + // to terminate eventually. So we wait for the `serverLock` to be released, + // indicating the server is done, and wait 0.5 seconds for any data to arrive + // before terminating the pumper. locks.serverLock.await(); - - // Although the process that the server was running has terminated and the server has sent all the stdout/stderr - // over the unix pipe and released its lock we don't know that all the data has arrived at the client - // The outThread of the ProxyStreamPumper will not close until the socket is closed (so we can't join on it) - // but we also can't close the socket until all the data has arrived. Catch 22. We could signal termination - // in the stream (ProxyOutputStream / ProxyStreamPumper) but that would require a new protocol. - // So we just wait until there has been X ms with no data - outPump.waitForSilence(500); + outPump.stop(); try { return Integer.parseInt(Files.readAllLines(Paths.get(lockBase + "/" + ServerFiles.exitCode)).get(0)); diff --git a/main/client/src/mill/main/client/ProxyStream.java b/main/client/src/mill/main/client/ProxyStream.java index d93c7251afa..501be61036b 100644 --- a/main/client/src/mill/main/client/ProxyStream.java +++ b/main/client/src/mill/main/client/ProxyStream.java @@ -94,7 +94,7 @@ public static class Pumper implements Runnable{ private OutputStream dest1; private OutputStream dest2; private long last = System.currentTimeMillis(); - + private boolean running = true; public Pumper(InputStream src, OutputStream destOut, OutputStream destErr){ this.src = src; this.dest1 = destOut; @@ -108,8 +108,8 @@ public void waitForSilence(int millis) throws InterruptedException { } public void run() { + byte[] buffer = new byte[1024]; - boolean running = true; while (running) { try { int quantity0 = (byte)src.read(); @@ -149,5 +149,9 @@ public void flush() throws IOException { dest1.flush(); dest2.flush(); } + + public void stop() { + running = false; + } } } From 605cedc90492eb60fa0d86a98f75fc181570696d Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 11:37:45 -0700 Subject: [PATCH 04/33] . --- main/client/src/mill/main/client/ProxyStream.java | 4 ++-- main/client/src/mill/main/client/Util.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/main/client/src/mill/main/client/ProxyStream.java b/main/client/src/mill/main/client/ProxyStream.java index 501be61036b..e8ec6f84e9e 100644 --- a/main/client/src/mill/main/client/ProxyStream.java +++ b/main/client/src/mill/main/client/ProxyStream.java @@ -20,8 +20,8 @@ * * Where header is a single byte of the form: * - * - header > 0 indicating that this packet is for the `OUT` stream - * - header < 0 indicating that this packet is for the `ERR` stream + * - header more than 0 indicating that this packet is for the `OUT` stream + * - header less then 0 indicating that this packet is for the `ERR` stream * - abs(header) indicating the length of the packet body, in bytes * - header == 0 indicating the end of the stream * diff --git a/main/client/src/mill/main/client/Util.java b/main/client/src/mill/main/client/Util.java index 2dcb69c514f..0acff23ba17 100644 --- a/main/client/src/mill/main/client/Util.java +++ b/main/client/src/mill/main/client/Util.java @@ -34,7 +34,7 @@ public static final int ExitServerCodeWhenVersionMismatch() { public static String[] parseArgs(InputStream argStream) throws IOException { int argsLength = readInt(argStream); - String[] args = new String[argsLength]; + String[] args err= new String[argsLength]; for (int i = 0; i < args.length; i++) { args[i] = readString(argStream); } From 199757a80b8cc75d4a0064044239a71df8331f1a Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 11:38:23 -0700 Subject: [PATCH 05/33] . --- main/client/src/mill/main/client/ProxyStream.java | 2 +- main/client/src/mill/main/client/Util.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/main/client/src/mill/main/client/ProxyStream.java b/main/client/src/mill/main/client/ProxyStream.java index e8ec6f84e9e..1088ca82297 100644 --- a/main/client/src/mill/main/client/ProxyStream.java +++ b/main/client/src/mill/main/client/ProxyStream.java @@ -21,7 +21,7 @@ * Where header is a single byte of the form: * * - header more than 0 indicating that this packet is for the `OUT` stream - * - header less then 0 indicating that this packet is for the `ERR` stream + * - header less than 0 indicating that this packet is for the `ERR` stream * - abs(header) indicating the length of the packet body, in bytes * - header == 0 indicating the end of the stream * diff --git a/main/client/src/mill/main/client/Util.java b/main/client/src/mill/main/client/Util.java index 0acff23ba17..2dcb69c514f 100644 --- a/main/client/src/mill/main/client/Util.java +++ b/main/client/src/mill/main/client/Util.java @@ -34,7 +34,7 @@ public static final int ExitServerCodeWhenVersionMismatch() { public static String[] parseArgs(InputStream argStream) throws IOException { int argsLength = readInt(argStream); - String[] args err= new String[argsLength]; + String[] args = new String[argsLength]; for (int i = 0; i < args.length; i++) { args[i] = readString(argStream); } From 45f2496630c82a8b12ca6d287104823b13976e1c Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 14:06:35 -0700 Subject: [PATCH 06/33] . --- contrib/bloop/src/mill/contrib/bloop/Bloop.scala | 3 +-- runner/src/mill/runner/MillServerMain.scala | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/contrib/bloop/src/mill/contrib/bloop/Bloop.scala b/contrib/bloop/src/mill/contrib/bloop/Bloop.scala index 7886424a0ce..99de7d536ec 100644 --- a/contrib/bloop/src/mill/contrib/bloop/Bloop.scala +++ b/contrib/bloop/src/mill/contrib/bloop/Bloop.scala @@ -1,9 +1,8 @@ package mill.contrib.bloop -import mill.api.WorkspaceRoot import mill.eval.Evaluator /** * Usage : `mill mill.contrib.bloop.Bloop/install` */ -object Bloop extends BloopImpl(() => Evaluator.allBootstrapEvaluators.value.value, WorkspaceRoot.workspaceRoot) +object Bloop extends BloopImpl(() => Evaluator.allBootstrapEvaluators.value.value, os.pwd) diff --git a/runner/src/mill/runner/MillServerMain.scala b/runner/src/mill/runner/MillServerMain.scala index 9751bcb772f..a0ae6748b8f 100644 --- a/runner/src/mill/runner/MillServerMain.scala +++ b/runner/src/mill/runner/MillServerMain.scala @@ -225,7 +225,7 @@ class Server[T]( System.out.flush() System.err.flush() // Send a termination - currentOutErr.synchronized{ + currentOutErr.synchronized { currentOutErr.write(ProxyStream.END) currentOutErr.flush() } From bcd9e8512a628b647236bde27f048fe946fbeb0a Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 14:49:58 -0700 Subject: [PATCH 07/33] wip --- main/client/src/mill/main/client/MillClientMain.java | 2 +- main/client/src/mill/main/client/MillServerLauncher.java | 2 +- main/client/src/mill/main/client/ProxyStream.java | 9 ++++++++- main/client/test/src/mill/main/client/ClientTests.java | 5 +++-- runner/src/mill/runner/MillServerMain.scala | 5 ----- 5 files changed, 13 insertions(+), 10 deletions(-) diff --git a/main/client/src/mill/main/client/MillClientMain.java b/main/client/src/mill/main/client/MillClientMain.java index ea0d2965602..143cdf35bdc 100644 --- a/main/client/src/mill/main/client/MillClientMain.java +++ b/main/client/src/mill/main/client/MillClientMain.java @@ -27,7 +27,7 @@ public static void main(String[] args) throws Exception { runNoServer = true; } } - + System.out.println("MillClientMain.main runNoServer " + runNoServer); if (runNoServer) { // start in no-server mode MillNoServerLauncher.runMain(args); diff --git a/main/client/src/mill/main/client/MillServerLauncher.java b/main/client/src/mill/main/client/MillServerLauncher.java index 3066b28671c..f9156fb9c5f 100644 --- a/main/client/src/mill/main/client/MillServerLauncher.java +++ b/main/client/src/mill/main/client/MillServerLauncher.java @@ -173,7 +173,7 @@ public static int run( // indicating the server is done, and wait 0.5 seconds for any data to arrive // before terminating the pumper. locks.serverLock.await(); - outPump.waitForSilence(500); + outThread.join(); outPump.stop(); try { diff --git a/main/client/src/mill/main/client/ProxyStream.java b/main/client/src/mill/main/client/ProxyStream.java index 1088ca82297..517e527daca 100644 --- a/main/client/src/mill/main/client/ProxyStream.java +++ b/main/client/src/mill/main/client/ProxyStream.java @@ -35,7 +35,6 @@ public class ProxyStream{ public static final int OUT = 1; public static final int ERR = -1; - public static final int END = 0; public static class Output extends java.io.OutputStream { private java.io.OutputStream destination; @@ -107,12 +106,18 @@ public void waitForSilence(int millis) throws InterruptedException { } while ((System.currentTimeMillis() - last) < millis); } + public boolean isRunning() { + return running; + } + public void run() { + System.out.println("Pumper.run"); byte[] buffer = new byte[1024]; while (running) { try { int quantity0 = (byte)src.read(); + System.out.println("Pumper.run quantity0 " + quantity0); if (quantity0 != 0) { int quantity = Math.abs(quantity0); int offset = 0; @@ -133,6 +138,8 @@ public void run() { flush(); this.last = System.currentTimeMillis(); } + }else { + } } catch (IOException e) { e.printStackTrace(); diff --git a/main/client/test/src/mill/main/client/ClientTests.java b/main/client/test/src/mill/main/client/ClientTests.java index b4972c5a863..187664739b7 100644 --- a/main/client/test/src/mill/main/client/ClientTests.java +++ b/main/client/test/src/mill/main/client/ClientTests.java @@ -9,6 +9,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.OutputStream; +import java.lang.reflect.Proxy; import java.util.*; public class ClientTests { @@ -115,8 +116,8 @@ public void proxyInputOutputStreams(byte[] samples1, int chunkMax) throws Exception{ ByteArrayOutputStream pipe = new ByteArrayOutputStream(); - OutputStream src1 = new ProxyStream.Output(pipe, 1); - OutputStream src2 = new ProxyStream.Output(pipe, -1); + OutputStream src1 = new ProxyStream.Output(pipe, ProxyStream.OUT); + OutputStream src2 = new ProxyStream.Output(pipe, ProxyStream.ERR); Random random = new Random(31337); diff --git a/runner/src/mill/runner/MillServerMain.scala b/runner/src/mill/runner/MillServerMain.scala index a0ae6748b8f..d866a817571 100644 --- a/runner/src/mill/runner/MillServerMain.scala +++ b/runner/src/mill/runner/MillServerMain.scala @@ -224,11 +224,6 @@ class Server[T]( // flush before closing the socket System.out.flush() System.err.flush() - // Send a termination - currentOutErr.synchronized { - currentOutErr.write(ProxyStream.END) - currentOutErr.flush() - } clientSocket.close() } } From c42220d886e278d3ccd41965646c96a183fec0ce Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 18:20:29 -0700 Subject: [PATCH 08/33] . --- build.sc | 7 +- .../src/mill/main/client/ProxyStream.java | 5 +- .../mill/main/client/ProxyStreamTests.java | 110 ++++++++++++++++++ runner/src/mill/runner/MillServerMain.scala | 5 + 4 files changed, 123 insertions(+), 4 deletions(-) create mode 100644 main/client/test/src/mill/main/client/ProxyStreamTests.java diff --git a/build.sc b/build.sc index 164b15f8cff..a645d6a5626 100644 --- a/build.sc +++ b/build.sc @@ -154,6 +154,7 @@ object Deps { val jnaPlatform = ivy"net.java.dev.jna:jna-platform:${jnaVersion}" val junitInterface = ivy"com.github.sbt:junit-interface:0.13.3" + val commonsIO = ivy"commons-io:commons-io:2.16.1" val lambdaTest = ivy"de.tototec:de.tobiasroeser.lambdatest:0.8.0" val log4j2Core = ivy"org.apache.logging.log4j:log4j-core:2.23.1" val osLib = ivy"com.lihaoyi::os-lib:0.10.3" @@ -753,7 +754,11 @@ object main extends MillStableScalaModule with BuildInfo { def ivyDeps = Agg(Deps.junixsocket) object test extends JavaModuleTests with TestModule.Junit4 { - def ivyDeps = Agg(Deps.junitInterface, Deps.lambdaTest) + def ivyDeps = Agg( + Deps.junitInterface, + Deps.lambdaTest, + Deps.commonsIO + ) } } diff --git a/main/client/src/mill/main/client/ProxyStream.java b/main/client/src/mill/main/client/ProxyStream.java index 517e527daca..6b3ed53052d 100644 --- a/main/client/src/mill/main/client/ProxyStream.java +++ b/main/client/src/mill/main/client/ProxyStream.java @@ -35,6 +35,7 @@ public class ProxyStream{ public static final int OUT = 1; public static final int ERR = -1; + public static final int END = 0; public static class Output extends java.io.OutputStream { private java.io.OutputStream destination; @@ -112,12 +113,10 @@ public boolean isRunning() { public void run() { - System.out.println("Pumper.run"); byte[] buffer = new byte[1024]; while (running) { try { int quantity0 = (byte)src.read(); - System.out.println("Pumper.run quantity0 " + quantity0); if (quantity0 != 0) { int quantity = Math.abs(quantity0); int offset = 0; @@ -139,7 +138,7 @@ public void run() { this.last = System.currentTimeMillis(); } }else { - + running = false; } } catch (IOException e) { e.printStackTrace(); diff --git a/main/client/test/src/mill/main/client/ProxyStreamTests.java b/main/client/test/src/mill/main/client/ProxyStreamTests.java new file mode 100644 index 00000000000..cf6dcd5ab63 --- /dev/null +++ b/main/client/test/src/mill/main/client/ProxyStreamTests.java @@ -0,0 +1,110 @@ +package mill.main.client; + +import org.junit.Test; + +import java.io.*; +import static org.junit.Assert.assertArrayEquals; +import org.apache.commons.io.output.TeeOutputStream; +public class ProxyStreamTests { + /** + * Ad-hoc fuzz tests to try and make sure the stuff we write into the + * `ProxyStreams.Output` and read out of the `ProxyStreams.Pumper` ends up + * being the same + */ + @Test + public void test() throws Exception{ + // Test writes of sizes around 1, around 127, around 255, and much larger. + // These are likely sizes to have bugs since we write data in chunks of size + // 127 + int[] interestingLengths = { + 1, 2, 3, 4, 5, 6, 7, 8, 9, + 10, 20, 30, 40, 50, 100, + 126, 127, 128, 129, 130, + 253, 254, 255, 256, 257, + 1000, 2000, 3000 + }; + byte[] interestingBytes = { + -128, -127, -126, + -120, -100, -80, -60, -40, -20, -10, + -5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5, + 10, 20, 40, 60, 80, 100, 120, + 125, 126, 127 + }; + + for (int n: interestingLengths){ + System.out.println("ProxyStreamTests fuzzing length " + n); + + for (int r = 0; r < interestingBytes.length; r += 1) { + byte[] outData = new byte[n]; + byte[] errData = new byte[n]; + for(int j = 0; j < n; j++) { + // fill test data blobs with arbitrary bytes from `interestingBytes`, negating + // the bytes we use for `errData` so we can distinguish it from `outData` + // + // offset the start byte we use by `r`, so we exercise writing blobs + // that start with every value listed in `interestingBytes` + outData[j] = interestingBytes[(j + r) % interestingBytes.length]; + errData[j] = (byte)-interestingBytes[(j + r) % interestingBytes.length]; + } + + test0(outData, errData, r); + } + } + } + + public void test0(byte[] outData, byte[] errData, int repeats) throws Exception{ + PipedOutputStream pipedOutputStream = new PipedOutputStream(); + PipedInputStream pipedInputStream = new PipedInputStream(1000000); + + pipedInputStream.connect(pipedOutputStream); + + ProxyStream.Output srcOut = new ProxyStream.Output(pipedOutputStream, ProxyStream.OUT); + ProxyStream.Output srcErr = new ProxyStream.Output(pipedOutputStream, ProxyStream.ERR); + + // Capture both the destOut/destErr from the pumper, as well as the destCombined + // to ensure the individual streams contain the right data and combined stream + // is in the right order + ByteArrayOutputStream destOut = new ByteArrayOutputStream(); + ByteArrayOutputStream destErr = new ByteArrayOutputStream(); + ByteArrayOutputStream destCombined = new ByteArrayOutputStream(); + ProxyStream.Pumper pumper = new ProxyStream.Pumper( + pipedInputStream, + new TeeOutputStream(destOut, destCombined), + new TeeOutputStream(destErr, destCombined) + ); + + for(int i = 0; i < repeats; i++){ + srcOut.write(outData); + srcErr.write(errData); + } + pipedOutputStream.write(0); + + new Thread(pumper).start(); + while(pumper.isRunning()) { + Thread.sleep(1); + } + + // Check that the individual `destOut` and `destErr` contain the correct bytes + assertArrayEquals(repeatArray(outData, repeats), destOut.toByteArray()); + assertArrayEquals(repeatArray(errData, repeats), destErr.toByteArray()); + + // Check that the combined `destCombined` contains the correct bytes in the correct order + byte[] combinedData = new byte[outData.length + errData.length]; + + System.arraycopy(outData, 0, combinedData, 0, outData.length); + System.arraycopy(errData, 0, combinedData, outData.length, errData.length); + + assertArrayEquals(repeatArray(combinedData, repeats), destCombined.toByteArray()); + } + + private static byte[] repeatArray(byte[] original, int n) { + byte[] result = new byte[original.length * n]; + + for (int i = 0; i < n; i++) { + System.arraycopy(original, 0, result, i * original.length, original.length); + } + + return result; + } + +} diff --git a/runner/src/mill/runner/MillServerMain.scala b/runner/src/mill/runner/MillServerMain.scala index d866a817571..a0ae6748b8f 100644 --- a/runner/src/mill/runner/MillServerMain.scala +++ b/runner/src/mill/runner/MillServerMain.scala @@ -224,6 +224,11 @@ class Server[T]( // flush before closing the socket System.out.flush() System.err.flush() + // Send a termination + currentOutErr.synchronized { + currentOutErr.write(ProxyStream.END) + currentOutErr.flush() + } clientSocket.close() } } From 64cc3ce5cabeea39155b2de9e9c0ebc77393febb Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 18:21:50 -0700 Subject: [PATCH 09/33] . --- .../client/test/src/mill/main/client/ProxyStreamTests.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/main/client/test/src/mill/main/client/ProxyStreamTests.java b/main/client/test/src/mill/main/client/ProxyStreamTests.java index cf6dcd5ab63..c81de13b28f 100644 --- a/main/client/test/src/mill/main/client/ProxyStreamTests.java +++ b/main/client/test/src/mill/main/client/ProxyStreamTests.java @@ -13,15 +13,14 @@ public class ProxyStreamTests { */ @Test public void test() throws Exception{ - // Test writes of sizes around 1, around 127, around 255, and much larger. - // These are likely sizes to have bugs since we write data in chunks of size - // 127 + // Test writes of sizes around 1, around 127, around 255, and much larger. These + // are likely sizes to have bugs since we write data in chunks of size 127 int[] interestingLengths = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, 50, 100, 126, 127, 128, 129, 130, 253, 254, 255, 256, 257, - 1000, 2000, 3000 + 1000, 2000, 4000, 8000 }; byte[] interestingBytes = { -128, -127, -126, From 685804aed54ef88c874839dfb3ec3c8c5624cab3 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 18:22:35 -0700 Subject: [PATCH 10/33] . --- main/client/src/mill/main/client/MillClientMain.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main/client/src/mill/main/client/MillClientMain.java b/main/client/src/mill/main/client/MillClientMain.java index 143cdf35bdc..ea0d2965602 100644 --- a/main/client/src/mill/main/client/MillClientMain.java +++ b/main/client/src/mill/main/client/MillClientMain.java @@ -27,7 +27,7 @@ public static void main(String[] args) throws Exception { runNoServer = true; } } - System.out.println("MillClientMain.main runNoServer " + runNoServer); + if (runNoServer) { // start in no-server mode MillNoServerLauncher.runMain(args); From df0279473fd484f8dd2e49e838216b7c660f8f3e Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 18:24:03 -0700 Subject: [PATCH 11/33] . --- main/client/src/mill/main/client/ProxyStream.java | 7 +++++++ runner/src/mill/runner/MillServerMain.scala | 5 +---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/main/client/src/mill/main/client/ProxyStream.java b/main/client/src/mill/main/client/ProxyStream.java index 6b3ed53052d..2056c4b5f97 100644 --- a/main/client/src/mill/main/client/ProxyStream.java +++ b/main/client/src/mill/main/client/ProxyStream.java @@ -37,6 +37,13 @@ public class ProxyStream{ public static final int ERR = -1; public static final int END = 0; + public static void sendEnd(OutputStream out) throws IOException { + synchronized(out){ + out.write(ProxyStream.END); + out.flush(); + } + } + public static class Output extends java.io.OutputStream { private java.io.OutputStream destination; private int key; diff --git a/runner/src/mill/runner/MillServerMain.scala b/runner/src/mill/runner/MillServerMain.scala index a0ae6748b8f..33d9c3cc680 100644 --- a/runner/src/mill/runner/MillServerMain.scala +++ b/runner/src/mill/runner/MillServerMain.scala @@ -225,10 +225,7 @@ class Server[T]( System.out.flush() System.err.flush() // Send a termination - currentOutErr.synchronized { - currentOutErr.write(ProxyStream.END) - currentOutErr.flush() - } + ProxyStream.sendEnd(currentOutErr) clientSocket.close() } } From 2614a3e616b4f5bdf0524a57bb9077651814bbbb Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 18:35:54 -0700 Subject: [PATCH 12/33] . --- .../mill/main/client/MillServerLauncher.java | 20 +- .../src/mill/main/client/lock/Locks.java | 3 - runner/src/mill/runner/MillServerMain.scala | 208 +++++++++--------- .../src/mill/runner/ClientServerTests.scala | 5 - 4 files changed, 109 insertions(+), 127 deletions(-) diff --git a/main/client/src/mill/main/client/MillServerLauncher.java b/main/client/src/mill/main/client/MillServerLauncher.java index f9156fb9c5f..4d327f34268 100644 --- a/main/client/src/mill/main/client/MillServerLauncher.java +++ b/main/client/src/mill/main/client/MillServerLauncher.java @@ -155,26 +155,16 @@ public static int run( InputStream outErr = ioSocket.getInputStream(); OutputStream in = ioSocket.getOutputStream(); - ProxyStream.Pumper outPump = new ProxyStream.Pumper(outErr, stdout, stderr); + ProxyStream.Pumper outPumper = new ProxyStream.Pumper(outErr, stdout, stderr); InputPumper inPump = new InputPumper(() -> stdin, () -> in, true); - Thread outThread = new Thread(outPump, "outPump"); - outThread.setDaemon(true); + Thread outPumperThread = new Thread(outPumper, "outPump"); + outPumperThread.setDaemon(true); Thread inThread = new Thread(inPump, "inPump"); inThread.setDaemon(true); - outThread.start(); + outPumperThread.start(); inThread.start(); - // Fallback mechanism to terminate ProxyStream.Pumper. - // - // We don't expect this to be used much, because the `ProxyStream` protocol - // should provide a `0` packet to terminate the stream and stop the pumper. - // However, in the event that this does not happen, we still want the pumper - // to terminate eventually. So we wait for the `serverLock` to be released, - // indicating the server is done, and wait 0.5 seconds for any data to arrive - // before terminating the pumper. - locks.serverLock.await(); - outThread.join(); - outPump.stop(); + outPumperThread.join(); try { return Integer.parseInt(Files.readAllLines(Paths.get(lockBase + "/" + ServerFiles.exitCode)).get(0)); diff --git a/main/client/src/mill/main/client/lock/Locks.java b/main/client/src/mill/main/client/lock/Locks.java index 9585b662165..7059f496d86 100644 --- a/main/client/src/mill/main/client/lock/Locks.java +++ b/main/client/src/mill/main/client/lock/Locks.java @@ -25,12 +25,10 @@ final public class Locks implements AutoCloseable { final public Lock processLock; - final public Lock serverLock; final public Lock clientLock; public Locks(Lock processLock, Lock serverLock, Lock clientLock){ this.processLock = processLock; - this.serverLock = serverLock; this.clientLock = clientLock; } @@ -53,7 +51,6 @@ public static Locks memory() { @Override public void close() throws Exception { processLock.close(); - serverLock.close(); clientLock.close(); } } diff --git a/runner/src/mill/runner/MillServerMain.scala b/runner/src/mill/runner/MillServerMain.scala index 33d9c3cc680..b1c8c6dbd0b 100644 --- a/runner/src/mill/runner/MillServerMain.scala +++ b/runner/src/mill/runner/MillServerMain.scala @@ -100,33 +100,33 @@ class Server[T]( Server.tryLockBlock(locks.processLock) { var running = true while (running) { - Server.lockBlock(locks.serverLock) { - - val socketName = ServerFiles.pipe(lockBase) - - new File(socketName).delete() - - val addr = - AFUNIXSocketAddress.of(os.Path(new File(socketName)).relativeTo(os.pwd).toNIO.toFile) - val serverSocket = AFUNIXServerSocket.bindOn(addr) - val socketClose = () => serverSocket.close() - - val sockOpt = Server.interruptWith( - "MillSocketTimeoutInterruptThread", - acceptTimeoutMillis, - socketClose(), - serverSocket.accept() - ) - - sockOpt match { - case None => running = false - case Some(sock) => - try { - handleRun(sock, initialSystemProperties) - serverSocket.close() - } catch { case e: Throwable => e.printStackTrace(originalStdout) } - } + + val socketName = ServerFiles.pipe(lockBase) + + new File(socketName).delete() + + val addr = + AFUNIXSocketAddress.of(os.Path(new File(socketName)).relativeTo(os.pwd).toNIO.toFile) + val serverSocket = AFUNIXServerSocket.bindOn(addr) + val socketClose = () => serverSocket.close() + + val sockOpt = Server.interruptWith( + "MillSocketTimeoutInterruptThread", + acceptTimeoutMillis, + socketClose(), + serverSocket.accept() + ) + + sockOpt match { + case None => running = false + case Some(sock) => + try { + try handleRun(sock, initialSystemProperties) + catch { case e: Throwable => e.printStackTrace(originalStdout)} + finally sock.close(); + }finally serverSocket.close() } + // Make sure you give an opportunity for the client to probe the lock // and realize the server has released it to signal completion Thread.sleep(10) @@ -147,86 +147,86 @@ class Server[T]( def handleRun(clientSocket: Socket, initialSystemProperties: Map[String, String]): Unit = { val currentOutErr = clientSocket.getOutputStream - val stdout = new PrintStream(new Output(currentOutErr, ProxyStream.OUT), true) - val stderr = new PrintStream(new Output(currentOutErr, ProxyStream.ERR), true) - - // Proxy the input stream through a pair of Piped**putStream via a pumper, - // as the `UnixDomainSocketInputStream` we get directly from the socket does - // not properly implement `available(): Int` and thus messes up polling logic - // that relies on that method - val proxiedSocketInput = proxyInputStreamThroughPumper(clientSocket.getInputStream) - - val argStream = new FileInputStream(lockBase + "/" + ServerFiles.runArgs) - val interactive = argStream.read() != 0 - val clientMillVersion = Util.readString(argStream) - val serverMillVersion = BuildInfo.millVersion - if (clientMillVersion != serverMillVersion) { - stderr.println( - s"Mill version changed ($serverMillVersion -> $clientMillVersion), re-starting server" - ) - java.nio.file.Files.write( - java.nio.file.Paths.get(lockBase + "/" + ServerFiles.exitCode), - s"${Util.ExitServerCodeWhenVersionMismatch()}".getBytes() + try { + val stdout = new PrintStream(new Output(currentOutErr, ProxyStream.OUT), true) + val stderr = new PrintStream(new Output(currentOutErr, ProxyStream.ERR), true) + + // Proxy the input stream through a pair of Piped**putStream via a pumper, + // as the `UnixDomainSocketInputStream` we get directly from the socket does + // not properly implement `available(): Int` and thus messes up polling logic + // that relies on that method + val proxiedSocketInput = proxyInputStreamThroughPumper(clientSocket.getInputStream) + + val argStream = new FileInputStream(lockBase + "/" + ServerFiles.runArgs) + val interactive = argStream.read() != 0 + val clientMillVersion = Util.readString(argStream) + val serverMillVersion = BuildInfo.millVersion + if (clientMillVersion != serverMillVersion) { + stderr.println( + s"Mill version changed ($serverMillVersion -> $clientMillVersion), re-starting server" + ) + java.nio.file.Files.write( + java.nio.file.Paths.get(lockBase + "/" + ServerFiles.exitCode), + s"${Util.ExitServerCodeWhenVersionMismatch()}".getBytes() + ) + System.exit(Util.ExitServerCodeWhenVersionMismatch()) + } + val args = Util.parseArgs(argStream) + val env = Util.parseMap(argStream) + val userSpecifiedProperties = Util.parseMap(argStream) + argStream.close() + + @volatile var done = false + @volatile var idle = false + val t = new Thread( + () => + try { + val (result, newStateCache) = sm.main0( + args, + sm.stateCache, + interactive, + new SystemStreams(stdout, stderr, proxiedSocketInput), + env.asScala.toMap, + idle = _, + userSpecifiedProperties.asScala.toMap, + initialSystemProperties + ) + + sm.stateCache = newStateCache + java.nio.file.Files.write( + java.nio.file.Paths.get(lockBase + "/exitCode"), + (if (result) 0 else 1).toString.getBytes + ) + } finally { + done = true + idle = true + }, + "MillServerActionRunner" ) - System.exit(Util.ExitServerCodeWhenVersionMismatch()) - } - val args = Util.parseArgs(argStream) - val env = Util.parseMap(argStream) - val userSpecifiedProperties = Util.parseMap(argStream) - argStream.close() - - @volatile var done = false - @volatile var idle = false - val t = new Thread( - () => - try { - val (result, newStateCache) = sm.main0( - args, - sm.stateCache, - interactive, - new SystemStreams(stdout, stderr, proxiedSocketInput), - env.asScala.toMap, - idle = _, - userSpecifiedProperties.asScala.toMap, - initialSystemProperties - ) - - sm.stateCache = newStateCache - java.nio.file.Files.write( - java.nio.file.Paths.get(lockBase + "/exitCode"), - (if (result) 0 else 1).toString.getBytes - ) - } finally { - done = true - idle = true - }, - "MillServerActionRunner" - ) - t.start() - // We cannot simply use Lock#await here, because the filesystem doesn't - // realize the clientLock/serverLock are held by different threads in the - // two processes and gives a spurious deadlock error - while (!done && !locks.clientLock.probe()) Thread.sleep(3) - - if (!idle) interruptServer() - - t.interrupt() - // Try to give thread a moment to stop before we kill it for real - Thread.sleep(5) - try t.stop() - catch { - case e: UnsupportedOperationException => - // nothing we can do about, removed in Java 20 - case e: java.lang.Error if e.getMessage.contains("Cleaner terminated abnormally") => - // ignore this error and do nothing; seems benign - } + t.start() + // We cannot simply use Lock#await here, because the filesystem doesn't + // realize the clientLock/serverLock are held by different threads in the + // two processes and gives a spurious deadlock error + while (!done && !locks.clientLock.probe()) Thread.sleep(3) + + if (!idle) interruptServer() + + t.interrupt() + // Try to give thread a moment to stop before we kill it for real + Thread.sleep(5) + try t.stop() + catch { + case e: UnsupportedOperationException => + // nothing we can do about, removed in Java 20 + case e: java.lang.Error if e.getMessage.contains("Cleaner terminated abnormally") => + // ignore this error and do nothing; seems benign + } + + // flush before closing the socket + System.out.flush() + System.err.flush() - // flush before closing the socket - System.out.flush() - System.err.flush() - // Send a termination - ProxyStream.sendEnd(currentOutErr) - clientSocket.close() + }finally ProxyStream.sendEnd(currentOutErr) // Send a termination } } diff --git a/runner/test/src/mill/runner/ClientServerTests.scala b/runner/test/src/mill/runner/ClientServerTests.scala index 31ba749e097..68999ceef6e 100644 --- a/runner/test/src/mill/runner/ClientServerTests.scala +++ b/runner/test/src/mill/runner/ClientServerTests.scala @@ -101,7 +101,6 @@ object ClientServerTests extends TestSuite { assert( locks.clientLock.probe(), - locks.serverLock.probe(), locks.processLock.probe() ) @@ -118,7 +117,6 @@ object ClientServerTests extends TestSuite { assert( locks.clientLock.probe(), - !locks.serverLock.probe(), !locks.processLock.probe() ) @@ -135,7 +133,6 @@ object ClientServerTests extends TestSuite { Thread.sleep(2000) assert( locks.clientLock.probe(), - locks.serverLock.probe(), locks.processLock.probe() ) @@ -157,7 +154,6 @@ object ClientServerTests extends TestSuite { assert( locks.clientLock.probe(), - locks.serverLock.probe(), locks.processLock.probe() ) @@ -186,7 +182,6 @@ object ClientServerTests extends TestSuite { assert( locks.clientLock.probe(), - !locks.serverLock.probe(), !locks.processLock.probe() ) From 159790c5b2c4631c318ebf231d59716fbe7a49b9 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 18:36:22 -0700 Subject: [PATCH 13/33] . --- main/client/src/mill/main/client/ServerFiles.java | 7 ------- main/client/src/mill/main/client/lock/Locks.java | 4 +--- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/main/client/src/mill/main/client/ServerFiles.java b/main/client/src/mill/main/client/ServerFiles.java index fb41e3fa533..e01a8411526 100644 --- a/main/client/src/mill/main/client/ServerFiles.java +++ b/main/client/src/mill/main/client/ServerFiles.java @@ -14,13 +14,6 @@ public class ServerFiles { */ final public static String processLock = "processLock"; - /** - * Lock file indicating a server is currently waiting for or processing - * a command. Used by the client to detect when the server has completed - * processing. - */ - final public static String serverLock = "serverLock"; - /** * Ensures only a single client is manipulating each mill-worker folder at * a time, either spawning the server or submitting a command. Also used by diff --git a/main/client/src/mill/main/client/lock/Locks.java b/main/client/src/mill/main/client/lock/Locks.java index 7059f496d86..d4e87b3f7ed 100644 --- a/main/client/src/mill/main/client/lock/Locks.java +++ b/main/client/src/mill/main/client/lock/Locks.java @@ -27,7 +27,7 @@ final public class Locks implements AutoCloseable { final public Lock processLock; final public Lock clientLock; - public Locks(Lock processLock, Lock serverLock, Lock clientLock){ + public Locks(Lock processLock, Lock clientLock){ this.processLock = processLock; this.clientLock = clientLock; } @@ -35,14 +35,12 @@ public Locks(Lock processLock, Lock serverLock, Lock clientLock){ public static Locks files(String lockBase) throws Exception { return new Locks( new FileLock(lockBase + "/" + ServerFiles.processLock), - new FileLock(lockBase + "/" + ServerFiles.serverLock), new FileLock(lockBase + "/" + ServerFiles.clientLock) ); } public static Locks memory() { return new Locks( - new MemoryLock(), new MemoryLock(), new MemoryLock() ); From cb80f57ebbe1b2863bc46787ad11311e497a0422 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 18:39:50 -0700 Subject: [PATCH 14/33] . --- main/client/src/mill/main/client/MillServerLauncher.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/main/client/src/mill/main/client/MillServerLauncher.java b/main/client/src/mill/main/client/MillServerLauncher.java index 4d327f34268..ab66e2312db 100644 --- a/main/client/src/mill/main/client/MillServerLauncher.java +++ b/main/client/src/mill/main/client/MillServerLauncher.java @@ -126,14 +126,6 @@ public static int run( Util.writeMap(env, f); } - boolean serverInit = false; - if (locks.processLock.probe()) { - serverInit = true; - initServer.run(); - } - - while (locks.processLock.probe()) Thread.sleep(3); - String socketName = ServerFiles.pipe(lockBase); AFUNIXSocketAddress addr = AFUNIXSocketAddress.of(new File(socketName)); From beec32aef45a04eddf2c21c2cf2a112a922eb744 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 18:45:21 -0700 Subject: [PATCH 15/33] . --- main/client/src/mill/main/client/MillServerLauncher.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/main/client/src/mill/main/client/MillServerLauncher.java b/main/client/src/mill/main/client/MillServerLauncher.java index ab66e2312db..8d943562af6 100644 --- a/main/client/src/mill/main/client/MillServerLauncher.java +++ b/main/client/src/mill/main/client/MillServerLauncher.java @@ -77,7 +77,6 @@ static int runMillServer(String[] args, throw new RuntimeException(e); } }, - locks, System.in, System.out, System.err, @@ -112,7 +111,6 @@ private static int getServerProcessesLimit(String jvmHomeEncoding) { public static int run( String lockBase, Runnable initServer, - Locks locks, InputStream stdin, OutputStream stdout, OutputStream stderr, @@ -126,6 +124,8 @@ public static int run( Util.writeMap(env, f); } + initServer.run(); + String socketName = ServerFiles.pipe(lockBase); AFUNIXSocketAddress addr = AFUNIXSocketAddress.of(new File(socketName)); From 08b25e46f5f42fae42ea8d05a51d8737ea62bbf4 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 18:50:46 -0700 Subject: [PATCH 16/33] . --- .../src/mill/main/client/MillServerLauncher.java | 2 ++ main/client/src/mill/main/client/lock/Locks.java | 13 ++++++------- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/main/client/src/mill/main/client/MillServerLauncher.java b/main/client/src/mill/main/client/MillServerLauncher.java index 8d943562af6..042b6e40116 100644 --- a/main/client/src/mill/main/client/MillServerLauncher.java +++ b/main/client/src/mill/main/client/MillServerLauncher.java @@ -77,6 +77,7 @@ static int runMillServer(String[] args, throw new RuntimeException(e); } }, + locks, System.in, System.out, System.err, @@ -111,6 +112,7 @@ private static int getServerProcessesLimit(String jvmHomeEncoding) { public static int run( String lockBase, Runnable initServer, + Locks locks, InputStream stdin, OutputStream stdout, OutputStream stderr, diff --git a/main/client/src/mill/main/client/lock/Locks.java b/main/client/src/mill/main/client/lock/Locks.java index d4e87b3f7ed..697b8b7394a 100644 --- a/main/client/src/mill/main/client/lock/Locks.java +++ b/main/client/src/mill/main/client/lock/Locks.java @@ -8,19 +8,18 @@ * * - Client: * - Take clientLock - * - If processLock is not taken, it means we need to spawn the server - * - Spawn the server and wait for processLock to be taken + * - Wait for server socket to be available for connection * - Server: - * - take processLock + * - Take processLock. If already taken, it means another server was running + * (e.g. spawned by a different client) so exit immediately * - Server: loop: - * - Take serverLock, * - Listen for incoming client requests on serverSocket * - Execute client request * - If clientLock is released during execution, terminate server - * - Release serverLock + * - Send `ProxyStream.END` packet and call `clientSocket.close()` * - Client: - * - Wait for serverLock to be released, indicating server has finished execution - * - Give 50ms grace period for server output to arrive over pipe + * - Wait for `ProxyStream.END` packet or `clientSocket.close()`, + * indicating server has finished execution and all data has been received */ final public class Locks implements AutoCloseable { From d50cfe84a3487633be5c11e915653fdd6749d24e Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 18:54:49 -0700 Subject: [PATCH 17/33] . --- main/client/src/mill/main/client/MillServerLauncher.java | 4 +++- runner/src/mill/runner/MillServerMain.scala | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/main/client/src/mill/main/client/MillServerLauncher.java b/main/client/src/mill/main/client/MillServerLauncher.java index 042b6e40116..a788844e079 100644 --- a/main/client/src/mill/main/client/MillServerLauncher.java +++ b/main/client/src/mill/main/client/MillServerLauncher.java @@ -126,7 +126,9 @@ public static int run( Util.writeMap(env, f); } - initServer.run(); + if (locks.processLock.probe()) initServer.run(); + + while (locks.processLock.probe()) Thread.sleep(3); String socketName = ServerFiles.pipe(lockBase); AFUNIXSocketAddress addr = AFUNIXSocketAddress.of(new File(socketName)); diff --git a/runner/src/mill/runner/MillServerMain.scala b/runner/src/mill/runner/MillServerMain.scala index b1c8c6dbd0b..f05c1fa7676 100644 --- a/runner/src/mill/runner/MillServerMain.scala +++ b/runner/src/mill/runner/MillServerMain.scala @@ -122,9 +122,9 @@ class Server[T]( case Some(sock) => try { try handleRun(sock, initialSystemProperties) - catch { case e: Throwable => e.printStackTrace(originalStdout)} + catch { case e: Throwable => e.printStackTrace(originalStdout) } finally sock.close(); - }finally serverSocket.close() + } finally serverSocket.close() } // Make sure you give an opportunity for the client to probe the lock @@ -226,7 +226,7 @@ class Server[T]( System.out.flush() System.err.flush() - }finally ProxyStream.sendEnd(currentOutErr) // Send a termination + } finally ProxyStream.sendEnd(currentOutErr) // Send a termination } } From 987011c96491cb7d3e28dc5c035dc92352c9e684 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 18:58:27 -0700 Subject: [PATCH 18/33] . --- main/client/src/mill/main/client/lock/Locks.java | 1 + 1 file changed, 1 insertion(+) diff --git a/main/client/src/mill/main/client/lock/Locks.java b/main/client/src/mill/main/client/lock/Locks.java index 697b8b7394a..00a21186d1e 100644 --- a/main/client/src/mill/main/client/lock/Locks.java +++ b/main/client/src/mill/main/client/lock/Locks.java @@ -8,6 +8,7 @@ * * - Client: * - Take clientLock + * - If processLock is not yet taken, it means server is not running, so spawn a server * - Wait for server socket to be available for connection * - Server: * - Take processLock. If already taken, it means another server was running From d93df57219216c96ae0995b24a327e5870e120e0 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 18:58:41 -0700 Subject: [PATCH 19/33] . --- main/client/src/mill/main/client/lock/Locks.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/main/client/src/mill/main/client/lock/Locks.java b/main/client/src/mill/main/client/lock/Locks.java index 00a21186d1e..3b98476966c 100644 --- a/main/client/src/mill/main/client/lock/Locks.java +++ b/main/client/src/mill/main/client/lock/Locks.java @@ -11,8 +11,9 @@ * - If processLock is not yet taken, it means server is not running, so spawn a server * - Wait for server socket to be available for connection * - Server: - * - Take processLock. If already taken, it means another server was running - * (e.g. spawned by a different client) so exit immediately + * - Take processLock. + * - If already taken, it means another server was running + * (e.g. spawned by a different client) so exit immediately * - Server: loop: * - Listen for incoming client requests on serverSocket * - Execute client request From 9ceb52d2bdbfb42bf679e3b09b07d063a99cb88c Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 18:59:26 -0700 Subject: [PATCH 20/33] . --- main/client/src/mill/main/client/lock/Locks.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/main/client/src/mill/main/client/lock/Locks.java b/main/client/src/mill/main/client/lock/Locks.java index 3b98476966c..646158968df 100644 --- a/main/client/src/mill/main/client/lock/Locks.java +++ b/main/client/src/mill/main/client/lock/Locks.java @@ -17,7 +17,9 @@ * - Server: loop: * - Listen for incoming client requests on serverSocket * - Execute client request - * - If clientLock is released during execution, terminate server + * - If clientLock is released during execution, terminate server (otherwise + * we have no safe way of termianting the in-process request, so the server + * may continue running for arbitrarily long with no client attached) * - Send `ProxyStream.END` packet and call `clientSocket.close()` * - Client: * - Wait for `ProxyStream.END` packet or `clientSocket.close()`, From 962b48d6f254234678bea15cb96ac7a733db487a Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 19:00:18 -0700 Subject: [PATCH 21/33] . --- runner/src/mill/runner/MillServerMain.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/runner/src/mill/runner/MillServerMain.scala b/runner/src/mill/runner/MillServerMain.scala index f05c1fa7676..b9cced88e24 100644 --- a/runner/src/mill/runner/MillServerMain.scala +++ b/runner/src/mill/runner/MillServerMain.scala @@ -105,6 +105,7 @@ class Server[T]( new File(socketName).delete() + // Use relative path because otherwise the full path might be too long for the socket API val addr = AFUNIXSocketAddress.of(os.Path(new File(socketName)).relativeTo(os.pwd).toNIO.toFile) val serverSocket = AFUNIXServerSocket.bindOn(addr) From 29b49e4a716206fc7323a2da99ef0a7f13000124 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 19:01:43 -0700 Subject: [PATCH 22/33] . --- main/client/src/mill/main/client/ServerFiles.java | 14 +++++++------- main/client/src/mill/main/client/lock/Locks.java | 12 ++++++------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/main/client/src/mill/main/client/ServerFiles.java b/main/client/src/mill/main/client/ServerFiles.java index e01a8411526..cbc2b33ab47 100644 --- a/main/client/src/mill/main/client/ServerFiles.java +++ b/main/client/src/mill/main/client/ServerFiles.java @@ -6,6 +6,13 @@ */ public class ServerFiles { final public static String sandbox = "sandbox"; + + /** + * Ensures only a single client is manipulating each mill-worker folder at + * a time, either spawning the server or submitting a command. Also used by + * the server to detect when a client disconnects, so it can terminate execution + */ + final public static String clientLock = "clientLock"; /** * Lock file ensuring a single server is running in a particular mill-worker @@ -14,13 +21,6 @@ public class ServerFiles { */ final public static String processLock = "processLock"; - /** - * Ensures only a single client is manipulating each mill-worker folder at - * a time, either spawning the server or submitting a command. Also used by - * the server to detect when a client disconnects, so it can terminate execution - */ - final public static String clientLock = "clientLock"; - /** * The pipe by which the client snd server exchange IO * diff --git a/main/client/src/mill/main/client/lock/Locks.java b/main/client/src/mill/main/client/lock/Locks.java index 646158968df..86f55ed8496 100644 --- a/main/client/src/mill/main/client/lock/Locks.java +++ b/main/client/src/mill/main/client/lock/Locks.java @@ -27,18 +27,18 @@ */ final public class Locks implements AutoCloseable { - final public Lock processLock; final public Lock clientLock; + final public Lock processLock; - public Locks(Lock processLock, Lock clientLock){ - this.processLock = processLock; + public Locks(Lock clientLock, Lock processLock){ this.clientLock = clientLock; + this.processLock = processLock; } public static Locks files(String lockBase) throws Exception { return new Locks( - new FileLock(lockBase + "/" + ServerFiles.processLock), - new FileLock(lockBase + "/" + ServerFiles.clientLock) + new FileLock(lockBase + "/" + ServerFiles.clientLock), + new FileLock(lockBase + "/" + ServerFiles.processLock) ); } @@ -51,7 +51,7 @@ public static Locks memory() { @Override public void close() throws Exception { - processLock.close(); clientLock.close(); + processLock.close(); } } From 40417deef7b4286ee16f89e4ef7a44a996ae8a8a Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 19:10:33 -0700 Subject: [PATCH 23/33] . --- .../src/mill/main/client/ServerFiles.java | 4 ++-- runner/src/mill/runner/MillServerMain.scala | 24 +++++++++---------- .../src/mill/runner/ClientServerTests.scala | 8 +++---- 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/main/client/src/mill/main/client/ServerFiles.java b/main/client/src/mill/main/client/ServerFiles.java index cbc2b33ab47..b84194c7dee 100644 --- a/main/client/src/mill/main/client/ServerFiles.java +++ b/main/client/src/mill/main/client/ServerFiles.java @@ -6,7 +6,7 @@ */ public class ServerFiles { final public static String sandbox = "sandbox"; - + /** * Ensures only a single client is manipulating each mill-worker folder at * a time, either spawning the server or submitting a command. Also used by @@ -30,7 +30,7 @@ public class ServerFiles { */ public static String pipe(String base) { try { - return base + "/mill-" + Util.md5hex(new java.io.File(base).getCanonicalPath()) + "-io"; + return base + "mill-" + Util.md5hex(new java.io.File(base).getCanonicalPath()) + "-io"; }catch (Exception e){ throw new RuntimeException(e); } diff --git a/runner/src/mill/runner/MillServerMain.scala b/runner/src/mill/runner/MillServerMain.scala index b9cced88e24..a966e6b6103 100644 --- a/runner/src/mill/runner/MillServerMain.scala +++ b/runner/src/mill/runner/MillServerMain.scala @@ -4,6 +4,7 @@ import sun.misc.{Signal, SignalHandler} import java.io._ import java.net.Socket +import java.nio.file.{Files, Paths} import scala.jdk.CollectionConverters._ import org.newsclub.net.unix.AFUNIXServerSocket import org.newsclub.net.unix.AFUNIXSocketAddress @@ -53,7 +54,7 @@ object MillServerMain extends MillServerMain[RunnerState] { Try(System.getProperty("mill.server_timeout").toInt).getOrElse(5 * 60 * 1000) // 5 minutes new Server( - lockBase = args0(0), + lockBase = os.Path(args0(0)), this, () => System.exit(Util.ExitServerCodeWhenIdle()), acceptTimeoutMillis = acceptTimeoutMillis, @@ -87,7 +88,7 @@ object MillServerMain extends MillServerMain[RunnerState] { } class Server[T]( - lockBase: String, + lockBase: os.Path, sm: MillServerMain[T], interruptServer: () => Unit, acceptTimeoutMillis: Int, @@ -101,13 +102,13 @@ class Server[T]( var running = true while (running) { - val socketName = ServerFiles.pipe(lockBase) + val socketPath = os.Path(ServerFiles.pipe(lockBase.toString())) - new File(socketName).delete() + os.remove.all(socketPath) // Use relative path because otherwise the full path might be too long for the socket API val addr = - AFUNIXSocketAddress.of(os.Path(new File(socketName)).relativeTo(os.pwd).toNIO.toFile) + AFUNIXSocketAddress.of(socketPath.relativeTo(os.pwd).toNIO.toFile) val serverSocket = AFUNIXServerSocket.bindOn(addr) val socketClose = () => serverSocket.close() @@ -158,7 +159,7 @@ class Server[T]( // that relies on that method val proxiedSocketInput = proxyInputStreamThroughPumper(clientSocket.getInputStream) - val argStream = new FileInputStream(lockBase + "/" + ServerFiles.runArgs) + val argStream = os.read.inputStream(lockBase / ServerFiles.runArgs) val interactive = argStream.read() != 0 val clientMillVersion = Util.readString(argStream) val serverMillVersion = BuildInfo.millVersion @@ -166,9 +167,9 @@ class Server[T]( stderr.println( s"Mill version changed ($serverMillVersion -> $clientMillVersion), re-starting server" ) - java.nio.file.Files.write( - java.nio.file.Paths.get(lockBase + "/" + ServerFiles.exitCode), - s"${Util.ExitServerCodeWhenVersionMismatch()}".getBytes() + os.write( + lockBase / ServerFiles.exitCode, + Util.ExitServerCodeWhenVersionMismatch().toString.getBytes() ) System.exit(Util.ExitServerCodeWhenVersionMismatch()) } @@ -194,10 +195,7 @@ class Server[T]( ) sm.stateCache = newStateCache - java.nio.file.Files.write( - java.nio.file.Paths.get(lockBase + "/exitCode"), - (if (result) 0 else 1).toString.getBytes - ) + os.write(lockBase / ServerFiles.exitCode, (if (result) 0 else 1).toString.getBytes()) } finally { done = true idle = true diff --git a/runner/test/src/mill/runner/ClientServerTests.scala b/runner/test/src/mill/runner/ClientServerTests.scala index 68999ceef6e..5eb24f6a765 100644 --- a/runner/test/src/mill/runner/ClientServerTests.scala +++ b/runner/test/src/mill/runner/ClientServerTests.scala @@ -52,16 +52,16 @@ object ClientServerTests extends TestSuite { (in, out, err) } def init() = { - val tmpDir = java.nio.file.Files.createTempDirectory("") + val tmpDir = os.temp.dir() val locks = Locks.memory() (tmpDir, locks) } - def spawnEchoServer(tmpDir: java.nio.file.Path, locks: Locks): Unit = { + def spawnEchoServer(tmpDir: os.Path, locks: Locks): Unit = { new Thread(() => new Server( - tmpDir.toString, + tmpDir, new EchoServer(), () => (), 1000, @@ -71,7 +71,7 @@ object ClientServerTests extends TestSuite { } def runClientAux( - tmpDir: java.nio.file.Path, + tmpDir: os.Path, locks: Locks )(env: Map[String, String], args: Array[String]) = { val (in, out, err) = initStreams() From a7f286587449453669bc269f3b5f6cd2a96d513c Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 19:33:04 -0700 Subject: [PATCH 24/33] . --- .../src/mill/main/client/ProxyStream.java | 54 ++++++++++--------- .../mill/main/client/ProxyStreamTests.java | 40 +++++++++----- 2 files changed, 55 insertions(+), 39 deletions(-) diff --git a/main/client/src/mill/main/client/ProxyStream.java b/main/client/src/mill/main/client/ProxyStream.java index 2056c4b5f97..c5ae24d27ea 100644 --- a/main/client/src/mill/main/client/ProxyStream.java +++ b/main/client/src/mill/main/client/ProxyStream.java @@ -121,36 +121,40 @@ public boolean isRunning() { public void run() { byte[] buffer = new byte[1024]; - while (running) { - try { - int quantity0 = (byte)src.read(); - if (quantity0 != 0) { - int quantity = Math.abs(quantity0); - int offset = 0; - int delta = -1; - while (offset < quantity) { - delta = src.read(buffer, offset, quantity - offset); - if (delta == -1) { - running = false; - break; - } else { - offset += delta; + try { + while (true) { + try { + int header = src.read(); + // -1 means socket was closed, 0 means a ProxyStream.END was sent + if (header == -1 || header == 0) break; + else { + int quantity0 = (byte) header; + int quantity = Math.abs(quantity0); + int offset = 0; + int delta = -1; + while (offset < quantity) { + delta = src.read(buffer, offset, quantity - offset); + if (delta == -1) { + break; + } else { + offset += delta; + } } - } - if (delta != -1) { - if (quantity0 > 0) dest1.write(buffer, 0, offset); - else dest2.write(buffer, 0, offset); - flush(); - this.last = System.currentTimeMillis(); + if (delta != -1) { + if ((byte) quantity0 > 0) dest1.write(buffer, 0, offset); + else dest2.write(buffer, 0, offset); + flush(); + this.last = System.currentTimeMillis(); + } } - }else { - running = false; + } catch (IOException e) { + e.printStackTrace(); + System.exit(1); } - } catch (IOException e) { - e.printStackTrace(); - System.exit(1); } + }finally { + running = false; } try { dest1.close(); diff --git a/main/client/test/src/mill/main/client/ProxyStreamTests.java b/main/client/test/src/mill/main/client/ProxyStreamTests.java index c81de13b28f..3af807c7a3e 100644 --- a/main/client/test/src/mill/main/client/ProxyStreamTests.java +++ b/main/client/test/src/mill/main/client/ProxyStreamTests.java @@ -23,7 +23,7 @@ public void test() throws Exception{ 1000, 2000, 4000, 8000 }; byte[] interestingBytes = { - -128, -127, -126, + -1, -127, -126, -120, -100, -80, -60, -40, -20, -10, -5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5, 10, 20, 40, 60, 80, 100, 120, @@ -31,9 +31,9 @@ public void test() throws Exception{ }; for (int n: interestingLengths){ - System.out.println("ProxyStreamTests fuzzing length " + n); - for (int r = 0; r < interestingBytes.length; r += 1) { + System.out.println("ProxyStreamTests fuzzing length " + n); + for (int r = 1; r < interestingBytes.length + 1; r += 1) { byte[] outData = new byte[n]; byte[] errData = new byte[n]; for(int j = 0; j < n; j++) { @@ -46,12 +46,15 @@ public void test() throws Exception{ errData[j] = (byte)-interestingBytes[(j + r) % interestingBytes.length]; } - test0(outData, errData, r); + // Run all tests both with the format `ProxyStream.END` packet + // being sent as well as when the stream is unceremoniously closed + test0(outData, errData, r, false); + test0(outData, errData, r, true); } } } - public void test0(byte[] outData, byte[] errData, int repeats) throws Exception{ + public void test0(byte[] outData, byte[] errData, int repeats, boolean gracefulEnd) throws Exception{ PipedOutputStream pipedOutputStream = new PipedOutputStream(); PipedInputStream pipedInputStream = new PipedInputStream(1000000); @@ -72,16 +75,25 @@ public void test0(byte[] outData, byte[] errData, int repeats) throws Exception{ new TeeOutputStream(destErr, destCombined) ); - for(int i = 0; i < repeats; i++){ - srcOut.write(outData); - srcErr.write(errData); - } - pipedOutputStream.write(0); - new Thread(pumper).start(); - while(pumper.isRunning()) { - Thread.sleep(1); - } + new Thread(() -> { + try { + for (int i = 0; i < repeats; i++) { + srcOut.write(outData); + srcErr.write(errData); + } + + if (gracefulEnd) ProxyStream.sendEnd(pipedOutputStream); + else { + pipedOutputStream.close(); + } + }catch(Exception e){ e.printStackTrace();} + }).start(); + + Thread pumperThread = new Thread(pumper); + + pumperThread.start(); + pumperThread.join(); // Check that the individual `destOut` and `destErr` contain the correct bytes assertArrayEquals(repeatArray(outData, repeats), destOut.toByteArray()); From 7b55ec1ae14e0185b03deaa577a5478a84846df7 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 19:45:07 -0700 Subject: [PATCH 25/33] . --- .../src/mill/main/client/ProxyStream.java | 91 ++++++++----------- 1 file changed, 39 insertions(+), 52 deletions(-) diff --git a/main/client/src/mill/main/client/ProxyStream.java b/main/client/src/mill/main/client/ProxyStream.java index c5ae24d27ea..2e45ebaa3f8 100644 --- a/main/client/src/mill/main/client/ProxyStream.java +++ b/main/client/src/mill/main/client/ProxyStream.java @@ -98,77 +98,64 @@ public Output(java.io.OutputStream out, int key){ public static class Pumper implements Runnable{ private InputStream src; - private OutputStream dest1; - private OutputStream dest2; - private long last = System.currentTimeMillis(); - private boolean running = true; + private OutputStream destOut; + private OutputStream destErr; public Pumper(InputStream src, OutputStream destOut, OutputStream destErr){ this.src = src; - this.dest1 = destOut; - this.dest2 = destErr; - } - - public void waitForSilence(int millis) throws InterruptedException { - do { - Thread.sleep(10); - } while ((System.currentTimeMillis() - last) < millis); - } - - public boolean isRunning() { - return running; + this.destOut = destOut; + this.destErr = destErr; } public void run() { byte[] buffer = new byte[1024]; - try { - while (true) { - try { - int header = src.read(); - // -1 means socket was closed, 0 means a ProxyStream.END was sent - if (header == -1 || header == 0) break; - else { - int quantity0 = (byte) header; - int quantity = Math.abs(quantity0); - int offset = 0; - int delta = -1; - while (offset < quantity) { - delta = src.read(buffer, offset, quantity - offset); - if (delta == -1) { - break; - } else { - offset += delta; - } + while (true) { + try { + int header = src.read(); + // -1 means socket was closed, 0 means a ProxyStream.END was sent. Note + // that only header values > 0 represent actual data to read: + // - sign((byte)header) represents which stream the data should be sent to + // - abs((byte)header) represents the length of the data to read and send + if (header == -1 || header == 0) break; + else { + int stream = (byte) header > 0 ? 1 : -1; + int quantity0 = (byte) header; + int quantity = Math.abs(quantity0); + int offset = 0; + int delta = -1; + while (offset < quantity) { + delta = src.read(buffer, offset, quantity - offset); + if (delta == -1) { + break; + } else { + offset += delta; } + } - if (delta != -1) { - if ((byte) quantity0 > 0) dest1.write(buffer, 0, offset); - else dest2.write(buffer, 0, offset); - flush(); - this.last = System.currentTimeMillis(); + if (delta != -1) { + switch(stream){ + case ProxyStream.OUT: destOut.write(buffer, 0, offset); break; + case ProxyStream.ERR: destErr.write(buffer, 0, offset); break; } + + flush(); } - } catch (IOException e) { - e.printStackTrace(); - System.exit(1); } + } catch (IOException e) { + e.printStackTrace(); + System.exit(1); } - }finally { - running = false; } + try { - dest1.close(); - dest2.close(); + destOut.close(); + destErr.close(); } catch(IOException e) {} } public void flush() throws IOException { - dest1.flush(); - dest2.flush(); - } - - public void stop() { - running = false; + destOut.flush(); + destErr.flush(); } } } From 216836c0699d53e1105e7e8793ee896e859758b3 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 19:46:39 -0700 Subject: [PATCH 26/33] . --- main/client/src/mill/main/client/ServerFiles.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main/client/src/mill/main/client/ServerFiles.java b/main/client/src/mill/main/client/ServerFiles.java index b84194c7dee..d4ee7d11d48 100644 --- a/main/client/src/mill/main/client/ServerFiles.java +++ b/main/client/src/mill/main/client/ServerFiles.java @@ -30,7 +30,7 @@ public class ServerFiles { */ public static String pipe(String base) { try { - return base + "mill-" + Util.md5hex(new java.io.File(base).getCanonicalPath()) + "-io"; + return base + "/mill-" + Util.md5hex(new java.io.File(base).getCanonicalPath()) + "-io"; }catch (Exception e){ throw new RuntimeException(e); } From 2836d5feffd4934f0faa83e57afd7847ef9fe8d9 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 19:53:18 -0700 Subject: [PATCH 27/33] . --- runner/src/mill/runner/MillServerMain.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/runner/src/mill/runner/MillServerMain.scala b/runner/src/mill/runner/MillServerMain.scala index a966e6b6103..ed982e0f244 100644 --- a/runner/src/mill/runner/MillServerMain.scala +++ b/runner/src/mill/runner/MillServerMain.scala @@ -128,10 +128,6 @@ class Server[T]( finally sock.close(); } finally serverSocket.close() } - - // Make sure you give an opportunity for the client to probe the lock - // and realize the server has released it to signal completion - Thread.sleep(10) } }.getOrElse(throw new Exception("PID already present")) } From 59a198be61e374a7c01aad69ae9f3c924b3f93ee Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 19:54:27 -0700 Subject: [PATCH 28/33] . --- main/client/test/src/mill/main/client/ClientTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/main/client/test/src/mill/main/client/ClientTests.java b/main/client/test/src/mill/main/client/ClientTests.java index 187664739b7..42f47180164 100644 --- a/main/client/test/src/mill/main/client/ClientTests.java +++ b/main/client/test/src/mill/main/client/ClientTests.java @@ -9,7 +9,6 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.OutputStream; -import java.lang.reflect.Proxy; import java.util.*; public class ClientTests { From ca6a18a7268d9a7b5271d987e71254f29aac060a Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 20:00:39 -0700 Subject: [PATCH 29/33] . --- runner/src/mill/runner/MillServerMain.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runner/src/mill/runner/MillServerMain.scala b/runner/src/mill/runner/MillServerMain.scala index ed982e0f244..b9dafc00703 100644 --- a/runner/src/mill/runner/MillServerMain.scala +++ b/runner/src/mill/runner/MillServerMain.scala @@ -191,7 +191,7 @@ class Server[T]( ) sm.stateCache = newStateCache - os.write(lockBase / ServerFiles.exitCode, (if (result) 0 else 1).toString.getBytes()) + os.write.over(lockBase / ServerFiles.exitCode, (if (result) 0 else 1).toString.getBytes()) } finally { done = true idle = true From 1ba6f1987d55a0abbf40b5cdd1bef6264b84bb0c Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 20:05:54 -0700 Subject: [PATCH 30/33] . --- runner/src/mill/runner/MillServerMain.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/runner/src/mill/runner/MillServerMain.scala b/runner/src/mill/runner/MillServerMain.scala index b9dafc00703..d1a4c3a3e75 100644 --- a/runner/src/mill/runner/MillServerMain.scala +++ b/runner/src/mill/runner/MillServerMain.scala @@ -4,7 +4,6 @@ import sun.misc.{Signal, SignalHandler} import java.io._ import java.net.Socket -import java.nio.file.{Files, Paths} import scala.jdk.CollectionConverters._ import org.newsclub.net.unix.AFUNIXServerSocket import org.newsclub.net.unix.AFUNIXSocketAddress From 6eb1749aed19264bd04dfd76743226157ce22c07 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 20:16:06 -0700 Subject: [PATCH 31/33] . --- .config/mill-version | 2 +- build.sc | 2 +- runner/src/mill/runner/MillServerMain.scala | 5 ++++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/.config/mill-version b/.config/mill-version index 1a775473e23..f8a052f0c60 100644 --- a/.config/mill-version +++ b/.config/mill-version @@ -1 +1 @@ -0.11.11 +0.12.0-M0-43-1ba6f1 \ No newline at end of file diff --git a/build.sc b/build.sc index a645d6a5626..123d17271c0 100644 --- a/build.sc +++ b/build.sc @@ -2101,7 +2101,7 @@ def uploadToGithub(authKey: String) = T.command { private def resolveTasks[T](taskNames: String*): Seq[NamedTask[T]] = { mill.resolve.Resolve.Tasks.resolve( - build, + build.`package`, taskNames, SelectMode.Separated ).map(x => x.asInstanceOf[Seq[mill.define.NamedTask[T]]]).getOrElse(???) diff --git a/runner/src/mill/runner/MillServerMain.scala b/runner/src/mill/runner/MillServerMain.scala index d1a4c3a3e75..103558e74eb 100644 --- a/runner/src/mill/runner/MillServerMain.scala +++ b/runner/src/mill/runner/MillServerMain.scala @@ -190,7 +190,10 @@ class Server[T]( ) sm.stateCache = newStateCache - os.write.over(lockBase / ServerFiles.exitCode, (if (result) 0 else 1).toString.getBytes()) + os.write.over( + lockBase / ServerFiles.exitCode, + (if (result) 0 else 1).toString.getBytes() + ) } finally { done = true idle = true From be61df677f824dc60c57806a742d773526b44d51 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 20:16:15 -0700 Subject: [PATCH 32/33] . --- build.sc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sc b/build.sc index 123d17271c0..a645d6a5626 100644 --- a/build.sc +++ b/build.sc @@ -2101,7 +2101,7 @@ def uploadToGithub(authKey: String) = T.command { private def resolveTasks[T](taskNames: String*): Seq[NamedTask[T]] = { mill.resolve.Resolve.Tasks.resolve( - build.`package`, + build, taskNames, SelectMode.Separated ).map(x => x.asInstanceOf[Seq[mill.define.NamedTask[T]]]).getOrElse(???) From eea5f3c64c0d7e07e41bb6d697fee59aea3decbf Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 13 Aug 2024 20:18:47 -0700 Subject: [PATCH 33/33] . --- .config/mill-version | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.config/mill-version b/.config/mill-version index f8a052f0c60..1a775473e23 100644 --- a/.config/mill-version +++ b/.config/mill-version @@ -1 +1 @@ -0.12.0-M0-43-1ba6f1 \ No newline at end of file +0.11.11