From 2a13263311f23e91478202db00cb3c384b3885d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20C=2E=20R=C3=ADos?= Date: Wed, 5 Jun 2024 17:02:13 +0200 Subject: [PATCH 01/12] Adding logging module using LogCacheClient --- build.gradle | 3 +- settings.gradle | 13 +- .../build.gradle | 1 + .../BackingApplicationIdsProvider.java | 57 ++++++ .../CloudFoundryAcceptanceTest.java | 110 ++++++++++- .../LoggingRecentAcceptanceTest.java | 84 +++++++++ .../LoggingStreamingAcceptanceTest.java | 103 +++++++++++ .../cf/CloudFoundryClientConfiguration.java | 11 ++ .../fixtures/cf/CloudFoundryService.java | 8 +- .../build.gradle | 8 + ...udFoundryAppDeployerAutoConfiguration.java | 18 ++ ...iceInstanceLogStreamAutoConfiguration.java | 84 +++++++++ ...ceInstanceRecentLogsAutoConfiguration.java | 49 +++++ ...ot.autoconfigure.AutoConfiguration.imports | 2 + ...nstanceLogStreamAutoConfigurationTest.java | 96 ++++++++++ ...stanceRecentLogsAutoConfigurationTest.java | 85 +++++++++ spring-cloud-app-broker-logging/build.gradle | 35 ++++ .../logging/ApplicationIdsProvider.java | 25 +++ .../cloud/appbroker/logging/LoggingUtils.java | 66 +++++++ .../recent/ApplicationRecentLogsProvider.java | 59 ++++++ .../logging/recent/RecentLogsProvider.java | 26 +++ .../recent/endpoint/EncodingException.java | 27 +++ .../recent/endpoint/LogMessageComparator.java | 39 ++++ .../recent/endpoint/MultipartEncoder.java | 66 +++++++ .../recent/endpoint/RecentLogsController.java | 64 +++++++ .../ApplicationLogStreamPublisher.java | 147 +++++++++++++++ .../streaming/LogCacheStreamPublisher.java | 120 ++++++++++++ .../logging/streaming/LogStreamPublisher.java | 25 +++ .../ServiceInstanceNotFoundException.java | 27 +++ .../StreamingLogWebSocketHandler.java | 122 ++++++++++++ .../events/ServiceInstanceLogEvent.java | 46 +++++ .../events/ServiceInstanceLoggingEvent.java | 56 ++++++ .../StartServiceInstanceLoggingEvent.java | 27 +++ .../StopServiceInstanceLoggingEvent.java | 27 +++ .../example/recentlog/RecentLogsTestApp.java | 46 +++++ .../streaming/LogStreamingTestApp.java | 65 +++++++ .../recent/ServiceInstanceRecentLogsTest.java | 103 +++++++++++ .../ServiceInstanceLogStreamingTest.java | 175 ++++++++++++++++++ .../build.gradle | 22 +++ src/pmd/pmdTestRuleSet.xml | 1 + 40 files changed, 2131 insertions(+), 17 deletions(-) create mode 100644 spring-cloud-app-broker-acceptance-tests/src/main/java/org/springframework/cloud/appbroker/acceptance/logging/BackingApplicationIdsProvider.java create mode 100644 spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingRecentAcceptanceTest.java create mode 100644 spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingStreamingAcceptanceTest.java create mode 100644 spring-cloud-app-broker-autoconfigure/src/main/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceLogStreamAutoConfiguration.java create mode 100644 spring-cloud-app-broker-autoconfigure/src/main/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceRecentLogsAutoConfiguration.java create mode 100644 spring-cloud-app-broker-autoconfigure/src/test/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceLogStreamAutoConfigurationTest.java create mode 100644 spring-cloud-app-broker-autoconfigure/src/test/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceRecentLogsAutoConfigurationTest.java create mode 100644 spring-cloud-app-broker-logging/build.gradle create mode 100644 spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/ApplicationIdsProvider.java create mode 100644 spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/LoggingUtils.java create mode 100644 spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/ApplicationRecentLogsProvider.java create mode 100644 spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/RecentLogsProvider.java create mode 100644 spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/EncodingException.java create mode 100644 spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/LogMessageComparator.java create mode 100644 spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/MultipartEncoder.java create mode 100644 spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/RecentLogsController.java create mode 100644 spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/ApplicationLogStreamPublisher.java create mode 100644 spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java create mode 100644 spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogStreamPublisher.java create mode 100644 spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/endpoint/ServiceInstanceNotFoundException.java create mode 100644 spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/endpoint/StreamingLogWebSocketHandler.java create mode 100644 spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/ServiceInstanceLogEvent.java create mode 100644 spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/ServiceInstanceLoggingEvent.java create mode 100644 spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/StartServiceInstanceLoggingEvent.java create mode 100644 spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/StopServiceInstanceLoggingEvent.java create mode 100644 spring-cloud-app-broker-logging/src/test/java/com/example/recentlog/RecentLogsTestApp.java create mode 100644 spring-cloud-app-broker-logging/src/test/java/com/example/streaming/LogStreamingTestApp.java create mode 100644 spring-cloud-app-broker-logging/src/test/java/org/springframework/cloud/appbroker/logging/recent/ServiceInstanceRecentLogsTest.java create mode 100644 spring-cloud-app-broker-logging/src/test/java/org/springframework/cloud/appbroker/logging/streaming/ServiceInstanceLogStreamingTest.java create mode 100644 spring-cloud-starter-app-broker-logging/build.gradle diff --git a/build.gradle b/build.gradle index ac68bc438..2eebcecfd 100644 --- a/build.gradle +++ b/build.gradle @@ -414,7 +414,8 @@ def getTestProjects() { def getStarterProjects() { [project(":spring-cloud-starter-app-broker"), - project(":spring-cloud-starter-app-broker-cloudfoundry")] as Set + project(":spring-cloud-starter-app-broker-cloudfoundry"), + project(":spring-cloud-starter-app-broker-logging")] as Set } def getLibraryProjects() { diff --git a/settings.gradle b/settings.gradle index 660e2c7b0..bb67d5bd5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,16 +1,9 @@ pluginManagement { - resolutionStrategy { - eachPlugin { - if (requested.id.namespace?.startsWith('org.asciidoctor.jvm')) { - useVersion("4.0.1") - } - } - } plugins { id "io.spring.nohttp" version "0.0.11" id 'org.springframework.boot' version "3.3.1" - id 'org.asciidoctor.jvm.pdf' - id 'org.asciidoctor.jvm.convert' + id 'org.asciidoctor.jvm.pdf' version '4.0.2' + id 'org.asciidoctor.jvm.convert' version '4.0.2' } repositories { gradlePluginPortal() @@ -38,5 +31,7 @@ include "spring-cloud-app-broker-core" include "spring-cloud-app-broker-autoconfigure" include "spring-cloud-app-broker-integration-tests" include "spring-cloud-app-broker-acceptance-tests" +include "spring-cloud-app-broker-logging" include "spring-cloud-starter-app-broker" include "spring-cloud-starter-app-broker-cloudfoundry" +include "spring-cloud-starter-app-broker-logging" diff --git a/spring-cloud-app-broker-acceptance-tests/build.gradle b/spring-cloud-app-broker-acceptance-tests/build.gradle index 9aa68b0d3..60e31c37b 100644 --- a/spring-cloud-app-broker-acceptance-tests/build.gradle +++ b/spring-cloud-app-broker-acceptance-tests/build.gradle @@ -25,6 +25,7 @@ description = "Spring Cloud App Broker Acceptance Tests" dependencies { api platform(SpringBootPlugin.BOM_COORDINATES) api project(":spring-cloud-starter-app-broker-cloudfoundry") + api project(":spring-cloud-starter-app-broker-logging") api "org.springframework.boot:spring-boot-starter-webflux" testImplementation "org.springframework.boot:spring-boot-starter-test" diff --git a/spring-cloud-app-broker-acceptance-tests/src/main/java/org/springframework/cloud/appbroker/acceptance/logging/BackingApplicationIdsProvider.java b/spring-cloud-app-broker-acceptance-tests/src/main/java/org/springframework/cloud/appbroker/acceptance/logging/BackingApplicationIdsProvider.java new file mode 100644 index 000000000..a0ad2cc0b --- /dev/null +++ b/spring-cloud-app-broker-acceptance-tests/src/main/java/org/springframework/cloud/appbroker/acceptance/logging/BackingApplicationIdsProvider.java @@ -0,0 +1,57 @@ +/* + * Copyright 2016-2024 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.acceptance.logging; + + +import org.cloudfoundry.client.CloudFoundryClient; +import org.cloudfoundry.client.v3.applications.ApplicationResource; +import org.cloudfoundry.client.v3.applications.ListApplicationsRequest; +import org.cloudfoundry.operations.CloudFoundryOperations; +import org.cloudfoundry.operations.spaces.GetSpaceRequest; +import org.cloudfoundry.operations.spaces.SpaceDetail; +import reactor.core.publisher.Flux; + +import org.springframework.cloud.appbroker.logging.ApplicationIdsProvider; +import org.springframework.stereotype.Component; + +@Component +class BackingApplicationIdsProvider implements ApplicationIdsProvider { + + private final CloudFoundryClient cloudFoundryClient; + + private final CloudFoundryOperations cloudFoundryOperations; + + public BackingApplicationIdsProvider(CloudFoundryClient cloudFoundryClient, + CloudFoundryOperations cloudFoundryOperations) { + this.cloudFoundryClient = cloudFoundryClient; + this.cloudFoundryOperations = cloudFoundryOperations; + } + + @Override + public Flux getApplicationIds(String serviceInstanceId) { + return cloudFoundryOperations.spaces().get(GetSpaceRequest.builder().name(serviceInstanceId).build()) + .map(SpaceDetail::getId) + .flatMap(spaceId -> + cloudFoundryClient.applicationsV3() + .list(ListApplicationsRequest.builder().spaceIds(spaceId).build()) + ) + .flatMapMany( + listApplicationsResponse -> Flux.fromIterable(listApplicationsResponse.getResources()) + .map(ApplicationResource::getId)); + } + +} diff --git a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/CloudFoundryAcceptanceTest.java b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/CloudFoundryAcceptanceTest.java index 754685ff7..b4fc41675 100644 --- a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/CloudFoundryAcceptanceTest.java +++ b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/CloudFoundryAcceptanceTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,9 +16,14 @@ package org.springframework.cloud.appbroker.acceptance; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; import java.net.URI; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -29,6 +34,12 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.DocumentContext; @@ -64,6 +75,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.cloud.appbroker.acceptance.fixtures.cf.CloudFoundryClientConfiguration; +import org.springframework.cloud.appbroker.acceptance.fixtures.cf.CloudFoundryProperties; import org.springframework.cloud.appbroker.acceptance.fixtures.cf.CloudFoundryService; import org.springframework.cloud.appbroker.acceptance.fixtures.cf.UserCloudFoundryService; import org.springframework.cloud.appbroker.acceptance.fixtures.uaa.UaaService; @@ -91,6 +103,7 @@ @ExtendWith(SpringExtension.class) @ExtendWith(BrokerPropertiesParameterResolver.class) @EnableConfigurationProperties(AcceptanceTestProperties.class) +@SuppressWarnings("PMD.GodClass") abstract class CloudFoundryAcceptanceTest { private static final Logger LOG = LoggerFactory.getLogger(CloudFoundryAcceptanceTest.class); @@ -113,12 +126,17 @@ abstract class CloudFoundryAcceptanceTest { @Autowired protected UserCloudFoundryService userCloudFoundryService; + @Autowired + private CloudFoundryProperties cloudFoundryProperties; + @Autowired private UaaService uaaService; @Autowired private AcceptanceTestProperties acceptanceTestProperties; + private String cfHome; + private final WebClient webClient = getSslIgnoringWebClient(); protected abstract String testSuffix(); @@ -144,6 +162,7 @@ void setUp(TestInfo testInfo, BrokerProperties brokerProperties) { List appBrokerProperties = getAppBrokerProperties(brokerProperties); blockingSubscribe(initializeUser()); blockingSubscribe(initializeBroker(appBrokerProperties)); + prepareCLI(); } void setUpForBrokerUpdate(BrokerProperties brokerProperties) { @@ -157,6 +176,8 @@ private List getAppBrokerProperties(BrokerProperties brokerProperties) { "spring.cloud.openservicebroker.catalog.services[0].name=" + appServiceName(), "spring.cloud.openservicebroker.catalog.services[0].description=A service that deploys a backing app", "spring.cloud.openservicebroker.catalog.services[0].bindable=true", + "spring.cloud.openservicebroker.catalog.services[0].metadata.properties.serviceInstanceLogsEndpoint=" + + getServiceInstanceLogsEndpoint(), "spring.cloud.openservicebroker.catalog.services[0].plans[0].id=" + PLAN_ID, "spring.cloud.openservicebroker.catalog.services[0].plans[0].name=standard", "spring.cloud.openservicebroker.catalog.services[0].plans[0].bindable=true", @@ -179,6 +200,10 @@ private List getAppBrokerProperties(BrokerProperties brokerProperties) { return appBrokerProperties; } + private String getServiceInstanceLogsEndpoint() { + return "https://" + testBrokerAppName() + "." + cloudFoundryProperties.getApiHost().substring(4) + "/logs/"; + } + @BeforeEach void configureJsonPath() { Configuration.setDefaults(new Configuration.Defaults() { @@ -332,7 +357,7 @@ protected String getServiceInstanceGuid(String serviceInstanceName) { .block(); } - private Mono getServiceInstanceMono(String serviceInstanceName) { + Mono getServiceInstanceMono(String serviceInstanceName) { return userCloudFoundryService.getServiceInstance(serviceInstanceName); } @@ -423,7 +448,8 @@ protected Mono manageApps(String serviceInstanceName, String serviceName .getApplicationRoute(testBrokerAppName()) .flatMap(appRoute -> webClient.get() - .uri(URI.create(appRoute + "/" + operation + "/" + serviceName + "/" + planName + "/" + serviceInstanceId)) + .uri(URI.create( + appRoute + "/" + operation + "/" + serviceName + "/" + planName + "/" + serviceInstanceId)) .retrieve() .toEntity(String.class) .map(HttpEntity::getBody))); @@ -451,11 +477,87 @@ private WebClient getSslIgnoringWebClient() { protected Mono> getApplications(String app1, String app2) { return Flux.merge(cloudFoundryService.getApplication(app1), - cloudFoundryService.getApplication(app2)) + cloudFoundryService.getApplication(app2)) .parallel() .runOn(Schedulers.parallel()) .sequential() .collectList(); } + private void prepareCLI() { + try { + cfHome = Files.createTempDirectory("app-broker-acceptance-tests").toString(); + + callCLICommand(List.of("cf", "login", "-a", + cloudFoundryProperties.getApiHost(), + "--skip-ssl-validation", "-u", + cloudFoundryProperties.getUsername(), + "-p", + cloudFoundryProperties.getPassword(), + "-o", "test-instances")) + .block(Duration.ofSeconds(60)); + callCLICommand(List.of("cf", "install-plugin", "-f", "-r", "Cf-Community", "Service Instance Logging")) + .block(Duration.ofSeconds(60)); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + protected Mono callCLICommand(List command) { + return Mono.fromCallable(() -> { + if (LOG.isDebugEnabled()) { + LOG.debug("Executing command: {}", command); + } + ProcessBuilder processBuilder = new ProcessBuilder(command); + processBuilder.environment().put("CF_HOME", cfHome); + processBuilder.redirectErrorStream(true); + Process process = processBuilder.start(); + + return processOutput(process); + }).subscribeOn(Schedulers.boundedElastic()); + } + + private static String processOutput(Process process) throws InterruptedException, ExecutionException { + StringBuffer outputBuilder = new StringBuffer(); + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future future = appendLines(executor, process, outputBuilder); + try { + future.get(30, TimeUnit.SECONDS); + } + catch (TimeoutException e) { + LOG.info("Process reading timed out after 30 seconds"); + } + finally { + future.cancel(true); + executor.shutdownNow(); + process.destroyForcibly(); + try { + process.waitFor(5, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for process to terminate", e); + } + } + return outputBuilder.toString(); + } + + private static Future appendLines(ExecutorService executor, Process process, StringBuffer outputBuilder) { + return executor.submit(() -> { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { + String line = ""; + while (line != null) { + line = reader.readLine(); + if (line != null) { + outputBuilder.append(line).append('\n'); + if (LOG.isDebugEnabled()) { + LOG.debug("Read line: {}", line); + } + } + } + } + return null; + }); + } + } diff --git a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingRecentAcceptanceTest.java b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingRecentAcceptanceTest.java new file mode 100644 index 000000000..b91495fc0 --- /dev/null +++ b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingRecentAcceptanceTest.java @@ -0,0 +1,84 @@ +/* + * Copyright 2002-2020 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.acceptance; + +import java.time.Duration; +import java.util.List; +import java.util.Optional; + +import org.cloudfoundry.operations.applications.ApplicationSummary; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class LoggingRecentAcceptanceTest extends CloudFoundryAcceptanceTest { + + private static final String APP_CREATE_1 = "app-logging-recent-1"; + + private static final String SI_NAME = "si-logging-recent"; + + private static final String SUFFIX = "logging-recent-instance"; + + private static final String APP_SERVICE_NAME = "app-service-" + SUFFIX; + + private static final String BACKING_SERVICE_NAME = "backing-service-" + SUFFIX; + + @Override + protected String testSuffix() { + return SUFFIX; + } + + @Override + protected String appServiceName() { + return APP_SERVICE_NAME; + } + + @Override + protected String backingServiceName() { + return BACKING_SERVICE_NAME; + } + + @Test + @AppBrokerTestProperties({ + "spring.cloud.appbroker.services[0].service-name=" + APP_SERVICE_NAME, + "spring.cloud.appbroker.services[0].plan-name=" + PLAN_NAME, + + "spring.cloud.appbroker.services[0].apps[0].name=" + APP_CREATE_1, + "spring.cloud.appbroker.services[0].apps[0].path=" + BACKING_APP_PATH, + "spring.cloud.appbroker.services[0].target.name=SpacePerServiceInstance", + "spring.cloud.appbroker.deployer.cloudfoundry.properties.stack=cflinuxfs4" + }) + void shouldReturnBackingApplicationLogs() { + createServiceInstance(SI_NAME); + + String lines = callCLICommand( + List.of("cf", "service-logs", SI_NAME, "--recent", "--skip-ssl-validation")) + .block(Duration.ofSeconds(35)); + + assertThat(lines).isNotEmpty(); + + assertThat(lines).contains("Created app with guid"); + assertThat(lines).contains("Updated app with guid"); + assertThat(lines).contains("APP/PROC/WEB"); + + deleteServiceInstance(SI_NAME); + + Optional backingApplication1AfterDeletion = getApplicationSummary(APP_CREATE_1); + assertThat(backingApplication1AfterDeletion).isEmpty(); + } + +} diff --git a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingStreamingAcceptanceTest.java b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingStreamingAcceptanceTest.java new file mode 100644 index 000000000..117884461 --- /dev/null +++ b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingStreamingAcceptanceTest.java @@ -0,0 +1,103 @@ +/* + * Copyright 2002-2020 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.acceptance; + +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import org.cloudfoundry.operations.applications.ApplicationSummary; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.test.StepVerifier; + +import static org.assertj.core.api.Assertions.assertThat; + +class LoggingStreamingAcceptanceTest extends CloudFoundryAcceptanceTest { + + private static final String APP_CREATE_1 = "app-logging-stream-1"; + + private static final String SI_NAME = "si-logging-stream"; + + private static final String SUFFIX = "logging-stream-instance"; + + private static final String APP_SERVICE_NAME = "app-service-" + SUFFIX; + + private static final String BACKING_SERVICE_NAME = "backing-service-" + SUFFIX; + + @Override + protected String testSuffix() { + return SUFFIX; + } + + @Override + protected String appServiceName() { + return APP_SERVICE_NAME; + } + + @Override + protected String backingServiceName() { + return BACKING_SERVICE_NAME; + } + + @Test + @AppBrokerTestProperties({ + "spring.cloud.appbroker.services[0].service-name=" + APP_SERVICE_NAME, + "spring.cloud.appbroker.services[0].plan-name=" + PLAN_NAME, + + "spring.cloud.appbroker.services[0].apps[0].name=" + APP_CREATE_1, + "spring.cloud.appbroker.services[0].apps[0].path=" + BACKING_APP_PATH, + "spring.cloud.appbroker.services[0].target.name=SpacePerServiceInstance", + "spring.cloud.appbroker.deployer.cloudfoundry.properties.stack=cflinuxfs4" + }) + void shouldStreamBackingApplicationLogs() { + Mono createServiceInstanceMono = Mono.fromRunnable(() -> + CompletableFuture.runAsync(() -> + createServiceInstance(SI_NAME))) + .subscribeOn(Schedulers.boundedElastic()); + + Mono logStreamingMono = Mono.defer(() -> callCLICommand( + List.of("cf", "service-logs", SI_NAME, "--skip-ssl-validation") + )).subscribeOn(Schedulers.boundedElastic()); + + StepVerifier.create(createServiceInstanceMono + .then(Mono.delay(Duration.ofSeconds(20)).then()) + .then(getServiceInstanceMono(SI_NAME).retry(5)) + .then(Mono.zip(logStreamingMono, logStreamingMono)) + ) + .assertNext(tuple -> { + assertThat(tuple.getT1()).isNotEmpty(); + assertThat(tuple.getT1()).contains("Connected, tailing logs for service instance"); + assertThat(tuple.getT1()).contains("[STG/0]"); + assertThat(tuple.getT1()).doesNotContain("websocket: close"); + + assertThat(tuple.getT2()).isNotEmpty(); + assertThat(tuple.getT2()).contains("Connected, tailing logs for service instance"); + assertThat(tuple.getT2()).contains("[STG/0]"); + assertThat(tuple.getT2()).doesNotContain("websocket: close"); + }) + .verifyComplete(); + + deleteServiceInstance(SI_NAME); + + Optional backingApplication1AfterDeletion = getApplicationSummary(APP_CREATE_1); + assertThat(backingApplication1AfterDeletion).isEmpty(); + } + +} diff --git a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/fixtures/cf/CloudFoundryClientConfiguration.java b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/fixtures/cf/CloudFoundryClientConfiguration.java index 4f85c9a79..998e34f39 100644 --- a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/fixtures/cf/CloudFoundryClientConfiguration.java +++ b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/fixtures/cf/CloudFoundryClientConfiguration.java @@ -20,6 +20,7 @@ import org.cloudfoundry.client.CloudFoundryClient; import org.cloudfoundry.doppler.DopplerClient; +import org.cloudfoundry.logcache.v1.LogCacheClient; import org.cloudfoundry.operations.CloudFoundryOperations; import org.cloudfoundry.operations.DefaultCloudFoundryOperations; import org.cloudfoundry.reactor.ConnectionContext; @@ -27,6 +28,7 @@ import org.cloudfoundry.reactor.TokenProvider; import org.cloudfoundry.reactor.client.ReactorCloudFoundryClient; import org.cloudfoundry.reactor.doppler.ReactorDopplerClient; +import org.cloudfoundry.reactor.logcache.v1.ReactorLogCacheClient; import org.cloudfoundry.reactor.tokenprovider.ClientCredentialsGrantTokenProvider; import org.cloudfoundry.reactor.tokenprovider.PasswordGrantTokenProvider; import org.cloudfoundry.reactor.uaa.ReactorUaaClient; @@ -117,6 +119,15 @@ protected DopplerClient dopplerClient(ConnectionContext connectionContext, .build(); } + @Bean + protected LogCacheClient logCacheClient(ConnectionContext connectionContext, + @Qualifier("userCredentials") TokenProvider tokenProvider) { + return ReactorLogCacheClient.builder() + .connectionContext(connectionContext) + .tokenProvider(tokenProvider) + .build(); + } + @Bean @Qualifier("userCredentials") protected UaaClient userCredentialsUaaClient(ConnectionContext connectionContext, diff --git a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/fixtures/cf/CloudFoundryService.java b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/fixtures/cf/CloudFoundryService.java index 5ac66e8fd..97dd9fd27 100644 --- a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/fixtures/cf/CloudFoundryService.java +++ b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/fixtures/cf/CloudFoundryService.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -88,7 +88,9 @@ public class CloudFoundryService { private final CloudFoundryProperties cloudFoundryProperties; - public CloudFoundryService(CloudFoundryClient cloudFoundryClient, CloudFoundryOperations cloudFoundryOperations, + public CloudFoundryService( + CloudFoundryClient cloudFoundryClient, + CloudFoundryOperations cloudFoundryOperations, CloudFoundryProperties cloudFoundryProperties) { this.cloudFoundryClient = cloudFoundryClient; this.cloudFoundryOperations = cloudFoundryOperations; @@ -304,7 +306,7 @@ public Mono getOrCreateDefaultOrg() { public Mono> getSpaces() { return cloudFoundryOperations.spaces().list() .doOnComplete(() -> LOG.info("Success listing spaces")) - .doOnError(e -> LOG.error(String.format("Error listing spaces. error=%s" + e.getMessage()), e)) + .doOnError(e -> LOG.error(String.format("Error listing spaces. error={}" + e.getMessage()), e)) .map(SpaceSummary::getName) .collectList(); } diff --git a/spring-cloud-app-broker-autoconfigure/build.gradle b/spring-cloud-app-broker-autoconfigure/build.gradle index 3c76a6934..ffd9107fa 100644 --- a/spring-cloud-app-broker-autoconfigure/build.gradle +++ b/spring-cloud-app-broker-autoconfigure/build.gradle @@ -22,6 +22,12 @@ plugins { description = "Spring Cloud App Broker Autoconfiguration" +java { + registerFeature("logging") { + usingSourceSet(sourceSets.main) + } +} + dependencies { annotationProcessor platform(SpringBootPlugin.BOM_COORDINATES) annotationProcessor "org.springframework.boot:spring-boot-configuration-processor" @@ -35,6 +41,8 @@ dependencies { api "org.cloudfoundry:cloudfoundry-client-reactor:${cfJavaClientVersion}" api "org.cloudfoundry:cloudfoundry-operations:${cfJavaClientVersion}" + loggingImplementation project(":spring-cloud-app-broker-logging") + testImplementation "org.springframework.boot:spring-boot-starter-test" testImplementation "org.springframework.boot:spring-boot-starter-webflux" testImplementation "io.projectreactor.tools:blockhound-junit-platform:${blockHoundVersion}" diff --git a/spring-cloud-app-broker-autoconfigure/src/main/java/org/springframework/cloud/appbroker/autoconfigure/CloudFoundryAppDeployerAutoConfiguration.java b/spring-cloud-app-broker-autoconfigure/src/main/java/org/springframework/cloud/appbroker/autoconfigure/CloudFoundryAppDeployerAutoConfiguration.java index 4c4e72c88..a427f291c 100644 --- a/spring-cloud-app-broker-autoconfigure/src/main/java/org/springframework/cloud/appbroker/autoconfigure/CloudFoundryAppDeployerAutoConfiguration.java +++ b/spring-cloud-app-broker-autoconfigure/src/main/java/org/springframework/cloud/appbroker/autoconfigure/CloudFoundryAppDeployerAutoConfiguration.java @@ -25,6 +25,7 @@ import org.cloudfoundry.client.CloudFoundryClient; import org.cloudfoundry.doppler.DopplerClient; +import org.cloudfoundry.logcache.v1.LogCacheClient; import org.cloudfoundry.operations.CloudFoundryOperations; import org.cloudfoundry.operations.DefaultCloudFoundryOperations; import org.cloudfoundry.reactor.ConnectionContext; @@ -32,6 +33,7 @@ import org.cloudfoundry.reactor.TokenProvider; import org.cloudfoundry.reactor.client.ReactorCloudFoundryClient; import org.cloudfoundry.reactor.doppler.ReactorDopplerClient; +import org.cloudfoundry.reactor.logcache.v1.ReactorLogCacheClient; import org.cloudfoundry.reactor.tokenprovider.ClientCredentialsGrantTokenProvider; import org.cloudfoundry.reactor.tokenprovider.PasswordGrantTokenProvider; import org.cloudfoundry.reactor.uaa.ReactorUaaClient; @@ -210,6 +212,22 @@ public ReactorDopplerClient dopplerClient(@ConnectionContextQualifier Connection .build(); } + /** + * Provide a {@link LogCacheClient} bean + * + * @param connectionContext the ConnectionContext bean + * @param tokenProvider the TokenProvider bean + * @return the bean + */ + @Bean + public LogCacheClient logCacheClient(@ConnectionContextQualifier ConnectionContext connectionContext, + @TokenQualifier TokenProvider tokenProvider) { + return ReactorLogCacheClient.builder() + .connectionContext(connectionContext) + .tokenProvider(tokenProvider) + .build(); + } + /** * Provide a {@link TokenProvider} bean * diff --git a/spring-cloud-app-broker-autoconfigure/src/main/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceLogStreamAutoConfiguration.java b/spring-cloud-app-broker-autoconfigure/src/main/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceLogStreamAutoConfiguration.java new file mode 100644 index 000000000..6e0e51b36 --- /dev/null +++ b/spring-cloud-app-broker-autoconfigure/src/main/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceLogStreamAutoConfiguration.java @@ -0,0 +1,84 @@ +/* + * Copyright 2016-2024 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.autoconfigure; + +import java.util.HashMap; +import java.util.Map; + +import org.cloudfoundry.client.CloudFoundryClient; +import org.cloudfoundry.logcache.v1.LogCacheClient; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.cloud.appbroker.logging.ApplicationIdsProvider; +import org.springframework.cloud.appbroker.logging.streaming.ApplicationLogStreamPublisher; +import org.springframework.cloud.appbroker.logging.streaming.LogCacheStreamPublisher; +import org.springframework.cloud.appbroker.logging.streaming.LogStreamPublisher; +import org.springframework.cloud.appbroker.logging.streaming.endpoint.StreamingLogWebSocketHandler; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.reactive.HandlerMapping; +import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter; + +@Configuration +@ConditionalOnClass(ApplicationLogStreamPublisher.class) +@ConditionalOnBean(ApplicationIdsProvider.class) +public class ServiceInstanceLogStreamAutoConfiguration { + + @Bean + public StreamingLogWebSocketHandler streamingLogWebSocketHandler( + ApplicationEventPublisher applicationEventPublisher) { + return new StreamingLogWebSocketHandler(applicationEventPublisher); + } + + @Bean + @ConditionalOnMissingBean + public WebSocketHandlerAdapter handlerAdapter() { + return new WebSocketHandlerAdapter(); + } + + @Bean + public HandlerMapping logsHandlerMapping(StreamingLogWebSocketHandler webSocketHandler) { + Map map = new HashMap<>(); + map.put("/logs/**", webSocketHandler); + + SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping(); + handlerMapping.setOrder(1); + handlerMapping.setUrlMap(map); + return handlerMapping; + } + + @Bean + @ConditionalOnMissingBean + public LogStreamPublisher streamLogsPublisher( + CloudFoundryClient cloudFoundryClient, + LogCacheClient logCacheClient, + ApplicationIdsProvider applicationIdsProvider) { + return new LogCacheStreamPublisher(cloudFoundryClient, logCacheClient, applicationIdsProvider); + } + + @Bean + public ApplicationLogStreamPublisher applicationLogsPublisher(LogStreamPublisher logStreamPublisher, + ApplicationEventPublisher eventPublisher) { + return new ApplicationLogStreamPublisher(logStreamPublisher, eventPublisher); + } + +} diff --git a/spring-cloud-app-broker-autoconfigure/src/main/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceRecentLogsAutoConfiguration.java b/spring-cloud-app-broker-autoconfigure/src/main/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceRecentLogsAutoConfiguration.java new file mode 100644 index 000000000..7c08e691d --- /dev/null +++ b/spring-cloud-app-broker-autoconfigure/src/main/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceRecentLogsAutoConfiguration.java @@ -0,0 +1,49 @@ +/* + * Copyright 2016-2024 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.autoconfigure; + +import org.cloudfoundry.logcache.v1.LogCacheClient; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.cloud.appbroker.logging.ApplicationIdsProvider; +import org.springframework.cloud.appbroker.logging.recent.ApplicationRecentLogsProvider; +import org.springframework.cloud.appbroker.logging.recent.RecentLogsProvider; +import org.springframework.cloud.appbroker.logging.recent.endpoint.RecentLogsController; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ConditionalOnClass(ApplicationRecentLogsProvider.class) +@ConditionalOnBean(ApplicationIdsProvider.class) +public class ServiceInstanceRecentLogsAutoConfiguration { + + @Bean + public RecentLogsProvider recentLogsProvider( + LogCacheClient logCacheClient, + ApplicationIdsProvider applicationIdsProvider) { + return new ApplicationRecentLogsProvider(logCacheClient, applicationIdsProvider); + } + + @Bean + @ConditionalOnMissingBean + public RecentLogsController recentLogsController(RecentLogsProvider recentLogsProvider) { + return new RecentLogsController(recentLogsProvider); + } + +} diff --git a/spring-cloud-app-broker-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/spring-cloud-app-broker-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index e2e718516..ba66ea8ff 100644 --- a/spring-cloud-app-broker-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/spring-cloud-app-broker-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1,2 +1,4 @@ org.springframework.cloud.appbroker.autoconfigure.AppBrokerAutoConfiguration org.springframework.cloud.appbroker.autoconfigure.CloudFoundryAppDeployerAutoConfiguration +org.springframework.cloud.appbroker.autoconfigure.ServiceInstanceRecentLogsAutoConfiguration +org.springframework.cloud.appbroker.autoconfigure.ServiceInstanceLogStreamAutoConfiguration diff --git a/spring-cloud-app-broker-autoconfigure/src/test/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceLogStreamAutoConfigurationTest.java b/spring-cloud-app-broker-autoconfigure/src/test/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceLogStreamAutoConfigurationTest.java new file mode 100644 index 000000000..7127678ac --- /dev/null +++ b/spring-cloud-app-broker-autoconfigure/src/test/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceLogStreamAutoConfigurationTest.java @@ -0,0 +1,96 @@ +/* + * Copyright 2002-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.autoconfigure; + +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; + +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.FilteredClassLoader; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.cloud.appbroker.logging.ApplicationIdsProvider; +import org.springframework.cloud.appbroker.logging.streaming.ApplicationLogStreamPublisher; +import org.springframework.cloud.appbroker.logging.streaming.LogStreamPublisher; +import org.springframework.cloud.appbroker.logging.streaming.endpoint.StreamingLogWebSocketHandler; +import org.springframework.context.annotation.Bean; +import org.springframework.web.reactive.HandlerMapping; +import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter; + +import static org.assertj.core.api.Assertions.assertThat; + +class ServiceInstanceLogStreamAutoConfigurationTest { + + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of( + AppBrokerAutoConfiguration.class, + CloudFoundryAppDeployerAutoConfiguration.class, + ServiceInstanceLogStreamAutoConfiguration.class + )) + .withPropertyValues( + "spring.cloud.appbroker.deployer.cloudfoundry.api-host=https://api.example.local", + "spring.cloud.appbroker.deployer.cloudfoundry.username=user", + "spring.cloud.appbroker.deployer.cloudfoundry.password=secret" + ); + + @Test + void servicesAreNotCreatedWithoutLoggingOnClasspath() { + contextRunner + .withClassLoader(new FilteredClassLoader(ApplicationLogStreamPublisher.class)) + .withUserConfiguration(LoggingConfiguration.class) + .run(context -> assertThat(context) + .doesNotHaveBean(StreamingLogWebSocketHandler.class) + .doesNotHaveBean(WebSocketHandlerAdapter.class) + .doesNotHaveBean(HandlerMapping.class) + .doesNotHaveBean(LogStreamPublisher.class) + .doesNotHaveBean(ApplicationLogStreamPublisher.class)); + } + + @Test + void servicesAreNotCreatedWithoutRequiredBeansOnClasspath() { + contextRunner + .run(context -> assertThat(context) + .doesNotHaveBean(StreamingLogWebSocketHandler.class) + .doesNotHaveBean(WebSocketHandlerAdapter.class) + .doesNotHaveBean(HandlerMapping.class) + .doesNotHaveBean(LogStreamPublisher.class) + .doesNotHaveBean(ApplicationLogStreamPublisher.class)); + } + + @Test + void servicesAreCreatedWithLoggingConfigured() { + contextRunner + .withUserConfiguration(LoggingConfiguration.class) + .run(context -> assertThat(context) + .hasSingleBean(StreamingLogWebSocketHandler.class) + .hasSingleBean(WebSocketHandlerAdapter.class) + .hasSingleBean(HandlerMapping.class) + .hasSingleBean(LogStreamPublisher.class) + .hasSingleBean(ApplicationLogStreamPublisher.class)); + } + + @TestConfiguration + public static class LoggingConfiguration { + + @Bean + public ApplicationIdsProvider applicationIdsProvider() { + return serviceInstanceId -> Flux.just("app1"); + } + + } + +} diff --git a/spring-cloud-app-broker-autoconfigure/src/test/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceRecentLogsAutoConfigurationTest.java b/spring-cloud-app-broker-autoconfigure/src/test/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceRecentLogsAutoConfigurationTest.java new file mode 100644 index 000000000..e523d1a7c --- /dev/null +++ b/spring-cloud-app-broker-autoconfigure/src/test/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceRecentLogsAutoConfigurationTest.java @@ -0,0 +1,85 @@ +/* + * Copyright 2002-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.autoconfigure; + +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; + +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.FilteredClassLoader; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.cloud.appbroker.logging.ApplicationIdsProvider; +import org.springframework.cloud.appbroker.logging.recent.ApplicationRecentLogsProvider; +import org.springframework.cloud.appbroker.logging.recent.RecentLogsProvider; +import org.springframework.cloud.appbroker.logging.recent.endpoint.RecentLogsController; +import org.springframework.context.annotation.Bean; + +import static org.assertj.core.api.Assertions.assertThat; + +class ServiceInstanceRecentLogsAutoConfigurationTest { + + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of( + AppBrokerAutoConfiguration.class, + CloudFoundryAppDeployerAutoConfiguration.class, + ServiceInstanceRecentLogsAutoConfiguration.class + )) + .withPropertyValues( + "spring.cloud.appbroker.deployer.cloudfoundry.api-host=https://api.example.local", + "spring.cloud.appbroker.deployer.cloudfoundry.username=user", + "spring.cloud.appbroker.deployer.cloudfoundry.password=secret" + ); + + @Test + void servicesAreNotCreatedWithoutLoggingOnClasspath() { + contextRunner + .withClassLoader(new FilteredClassLoader(ApplicationRecentLogsProvider.class)) + .withUserConfiguration(LoggingConfiguration.class) + .run(context -> assertThat(context) + .doesNotHaveBean(RecentLogsProvider.class) + .doesNotHaveBean(RecentLogsController.class)); + } + + @Test + void servicesAreNotCreatedWithoutRequiredBeansOnClasspath() { + contextRunner + .run(context -> assertThat(context) + .doesNotHaveBean(RecentLogsProvider.class) + .doesNotHaveBean(RecentLogsController.class)); + } + + @Test + void servicesAreCreatedWithLoggingConfigured() { + contextRunner + .withUserConfiguration(LoggingConfiguration.class) + .run(context -> assertThat(context) + .hasSingleBean(RecentLogsProvider.class) + .hasSingleBean(RecentLogsController.class)); + } + + @TestConfiguration + public static class LoggingConfiguration { + + @Bean + public ApplicationIdsProvider applicationIdsProvider() { + return serviceInstanceId -> Flux.just("app1"); + } + + } + +} diff --git a/spring-cloud-app-broker-logging/build.gradle b/spring-cloud-app-broker-logging/build.gradle new file mode 100644 index 000000000..65f4351c2 --- /dev/null +++ b/spring-cloud-app-broker-logging/build.gradle @@ -0,0 +1,35 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.springframework.boot.gradle.plugin.SpringBootPlugin + +plugins { + id 'org.springframework.boot' apply false +} + +description = "Spring Cloud App Broker Logging" + +dependencies { + api platform(SpringBootPlugin.BOM_COORDINATES) + api "org.springframework.boot:spring-boot-starter-webflux" + api "org.cloudfoundry:cloudfoundry-client-reactor:${cfJavaClientVersion}" + api "org.cloudfoundry:cloudfoundry-operations:${cfJavaClientVersion}" + + testImplementation project(":spring-cloud-starter-app-broker-logging") + testImplementation "org.springframework.boot:spring-boot-starter-test" + testImplementation "org.junit.jupiter:junit-jupiter-api" + testImplementation "org.awaitility:awaitility" +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/ApplicationIdsProvider.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/ApplicationIdsProvider.java new file mode 100644 index 000000000..500260a5d --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/ApplicationIdsProvider.java @@ -0,0 +1,25 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging; + +import reactor.core.publisher.Flux; + +public interface ApplicationIdsProvider { + + Flux getApplicationIds(String serviceInstanceId); + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/LoggingUtils.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/LoggingUtils.java new file mode 100644 index 000000000..379f80e90 --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/LoggingUtils.java @@ -0,0 +1,66 @@ +/* + * Copyright 2016-2024 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging; + + +import java.util.Base64; + +import okio.ByteString; +import org.cloudfoundry.dropsonde.events.Envelope; +import org.cloudfoundry.dropsonde.events.LogMessage; + +public final class LoggingUtils { + + private LoggingUtils() { + } + + public static Envelope convertLogCacheEnvelopeToDropsonde(org.cloudfoundry.logcache.v1.Envelope envelope) { + final Envelope.Builder builder = new Envelope.Builder() + .eventType(Envelope.EventType.LogMessage) + .tags(envelope.getTags()) + .origin(getFromTags(envelope, "rep")) + .timestamp(envelope.getTimestamp()) + .logMessage(new LogMessage.Builder() + .message(ByteString.of(Base64.getDecoder().decode(envelope.getLog().getPayload()))) + .message_type(getMessageType(envelope)) + .timestamp(envelope.getTimestamp()) + .source_instance(envelope.getInstanceId()) + .source_type(getFromTags(envelope, "source_type")) + .build()); + return builder.build(); + } + + private static String getFromTags(org.cloudfoundry.logcache.v1.Envelope envelope, String sourceTypeTag) { + String sourceType = envelope.getTags().get(sourceTypeTag); + if (sourceType == null) { + sourceType = ""; + } + return sourceType; + } + + private static LogMessage.MessageType getMessageType(org.cloudfoundry.logcache.v1.Envelope envelope) { + LogMessage.MessageType messageType; + if ("ERR".equals(envelope.getLog().getType().getValue())) { + messageType = LogMessage.MessageType.ERR; + } + else { + messageType = LogMessage.MessageType.OUT; + } + return messageType; + } + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/ApplicationRecentLogsProvider.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/ApplicationRecentLogsProvider.java new file mode 100644 index 000000000..954f5366e --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/ApplicationRecentLogsProvider.java @@ -0,0 +1,59 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.recent; + +import java.time.Instant; + +import org.cloudfoundry.logcache.v1.Envelope; +import org.cloudfoundry.logcache.v1.EnvelopeType; +import org.cloudfoundry.logcache.v1.LogCacheClient; +import org.cloudfoundry.logcache.v1.ReadRequest; +import reactor.core.publisher.Flux; + +import org.springframework.cloud.appbroker.logging.ApplicationIdsProvider; + +public class ApplicationRecentLogsProvider implements RecentLogsProvider { + + private final LogCacheClient logCacheClient; + + private final ApplicationIdsProvider applicationIdsProvider; + + public ApplicationRecentLogsProvider(LogCacheClient logCacheClient, + ApplicationIdsProvider applicationIdsProvider) { + this.logCacheClient = logCacheClient; + this.applicationIdsProvider = applicationIdsProvider; + } + + @Override + public Flux getLogs(String serviceInstanceId) { + return this.applicationIdsProvider.getApplicationIds(serviceInstanceId) + .flatMap(this::recentLogs); + } + + protected Flux recentLogs(String applicationId) { + return logCacheClient + .read(ReadRequest.builder() + .sourceId(applicationId) + .descending(true) + .envelopeTypes(EnvelopeType.LOG) + .limit(1000) + .startTime(Instant.MIN.getEpochSecond()) + .build()) + .flatMapMany(readResponse -> Flux.fromIterable(readResponse.getEnvelopes().getBatch())); + } + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/RecentLogsProvider.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/RecentLogsProvider.java new file mode 100644 index 000000000..cf908c75b --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/RecentLogsProvider.java @@ -0,0 +1,26 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.recent; + +import org.cloudfoundry.logcache.v1.Envelope; +import reactor.core.publisher.Flux; + +public interface RecentLogsProvider { + + Flux getLogs(String serviceInstanceId); + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/EncodingException.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/EncodingException.java new file mode 100644 index 000000000..7d0adb1a0 --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/EncodingException.java @@ -0,0 +1,27 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.recent.endpoint; + +class EncodingException extends RuntimeException { + + private static final long serialVersionUID = 1837485200518028161L; + + public EncodingException(Throwable throwable) { + super("Failed to encode: " + throwable.getMessage(), throwable); + } + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/LogMessageComparator.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/LogMessageComparator.java new file mode 100644 index 000000000..08c07a7d6 --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/LogMessageComparator.java @@ -0,0 +1,39 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.recent.endpoint; + +import java.util.Comparator; + +import org.cloudfoundry.logcache.v1.Envelope; + +class LogMessageComparator implements Comparator { + + @Override + public int compare(Envelope o1, Envelope o2) { + return Long.compare(getTimestamp(o1), getTimestamp(o2)); + } + + private long getTimestamp(Envelope e) { + if (e.getTimestamp() != null) { + return e.getTimestamp(); + } + else { + return 0; + } + } + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/MultipartEncoder.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/MultipartEncoder.java new file mode 100644 index 000000000..ece987a68 --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/MultipartEncoder.java @@ -0,0 +1,66 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.recent.endpoint; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +class MultipartEncoder { + + private final byte[] bytesCRLF = {'\r', '\n'}; + + private final byte[] bytesSEP = {'-', '-'}; + + private final byte[] boundary; + + private final ByteArrayOutputStream out = new ByteArrayOutputStream(); + + public MultipartEncoder(String boundary) { + this.boundary = boundary.getBytes(); + } + + public void append(byte[] part) { + try { + out.write(bytesCRLF); + out.write(bytesSEP); + out.write(boundary); + out.write(bytesCRLF); + out.write(bytesCRLF); + out.write(part); + } + catch (IOException e) { + throw new EncodingException(e); + } + } + + public byte[] terminateAndGetBytes() { + try { + out.write(bytesCRLF); + out.write(bytesSEP); + out.write(boundary); + out.write(bytesSEP); + out.write(bytesCRLF); + final byte[] bytes = out.toByteArray(); + out.close(); + return bytes; + } + catch (IOException e) { + throw new EncodingException(e); + } + } + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/RecentLogsController.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/RecentLogsController.java new file mode 100644 index 000000000..d219c790b --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/RecentLogsController.java @@ -0,0 +1,64 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.recent.endpoint; + +import java.util.UUID; + +import org.cloudfoundry.logcache.v1.Envelope; +import reactor.core.publisher.Mono; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.appbroker.logging.LoggingUtils; +import org.springframework.cloud.appbroker.logging.recent.RecentLogsProvider; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +public class RecentLogsController { + + private static final LogMessageComparator LOG_MESSAGE_COMPARATOR = new LogMessageComparator(); + + private final RecentLogsProvider recentLogsProviders; + + public RecentLogsController(@Autowired(required = false) RecentLogsProvider recentLogsProviders) { + this.recentLogsProviders = recentLogsProviders; + } + + @RequestMapping("/logs/{serviceInstanceId}/recentlogs") + public Mono> recentLogs(@PathVariable("serviceInstanceId") String serviceInstanceId) { + final String multipartBoundary = UUID.randomUUID().toString(); + final HttpHeaders headers = new HttpHeaders(); + headers.add(HttpHeaders.CONTENT_TYPE, "multipart/mixed; boundary=" + multipartBoundary); + return recentLogsProviders.getLogs(serviceInstanceId) + .collectList() + .doOnNext(envelopes -> envelopes.sort(LOG_MESSAGE_COMPARATOR)) + .map(envelopes -> { + final MultipartEncoder multipart = new MultipartEncoder(multipartBoundary); + for (Envelope message : envelopes) { + var dropsondeEvent = LoggingUtils.convertLogCacheEnvelopeToDropsonde(message); + multipart.append(org.cloudfoundry.dropsonde.events.Envelope.ADAPTER.encode(dropsondeEvent)); + } + + return new ResponseEntity<>(multipart.terminateAndGetBytes(), headers, HttpStatus.OK); + }); + } + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/ApplicationLogStreamPublisher.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/ApplicationLogStreamPublisher.java new file mode 100644 index 000000000..9da5c0e89 --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/ApplicationLogStreamPublisher.java @@ -0,0 +1,147 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.streaming; + +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; + +import org.springframework.cloud.appbroker.logging.streaming.events.ServiceInstanceLogEvent; +import org.springframework.cloud.appbroker.logging.streaming.events.ServiceInstanceLoggingEvent; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationListener; + +public class ApplicationLogStreamPublisher implements ApplicationListener { + + private static final Logger LOG = LoggerFactory.getLogger(ApplicationLogStreamPublisher.class); + + private final Map registry = new HashMap<>(); + + private final LogStreamPublisher logStreamPublisher; + + private final ApplicationEventPublisher publisher; + + public ApplicationLogStreamPublisher( + LogStreamPublisher logStreamPublisher, + ApplicationEventPublisher publisher) { + this.logStreamPublisher = logStreamPublisher; + this.publisher = publisher; + } + + @Override + public void onApplicationEvent(ServiceInstanceLoggingEvent event) { + final String serviceInstanceId = event.getServiceInstanceId(); + switch (event.getOperation()) { + case START: + LOG.debug("Received event to begin listening to logs for {}", serviceInstanceId); + this.startPublishing(serviceInstanceId); + return; + case STOP: + LOG.debug("Received event to stop listening to logs for {}", serviceInstanceId); + this.stopPublishing(serviceInstanceId); + return; + + default: + throw new IllegalArgumentException("Unknown operation: " + event.getOperation()); + } + } + + private void startPublishing(String serviceInstanceId) { + synchronized (registry) { + final Registration registration = registry.get(serviceInstanceId); + if (registration != null) { + LOG.debug("Incrementing registration subscription count for {}", serviceInstanceId); + registration.increment(); + + return; + } + + Flux logStream = this.logStreamPublisher + .getLogStream(serviceInstanceId); + + final Disposable subscription = logStream + .doOnNext( + envelope -> publisher + .publishEvent(new ServiceInstanceLogEvent(this, serviceInstanceId, envelope))) + .subscribe(); + + LOG.debug("Creating new registration for {}", serviceInstanceId); + registry.put(serviceInstanceId, new Registration(subscription)); + } + } + + private void stopPublishing(String serviceInstanceId) { + if (LOG.isDebugEnabled()) { + LOG.debug("Received event to stop publishing logs for {}", serviceInstanceId); + } + + synchronized (registry) { + final Registration registration = registry.get(serviceInstanceId); + if (registration == null) { + if (LOG.isWarnEnabled()) { + LOG.warn("Received deregister event for service instance {} but there no event handler registered", + serviceInstanceId); + } + } + else if (registration.decrement() == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Disposing of registration since there are no more subscriptions"); + } + + registration.getSubscription().dispose(); + registry.remove(serviceInstanceId); + } + } + } + + private final static class Registration { + + private final Disposable subscription; + + private int count = 1; + + private Registration(Disposable subscription) { + this.subscription = subscription; + } + + public void increment() { + if (LOG.isDebugEnabled()) { + LOG.debug("Incrementing subscription count from {} to {}", count, count + 1); + } + + ++count; + } + + public int decrement() { + if (LOG.isDebugEnabled()) { + LOG.debug("Decrementing subscription count from {} to {}", count, count - 1); + } + + return --count; + } + + public Disposable getSubscription() { + return subscription; + } + + } + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java new file mode 100644 index 000000000..856b8dcd7 --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java @@ -0,0 +1,120 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.streaming; + +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.cloudfoundry.client.CloudFoundryClient; +import org.cloudfoundry.client.v2.applications.GetApplicationRequest; +import org.cloudfoundry.dropsonde.events.Envelope; +import org.cloudfoundry.logcache.v1.EnvelopeType; +import org.cloudfoundry.logcache.v1.LogCacheClient; +import org.cloudfoundry.logcache.v1.ReadRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.util.retry.Retry; + +import org.springframework.cloud.appbroker.logging.ApplicationIdsProvider; +import org.springframework.cloud.appbroker.logging.LoggingUtils; + +public class LogCacheStreamPublisher implements LogStreamPublisher { + + private static final Logger LOG = LoggerFactory.getLogger(LogCacheStreamPublisher.class); + + private final CloudFoundryClient client; + + private final LogCacheClient logCacheClient; + + private final ApplicationIdsProvider applicationIdsProvider; + + public LogCacheStreamPublisher( + CloudFoundryClient client, + LogCacheClient logCacheClient, + ApplicationIdsProvider applicationIdsProvider) { + this.client = client; + this.logCacheClient = logCacheClient; + this.applicationIdsProvider = applicationIdsProvider; + } + + @Override + public Flux getLogStream(String serviceInstanceId) { + return this.applicationIdsProvider + .getApplicationIds(serviceInstanceId) + .doOnNext(id -> LOG.debug("Starting log streaming for app with ID {}", id)) + .flatMap(this::createApplicationStreamer); + } + + protected Flux createApplicationStreamer(String applicationId) { + return client.applicationsV2() + .get(GetApplicationRequest.builder() + .applicationId(applicationId) + .build()) + .map(response -> response.getEntity().getName()) + .flatMapMany(appName -> { + long initialStartTime = Instant.now().minus(5, ChronoUnit.SECONDS).toEpochMilli() * 1_000_000L; + return logCacheClient.read( + ReadRequest.builder() + .sourceId(applicationId) + .envelopeTypes(EnvelopeType.LOG) + .startTime(initialStartTime) + .build()) + .flatMapMany(initialResponse -> { + AtomicLong lastTimestamp = new AtomicLong( + initialResponse.getEnvelopes().getBatch().stream() + .mapToLong(org.cloudfoundry.logcache.v1.Envelope::getTimestamp) + .max() + .orElse(initialStartTime) + ); + + Flux initialLogs = Flux.fromIterable(initialResponse.getEnvelopes().getBatch()) + .map(LoggingUtils::convertLogCacheEnvelopeToDropsonde); + + Flux polledLogs = Flux.interval(Duration.ofSeconds(1)) + .flatMap(tick -> logCacheClient.read( + ReadRequest.builder() + .sourceId(applicationId) + .envelopeTypes(EnvelopeType.LOG) + .startTime(lastTimestamp.get() + 1) + .build()) + .flatMapMany(readResponse -> { + long maxTimestamp = readResponse.getEnvelopes().getBatch().stream() + .mapToLong(org.cloudfoundry.logcache.v1.Envelope::getTimestamp) + .max() + .orElse(lastTimestamp.get()); + + lastTimestamp.set(maxTimestamp); + + return Flux.fromIterable(readResponse.getEnvelopes().getBatch()) + .map(LoggingUtils::convertLogCacheEnvelopeToDropsonde); + })) + .onErrorResume(error -> { + LOG.error("Error during log polling: ", error); + return Flux.empty(); + }); + + return Flux.merge(initialLogs, polledLogs) + .retryWhen(Retry.backoff(3, Duration.ofSeconds(5))) + .doOnError(error -> LOG.error("Streaming error: ", error)); + }); + }); + } + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogStreamPublisher.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogStreamPublisher.java new file mode 100644 index 000000000..8c9229b61 --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogStreamPublisher.java @@ -0,0 +1,25 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.streaming; + +import reactor.core.publisher.Flux; + +public interface LogStreamPublisher { + + Flux getLogStream(String serviceInstanceId); + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/endpoint/ServiceInstanceNotFoundException.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/endpoint/ServiceInstanceNotFoundException.java new file mode 100644 index 000000000..b66552546 --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/endpoint/ServiceInstanceNotFoundException.java @@ -0,0 +1,27 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.streaming.endpoint; + +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.ResponseStatus; + +@ResponseStatus(code = HttpStatus.NOT_FOUND) +class ServiceInstanceNotFoundException extends RuntimeException { + + private static final long serialVersionUID = 8672053621926030384L; + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/endpoint/StreamingLogWebSocketHandler.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/endpoint/StreamingLogWebSocketHandler.java new file mode 100644 index 000000000..5ebb1efc3 --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/endpoint/StreamingLogWebSocketHandler.java @@ -0,0 +1,122 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.streaming.endpoint; + +import java.net.URI; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.cloudfoundry.dropsonde.events.Envelope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; + +import org.springframework.cloud.appbroker.logging.streaming.events.ServiceInstanceLogEvent; +import org.springframework.cloud.appbroker.logging.streaming.events.StartServiceInstanceLoggingEvent; +import org.springframework.cloud.appbroker.logging.streaming.events.StopServiceInstanceLoggingEvent; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationListener; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.WebSocketSession; +import org.springframework.web.util.UriTemplate; + +public class StreamingLogWebSocketHandler implements WebSocketHandler, ApplicationListener { + + private static final Logger LOG = LoggerFactory.getLogger(StreamingLogWebSocketHandler.class); + + private static final UriTemplate LOGGING_URI_TEMPLATE = new UriTemplate("/logs/{serviceInstanceId}/stream"); + + private final ApplicationEventPublisher eventPublisher; + + private final ConcurrentHashMap> envelopeSinks = new ConcurrentHashMap<>(); + + public StreamingLogWebSocketHandler(ApplicationEventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; + } + + @Override + public Mono handle(WebSocketSession session) { + String serviceInstanceId = getServiceInstanceId(session); + LOG.info("Connection established [{}], service instance {}", + session.getHandshakeInfo().getRemoteAddress(), + serviceInstanceId); + + Sinks.Many envelopeSink = envelopeSinks + .computeIfAbsent(serviceInstanceId, s -> Sinks.many().multicast().onBackpressureBuffer()); + + eventPublisher.publishEvent(new StartServiceInstanceLoggingEvent(this, serviceInstanceId)); + LOG.info("Published event to start streaming logs for service instance with ID {}", serviceInstanceId); + + return session.send(envelopeSink.asFlux() + .map(envelope -> session.binaryMessage( + dataBufferFactory -> dataBufferFactory.wrap(Envelope.ADAPTER.encode(envelope))))) + .doFinally(signalType -> afterConnectionClosed(session, serviceInstanceId)) + .doOnError(throwable -> LOG.error("Error handling logging stream for service instance {}", + serviceInstanceId, throwable)); + } + + @Override + public void onApplicationEvent(ServiceInstanceLogEvent event) { + broadcastLogMessage(event); + } + + public void broadcastLogMessage(ServiceInstanceLogEvent event) { + if (LOG.isDebugEnabled()) { + LOG.debug("Received event to broadcast log message for {}", event.getServiceInstanceId()); + } + + Sinks.Many envelopeSink = this.envelopeSinks.get(event.getServiceInstanceId()); + if (envelopeSink == null) { + if (LOG.isWarnEnabled()) { + LOG.warn("No sink found for {}, stopping log streaming", event.getServiceInstanceId()); + } + + eventPublisher.publishEvent(new StopServiceInstanceLoggingEvent(this, event.getServiceInstanceId())); + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Sending message to client for {}", event.getServiceInstanceId()); + } + + envelopeSink.tryEmitNext(event.getEnvelope()).orThrow(); + } + + private void afterConnectionClosed(WebSocketSession webSocketSession, String serviceInstanceId) { + LOG.info("Connection closed [{}], service instance {}", webSocketSession.getHandshakeInfo().getRemoteAddress(), + serviceInstanceId); + + eventPublisher.publishEvent(new StopServiceInstanceLoggingEvent(this, serviceInstanceId)); + + Sinks.Many sink = envelopeSinks.remove(serviceInstanceId); + if (sink != null) { + sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST); + } + } + + private String getServiceInstanceId(WebSocketSession webSocketSession) { + URI uri = webSocketSession.getHandshakeInfo().getUri(); + final Map match = LOGGING_URI_TEMPLATE.match(uri.getPath()); + if (match.isEmpty()) { + throw new ServiceInstanceNotFoundException(); + } + + return match.get("serviceInstanceId"); + } + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/ServiceInstanceLogEvent.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/ServiceInstanceLogEvent.java new file mode 100644 index 000000000..6d450797f --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/ServiceInstanceLogEvent.java @@ -0,0 +1,46 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.streaming.events; + +import org.cloudfoundry.dropsonde.events.Envelope; + +import org.springframework.context.ApplicationEvent; + +public class ServiceInstanceLogEvent extends ApplicationEvent { + + private static final long serialVersionUID = 4391048666734944603L; + + private final String serviceInstanceId; + + private final Envelope envelope; + + public ServiceInstanceLogEvent(Object source, String serviceInstanceId, + Envelope envelope) { + super(source); + this.serviceInstanceId = serviceInstanceId; + this.envelope = envelope; + } + + public String getServiceInstanceId() { + return serviceInstanceId; + } + + public Envelope getEnvelope() { + return envelope; + } + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/ServiceInstanceLoggingEvent.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/ServiceInstanceLoggingEvent.java new file mode 100644 index 000000000..4a45f3bf2 --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/ServiceInstanceLoggingEvent.java @@ -0,0 +1,56 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.streaming.events; + +import org.springframework.context.ApplicationEvent; + +public class ServiceInstanceLoggingEvent extends ApplicationEvent { + + private static final long serialVersionUID = 3721553379568462887L; + + public enum Operation { + + /** + * Start publishing log stream for a given service instance id + */ + START, + + /** + * Stop publishing log stream for a given service instance id + */ + STOP + } + + private final String serviceInstanceId; + + private final Operation operation; + + public ServiceInstanceLoggingEvent(Object source, String serviceInstanceId, Operation operation) { + super(source); + this.serviceInstanceId = serviceInstanceId; + this.operation = operation; + } + + public String getServiceInstanceId() { + return serviceInstanceId; + } + + public Operation getOperation() { + return operation; + } + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/StartServiceInstanceLoggingEvent.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/StartServiceInstanceLoggingEvent.java new file mode 100644 index 000000000..e4dbf24e9 --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/StartServiceInstanceLoggingEvent.java @@ -0,0 +1,27 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.streaming.events; + +public class StartServiceInstanceLoggingEvent extends ServiceInstanceLoggingEvent { + + private static final long serialVersionUID = -5940715663862240039L; + + public StartServiceInstanceLoggingEvent(Object source, String serviceInstanceId) { + super(source, serviceInstanceId, Operation.START); + } + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/StopServiceInstanceLoggingEvent.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/StopServiceInstanceLoggingEvent.java new file mode 100644 index 000000000..c337bb487 --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/StopServiceInstanceLoggingEvent.java @@ -0,0 +1,27 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.streaming.events; + +public class StopServiceInstanceLoggingEvent extends ServiceInstanceLoggingEvent { + + private static final long serialVersionUID = 2858399700112202361L; + + public StopServiceInstanceLoggingEvent(Object source, String serviceInstanceId) { + super(source, serviceInstanceId, Operation.STOP); + } + +} diff --git a/spring-cloud-app-broker-logging/src/test/java/com/example/recentlog/RecentLogsTestApp.java b/spring-cloud-app-broker-logging/src/test/java/com/example/recentlog/RecentLogsTestApp.java new file mode 100644 index 000000000..95a30111f --- /dev/null +++ b/spring-cloud-app-broker-logging/src/test/java/com/example/recentlog/RecentLogsTestApp.java @@ -0,0 +1,46 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.recentlog; + +import java.util.UUID; + +import reactor.core.publisher.Flux; + +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.appbroker.logging.ApplicationIdsProvider; +import org.springframework.cloud.servicebroker.autoconfigure.web.ServiceBrokerAutoConfiguration; +import org.springframework.cloud.servicebroker.autoconfigure.web.reactive.ServiceBrokerWebFluxAutoConfiguration; +import org.springframework.context.annotation.Bean; + +@SpringBootApplication(exclude = { + ServiceBrokerAutoConfiguration.class, + ServiceBrokerWebFluxAutoConfiguration.class +}) +public class RecentLogsTestApp { + + static final String APP_ID = UUID.randomUUID().toString(); + + public static String getAppId() { + return APP_ID; + } + + @Bean + ApplicationIdsProvider applicationIdsProvider() { + return serviceInstanceId -> Flux.just(APP_ID); + } + +} diff --git a/spring-cloud-app-broker-logging/src/test/java/com/example/streaming/LogStreamingTestApp.java b/spring-cloud-app-broker-logging/src/test/java/com/example/streaming/LogStreamingTestApp.java new file mode 100644 index 000000000..9045cafe3 --- /dev/null +++ b/spring-cloud-app-broker-logging/src/test/java/com/example/streaming/LogStreamingTestApp.java @@ -0,0 +1,65 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.streaming; + +import java.util.UUID; + +import reactor.core.publisher.Flux; + +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.appbroker.logging.ApplicationIdsProvider; +import org.springframework.cloud.appbroker.logging.streaming.events.StopServiceInstanceLoggingEvent; +import org.springframework.cloud.servicebroker.autoconfigure.web.ServiceBrokerAutoConfiguration; +import org.springframework.cloud.servicebroker.autoconfigure.web.reactive.ServiceBrokerWebFluxAutoConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.event.EventListener; + +@SpringBootApplication(exclude = { + ServiceBrokerAutoConfiguration.class, + ServiceBrokerWebFluxAutoConfiguration.class +}) +public class LogStreamingTestApp { + + private static final String APP_ID = UUID.randomUUID().toString(); + + private static boolean receivedStopEvent; + private static String receivedStopEventServiceInstanceId; + + public static String getAppId() { + return APP_ID; + } + + public static boolean isReceivedStopEvent() { + return receivedStopEvent; + } + + public static String getReceivedStopEventServiceInstanceId() { + return receivedStopEventServiceInstanceId; + } + + @Bean + ApplicationIdsProvider applicationIdsProvider() { + return serviceInstanceId -> Flux.just(APP_ID); + } + + @EventListener + public void onStop(StopServiceInstanceLoggingEvent stopServiceInstanceLoggingEvent) { + receivedStopEventServiceInstanceId = stopServiceInstanceLoggingEvent.getServiceInstanceId(); + receivedStopEvent = true; + } + +} diff --git a/spring-cloud-app-broker-logging/src/test/java/org/springframework/cloud/appbroker/logging/recent/ServiceInstanceRecentLogsTest.java b/spring-cloud-app-broker-logging/src/test/java/org/springframework/cloud/appbroker/logging/recent/ServiceInstanceRecentLogsTest.java new file mode 100644 index 000000000..a78d9108b --- /dev/null +++ b/spring-cloud-app-broker-logging/src/test/java/org/springframework/cloud/appbroker/logging/recent/ServiceInstanceRecentLogsTest.java @@ -0,0 +1,103 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.recent; + +import java.time.Instant; +import java.util.Base64; +import java.util.UUID; + +import com.example.recentlog.RecentLogsTestApp; +import org.cloudfoundry.client.CloudFoundryClient; +import org.cloudfoundry.client.v2.applications.ApplicationEntity; +import org.cloudfoundry.client.v2.applications.GetApplicationRequest; +import org.cloudfoundry.client.v2.applications.GetApplicationResponse; +import org.cloudfoundry.logcache.v1.Envelope; +import org.cloudfoundry.logcache.v1.EnvelopeBatch; +import org.cloudfoundry.logcache.v1.Log; +import org.cloudfoundry.logcache.v1.LogCacheClient; +import org.cloudfoundry.logcache.v1.LogType; +import org.cloudfoundry.logcache.v1.ReadRequest; +import org.cloudfoundry.logcache.v1.ReadResponse; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Answers; +import reactor.core.publisher.Mono; + +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.boot.test.web.server.LocalServerPort; +import org.springframework.test.web.reactive.server.WebTestClient; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; + +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = RecentLogsTestApp.class) +class ServiceInstanceRecentLogsTest { + + @LocalServerPort + private int port; + + @MockBean + private LogCacheClient logCacheClient; + + @MockBean(answer = Answers.RETURNS_DEEP_STUBS) + private CloudFoundryClient cloudFoundryClient; + + private String expectedTestMessage; + + private static final String SERVICE_INSTANCE_ID = UUID.randomUUID().toString(); + + @BeforeEach + void setUp() { + expectedTestMessage = "test message " + UUID.randomUUID(); + + Envelope testEnvelope = + Envelope.builder() + .timestamp(Instant.now().toEpochMilli()) + .log(Log.builder() + .payload(new String(Base64.getEncoder().encode(expectedTestMessage.getBytes()))) + .type(LogType.OUT) + .build()) + .build(); + + ReadResponse response = + ReadResponse.builder() + .envelopes(EnvelopeBatch.builder().batch(testEnvelope).build()) + .build(); + + given(logCacheClient.read(any(ReadRequest.class))) + .willReturn(Mono.just(response)); + + given(cloudFoundryClient.applicationsV2() + .get(GetApplicationRequest.builder().applicationId(RecentLogsTestApp.getAppId()).build())) + .willReturn(Mono.just( + GetApplicationResponse.builder().entity(ApplicationEntity.builder().name("test-app").build()).build())); + } + + @Test + void shouldFetchLogs() { + WebTestClient client = WebTestClient.bindToServer().baseUrl("http://localhost:" + port).build(); + + client.get().uri("/logs/{serviceInstanceId}/recentlogs", SERVICE_INSTANCE_ID) + .exchange() + .expectStatus().isOk() + .expectBody(String.class) + .value(Matchers.containsString(expectedTestMessage)); + } + +} diff --git a/spring-cloud-app-broker-logging/src/test/java/org/springframework/cloud/appbroker/logging/streaming/ServiceInstanceLogStreamingTest.java b/spring-cloud-app-broker-logging/src/test/java/org/springframework/cloud/appbroker/logging/streaming/ServiceInstanceLogStreamingTest.java new file mode 100644 index 000000000..fa380bace --- /dev/null +++ b/spring-cloud-app-broker-logging/src/test/java/org/springframework/cloud/appbroker/logging/streaming/ServiceInstanceLogStreamingTest.java @@ -0,0 +1,175 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.streaming; + +import java.net.URI; +import java.time.Instant; +import java.util.Base64; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +import com.example.recentlog.RecentLogsTestApp; +import com.example.streaming.LogStreamingTestApp; +import okio.ByteString; +import org.cloudfoundry.client.CloudFoundryClient; +import org.cloudfoundry.client.v2.applications.ApplicationEntity; +import org.cloudfoundry.client.v2.applications.GetApplicationRequest; +import org.cloudfoundry.client.v2.applications.GetApplicationResponse; +import org.cloudfoundry.dropsonde.events.LogMessage; +import org.cloudfoundry.logcache.v1.Envelope; +import org.cloudfoundry.logcache.v1.EnvelopeBatch; +import org.cloudfoundry.logcache.v1.Log; +import org.cloudfoundry.logcache.v1.LogCacheClient; +import org.cloudfoundry.logcache.v1.LogType; +import org.cloudfoundry.logcache.v1.ReadRequest; +import org.cloudfoundry.logcache.v1.ReadResponse; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Answers; +import reactor.core.Disposable; +import reactor.core.publisher.Mono; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.boot.test.web.server.LocalServerPort; +import org.springframework.cloud.appbroker.logging.streaming.events.ServiceInstanceLogEvent; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient; +import org.springframework.web.reactive.socket.client.WebSocketClient; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; + +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = LogStreamingTestApp.class) +class ServiceInstanceLogStreamingTest { + + private static final String SERVICE_INSTANCE_ID = UUID.randomUUID().toString(); + + @LocalServerPort + private int port; + + @MockBean + private LogCacheClient logCacheClient; + + @MockBean(answer = Answers.RETURNS_DEEP_STUBS) + private CloudFoundryClient cloudFoundryClient; + + @Autowired + private ApplicationEventPublisher applicationEventPublisher; + + private org.cloudfoundry.dropsonde.events.Envelope expectedEnvelope; + + private final AtomicReference actualEnvelope = new AtomicReference<>(); + + @BeforeEach + void setUp() { + String expectedTestMessage = "test message " + UUID.randomUUID(); + + Envelope testEnvelope = + Envelope.builder() + .timestamp(Instant.now().toEpochMilli()) + .log(Log.builder() + .payload(new String(Base64.getEncoder().encode(expectedTestMessage.getBytes()))) + .type(LogType.OUT) + .build()) + .build(); + expectedEnvelope = new org.cloudfoundry.dropsonde.events.Envelope.Builder() + .logMessage(new LogMessage.Builder() + .message(ByteString.of(Base64.getEncoder().encode(expectedTestMessage.getBytes()))) + .message_type(LogMessage.MessageType.OUT) + .timestamp(0L) + .build()) + .origin("") + .eventType(org.cloudfoundry.dropsonde.events.Envelope.EventType.LogMessage) + .build(); + actualEnvelope.set(testEnvelope); + + ReadResponse response = + ReadResponse.builder() + .envelopes(EnvelopeBatch.builder().batch(testEnvelope).build()) + .build(); + + given(logCacheClient.read(any(ReadRequest.class))) + .willReturn(Mono.just(response)); + + given(cloudFoundryClient.applicationsV2() + .get(GetApplicationRequest.builder().applicationId(RecentLogsTestApp.getAppId()).build())) + .willReturn(Mono.just( + GetApplicationResponse.builder().entity(ApplicationEntity.builder().name("test-app").build()).build())); + } + + @Test + void shouldPublishWebSocketEndpoint() { + Disposable subscription = connectToLogsStreamEndpoint(); + + await().untilAsserted(() -> assertThat(Base64.getDecoder().decode(actualEnvelope.get().getLog().getPayload())) + .isEqualTo(Base64.getDecoder().decode(expectedEnvelope.logMessage.message.toByteArray()))); + + subscription.dispose(); + } + + @Test + void shouldPublishEventOnDisconnect() { + Disposable subscription = connectToLogsStreamEndpoint(); + + await().untilAsserted(() -> assertThat(actualEnvelope.get()).isNotNull()); + + subscription.dispose(); + + await().untilAsserted(() -> assertThat(LogStreamingTestApp.isReceivedStopEvent()).isTrue()); + } + + @Test + void shouldStopStreamingIfNoClient() { + // CLI plugin doesn't always handle disconnect gracefully, so sometimes it is possible that log stream is + // being published but there is no listener. In this case log streaming should be stopped. + + Disposable subscription = connectToLogsStreamEndpoint(); + subscription.dispose(); + + applicationEventPublisher.publishEvent( + new ServiceInstanceLogEvent(this, SERVICE_INSTANCE_ID, expectedEnvelope)); + await().untilAsserted( + () -> assertThat(LogStreamingTestApp.getReceivedStopEventServiceInstanceId()).isEqualTo( + SERVICE_INSTANCE_ID)); + } + + private Disposable connectToLogsStreamEndpoint() { + URI uri = URI.create("ws://localhost:" + port + "/logs/" + SERVICE_INSTANCE_ID + "/stream"); + + WebSocketClient client = new ReactorNettyWebSocketClient(); + return client.execute(uri, getWebSocketHandler()).subscribe(); + } + + private WebSocketHandler getWebSocketHandler() { + return session -> session + .receive() + .doOnNext(message -> { + DataBuffer buffer = message.getPayload(); + actualEnvelope.set(Envelope.builder() + .log(Log.builder().payload(new String(Base64.getDecoder().decode(buffer.toString()))).build()) + .build()); + }) + .then(); + } + +} diff --git a/spring-cloud-starter-app-broker-logging/build.gradle b/spring-cloud-starter-app-broker-logging/build.gradle new file mode 100644 index 000000000..128b3468a --- /dev/null +++ b/spring-cloud-starter-app-broker-logging/build.gradle @@ -0,0 +1,22 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +description = "Spring Cloud App Broker Starter for Logging" + +dependencies { + api project(":spring-cloud-starter-app-broker") + api project(":spring-cloud-app-broker-logging") +} diff --git a/src/pmd/pmdTestRuleSet.xml b/src/pmd/pmdTestRuleSet.xml index 8d5ef6221..cd4fe9e9f 100644 --- a/src/pmd/pmdTestRuleSet.xml +++ b/src/pmd/pmdTestRuleSet.xml @@ -73,6 +73,7 @@ + From 330aa8381608cb4be3408a793955bb36926c2237 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20C=2E=20R=C3=ADos?= Date: Mon, 8 Jul 2024 14:51:22 +0200 Subject: [PATCH 02/12] Documenting logging starter --- spring-cloud-app-broker-docs/build.gradle | 56 +++++++++---------- .../src/docs/asciidoc/index.adoc | 2 + .../asciidoc/service-instance-logging.adoc | 46 +++++++++++++++ 3 files changed, 76 insertions(+), 28 deletions(-) create mode 100644 spring-cloud-app-broker-docs/src/docs/asciidoc/service-instance-logging.adoc diff --git a/spring-cloud-app-broker-docs/build.gradle b/spring-cloud-app-broker-docs/build.gradle index 7a31058b7..9542a4437 100644 --- a/spring-cloud-app-broker-docs/build.gradle +++ b/spring-cloud-app-broker-docs/build.gradle @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,7 +37,7 @@ dependencies { implementation "org.springframework.boot:spring-boot-starter-tomcat" implementation "io.projectreactor:reactor-core" implementation "io.r2dbc:r2dbc-h2:1.0.0.RELEASE" - asciidoctorExtensions "io.spring.asciidoctor.backends:spring-asciidoctor-backends:0.0.7" + asciidoctorExtensions "io.spring.asciidoctor.backends:spring-asciidoctor-backends:0.0.7" } javadoc { @@ -63,20 +63,20 @@ asciidoctorPdf { } options doctype: 'book' attributes 'icons': 'font', - 'sectanchors': '', - 'sectnums': '', - 'toc': '', - 'source-highlighter' : 'coderay', - revnumber: project.version, - 'project-version': project.version + 'sectanchors': '', + 'sectnums': '', + 'toc': '', + 'source-highlighter': 'coderay', + revnumber: project.version, + 'project-version': project.version } } asciidoctor { dependsOn asciidoctorPdf - baseDirFollowsSourceFile() - configurations "asciidoctorExtensions" - outputOptions { + baseDirFollowsSourceFile() + configurations "asciidoctorExtensions" + outputOptions { backends "spring-html" } sourceDir = file("$buildDir/asciidoc") @@ -91,21 +91,21 @@ asciidoctor { logDocuments = true options doctype: 'book', eruby: 'erubis' attributes 'revnumber': project.version, - 'spring-version': project.version, - 'branch-or-tag': project.version.endsWith('SNAPSHOT') ? 'main' : "v${project.version}", - 'icons': 'font', - 'idprefix': '', - 'idseparator': '-', - docinfo: 'shared', - sectanchors: '', - sectnums: '', - stylesdir: "css/", - stylesheet: 'spring.css', - 'linkcss': true, - 'nofooter': true, - 'allow-uri-read': '', - 'source-highlighter': 'highlight.js', - 'highlightjsdir': 'js/highlight', - 'highlightjs-theme': 'github', - 'project-version': project.version + 'spring-version': project.version, + 'branch-or-tag': project.version.endsWith('SNAPSHOT') ? 'main' : "v${project.version}", + 'icons': 'font', + 'idprefix': '', + 'idseparator': '-', + docinfo: 'shared', + sectanchors: '', + sectnums: '', + stylesdir: "css/", + stylesheet: 'spring.css', + 'linkcss': true, + 'nofooter': true, + 'allow-uri-read': '', + 'source-highlighter': 'highlight.js', + 'highlightjsdir': 'js/highlight', + 'highlightjs-theme': 'github', + 'project-version': project.version } diff --git a/spring-cloud-app-broker-docs/src/docs/asciidoc/index.adoc b/spring-cloud-app-broker-docs/src/docs/asciidoc/index.adoc index 3c7f56dfa..49f03ae6b 100644 --- a/spring-cloud-app-broker-docs/src/docs/asciidoc/index.adoc +++ b/spring-cloud-app-broker-docs/src/docs/asciidoc/index.adoc @@ -31,6 +31,8 @@ include::getting-started.adoc[] include::advertising-services.adoc[] +include::service-instance-logging.adoc[] + include::service-instances.adoc[] include::service-bindings.adoc[] diff --git a/spring-cloud-app-broker-docs/src/docs/asciidoc/service-instance-logging.adoc b/spring-cloud-app-broker-docs/src/docs/asciidoc/service-instance-logging.adoc new file mode 100644 index 000000000..eca052741 --- /dev/null +++ b/spring-cloud-app-broker-docs/src/docs/asciidoc/service-instance-logging.adoc @@ -0,0 +1,46 @@ +[[service-instance-logging]] +:acceptance-tests-dir: ../../../spring-cloud-app-broker-acceptance-tests/src/main/java/org/springframework/cloud/appbroker/acceptance/ +== Service Instance Logging + +You can configure the service broker to be compatible with the https://github.com/pivotal-cf/service-instance-logs-cli-plugin[Service instance logs CLI plugin] in order to tail and stream the logs of the backing application. + + +If you use Gradle, include the following in your application's `build.gradle` file: + +==== +[source,groovy,subs="attributes+"] +---- +dependencies { + api 'spring-cloud-starter-app-broker-logging:{project-version}' +} +---- +==== + +Then, include the `serviceInstanceLogsEndpoint` in the catalog metadata properties: + +==== +[source,yaml,subs="+quotes"] +---- +spring: + cloud: + openservicebroker: + catalog: + services: + - id: "service-id" + name: "service-name" + metadata: + properties: + serviceInstanceLogsEndpoint: https://scg-service-broker.system.domain.com/logs/ +---- +==== + +Finally, provide an implementation for `ApplicationIdsProvider` in order to retrieve the backing application id given a service instance id, for example: + +==== +[source,java,%autofit] +---- +include::{acceptance-tests-dir}logging/BackingApplicationIdsProvider.java[] +---- +==== + + From 32d09304035d629adffb1f65a69d8d87a4b9ae2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20C=2E=20R=C3=ADos?= Date: Tue, 9 Jul 2024 13:09:42 +0200 Subject: [PATCH 03/12] Update spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingRecentAcceptanceTest.java Co-authored-by: Gareth Clay --- .../cloud/appbroker/acceptance/LoggingRecentAcceptanceTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingRecentAcceptanceTest.java b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingRecentAcceptanceTest.java index b91495fc0..11eb3c0fb 100644 --- a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingRecentAcceptanceTest.java +++ b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingRecentAcceptanceTest.java @@ -56,7 +56,6 @@ protected String backingServiceName() { @AppBrokerTestProperties({ "spring.cloud.appbroker.services[0].service-name=" + APP_SERVICE_NAME, "spring.cloud.appbroker.services[0].plan-name=" + PLAN_NAME, - "spring.cloud.appbroker.services[0].apps[0].name=" + APP_CREATE_1, "spring.cloud.appbroker.services[0].apps[0].path=" + BACKING_APP_PATH, "spring.cloud.appbroker.services[0].target.name=SpacePerServiceInstance", From 6eb8c84d0b8cf029c210e67bdc35fb57259fdd7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20C=2E=20R=C3=ADos?= Date: Tue, 9 Jul 2024 13:16:40 +0200 Subject: [PATCH 04/12] Update spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java Co-authored-by: Gareth Clay --- .../appbroker/logging/streaming/LogCacheStreamPublisher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java index 856b8dcd7..e6ba6d48e 100644 --- a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java @@ -112,7 +112,7 @@ protected Flux createApplicationStreamer(String applicationId) { return Flux.merge(initialLogs, polledLogs) .retryWhen(Retry.backoff(3, Duration.ofSeconds(5))) - .doOnError(error -> LOG.error("Streaming error: ", error)); + .doOnError(error -> LOG.error("Streaming error", error)); }); }); } From 16d2ed115cef9ead4a295db118872af47009d523 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20C=2E=20R=C3=ADos?= Date: Tue, 9 Jul 2024 13:17:52 +0200 Subject: [PATCH 05/12] Update spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingStreamingAcceptanceTest.java Co-authored-by: Gareth Clay --- .../appbroker/acceptance/LoggingStreamingAcceptanceTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingStreamingAcceptanceTest.java b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingStreamingAcceptanceTest.java index 117884461..d039986b6 100644 --- a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingStreamingAcceptanceTest.java +++ b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingStreamingAcceptanceTest.java @@ -60,7 +60,6 @@ protected String backingServiceName() { @AppBrokerTestProperties({ "spring.cloud.appbroker.services[0].service-name=" + APP_SERVICE_NAME, "spring.cloud.appbroker.services[0].plan-name=" + PLAN_NAME, - "spring.cloud.appbroker.services[0].apps[0].name=" + APP_CREATE_1, "spring.cloud.appbroker.services[0].apps[0].path=" + BACKING_APP_PATH, "spring.cloud.appbroker.services[0].target.name=SpacePerServiceInstance", From 4cbee2d7bad1e5a150b74d131981e24947d22391 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20C=2E=20R=C3=ADos?= Date: Tue, 9 Jul 2024 13:18:47 +0200 Subject: [PATCH 06/12] Update spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/endpoint/StreamingLogWebSocketHandler.java Co-authored-by: Gareth Clay --- .../streaming/endpoint/StreamingLogWebSocketHandler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/endpoint/StreamingLogWebSocketHandler.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/endpoint/StreamingLogWebSocketHandler.java index 5ebb1efc3..81d0e9cf9 100644 --- a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/endpoint/StreamingLogWebSocketHandler.java +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/endpoint/StreamingLogWebSocketHandler.java @@ -66,8 +66,8 @@ public Mono handle(WebSocketSession session) { .map(envelope -> session.binaryMessage( dataBufferFactory -> dataBufferFactory.wrap(Envelope.ADAPTER.encode(envelope))))) .doFinally(signalType -> afterConnectionClosed(session, serviceInstanceId)) - .doOnError(throwable -> LOG.error("Error handling logging stream for service instance {}", - serviceInstanceId, throwable)); + .doOnError(throwable -> LOG.error(String.format("Error handling logging stream for service instance %s", + serviceInstanceId), throwable)); } @Override From b18896fb9cb692771e78eaef35bb959aeb60df5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20C=2E=20R=C3=ADos?= Date: Tue, 9 Jul 2024 13:26:28 +0200 Subject: [PATCH 07/12] Update spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/fixtures/cf/CloudFoundryService.java Co-authored-by: Gareth Clay --- .../appbroker/acceptance/fixtures/cf/CloudFoundryService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/fixtures/cf/CloudFoundryService.java b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/fixtures/cf/CloudFoundryService.java index 97dd9fd27..b169d202f 100644 --- a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/fixtures/cf/CloudFoundryService.java +++ b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/fixtures/cf/CloudFoundryService.java @@ -306,7 +306,7 @@ public Mono getOrCreateDefaultOrg() { public Mono> getSpaces() { return cloudFoundryOperations.spaces().list() .doOnComplete(() -> LOG.info("Success listing spaces")) - .doOnError(e -> LOG.error(String.format("Error listing spaces. error={}" + e.getMessage()), e)) + .doOnError(e -> LOG.error(String.format("Error listing spaces. error=%s", e.getMessage()), e)) .map(SpaceSummary::getName) .collectList(); } From d0b64c4e8567f7eba7017bf2caabe16e8cddab88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20C=2E=20R=C3=ADos?= Date: Tue, 9 Jul 2024 13:29:12 +0200 Subject: [PATCH 08/12] Update spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java Co-authored-by: Gareth Clay --- .../appbroker/logging/streaming/LogCacheStreamPublisher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java index e6ba6d48e..0aac1068f 100644 --- a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java @@ -106,7 +106,7 @@ protected Flux createApplicationStreamer(String applicationId) { .map(LoggingUtils::convertLogCacheEnvelopeToDropsonde); })) .onErrorResume(error -> { - LOG.error("Error during log polling: ", error); + LOG.error("Error during log polling", error); return Flux.empty(); }); From 9c6af342d341ec3fcc186925590aee1a79ca6e52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20C=2E=20R=C3=ADos?= Date: Tue, 9 Jul 2024 13:43:54 +0200 Subject: [PATCH 09/12] Review comments --- .../LoggingStreamingAcceptanceTest.java | 5 -- ...nstanceLogStreamAutoConfigurationTest.java | 4 +- .../asciidoc/service-instance-logging.adoc | 33 +++++-- .../streaming/LogCacheStreamPublisher.java | 85 +++++++++++-------- .../StreamingLogWebSocketHandler.java | 3 +- 5 files changed, 77 insertions(+), 53 deletions(-) diff --git a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingStreamingAcceptanceTest.java b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingStreamingAcceptanceTest.java index d039986b6..f7987cd9b 100644 --- a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingStreamingAcceptanceTest.java +++ b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingStreamingAcceptanceTest.java @@ -18,10 +18,8 @@ import java.time.Duration; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; -import org.cloudfoundry.operations.applications.ApplicationSummary; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -94,9 +92,6 @@ void shouldStreamBackingApplicationLogs() { .verifyComplete(); deleteServiceInstance(SI_NAME); - - Optional backingApplication1AfterDeletion = getApplicationSummary(APP_CREATE_1); - assertThat(backingApplication1AfterDeletion).isEmpty(); } } diff --git a/spring-cloud-app-broker-autoconfigure/src/test/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceLogStreamAutoConfigurationTest.java b/spring-cloud-app-broker-autoconfigure/src/test/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceLogStreamAutoConfigurationTest.java index 7127678ac..49cb13110 100644 --- a/spring-cloud-app-broker-autoconfigure/src/test/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceLogStreamAutoConfigurationTest.java +++ b/spring-cloud-app-broker-autoconfigure/src/test/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceLogStreamAutoConfigurationTest.java @@ -48,7 +48,7 @@ class ServiceInstanceLogStreamAutoConfigurationTest { ); @Test - void servicesAreNotCreatedWithoutLoggingOnClasspath() { + void servicesAreNotCreatedWhenPublisherIsNotConfigured() { contextRunner .withClassLoader(new FilteredClassLoader(ApplicationLogStreamPublisher.class)) .withUserConfiguration(LoggingConfiguration.class) @@ -61,7 +61,7 @@ void servicesAreNotCreatedWithoutLoggingOnClasspath() { } @Test - void servicesAreNotCreatedWithoutRequiredBeansOnClasspath() { + void servicesAreNotCreatedWhenLoggingIsNotConfigured() { contextRunner .run(context -> assertThat(context) .doesNotHaveBean(StreamingLogWebSocketHandler.class) diff --git a/spring-cloud-app-broker-docs/src/docs/asciidoc/service-instance-logging.adoc b/spring-cloud-app-broker-docs/src/docs/asciidoc/service-instance-logging.adoc index eca052741..f205f2d9d 100644 --- a/spring-cloud-app-broker-docs/src/docs/asciidoc/service-instance-logging.adoc +++ b/spring-cloud-app-broker-docs/src/docs/asciidoc/service-instance-logging.adoc @@ -16,21 +16,36 @@ dependencies { ---- ==== +If you use Maven, include the following in your application's `pom.xml` file: + +==== +[source,xml,subs="attributes+"] +---- + + + org.springframework.cloud + spring-cloud-starter-app-broker-logging + {project-version} + + +---- +==== + Then, include the `serviceInstanceLogsEndpoint` in the catalog metadata properties: ==== [source,yaml,subs="+quotes"] ---- spring: - cloud: - openservicebroker: - catalog: - services: - - id: "service-id" - name: "service-name" - metadata: - properties: - serviceInstanceLogsEndpoint: https://scg-service-broker.system.domain.com/logs/ + cloud: + openservicebroker: + catalog: + services: + - id: "service-id" + name: "service-name" + metadata: + properties: + serviceInstanceLogsEndpoint: https://scg-service-broker.system.domain.com/logs/ ---- ==== diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java index 0aac1068f..e2f0f7bb7 100644 --- a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java @@ -27,9 +27,11 @@ import org.cloudfoundry.logcache.v1.EnvelopeType; import org.cloudfoundry.logcache.v1.LogCacheClient; import org.cloudfoundry.logcache.v1.ReadRequest; +import org.cloudfoundry.logcache.v1.ReadResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.util.retry.Retry; import org.springframework.cloud.appbroker.logging.ApplicationIdsProvider; @@ -63,47 +65,18 @@ public Flux getLogStream(String serviceInstanceId) { } protected Flux createApplicationStreamer(String applicationId) { - return client.applicationsV2() - .get(GetApplicationRequest.builder() - .applicationId(applicationId) - .build()) - .map(response -> response.getEntity().getName()) + return getApplicationName(applicationId) .flatMapMany(appName -> { long initialStartTime = Instant.now().minus(5, ChronoUnit.SECONDS).toEpochMilli() * 1_000_000L; - return logCacheClient.read( - ReadRequest.builder() - .sourceId(applicationId) - .envelopeTypes(EnvelopeType.LOG) - .startTime(initialStartTime) - .build()) + return readLogCache(applicationId, initialStartTime) .flatMapMany(initialResponse -> { - AtomicLong lastTimestamp = new AtomicLong( - initialResponse.getEnvelopes().getBatch().stream() - .mapToLong(org.cloudfoundry.logcache.v1.Envelope::getTimestamp) - .max() - .orElse(initialStartTime) - ); - - Flux initialLogs = Flux.fromIterable(initialResponse.getEnvelopes().getBatch()) - .map(LoggingUtils::convertLogCacheEnvelopeToDropsonde); - + AtomicLong lastTimestamp = getLastTimestamp(initialResponse, initialStartTime); + Flux initialLogs = convertEnvelopesToDropsonde(initialResponse); Flux polledLogs = Flux.interval(Duration.ofSeconds(1)) - .flatMap(tick -> logCacheClient.read( - ReadRequest.builder() - .sourceId(applicationId) - .envelopeTypes(EnvelopeType.LOG) - .startTime(lastTimestamp.get() + 1) - .build()) + .flatMap(tick -> readLogCache(applicationId, lastTimestamp.get() + 1) .flatMapMany(readResponse -> { - long maxTimestamp = readResponse.getEnvelopes().getBatch().stream() - .mapToLong(org.cloudfoundry.logcache.v1.Envelope::getTimestamp) - .max() - .orElse(lastTimestamp.get()); - - lastTimestamp.set(maxTimestamp); - - return Flux.fromIterable(readResponse.getEnvelopes().getBatch()) - .map(LoggingUtils::convertLogCacheEnvelopeToDropsonde); + updateLastTimestampFromResponse(readResponse, lastTimestamp); + return convertEnvelopesToDropsonde(readResponse); })) .onErrorResume(error -> { LOG.error("Error during log polling", error); @@ -117,4 +90,44 @@ protected Flux createApplicationStreamer(String applicationId) { }); } + private static AtomicLong getLastTimestamp(ReadResponse initialResponse, long initialStartTime) { + return new AtomicLong( + initialResponse.getEnvelopes().getBatch().stream() + .mapToLong(org.cloudfoundry.logcache.v1.Envelope::getTimestamp) + .max() + .orElse(initialStartTime) + ); + } + + private static Flux convertEnvelopesToDropsonde(ReadResponse readResponse) { + return Flux.fromIterable(readResponse.getEnvelopes().getBatch()) + .map(LoggingUtils::convertLogCacheEnvelopeToDropsonde); + } + + private Mono getApplicationName(String applicationId) { + return client.applicationsV2() + .get(GetApplicationRequest.builder() + .applicationId(applicationId) + .build()) + .map(response -> response.getEntity().getName()); + } + + private Mono readLogCache(String applicationId, long lastTimestamp) { + return logCacheClient.read( + ReadRequest.builder() + .sourceId(applicationId) + .envelopeTypes(EnvelopeType.LOG) + .startTime(lastTimestamp) + .build()); + } + + private static void updateLastTimestampFromResponse(ReadResponse readResponse, AtomicLong lastTimestamp) { + long maxTimestamp = readResponse.getEnvelopes().getBatch().stream() + .mapToLong(org.cloudfoundry.logcache.v1.Envelope::getTimestamp) + .max() + .orElse(lastTimestamp.get()); + + lastTimestamp.set(maxTimestamp); + } + } diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/endpoint/StreamingLogWebSocketHandler.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/endpoint/StreamingLogWebSocketHandler.java index 81d0e9cf9..44d7248e5 100644 --- a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/endpoint/StreamingLogWebSocketHandler.java +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/endpoint/StreamingLogWebSocketHandler.java @@ -19,6 +19,7 @@ import java.net.URI; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.cloudfoundry.dropsonde.events.Envelope; import org.slf4j.Logger; @@ -43,7 +44,7 @@ public class StreamingLogWebSocketHandler implements WebSocketHandler, Applicati private final ApplicationEventPublisher eventPublisher; - private final ConcurrentHashMap> envelopeSinks = new ConcurrentHashMap<>(); + private final ConcurrentMap> envelopeSinks = new ConcurrentHashMap<>(); public StreamingLogWebSocketHandler(ApplicationEventPublisher eventPublisher) { this.eventPublisher = eventPublisher; From c4b6ce7dd91f25bce8768ffae596746c9c21e479 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20C=2E=20R=C3=ADos?= Date: Tue, 9 Jul 2024 14:04:19 +0200 Subject: [PATCH 10/12] Review comments --- .../recent/endpoint/MultipartEncoder.java | 8 +++++- .../ApplicationLogStreamPublisher.java | 28 +++++++++---------- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/MultipartEncoder.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/MultipartEncoder.java index ece987a68..a9ce3b49d 100644 --- a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/MultipartEncoder.java +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/MultipartEncoder.java @@ -17,9 +17,10 @@ package org.springframework.cloud.appbroker.logging.recent.endpoint; import java.io.ByteArrayOutputStream; +import java.io.Closeable; import java.io.IOException; -class MultipartEncoder { +class MultipartEncoder implements Closeable { private final byte[] bytesCRLF = {'\r', '\n'}; @@ -63,4 +64,9 @@ public byte[] terminateAndGetBytes() { } } + @Override + public void close() throws IOException { + out.close(); + } + } diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/ApplicationLogStreamPublisher.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/ApplicationLogStreamPublisher.java index 9da5c0e89..85406af28 100644 --- a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/ApplicationLogStreamPublisher.java +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/ApplicationLogStreamPublisher.java @@ -65,13 +65,11 @@ public void onApplicationEvent(ServiceInstanceLoggingEvent event) { } private void startPublishing(String serviceInstanceId) { - synchronized (registry) { - final Registration registration = registry.get(serviceInstanceId); - if (registration != null) { + registry.compute(serviceInstanceId, (key, existingRegistration) -> { + if (existingRegistration != null) { LOG.debug("Incrementing registration subscription count for {}", serviceInstanceId); - registration.increment(); - - return; + existingRegistration.increment(); + return existingRegistration; } Flux logStream = this.logStreamPublisher @@ -84,8 +82,8 @@ private void startPublishing(String serviceInstanceId) { .subscribe(); LOG.debug("Creating new registration for {}", serviceInstanceId); - registry.put(serviceInstanceId, new Registration(subscription)); - } + return new Registration(subscription); + }); } private void stopPublishing(String serviceInstanceId) { @@ -93,23 +91,25 @@ private void stopPublishing(String serviceInstanceId) { LOG.debug("Received event to stop publishing logs for {}", serviceInstanceId); } - synchronized (registry) { - final Registration registration = registry.get(serviceInstanceId); + registry.compute(serviceInstanceId, (key, registration) -> { if (registration == null) { if (LOG.isWarnEnabled()) { LOG.warn("Received deregister event for service instance {} but there no event handler registered", serviceInstanceId); } + return null; } - else if (registration.decrement() == 0) { + + if (registration.decrement() == 0) { if (LOG.isDebugEnabled()) { LOG.debug("Disposing of registration since there are no more subscriptions"); } - registration.getSubscription().dispose(); - registry.remove(serviceInstanceId); + return null; } - } + + return registration; + }); } private final static class Registration { From a8a9698d677ef14108688a2b6151227a1b614736 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20C=2E=20R=C3=ADos?= Date: Tue, 9 Jul 2024 14:41:12 +0200 Subject: [PATCH 11/12] Review comments --- .../acceptance/LoggingRecentAcceptanceTest.java | 9 ++++----- .../acceptance/LoggingStreamingAcceptanceTest.java | 4 ++++ .../logging/streaming/LogCacheStreamPublisher.java | 4 ++++ 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingRecentAcceptanceTest.java b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingRecentAcceptanceTest.java index 11eb3c0fb..9e338239d 100644 --- a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingRecentAcceptanceTest.java +++ b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingRecentAcceptanceTest.java @@ -18,9 +18,8 @@ import java.time.Duration; import java.util.List; -import java.util.Optional; -import org.cloudfoundry.operations.applications.ApplicationSummary; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; @@ -73,11 +72,11 @@ void shouldReturnBackingApplicationLogs() { assertThat(lines).contains("Created app with guid"); assertThat(lines).contains("Updated app with guid"); assertThat(lines).contains("APP/PROC/WEB"); + } + @AfterEach + void tearDown() { deleteServiceInstance(SI_NAME); - - Optional backingApplication1AfterDeletion = getApplicationSummary(APP_CREATE_1); - assertThat(backingApplication1AfterDeletion).isEmpty(); } } diff --git a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingStreamingAcceptanceTest.java b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingStreamingAcceptanceTest.java index f7987cd9b..94b77844b 100644 --- a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingStreamingAcceptanceTest.java +++ b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingStreamingAcceptanceTest.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -90,7 +91,10 @@ void shouldStreamBackingApplicationLogs() { assertThat(tuple.getT2()).doesNotContain("websocket: close"); }) .verifyComplete(); + } + @AfterEach + void tearDown() { deleteServiceInstance(SI_NAME); } diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java index e2f0f7bb7..1646038ff 100644 --- a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java @@ -37,6 +37,10 @@ import org.springframework.cloud.appbroker.logging.ApplicationIdsProvider; import org.springframework.cloud.appbroker.logging.LoggingUtils; +/*** + * Class to Stream LocCache Envelopes using the same pattern as the + * loc-cache-cli + */ public class LogCacheStreamPublisher implements LogStreamPublisher { private static final Logger LOG = LoggerFactory.getLogger(LogCacheStreamPublisher.class); From 7e57243f2fb6bdde178d42582a35bbe8bc18892c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20C=2E=20R=C3=ADos?= Date: Tue, 9 Jul 2024 14:48:23 +0200 Subject: [PATCH 12/12] Review comments --- .../appbroker/acceptance/CloudFoundryAcceptanceTest.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/CloudFoundryAcceptanceTest.java b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/CloudFoundryAcceptanceTest.java index b4fc41675..d2732e5c6 100644 --- a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/CloudFoundryAcceptanceTest.java +++ b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/CloudFoundryAcceptanceTest.java @@ -532,12 +532,6 @@ private static String processOutput(Process process) throws InterruptedException future.cancel(true); executor.shutdownNow(); process.destroyForcibly(); - try { - process.waitFor(5, TimeUnit.SECONDS); - } - catch (InterruptedException e) { - LOG.warn("Interrupted while waiting for process to terminate", e); - } } return outputBuilder.toString(); }