diff --git a/cmd/gapit/status.go b/cmd/gapit/status.go index 721ad19418..8bab21bc17 100644 --- a/cmd/gapit/status.go +++ b/cmd/gapit/status.go @@ -27,6 +27,7 @@ import ( "time" "github.com/google/gapid/core/app" + "github.com/google/gapid/core/app/crash" "github.com/google/gapid/core/event/task" "github.com/google/gapid/core/log" "github.com/google/gapid/gapis/service" @@ -75,6 +76,26 @@ func readableBytes(nBytes uint64) string { } } +func newTask(id, parent uint64, name string, background bool) *tsk { + return &tsk{ + id: id, + parentID: parent, + name: name, + background: background, + children: map[uint64]*tsk{}, + } +} + +type tsk struct { + id uint64 + parentID uint64 + name string + background bool + progress int32 + blocked bool + children map[uint64]*tsk +} + func (verb *statusVerb) Run(ctx context.Context, flags flag.FlagSet) error { client, err := getGapis(ctx, verb.Gapis, GapirFlags{}) if err != nil { @@ -88,16 +109,6 @@ func (verb *statusVerb) Run(ctx context.Context, flags flag.FlagSet) error { statusMutex := sync.Mutex{} - type tsk struct { - id uint64 - parentID uint64 - name string - background bool - progress int32 - blocked bool - children map[uint64]*tsk - } - ancestors := make(map[uint64][]uint64) activeTasks := make(map[uint64]*tsk) totalBlocked := 0 @@ -171,163 +182,138 @@ func (verb *statusVerb) Run(ctx context.Context, flags flag.FlagSet) error { }) defer stopPolling() - endStat, err := client.Status(ctx, - time.Duration(verb.MemoryUpdateInterval/2)*time.Millisecond, - time.Duration(verb.StatusUpdateInterval/2)*time.Millisecond, - func(tu *service.TaskUpdate) { - statusMutex.Lock() - defer statusMutex.Unlock() + ec := make(chan error) + ctx, cancel := context.WithCancel(ctx) + crash.Go(func() { + err := client.Status(ctx, + time.Duration(verb.MemoryUpdateInterval/2)*time.Millisecond, + time.Duration(verb.StatusUpdateInterval/2)*time.Millisecond, + func(tu *service.TaskUpdate) { + statusMutex.Lock() + defer statusMutex.Unlock() - if tu.Status == service.TaskStatus_STARTING { - // If this is a top-level task, add it to our list of top-level tasks. - if tu.Parent == 0 { - activeTasks[tu.Id] = &tsk{ - tu.Id, - 0, - tu.Name, - tu.Background, - 0, - false, - make(map[uint64]*tsk), - } - } else { - if p, ok := ancestors[tu.Parent]; ok { - // If we can find this tasks parent, then add it in the tree - if a := findTask(activeTasks, append(p, tu.Parent)); a != nil { - a.children[tu.Id] = &tsk{ + if tu.Status == service.TaskStatus_STARTING { + // If this is a top-level task, add it to our list of top-level tasks. + if tu.Parent == 0 { + activeTasks[tu.Id] = newTask(tu.Id, 0, tu.Name, tu.Background) + } else { + if p, ok := ancestors[tu.Parent]; ok { + // If we can find this tasks parent, then add it in the tree + if a := findTask(activeTasks, append(p, tu.Parent)); a != nil { + a.children[tu.Id] = newTask(tu.Id, tu.Parent, tu.Name, tu.Background || a.background) + ans := append([]uint64{}, ancestors[tu.Parent]...) + ancestors[tu.Id] = append(ans, tu.Parent) + } else { + // If we don't have the parent for whatever reason, + // treat this as a top-level task. + activeTasks[tu.Id] = newTask( + tu.Id, + 0, + tu.Name, + tu.Background) + } + } else if a, ok := activeTasks[tu.Parent]; ok { + // If the parent of this task is a top level task, + // then add it there. + a.children[tu.Id] = newTask( tu.Id, tu.Parent, tu.Name, - tu.Background || a.background, - 0, - false, - make(map[uint64]*tsk), - } + tu.Background || a.background) ans := append([]uint64{}, ancestors[tu.Parent]...) ancestors[tu.Id] = append(ans, tu.Parent) } else { - // If we don't have the parent for whatever reason, - // treat this as a top-level task. - activeTasks[tu.Id] = &tsk{ + // Fallback to adding this as its own top-level task. + activeTasks[tu.Id] = newTask( tu.Id, 0, tu.Name, - tu.Background, - 0, - false, - make(map[uint64]*tsk), + tu.Background) + } + } + } else if tu.Status == service.TaskStatus_FINISHED { + // Remove this from all parents. + // Make sure to fix up our "totalBlocked" if our + // blocked task finished. + loc := []uint64{} + if a, ok := ancestors[tu.Id]; ok { + loc = a + } + loc = append(loc, tu.Id) + forLineage(activeTasks, loc, func(t *tsk) { + if t.blocked { + if totalBlocked > 0 { + totalBlocked-- } } - } else if a, ok := activeTasks[tu.Parent]; ok { - // If the parent of this task is a top level task, - // then add it there. - a.children[tu.Id] = &tsk{ - tu.Id, - tu.Parent, - tu.Name, - tu.Background || a.background, - 0, - false, - make(map[uint64]*tsk), + }) + if len(loc) > 1 { + // Find the parent, and remove us + if t := findTask(activeTasks, loc[:len(loc)-1]); t != nil { + delete(t.children, tu.Id) } - ans := append([]uint64{}, ancestors[tu.Parent]...) - ancestors[tu.Id] = append(ans, tu.Parent) } else { - // Fallback to adding this as its own top-level task. - activeTasks[tu.Id] = &tsk{ - tu.Id, - 0, - tu.Name, - tu.Background, - 0, - false, - make(map[uint64]*tsk), - } + delete(activeTasks, tu.Id) } - } - } else if tu.Status == service.TaskStatus_FINISHED { - // Remove this from all parents. - // Make sure to fix up our "totalBlocked" if our - // blocked task finished. - loc := []uint64{} - if a, ok := ancestors[tu.Id]; ok { - loc = a - } - loc = append(loc, tu.Id) - forLineage(activeTasks, loc, func(t *tsk) { - if t.blocked { + } else if tu.Status == service.TaskStatus_PROGRESS { + // Simply update the progress for our task + loc := []uint64{} + if a, ok := ancestors[tu.Id]; ok { + loc = a + } + loc = append(loc, tu.Id) + if a := findTask(activeTasks, loc); a != nil { + a.progress = tu.CompletePercent + } + } else if tu.Status == service.TaskStatus_BLOCKED { + // If a task becomes blocked, then we should block + // it and all of its ancestors. + loc := []uint64{} + if a, ok := ancestors[tu.Id]; ok { + loc = a + } + loc = append(loc, tu.Id) + forLineage(activeTasks, loc, func(t *tsk) { + totalBlocked++ + t.blocked = true + }) + } else if tu.Status == service.TaskStatus_UNBLOCKED { + // If a task becomes unblocked, then we should unblock + // it and all of its ancestors. + loc := []uint64{} + if a, ok := ancestors[tu.Id]; ok { + loc = a + } + loc = append(loc, tu.Id) + forLineage(activeTasks, loc, func(t *tsk) { if totalBlocked > 0 { totalBlocked-- } - } - }) - if len(loc) > 1 { - // Find the parent, and remove us - if t := findTask(activeTasks, loc[:len(loc)-1]); t != nil { - delete(t.children, tu.Id) - } - } else { - delete(activeTasks, tu.Id) - } - } else if tu.Status == service.TaskStatus_PROGRESS { - // Simply update the progress for our task - loc := []uint64{} - if a, ok := ancestors[tu.Id]; ok { - loc = a - } - loc = append(loc, tu.Id) - if a := findTask(activeTasks, loc); a != nil { - a.progress = tu.CompletePercent + t.blocked = false + }) + } else if tu.Status == service.TaskStatus_EVENT { + fmt.Printf("EVENT--> %+v\n", tu.Event) } - } else if tu.Status == service.TaskStatus_BLOCKED { - // If a task becomes blocked, then we should block - // it and all of its ancestors. - loc := []uint64{} - if a, ok := ancestors[tu.Id]; ok { - loc = a + }, func(tu *service.MemoryStatus) { + if tu.TotalHeap > maxMemoryUsage { + maxMemoryUsage = tu.TotalHeap } - loc = append(loc, tu.Id) - forLineage(activeTasks, loc, func(t *tsk) { - totalBlocked++ - t.blocked = true - }) - } else if tu.Status == service.TaskStatus_UNBLOCKED { - // If a task becomes unblocked, then we should unblock - // it and all of its ancestors. - loc := []uint64{} - if a, ok := ancestors[tu.Id]; ok { - loc = a - } - loc = append(loc, tu.Id) - forLineage(activeTasks, loc, func(t *tsk) { - if totalBlocked > 0 { - totalBlocked-- - } - t.blocked = false - }) - } else if tu.Status == service.TaskStatus_EVENT { - fmt.Printf("EVENT--> %+v\n", tu.Event) - } - }, func(tu *service.MemoryStatus) { - if tu.TotalHeap > maxMemoryUsage { - maxMemoryUsage = tu.TotalHeap - } - currentMemoryUsage = tu.TotalHeap - }) - if err != nil { - return log.Err(ctx, err, "Failed to connect to the GAPIS status stream") - } - defer endStat() + currentMemoryUsage = tu.TotalHeap + }) + ec <- err + }) - var wait sync.WaitGroup - wait.Add(1) var sigChan chan os.Signal sigChan = make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt) - go func() { - <-sigChan - wait.Done() - }() - wait.Wait() + + select { + case <-sigChan: + cancel() + case err := <-ec: + if err != nil { + return log.Err(ctx, err, "Failed to connect to the GAPIS status stream") + } + } return nil } diff --git a/gapic/src/main/com/google/gapid/MainWindow.java b/gapic/src/main/com/google/gapid/MainWindow.java index 566b7ccead..e0de5d2e27 100644 --- a/gapic/src/main/com/google/gapid/MainWindow.java +++ b/gapic/src/main/com/google/gapid/MainWindow.java @@ -43,6 +43,7 @@ import com.google.gapid.util.MacApplication; import com.google.gapid.util.Messages; import com.google.gapid.util.OS; +import com.google.gapid.util.StatusWatcher; import com.google.gapid.util.UpdateWatcher; import com.google.gapid.views.StatusBar; import com.google.gapid.widgets.CopyPaste; @@ -76,7 +77,7 @@ public class MainWindow extends ApplicationWindow { private final Theme theme; private Composite mainArea; private LoadingScreen loadingScreen; - private StatusBar statusBar; + protected StatusBar statusBar; public MainWindow(Settings settings, Theme theme) { super(null); @@ -94,6 +95,7 @@ public void showLoadingMessage(String status) { public void initMainUi(Client client, Models models, Widgets widgets) { Shell shell = getShell(); + showLoadingMessage("Setting up UI..."); initMenus(client, models, widgets); LoadablePanel mainUi = new LoadablePanel( @@ -125,8 +127,16 @@ public void onCaptureLoaded(Message error) { file -> models.capture.loadCapture(new File(file))); } + if (settings.autoCheckForUpdates) { + // Only show the status message if we're actually checking for updates. watchForUpdates only + //schedules a periodic check to see if we should check for updates and if so, checks. + showLoadingMessage("Watching for updates..."); + } watchForUpdates(client, models); + showLoadingMessage("Tracking server status..."); + trackServerStatus(client); + showLoadingMessage("Ready! Please open or capture a trace file."); } @@ -140,6 +150,20 @@ private void watchForUpdates(Client client, Models models) { }); } + private void trackServerStatus(Client client) { + new StatusWatcher(client, new StatusWatcher.Listener() { + @Override + public void onStatus(String status) { + scheduleIfNotDisposed(statusBar, () -> statusBar.setServerStatus(status)); + } + + @Override + public void onHeap(long heap) { + scheduleIfNotDisposed(statusBar, () -> statusBar.setServerHeapSize(heap)); + } + }); + } + @Override protected void configureShell(Shell shell) { shell.setText(Messages.WINDOW_TITLE); @@ -160,7 +184,7 @@ protected Control createContents(Composite shell) { loadingScreen = new LoadingScreen(mainArea, theme); setTopControl(loadingScreen); - statusBar = new StatusBar(parent); + statusBar = new StatusBar(parent, theme); statusBar.setLayoutData(new GridData(SWT.FILL, SWT.BOTTOM, true, false)); return parent; } diff --git a/gapic/src/main/com/google/gapid/server/Client.java b/gapic/src/main/com/google/gapid/server/Client.java index 032bd2e1ce..6f723f8a27 100644 --- a/gapic/src/main/com/google/gapid/server/Client.java +++ b/gapic/src/main/com/google/gapid/server/Client.java @@ -251,6 +251,15 @@ public ListenableFuture streamLog(Consumer onLogMessage) { return client.streamLog(onLogMessage); } + public ListenableFuture streamStatus( + float memoryS, float statusS, Consumer onStatus) { + LOG.log(FINE, "RPC->streamStatus({}, {})", new Object[] { memoryS, statusS }); + return client.streamStatus(Service.ServerStatusRequest.newBuilder() + .setMemorySnapshotInterval(memoryS) + .setStatusUpdateFrequency(statusS) + .build(), onStatus); + } + public ListenableFuture streamSearch( Service.FindRequest request, Consumer onResult) { LOG.log(FINE, "RPC->find({0})", request); diff --git a/gapic/src/main/com/google/gapid/server/GapidClient.java b/gapic/src/main/com/google/gapid/server/GapidClient.java index a2adc02d87..24215428d0 100644 --- a/gapic/src/main/com/google/gapid/server/GapidClient.java +++ b/gapic/src/main/com/google/gapid/server/GapidClient.java @@ -61,6 +61,8 @@ public ListenableFuture updateSettings( Service.UpdateSettingsRequest request); public ListenableFuture streamLog(Consumer onLogMessage); + public ListenableFuture streamStatus( + Service.ServerStatusRequest request, Consumer onStatus); public ListenableFuture streamSearch( Service.FindRequest request, Consumer onResult); public StreamSender streamTrace( diff --git a/gapic/src/main/com/google/gapid/server/GapidClientGrpc.java b/gapic/src/main/com/google/gapid/server/GapidClientGrpc.java index e97f0ef781..a2668de9a6 100644 --- a/gapic/src/main/com/google/gapid/server/GapidClientGrpc.java +++ b/gapic/src/main/com/google/gapid/server/GapidClientGrpc.java @@ -169,6 +169,15 @@ public ListenableFuture streamLog(Consumer onLogMessage) { return handler.future; } + @Override + public ListenableFuture streamStatus( + Service.ServerStatusRequest request, Consumer onStatus) { + StreamHandler handler = StreamHandler.wrap(onStatus); + stub.status(request, handler); + return handler.future; + } + + @Override public ListenableFuture streamSearch( Service.FindRequest request, Consumer onResult) { diff --git a/gapic/src/main/com/google/gapid/util/StatusWatcher.java b/gapic/src/main/com/google/gapid/util/StatusWatcher.java new file mode 100644 index 0000000000..a1311fe475 --- /dev/null +++ b/gapic/src/main/com/google/gapid/util/StatusWatcher.java @@ -0,0 +1,197 @@ +/* + * Copyright (C) 2017 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.gapid.util; + +import com.google.common.collect.Maps; +import com.google.gapid.proto.service.Service; +import com.google.gapid.server.Client; + +import java.util.HashMap; +import java.util.LinkedHashMap; + +/** + * Utility class for monitoring the server status. + */ +public class StatusWatcher { + private static final float STATUS_UPDATE_INTERVAL_S = 0.5f; + private static final float MEMORY_UPDATE_INTERVAL_S = 1.0f; + + private final Listener listener; + private final Task root = Task.newRoot(); + private final HashMap tasks = Maps.newHashMap(); + private String shownSummary = ""; + + public StatusWatcher(Client client, Listener listener) { + this.listener = listener; + + client.streamStatus(MEMORY_UPDATE_INTERVAL_S, STATUS_UPDATE_INTERVAL_S, update -> { + switch (update.getResCase()) { + case TASK: + onTaskUpdate(update.getTask()); + break; + case MEMORY: + onMemoryUpdate(update.getMemory()); + break; + default: + // Ignore. + } + }); + } + + private void onTaskUpdate(Service.TaskUpdate update) { + String summary; + synchronized (this) { + switch(update.getStatus()) { + case STARTING: + Task parent = tasks.getOrDefault(update.getParent(), root); + Task child = new Task(update, parent); + tasks.put(update.getId(), child); + parent.addChild(child); + break; + case FINISHED: + tasks.getOrDefault(update.getId(), root).remove(); + break; + case PROGRESS: + tasks.getOrDefault(update.getId(), root).setProgress(update.getCompletePercent()); + break; + case BLOCKED: + tasks.getOrDefault(update.getId(), root).setBlocked(true); + break; + case UNBLOCKED: + tasks.getOrDefault(update.getId(), root).setBlocked(false); + break; + default: + // Ignore. + return; + } + + summary = root.getFirstChild(Task.State.RUNNING).getStatusLabel(); + if (shownSummary.equals(summary)) { + return; + } + shownSummary = summary; + } + + listener.onStatus(summary); + } + + private void onMemoryUpdate(Service.MemoryStatus update) { + listener.onHeap(update.getTotalHeap()); + } + + public static interface Listener { + public void onStatus(String status); + public void onHeap(long heap); + } + + private static class Task { + private final long id; + private final Task parent; + private final String name; + private final LinkedHashMap children = Maps.newLinkedHashMap(); + private State state; + private int progress; + + public Task(long id, Task parent, String name, State state, int progress) { + this.id = id; + this.parent = parent; + this.name = name; + this.state = state; + this.progress = progress; + } + + public Task(Service.TaskUpdate update, Task parent) { + this(update.getId(), parent, update.getName(), getState(update), update.getCompletePercent()); + } + + private static State getState(Service.TaskUpdate update) { + return (update.getBackground()) ? State.BACKGROUND : State.RUNNING; + } + + public static Task newRoot() { + return new Task(-1, null, null, null, 0) { + @Override + public void setBlocked(boolean newVal) { + // Don't do anything. + } + + @Override + public void remove() { + // Don't do anything. + } + + @Override + public String getLabel() { + return ""; + } + }; + } + + public void setBlocked(boolean blocked){ + if (state != State.BACKGROUND) { + state = blocked ? State.BLOCKED: State.RUNNING; + parent.setBlocked(blocked); + } + } + + public void setProgress(int progress) { + this.progress = progress; + } + + public void addChild(Task child) { + children.put(child.id, child); + } + + public void remove() { + parent.children.remove(id); + } + + public Task getFirstChild(State inState) { + for (Task child : children.values()) { + if (child.state == inState) { + return child; + } + } + return this; + } + + public Task getLeftMostDecendant(State inState) { + for (Task child : children.values()) { + if (child.state == inState) { + return child.getLeftMostDecendant(inState); + } + } + return this; + } + + public String getLabel() { + return (progress == 0) ? name : name + "<" + progress + "%>"; + } + + public String getStatusLabel() { + Task leaf = getLeftMostDecendant(State.RUNNING); + if (leaf == this) { + return getLabel(); + } else { + return getLabel() + " ... " + leaf.getLabel(); + } + } + + private static enum State { + BACKGROUND, RUNNING, BLOCKED, + } + } +} diff --git a/gapic/src/main/com/google/gapid/views/StatusBar.java b/gapic/src/main/com/google/gapid/views/StatusBar.java index 7e315e9619..1b918345f5 100644 --- a/gapic/src/main/com/google/gapid/views/StatusBar.java +++ b/gapic/src/main/com/google/gapid/views/StatusBar.java @@ -15,13 +15,24 @@ */ package com.google.gapid.views; +import static com.google.gapid.widgets.Widgets.createComposite; +import static com.google.gapid.widgets.Widgets.createLabel; +import static com.google.gapid.widgets.Widgets.createLink; +import static com.google.gapid.widgets.Widgets.filling; +import static com.google.gapid.widgets.Widgets.withLayoutData; import static com.google.gapid.widgets.Widgets.withMargin; -import com.google.gapid.widgets.Widgets; +import com.google.gapid.widgets.Theme; import org.eclipse.swt.SWT; +import org.eclipse.swt.graphics.GC; +import org.eclipse.swt.graphics.Point; +import org.eclipse.swt.graphics.Rectangle; import org.eclipse.swt.layout.GridData; import org.eclipse.swt.layout.GridLayout; +import org.eclipse.swt.layout.RowData; +import org.eclipse.swt.layout.RowLayout; +import org.eclipse.swt.widgets.Canvas; import org.eclipse.swt.widgets.Composite; import org.eclipse.swt.widgets.Label; import org.eclipse.swt.widgets.Link; @@ -30,24 +41,31 @@ * Displays status information at the bottom of the main window. */ public class StatusBar extends Composite { - private final Label status; + private final Composite serverStatus; + private final HeapStatus heap; + private final Label server; private final Link notification; - private Runnable onNotificationClick; + private Runnable onNotificationClick = null; - public StatusBar(Composite parent) { + public StatusBar(Composite parent, Theme theme) { super(parent, SWT.NONE); setLayout(withMargin(new GridLayout(2, false), 0, 0)); - status = Widgets.createLabel(this, ""); - status.setLayoutData(new GridData(SWT.LEFT, SWT.FILL, true, false)); + serverStatus = withLayoutData( + createComposite(this, filling(new RowLayout(SWT.HORIZONTAL), true, false)), + new GridData(SWT.LEFT, SWT.FILL, true, false)); + createLabel(serverStatus, "Server:"); + heap = new HeapStatus(serverStatus, theme); + withLayoutData(new Label(serverStatus, SWT.SEPARATOR | SWT.VERTICAL), new RowData(SWT.DEFAULT, 1)); + server = createLabel(serverStatus, ""); + serverStatus.setVisible(false); - notification = Widgets.createLink(this, "", (e) -> { + notification = withLayoutData(createLink(this, "", $ -> { if (onNotificationClick != null) { onNotificationClick.run(); } - }); - notification.setLayoutData(new GridData(SWT.RIGHT, SWT.FILL, false, false)); + }), new GridData(SWT.RIGHT, SWT.FILL, false, false)); } /** @@ -61,4 +79,79 @@ public void setNotification(String text, Runnable onClick) { onNotificationClick = onClick; layout(); } + + public void setServerStatus(String text) { + serverStatus.setVisible(true); + server.setText(text); + layout(); + } + + public void setServerHeapSize(long heapSize) { + serverStatus.setVisible(true); + heap.setHeap(heapSize); + layout(); + } + + private static class HeapStatus extends Canvas { + private static final int PADDING = 2; + + private long heap = 0; + private long max = 1; + private String label = ""; + private int maxMeasuredWidth; + + public HeapStatus(Composite parent, Theme theme) { + super(parent, SWT.NONE); + + addListener(SWT.Paint, e -> { + Rectangle ca = getClientArea(); + e.gc.setBackground(theme.statusBarMemoryBar()); + e.gc.fillRectangle(0, 0, (int)(ca.width * heap / max), ca.height); + + Point ts = e.gc.stringExtent(label); + e.gc.drawText( + label, ca.width - PADDING - ts.x, (ca.height - ts.y) / 2, SWT.DRAW_TRANSPARENT); + }); + } + + public void setHeap(long newHeap) { + heap = newHeap; + max = Math.max(max, newHeap); + label = bytesToHuman(newHeap) + " of " + bytesToHuman(max); + redraw(); + } + + @Override + public Point computeSize(int wHint, int hHint, boolean changed) { + Point result; + if (label.isEmpty()) { + result = new Point(0, 0); + } else { + GC gc = new GC(this); + result = gc.stringExtent(label); + gc.dispose(); + maxMeasuredWidth = result.x = Math.max(maxMeasuredWidth, result.x); + } + + if (wHint != SWT.DEFAULT) { + result.x = wHint; + } else { + result.x += 2 * PADDING; + } + if (hHint != SWT.DEFAULT) { + result.y = hHint; + } + return result; + } + + private static String bytesToHuman(long bytes) { + long mb = bytes >> 20; // The heap is never smaller than 4MB. + if (mb > 1024) { + // Show GBs with a decimal. + return String.format("%.1fGB", mb / 1024.0); + } else { + return mb + "MB"; + } + } + } } diff --git a/gapic/src/main/com/google/gapid/widgets/Theme.java b/gapic/src/main/com/google/gapid/widgets/Theme.java index 8ef62e1e5a..d128222e31 100644 --- a/gapic/src/main/com/google/gapid/widgets/Theme.java +++ b/gapic/src/main/com/google/gapid/widgets/Theme.java @@ -132,6 +132,8 @@ public interface Theme { @RGB(argb = 0x80000000) public Color histogramCurtain(); @RGB(argb = 0xc0202020) public Color histogramArrow(); + @RGB(argb = 0xffcccccc) public Color statusBarMemoryBar(); + @TextStyle(foreground = 0xa9a9a9) public Styler structureStyler(); @TextStyle(foreground = 0x0000ee) public Styler identifierStyler(); @TextStyle(bold = true) public Styler labelStyler(); diff --git a/gapis/client/client.go b/gapis/client/client.go index c4745a4d58..8e2eb3a89c 100644 --- a/gapis/client/client.go +++ b/gapis/client/client.go @@ -182,49 +182,35 @@ func (c *client) Profile( } func (c *client) Status( - ctx context.Context, snapshotInterval time.Duration, statusUpdateFrequency time.Duration, f func(*service.TaskUpdate), m func(*service.MemoryStatus)) (stop func() error, err error) { + ctx context.Context, snapshotInterval time.Duration, statusUpdateFrequency time.Duration, f func(*service.TaskUpdate), m func(*service.MemoryStatus)) error { - stream, err := c.client.Status(ctx) - if err != nil { - return nil, err - } - - req := &service.ServerStatusRequest{Enable: true, MemorySnapshotInterval: float32(snapshotInterval.Seconds()), StatusUpdateFrequency: float32(statusUpdateFrequency.Seconds())} + req := &service.ServerStatusRequest{MemorySnapshotInterval: float32(snapshotInterval.Seconds()), StatusUpdateFrequency: float32(statusUpdateFrequency.Seconds())} - if err := stream.Send(req); err != nil { - return nil, err + stream, err := c.client.Status(ctx, req) + if err != nil { + return err } - waitForEOF := task.Async(ctx, func(ctx context.Context) error { - for { - r, err := stream.Recv() - if err != nil { - if errors.Cause(err) == io.EOF { - return nil - } - return err - } - if _, ok := r.Res.(*service.ServerStatusResponse_Task); ok { - if f != nil { - f(r.GetTask()) - } - } else if _, ok := r.Res.(*service.ServerStatusResponse_Memory); ok { - if m != nil { - m(r.GetMemory()) - } + for { + r, err := stream.Recv() + if err != nil { + if errors.Cause(err) == io.EOF { + return nil } - } - }) - - stop = func() error { - // Tell the server we want to stop profiling. - if err := stream.Send(&service.ServerStatusRequest{}); err != nil { return err } - return waitForEOF() + if _, ok := r.Res.(*service.ServerStatusResponse_Task); ok { + if f != nil { + f(r.GetTask()) + } + } else if _, ok := r.Res.(*service.ServerStatusResponse_Memory); ok { + if m != nil { + m(r.GetMemory()) + } + } } - return stop, nil + return nil } func (c *client) GetPerformanceCounters(ctx context.Context) (string, error) { diff --git a/gapis/replay/scheduler/scheduler.go b/gapis/replay/scheduler/scheduler.go index 1e3de7416d..814e5de073 100644 --- a/gapis/replay/scheduler/scheduler.go +++ b/gapis/replay/scheduler/scheduler.go @@ -105,7 +105,7 @@ func (s *Scheduler) Schedule(ctx context.Context, t Task, b Batch) (val interfac } func (s *Scheduler) run(ctx context.Context) { - ctx = status.Start(ctx, "Replay Scheduler") + ctx = status.StartBackground(ctx, "Replay Scheduler") defer status.Finish(ctx) bins := map[Batch]*bin{} diff --git a/gapis/server/grpc.go b/gapis/server/grpc.go index 5131c6597f..e735446d22 100644 --- a/gapis/server/grpc.go +++ b/gapis/server/grpc.go @@ -330,51 +330,39 @@ func (s *grpcServer) Profile(stream service.Gapid_ProfileServer) error { } } -func (s *grpcServer) Status(stream service.Gapid_StatusServer) error { - defer s.inRPC()() - ctx := s.bindCtx(stream.Context()) - - // stop stops any running status requests - stop := func() error { return nil } - defer stop() - for { - // Grab an incoming request. - req, err := stream.Recv() - if err != nil { - return err - } - - // If there are no profile modes in the request, then the RPC can finish. - if !req.Enable { - break - } - if stop != nil { - stop() - } +func (s *grpcServer) Status(req *service.ServerStatusRequest, stream service.Gapid_StatusServer) error { + // defer s.inRPC()() -- don't consider the log stream an inflight RPC. + ctx, cancel := task.WithCancel(stream.Context()) + defer s.addInterrupter(cancel)() - f := func(t *service.TaskUpdate) { - stream.Send(&service.ServerStatusResponse{ - Res: &service.ServerStatusResponse_Task{t}, - }) + c := make(chan error) + f := func(t *service.TaskUpdate) { + if err := stream.Send(&service.ServerStatusResponse{ + Res: &service.ServerStatusResponse_Task{t}, + }); err != nil { + c <- err + cancel() } - m := func(t *service.MemoryStatus) { - stream.Send(&service.ServerStatusResponse{ - Res: &service.ServerStatusResponse_Memory{t}, - }) + } + m := func(t *service.MemoryStatus) { + if err := stream.Send(&service.ServerStatusResponse{ + Res: &service.ServerStatusResponse_Memory{t}, + }); err != nil { + c <- err + cancel() } + } + err := s.handler.Status(s.bindCtx(ctx), + time.Duration(float32(time.Second)*req.MemorySnapshotInterval), + time.Duration(float32(time.Second)*req.StatusUpdateFrequency), + f, m) - // Start the profile. - stop, err = s.handler.Status(ctx, - time.Duration(float32(time.Second)*req.MemorySnapshotInterval), - time.Duration(float32(time.Second)*req.StatusUpdateFrequency), - f, - m, - ) - if err != nil { - return err + if err == nil { + select { + case err = <-c: } } - return nil + return err } func (s *grpcServer) GetPerformanceCounters(ctx xctx.Context, req *service.GetPerformanceCountersRequest) (*service.GetPerformanceCountersResponse, error) { diff --git a/gapis/server/server.go b/gapis/server/server.go index 62b0887736..f77d29d158 100644 --- a/gapis/server/server.go +++ b/gapis/server/server.go @@ -486,6 +486,9 @@ type statusListener struct { } func (l *statusListener) OnTaskStart(ctx context.Context, task *status.Task) { + l.progressMutex.Lock() + defer l.progressMutex.Unlock() + l.f(&service.TaskUpdate{ Status: service.TaskStatus_STARTING, Id: task.ID(), @@ -494,8 +497,6 @@ func (l *statusListener) OnTaskStart(ctx context.Context, task *status.Task) { CompletePercent: 0, Background: task.Background(), }) - l.progressMutex.Lock() - defer l.progressMutex.Unlock() l.lastProgressUpdate[task] = time.Now() } @@ -518,6 +519,9 @@ func (l *statusListener) OnTaskProgress(ctx context.Context, task *status.Task) } func (l *statusListener) OnTaskBlock(ctx context.Context, task *status.Task) { + l.progressMutex.Lock() + defer l.progressMutex.Unlock() + l.f(&service.TaskUpdate{ Status: service.TaskStatus_BLOCKED, Id: task.ID(), @@ -529,6 +533,9 @@ func (l *statusListener) OnTaskBlock(ctx context.Context, task *status.Task) { } func (l *statusListener) OnTaskUnblock(ctx context.Context, task *status.Task) { + l.progressMutex.Lock() + defer l.progressMutex.Unlock() + l.f(&service.TaskUpdate{ Status: service.TaskStatus_UNBLOCKED, Id: task.ID(), @@ -540,6 +547,8 @@ func (l *statusListener) OnTaskUnblock(ctx context.Context, task *status.Task) { } func (l *statusListener) OnTaskFinish(ctx context.Context, task *status.Task) { + l.progressMutex.Lock() + defer l.progressMutex.Unlock() l.f(&service.TaskUpdate{ Status: service.TaskStatus_FINISHED, @@ -549,12 +558,14 @@ func (l *statusListener) OnTaskFinish(ctx context.Context, task *status.Task) { CompletePercent: 100, Background: task.Background(), }) - l.progressMutex.Lock() - defer l.progressMutex.Unlock() delete(l.lastProgressUpdate, task) + } func (l *statusListener) OnEvent(ctx context.Context, task *status.Task, event string, scope status.EventScope) { + l.progressMutex.Lock() + defer l.progressMutex.Unlock() + if task != nil { l.f(&service.TaskUpdate{ Status: service.TaskStatus_EVENT, @@ -579,41 +590,37 @@ func (l *statusListener) OnEvent(ctx context.Context, task *status.Task, event s } func (l *statusListener) OnMemorySnapshot(ctx context.Context, stats runtime.MemStats) { + l.progressMutex.Lock() + defer l.progressMutex.Unlock() + l.m(&service.MemoryStatus{ TotalHeap: stats.Alloc, }) } -func (s *server) Status( - ctx context.Context, - snapshotInterval time.Duration, - statusUpdateFrequency time.Duration, - f func(*service.TaskUpdate), - m func(*service.MemoryStatus)) (func() error, error) { - l := &statusListener{f, m, make(map[*status.Task]time.Time), statusUpdateFrequency, sync.Mutex{}} +func (s *server) Status(ctx context.Context, snapshotInterval, statusInterval time.Duration, f func(*service.TaskUpdate), m func(*service.MemoryStatus)) error { + ctx = status.StartBackground(ctx, "RPC Status") + defer status.Finish(ctx) + ctx = log.Enter(ctx, "Status") + l := &statusListener{f, m, make(map[*status.Task]time.Time), statusInterval, sync.Mutex{}} unregister := status.RegisterListener(l) - stop := func() error { - unregister() - return nil - } + defer unregister() if snapshotInterval > 0 { // Poll the memory. This will block until the context is cancelled. - msi := time.Second * time.Duration(snapshotInterval) stopSnapshot := task.Async(ctx, func(ctx context.Context) error { - return task.Poll(ctx, msi, func(ctx context.Context) error { + return task.Poll(ctx, snapshotInterval, func(ctx context.Context) error { status.SnapshotMemory(ctx) return nil }) }) - oldStop := stop - stop = func() error { - oldStop() - stopSnapshot() - return nil - } + defer stopSnapshot() } - return stop, nil + + select { + case <-task.ShouldStop(ctx): + } + return task.StopReason(ctx) } func (s *server) GetPerformanceCounters(ctx context.Context) (string, error) { diff --git a/gapis/service/service.go b/gapis/service/service.go index 99e5ba9c90..c8375ff463 100644 --- a/gapis/service/service.go +++ b/gapis/service/service.go @@ -140,7 +140,7 @@ type Service interface { snapshotInterval time.Duration, statusUpdateFrequency time.Duration, f func(*TaskUpdate), - m func(*MemoryStatus)) (stop func() error, err error) + m func(*MemoryStatus)) error // GetPerformanceCounters returns the values of all global counters as // a string. diff --git a/gapis/service/service.proto b/gapis/service/service.proto index 6ff7ca92d3..1078485357 100644 --- a/gapis/service/service.proto +++ b/gapis/service/service.proto @@ -245,9 +245,8 @@ message MemoryStatus { } message ServerStatusRequest { - bool enable = 1; - float memory_snapshot_interval = 2; - float status_update_frequency = 3; + float memory_snapshot_interval = 1; + float status_update_frequency = 2; } message ServerStatusResponse { @@ -669,7 +668,7 @@ service Gapid { } // Status returns a stream of Status events that are occuring on the server - rpc Status(stream ServerStatusRequest) returns (stream ServerStatusResponse) { + rpc Status(ServerStatusRequest) returns (stream ServerStatusResponse) { } ///////////////////////////////////////////////////////////////