Skip to content

Commit

Permalink
fix #3615: always requesting bookmarks (#3617)
Browse files Browse the repository at this point in the history
* fix #3615: always requesting bookmarks

* updating the tests and adding a changelog

* updating the changelog for possible mock uri changes

* removing errerant changes
  • Loading branch information
shawkins authored Dec 16, 2021
1 parent c1505ba commit f45ed4f
Show file tree
Hide file tree
Showing 11 changed files with 64 additions and 51 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* Fix #3636: ensure proper handling of LogWatch closure wrt its streams

#### Improvements
* Fix #3615: opt into bookmarks by default
* Fix #3600: add owner references support to HasMetadata

#### Dependency Upgrade
Expand All @@ -23,6 +24,7 @@
* Fix #3593: Add support for Istio extension

#### _**Note**_: Breaking changes in the API
* If you do not wish to receive bookmarks, then set ListOptions.allowWatchBookmarks=false - otherwise all Watches will default to requesting bookmarks. If supported by the api-server, bookmarks will avoid 410 exceptions and keep the watch alive longer. If you are using the mock framework with explicit uris, you may need to update your expected watch endpoints to include the parameter allowWatchBookmarks=true
* Refactoring #3547: due to an abstraction layer added over okHttp, the following api changes were made:
* OperationContext withOkHttpClient was removed, it should be needed to be directly called
* PatchType.getMediaType was replaced with PatchType.getContentType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public abstract class AbstractWatchManager<T extends HasMetadata> implements Wat
private URL requestUrl;

private final AtomicBoolean reconnectPending = new AtomicBoolean(false);

private final boolean receiveBookmarks;

AbstractWatchManager(
Watcher<T> watcher, BaseOperation<T, ?, ?> baseOperation, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, Supplier<HttpClient> clientSupplier
Expand All @@ -77,6 +79,11 @@ public abstract class AbstractWatchManager<T extends HasMetadata> implements Wat
this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion());
this.currentReconnectAttempt = new AtomicInteger(0);
this.forceClosed = new AtomicBoolean();
this.receiveBookmarks = Boolean.TRUE.equals(listOptions.getAllowWatchBookmarks());
// opt into bookmarks by default
if (listOptions.getAllowWatchBookmarks() == null) {
listOptions.setAllowWatchBookmarks(true);
}
this.baseOperation = baseOperation;
this.requestUrl = baseOperation.getNamespacedUrl();
this.listOptions = listOptions;
Expand Down Expand Up @@ -184,6 +191,10 @@ boolean isForceClosed() {
}

void eventReceived(Watcher.Action action, HasMetadata resource) {
if (!receiveBookmarks && action == Action.BOOKMARK) {
// the user didn't ask for bookmarks, just filter them
return;
}
// the WatchEvent deserialization is not specifically typed
// modify the type here if needed
if (resource != null && !baseOperation.getType().isAssignableFrom(resource.getClass())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ public void onClose(WatcherException cause) {
// Then
assertThat(eventReceived).hasValue("{\"kind\":\"Hello\",\"metadata\":{\"name\":\"test\"}}");
assertThat(result).isNotNull();
verify(builder, Mockito.times(1)).uri(URI.create("https://localhost:8443/apis/test.fabric8.io/v1alpha1/hellos?watch=true"));
verify(builder, Mockito.times(1)).uri(URI.create("https://localhost:8443/apis/test.fabric8.io/v1alpha1/hellos?allowWatchBookmarks=true&watch=true"));
}

private void mockCallWithResponse(int code) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ void testStatusUpdate() throws IOException {
@DisplayName("Should be able to watch some resource in a namespace with null name, labelSelector and ListOptions")
void testWatchAllResource() throws IOException, InterruptedException {
// Given
server.expect().withPath("/apis/test.fabric8.io/v1alpha1/namespaces/ns1/hellos?watch=true")
server.expect().withPath("/apis/test.fabric8.io/v1alpha1/namespaces/ns1/hellos?allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(WATCH_EVENT_PERIOD)
Expand Down Expand Up @@ -340,7 +340,7 @@ public void onClose(WatcherException cause) { }
@DisplayName("Should be able to watch some resource in a namespace")
void testWatchAllResourceInNamespace() throws IOException, InterruptedException {
// Given
server.expect().withPath("/apis/test.fabric8.io/v1alpha1/namespaces/ns1/hellos?watch=true")
server.expect().withPath("/apis/test.fabric8.io/v1alpha1/namespaces/ns1/hellos?allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(WATCH_EVENT_PERIOD)
Expand Down Expand Up @@ -369,7 +369,7 @@ public void onClose(WatcherException cause) { }
@DisplayName("Should be able to watch a single resource with some name")
void testWatchSingleResource() throws IOException, InterruptedException {
// Given
server.expect().withPath("/apis/test.fabric8.io/v1alpha1/namespaces/ns1/hellos"+ "?fieldSelector=" + Utils.toUrlEncoded("metadata.name=example-hello")+"&watch=true")
server.expect().withPath("/apis/test.fabric8.io/v1alpha1/namespaces/ns1/hellos"+ "?fieldSelector=" + Utils.toUrlEncoded("metadata.name=example-hello")+"&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(WATCH_EVENT_PERIOD)
Expand Down Expand Up @@ -397,7 +397,7 @@ public void onClose(WatcherException cause) { }
@DisplayName("Should be able to watch with labelSelectors")
void testWatchWithLabels() throws IOException, InterruptedException {
// Given
server.expect().withPath("/apis/test.fabric8.io/v1alpha1/namespaces/ns1/hellos?labelSelector="+ Utils.toUrlEncoded("foo=bar")+ "&watch=true")
server.expect().withPath("/apis/test.fabric8.io/v1alpha1/namespaces/ns1/hellos?labelSelector="+ Utils.toUrlEncoded("foo=bar")+ "&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(WATCH_EVENT_PERIOD)
Expand Down Expand Up @@ -426,7 +426,7 @@ public void onClose(WatcherException cause) { }
void testWatchSomeResourceVersion() throws IOException, InterruptedException {
// Given
String watchResourceVersion = "1001";
server.expect().withPath("/apis/test.fabric8.io/v1alpha1/namespaces/ns1/hellos?resourceVersion=" + watchResourceVersion + "&watch=true")
server.expect().withPath("/apis/test.fabric8.io/v1alpha1/namespaces/ns1/hellos?resourceVersion=" + watchResourceVersion + "&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(WATCH_EVENT_PERIOD)
Expand Down Expand Up @@ -456,7 +456,7 @@ public void onClose(WatcherException cause) { }
void testWatchNamespaceAndSomeResourceVersion() throws IOException, InterruptedException {
// Given
String watchResourceVersion = "1001";
server.expect().withPath("/apis/test.fabric8.io/v1alpha1/namespaces/ns1/hellos?resourceVersion=" + watchResourceVersion + "&watch=true")
server.expect().withPath("/apis/test.fabric8.io/v1alpha1/namespaces/ns1/hellos?resourceVersion=" + watchResourceVersion + "&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(WATCH_EVENT_PERIOD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void testNamespacedPodInformer() throws InterruptedException {

server.expect().withPath("/api/v1/namespaces/test/pods")
.andReturn(200, getList(startResourceVersion, Pod.class)).once();
server.expect().withPath("/api/v1/namespaces/test/pods?resourceVersion=" + startResourceVersion + "&watch=true")
server.expect().withPath("/api/v1/namespaces/test/pods?resourceVersion=" + startResourceVersion + "&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(WATCH_EVENT_EMIT_TIME)
Expand Down Expand Up @@ -160,7 +160,7 @@ void testInformerWithNamespaceAndNameConfigured() throws InterruptedException {

server.expect().withPath("/api/v1/namespaces/test/pods?fieldSelector=" + Utils.toUrlEncoded("metadata.name=pod1"))
.andReturn(200, getList(startResourceVersion, Pod.class)).once();
server.expect().withPath("/api/v1/namespaces/test/pods?fieldSelector=" + Utils.toUrlEncoded("metadata.name=pod1") + "&resourceVersion=" + startResourceVersion + "&watch=true")
server.expect().withPath("/api/v1/namespaces/test/pods?fieldSelector=" + Utils.toUrlEncoded("metadata.name=pod1") + "&resourceVersion=" + startResourceVersion + "&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(WATCH_EVENT_EMIT_TIME)
Expand Down Expand Up @@ -204,7 +204,7 @@ void testAllNamespacedInformer() throws InterruptedException {

server.expect().withPath("/api/v1/pods")
.andReturn(200, new PodListBuilder().withNewMetadata().withResourceVersion(startResourceVersion).endMetadata().withItems(Collections.emptyList()).build()).once();
server.expect().withPath("/api/v1/pods?resourceVersion=" + startResourceVersion + "&watch=true")
server.expect().withPath("/api/v1/pods?resourceVersion=" + startResourceVersion + "&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(WATCH_EVENT_EMIT_TIME)
Expand Down Expand Up @@ -248,7 +248,7 @@ void shouldReconnectInCaseOf410() throws InterruptedException {

server.expect().withPath("/api/v1/pods")
.andReturn(200, new PodListBuilder().withNewMetadata().withResourceVersion(startResourceVersion).endMetadata().withItems(Collections.emptyList()).build()).once();
server.expect().withPath("/api/v1/pods?resourceVersion=" + startResourceVersion + "&watch=true")
server.expect().withPath("/api/v1/pods?resourceVersion=" + startResourceVersion + "&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(WATCH_EVENT_EMIT_TIME)
Expand All @@ -259,7 +259,7 @@ void shouldReconnectInCaseOf410() throws InterruptedException {
server.expect().withPath("/api/v1/pods")
.andReturn(200, new PodListBuilder().withNewMetadata().withResourceVersion(mid2ResourceVersion).endMetadata().withItems(
new PodBuilder().withNewMetadata().withNamespace("test").withName("pod1").withResourceVersion(endResourceVersion).endMetadata().build()).build()).times(2);
server.expect().withPath("/api/v1/pods?resourceVersion=" + mid2ResourceVersion + "&watch=true")
server.expect().withPath("/api/v1/pods?resourceVersion=" + mid2ResourceVersion + "&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(WATCH_EVENT_EMIT_TIME)
Expand Down Expand Up @@ -302,7 +302,7 @@ void shouldDeleteIfMissingOnResync() throws InterruptedException {

server.expect().withPath("/api/v1/pods")
.andReturn(200, new PodListBuilder().withNewMetadata().withResourceVersion(startResourceVersion).endMetadata().withItems(Collections.emptyList()).build()).once();
server.expect().withPath("/api/v1/pods?resourceVersion=" + startResourceVersion + "&watch=true")
server.expect().withPath("/api/v1/pods?resourceVersion=" + startResourceVersion + "&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(WATCH_EVENT_EMIT_TIME)
Expand Down Expand Up @@ -348,7 +348,7 @@ void testHasSynced() {
String startResourceVersion = "1000", endResourceVersion = "1001";
server.expect().withPath("/api/v1/namespaces/test/pods")
.andReturn(200, new PodListBuilder().withNewMetadata().withResourceVersion(startResourceVersion).endMetadata().withItems(Collections.emptyList()).build()).once();
server.expect().withPath("/api/v1/namespaces/test/pods?resourceVersion=" + startResourceVersion + "&watch=true")
server.expect().withPath("/api/v1/namespaces/test/pods?resourceVersion=" + startResourceVersion + "&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(WATCH_EVENT_EMIT_TIME)
Expand Down Expand Up @@ -834,7 +834,7 @@ void testReconnectAfterOnCloseException() throws InterruptedException {
.andReturn(200, new PodListBuilder().withNewMetadata().withResourceVersion(startResourceVersion).endMetadata().withItems(Collections.emptyList()).build()).once();

// initial watch - terminates with an exception
server.expect().withPath("/api/v1/pods?resourceVersion=" + startResourceVersion + "&watch=true")
server.expect().withPath("/api/v1/pods?resourceVersion=" + startResourceVersion + "&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(WATCH_EVENT_EMIT_TIME)
Expand All @@ -844,7 +844,7 @@ void testReconnectAfterOnCloseException() throws InterruptedException {
.done().always();

// should pick this up after the termination
server.expect().withPath("/api/v1/pods?resourceVersion=" + midResourceVersion + "&watch=true")
server.expect().withPath("/api/v1/pods?resourceVersion=" + midResourceVersion + "&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(WATCH_EVENT_EMIT_TIME)
Expand Down Expand Up @@ -991,7 +991,7 @@ private <T extends HasMetadata> void setupMockServerExpectationsWithVersion(Clas
watchUrl += "?";
}

watchUrl += "resourceVersion=" + startResourceVersion + "&watch=true";
watchUrl += "resourceVersion=" + startResourceVersion + "&allowWatchBookmarks=true&watch=true";
server.expect().withPath(watchUrl)
.andUpgradeToWebSocket()
.open()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void watch() throws InterruptedException {
.build();

server.expect()
.withPath("/api/v1/namespaces/ns1/events?watch=true")
.withPath("/api/v1/namespaces/ns1/events?allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket().open().waitFor(50)
.andEmit(new WatchEvent(testEvent, "ADDED"))
.done().once();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void testInformPodWithLabel() throws InterruptedException {
.once();

server.expect()
.withPath("/api/v1/namespaces/test/pods?labelSelector=my-label&resourceVersion=1&watch=true")
.withPath("/api/v1/namespaces/test/pods?labelSelector=my-label&resourceVersion=1&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(EVENT_WAIT_PERIOD_MS)
Expand Down Expand Up @@ -114,7 +114,7 @@ void testInformGeneric() throws InterruptedException {
.once();

server.expect()
.withPath("/apis/demos.fabric8.io/v1/namespaces/test/dummies?labelSelector=my-label&resourceVersion=1&watch=true")
.withPath("/apis/demos.fabric8.io/v1/namespaces/test/dummies?labelSelector=my-label&resourceVersion=1&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(EVENT_WAIT_PERIOD_MS)
Expand Down Expand Up @@ -176,7 +176,7 @@ void testGenericWithKnownType() throws InterruptedException {
.once();

server.expect()
.withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true")
.withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(EVENT_WAIT_PERIOD_MS)
Expand Down Expand Up @@ -235,7 +235,7 @@ void testRunnableInformer() throws InterruptedException {
.once();

server.expect()
.withPath("/api/v1/namespaces/test/pods?labelSelector=my-label&resourceVersion=1&watch=true")
.withPath("/api/v1/namespaces/test/pods?labelSelector=my-label&resourceVersion=1&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(EVENT_WAIT_PERIOD_MS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ void testWatch() throws InterruptedException {
.addToItems(pod1)
.build()
).once();
server.expect().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&watch=true")
server.expect().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(50).andEmit(new WatchEvent(pod1, "DELETED"))
Expand Down Expand Up @@ -436,7 +436,7 @@ void testWait() throws InterruptedException {

server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1").andReturn(200, notReady).once();

server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true").andUpgradeToWebSocket()
server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true").andUpgradeToWebSocket()
.open()
.waitFor(50).andEmit(new WatchEvent(ready, "MODIFIED"))
.done()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,13 @@ void testSuccessfulWaitUntilCondition() throws InterruptedException {
ResourceTest.list(server, noReady1);
ResourceTest.list(server, noReady2);

server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true").andUpgradeToWebSocket()
server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true").andUpgradeToWebSocket()
.open()
.waitFor(500).andEmit(new WatchEvent(ready1, "MODIFIED"))
.done()
.once();

server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod2&resourceVersion=1&watch=true").andUpgradeToWebSocket()
server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod2&resourceVersion=1&allowWatchBookmarks=true&watch=true").andUpgradeToWebSocket()
.open()
.waitFor(500).andEmit(new WatchEvent(ready2, "MODIFIED"))
.done()
Expand Down Expand Up @@ -234,15 +234,15 @@ void testPartialSuccessfulWaitUntilCondition() {
.build();

// This pod has a non-retryable error.
server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true")
server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(500).andEmit(new WatchEvent(gone, "ERROR"))
.done()
.once();

// This pod succeeds.
server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod2&resourceVersion=1&watch=true")
server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod2&resourceVersion=1&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(500).andEmit(new WatchEvent(ready2, "MODIFIED"))
Expand Down Expand Up @@ -284,14 +284,14 @@ void testAllFailedWaitUntilCondition() {
.build();

// Both pods have a non-retryable error.
server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true")
server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(500).andEmit(new WatchEvent(gone, "ERROR"))
.done()
.once();

server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod2&resourceVersion=1&watch=true")
server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod2&resourceVersion=1&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(500).andEmit(new WatchEvent(gone, "ERROR"))
Expand Down
Loading

0 comments on commit f45ed4f

Please sign in to comment.