Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor handling of ProxyStreams and Client-Server Locks #3368

Merged
merged 33 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion build.sc
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ object Deps {
val jnaPlatform = ivy"net.java.dev.jna:jna-platform:${jnaVersion}"

val junitInterface = ivy"com.github.sbt:junit-interface:0.13.3"
val commonsIO = ivy"commons-io:commons-io:2.16.1"
val lambdaTest = ivy"de.tototec:de.tobiasroeser.lambdatest:0.8.0"
val log4j2Core = ivy"org.apache.logging.log4j:log4j-core:2.23.1"
val osLib = ivy"com.lihaoyi::os-lib:0.10.3"
Expand Down Expand Up @@ -753,7 +754,11 @@ object main extends MillStableScalaModule with BuildInfo {
def ivyDeps = Agg(Deps.junixsocket)

object test extends JavaModuleTests with TestModule.Junit4 {
def ivyDeps = Agg(Deps.junitInterface, Deps.lambdaTest)
def ivyDeps = Agg(
Deps.junitInterface,
Deps.lambdaTest,
Deps.commonsIO
)
}
}

Expand Down
25 changes: 6 additions & 19 deletions main/client/src/mill/main/client/MillServerLauncher.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,7 @@ public static int run(
Util.writeMap(env, f);
}

boolean serverInit = false;
if (locks.processLock.probe()) {
serverInit = true;
initServer.run();
}
if (locks.processLock.probe()) initServer.run();

while (locks.processLock.probe()) Thread.sleep(3);

Expand All @@ -155,25 +151,16 @@ public static int run(

InputStream outErr = ioSocket.getInputStream();
OutputStream in = ioSocket.getOutputStream();
ProxyStreamPumper outPump = new ProxyStreamPumper(outErr, stdout, stderr);
ProxyStream.Pumper outPumper = new ProxyStream.Pumper(outErr, stdout, stderr);
InputPumper inPump = new InputPumper(() -> stdin, () -> in, true);
Thread outThread = new Thread(outPump, "outPump");
outThread.setDaemon(true);
Thread outPumperThread = new Thread(outPumper, "outPump");
outPumperThread.setDaemon(true);
Thread inThread = new Thread(inPump, "inPump");
inThread.setDaemon(true);
outThread.start();
outPumperThread.start();
inThread.start();

locks.serverLock.await();

// Although the process that the server was running has terminated and the server has sent all the stdout/stderr
// over the unix pipe and released its lock we don't know that all the data has arrived at the client
// The outThread of the ProxyStreamPumper will not close until the socket is closed (so we can't join on it)
// but we also can't close the socket until all the data has arrived. Catch 22. We could signal termination
// in the stream (ProxyOutputStream / ProxyStreamPumper) but that would require a new protocol.
// So we just wait until there has been X ms with no data

outPump.getLastData().waitForSilence(50);
outPumperThread.join();

try {
return Integer.parseInt(Files.readAllLines(Paths.get(lockBase + "/" + ServerFiles.exitCode)).get(0));
Expand Down
34 changes: 0 additions & 34 deletions main/client/src/mill/main/client/ProxyOutputStream.java

This file was deleted.

161 changes: 161 additions & 0 deletions main/client/src/mill/main/client/ProxyStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package mill.main.client;


import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
* Logic to capture a pair of streams (typically stdout and stderr), combining
* them into a single stream, and splitting it back into two streams later while
* preserving ordering. This is useful for capturing stderr and stdout and forwarding
* them to a terminal while strictly preserving the ordering, i.e. users won't see
* exception stack traces and printlns arriving jumbled up and impossible to debug
*
* This works by converting writes from either of the two streams into packets of
* the form:
*
* 1 byte n bytes
* | header | body |
*
* Where header is a single byte of the form:
*
* - header more than 0 indicating that this packet is for the `OUT` stream
* - header less than 0 indicating that this packet is for the `ERR` stream
* - abs(header) indicating the length of the packet body, in bytes
* - header == 0 indicating the end of the stream
*
* Writes to either of the two `Output`s are synchronized on the shared
* `destination` stream, ensuring that they always arrive complete and without
* interleaving. On the other side, a `Pumper` reads from the combined
* stream, forwards each packet to its respective destination stream, or terminates
* when it hits a packet with `header == 0`
*/
public class ProxyStream{

public static final int OUT = 1;
public static final int ERR = -1;
public static final int END = 0;

public static void sendEnd(OutputStream out) throws IOException {
synchronized(out){
out.write(ProxyStream.END);
out.flush();
}
}

public static class Output extends java.io.OutputStream {
private java.io.OutputStream destination;
private int key;

public Output(java.io.OutputStream out, int key){
this.destination = out;
this.key = key;
}

@Override public void write(int b) throws IOException {
synchronized (destination){
destination.write(key);
destination.write(b);
}
}

@Override public void write(byte[] b) throws IOException {
if (b.length > 0) {
synchronized (destination) {
write(b, 0, b.length);
}
}
}

@Override public void write(byte[] b, int off, int len) throws IOException {

synchronized (destination) {
int i = 0;
while (i < len && i + off < b.length) {
int chunkLength = Math.min(len - i, 127);
if (chunkLength > 0) {
destination.write(chunkLength * key);
destination.write(b, off + i, Math.min(b.length - off - i, chunkLength));
i += chunkLength;
}
}
}
}

@Override public void flush() throws IOException {
synchronized (destination) {
destination.flush();
}
}

@Override public void close() throws IOException {
synchronized (destination) {
destination.close();
}
}
}

public static class Pumper implements Runnable{
private InputStream src;
private OutputStream destOut;
private OutputStream destErr;
public Pumper(InputStream src, OutputStream destOut, OutputStream destErr){
this.src = src;
this.destOut = destOut;
this.destErr = destErr;
}

public void run() {

byte[] buffer = new byte[1024];
while (true) {
try {
int header = src.read();
// -1 means socket was closed, 0 means a ProxyStream.END was sent. Note
// that only header values > 0 represent actual data to read:
// - sign((byte)header) represents which stream the data should be sent to
// - abs((byte)header) represents the length of the data to read and send
if (header == -1 || header == 0) break;
else {
int stream = (byte) header > 0 ? 1 : -1;
int quantity0 = (byte) header;
int quantity = Math.abs(quantity0);
int offset = 0;
int delta = -1;
while (offset < quantity) {
delta = src.read(buffer, offset, quantity - offset);
if (delta == -1) {
break;
} else {
offset += delta;
}
}

if (delta != -1) {
switch(stream){
case ProxyStream.OUT: destOut.write(buffer, 0, offset); break;
case ProxyStream.ERR: destErr.write(buffer, 0, offset); break;
}

flush();
}
}
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}

try {
destOut.close();
destErr.close();
} catch(IOException e) {}
}

public void flush() throws IOException {
destOut.flush();
destErr.flush();
}
}
}
63 changes: 0 additions & 63 deletions main/client/src/mill/main/client/ProxyStreamPumper.java

This file was deleted.

18 changes: 9 additions & 9 deletions main/client/src/mill/main/client/ServerFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ public class ServerFiles {
final public static String sandbox = "sandbox";

/**
* Lock file used to ensure a single server is running in a particular
* mill-worker folder.
* Ensures only a single client is manipulating each mill-worker folder at
* a time, either spawning the server or submitting a command. Also used by
* the server to detect when a client disconnects, so it can terminate execution
*/
final public static String processLock = "processLock";


final public static String clientLock = "clientLock";


final public static String serverLock = "serverLock";


/**
* Lock file ensuring a single server is running in a particular mill-worker
* folder. If multiple servers are spawned in the same folder, only one takes
* the lock and the others fail to do so and terminate immediately.
*/
final public static String processLock = "processLock";

/**
* The pipe by which the client snd server exchange IO
Expand Down
23 changes: 0 additions & 23 deletions main/client/src/mill/main/client/WaitForSilence.java

This file was deleted.

Loading