diff --git a/build.gradle b/build.gradle index ac68bc438..2eebcecfd 100644 --- a/build.gradle +++ b/build.gradle @@ -414,7 +414,8 @@ def getTestProjects() { def getStarterProjects() { [project(":spring-cloud-starter-app-broker"), - project(":spring-cloud-starter-app-broker-cloudfoundry")] as Set + project(":spring-cloud-starter-app-broker-cloudfoundry"), + project(":spring-cloud-starter-app-broker-logging")] as Set } def getLibraryProjects() { diff --git a/settings.gradle b/settings.gradle index 660e2c7b0..bb67d5bd5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,16 +1,9 @@ pluginManagement { - resolutionStrategy { - eachPlugin { - if (requested.id.namespace?.startsWith('org.asciidoctor.jvm')) { - useVersion("4.0.1") - } - } - } plugins { id "io.spring.nohttp" version "0.0.11" id 'org.springframework.boot' version "3.3.1" - id 'org.asciidoctor.jvm.pdf' - id 'org.asciidoctor.jvm.convert' + id 'org.asciidoctor.jvm.pdf' version '4.0.2' + id 'org.asciidoctor.jvm.convert' version '4.0.2' } repositories { gradlePluginPortal() @@ -38,5 +31,7 @@ include "spring-cloud-app-broker-core" include "spring-cloud-app-broker-autoconfigure" include "spring-cloud-app-broker-integration-tests" include "spring-cloud-app-broker-acceptance-tests" +include "spring-cloud-app-broker-logging" include "spring-cloud-starter-app-broker" include "spring-cloud-starter-app-broker-cloudfoundry" +include "spring-cloud-starter-app-broker-logging" diff --git a/spring-cloud-app-broker-acceptance-tests/build.gradle b/spring-cloud-app-broker-acceptance-tests/build.gradle index 9aa68b0d3..60e31c37b 100644 --- a/spring-cloud-app-broker-acceptance-tests/build.gradle +++ b/spring-cloud-app-broker-acceptance-tests/build.gradle @@ -25,6 +25,7 @@ description = "Spring Cloud App Broker Acceptance Tests" dependencies { api platform(SpringBootPlugin.BOM_COORDINATES) api project(":spring-cloud-starter-app-broker-cloudfoundry") + api project(":spring-cloud-starter-app-broker-logging") api "org.springframework.boot:spring-boot-starter-webflux" testImplementation "org.springframework.boot:spring-boot-starter-test" diff --git a/spring-cloud-app-broker-acceptance-tests/src/main/java/org/springframework/cloud/appbroker/acceptance/logging/BackingApplicationIdsProvider.java b/spring-cloud-app-broker-acceptance-tests/src/main/java/org/springframework/cloud/appbroker/acceptance/logging/BackingApplicationIdsProvider.java new file mode 100644 index 000000000..a0ad2cc0b --- /dev/null +++ b/spring-cloud-app-broker-acceptance-tests/src/main/java/org/springframework/cloud/appbroker/acceptance/logging/BackingApplicationIdsProvider.java @@ -0,0 +1,57 @@ +/* + * Copyright 2016-2024 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.acceptance.logging; + + +import org.cloudfoundry.client.CloudFoundryClient; +import org.cloudfoundry.client.v3.applications.ApplicationResource; +import org.cloudfoundry.client.v3.applications.ListApplicationsRequest; +import org.cloudfoundry.operations.CloudFoundryOperations; +import org.cloudfoundry.operations.spaces.GetSpaceRequest; +import org.cloudfoundry.operations.spaces.SpaceDetail; +import reactor.core.publisher.Flux; + +import org.springframework.cloud.appbroker.logging.ApplicationIdsProvider; +import org.springframework.stereotype.Component; + +@Component +class BackingApplicationIdsProvider implements ApplicationIdsProvider { + + private final CloudFoundryClient cloudFoundryClient; + + private final CloudFoundryOperations cloudFoundryOperations; + + public BackingApplicationIdsProvider(CloudFoundryClient cloudFoundryClient, + CloudFoundryOperations cloudFoundryOperations) { + this.cloudFoundryClient = cloudFoundryClient; + this.cloudFoundryOperations = cloudFoundryOperations; + } + + @Override + public Flux getApplicationIds(String serviceInstanceId) { + return cloudFoundryOperations.spaces().get(GetSpaceRequest.builder().name(serviceInstanceId).build()) + .map(SpaceDetail::getId) + .flatMap(spaceId -> + cloudFoundryClient.applicationsV3() + .list(ListApplicationsRequest.builder().spaceIds(spaceId).build()) + ) + .flatMapMany( + listApplicationsResponse -> Flux.fromIterable(listApplicationsResponse.getResources()) + .map(ApplicationResource::getId)); + } + +} diff --git a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/CloudFoundryAcceptanceTest.java b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/CloudFoundryAcceptanceTest.java index 754685ff7..d2732e5c6 100644 --- a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/CloudFoundryAcceptanceTest.java +++ b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/CloudFoundryAcceptanceTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,9 +16,14 @@ package org.springframework.cloud.appbroker.acceptance; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; import java.net.URI; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -29,6 +34,12 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.DocumentContext; @@ -64,6 +75,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.cloud.appbroker.acceptance.fixtures.cf.CloudFoundryClientConfiguration; +import org.springframework.cloud.appbroker.acceptance.fixtures.cf.CloudFoundryProperties; import org.springframework.cloud.appbroker.acceptance.fixtures.cf.CloudFoundryService; import org.springframework.cloud.appbroker.acceptance.fixtures.cf.UserCloudFoundryService; import org.springframework.cloud.appbroker.acceptance.fixtures.uaa.UaaService; @@ -91,6 +103,7 @@ @ExtendWith(SpringExtension.class) @ExtendWith(BrokerPropertiesParameterResolver.class) @EnableConfigurationProperties(AcceptanceTestProperties.class) +@SuppressWarnings("PMD.GodClass") abstract class CloudFoundryAcceptanceTest { private static final Logger LOG = LoggerFactory.getLogger(CloudFoundryAcceptanceTest.class); @@ -113,12 +126,17 @@ abstract class CloudFoundryAcceptanceTest { @Autowired protected UserCloudFoundryService userCloudFoundryService; + @Autowired + private CloudFoundryProperties cloudFoundryProperties; + @Autowired private UaaService uaaService; @Autowired private AcceptanceTestProperties acceptanceTestProperties; + private String cfHome; + private final WebClient webClient = getSslIgnoringWebClient(); protected abstract String testSuffix(); @@ -144,6 +162,7 @@ void setUp(TestInfo testInfo, BrokerProperties brokerProperties) { List appBrokerProperties = getAppBrokerProperties(brokerProperties); blockingSubscribe(initializeUser()); blockingSubscribe(initializeBroker(appBrokerProperties)); + prepareCLI(); } void setUpForBrokerUpdate(BrokerProperties brokerProperties) { @@ -157,6 +176,8 @@ private List getAppBrokerProperties(BrokerProperties brokerProperties) { "spring.cloud.openservicebroker.catalog.services[0].name=" + appServiceName(), "spring.cloud.openservicebroker.catalog.services[0].description=A service that deploys a backing app", "spring.cloud.openservicebroker.catalog.services[0].bindable=true", + "spring.cloud.openservicebroker.catalog.services[0].metadata.properties.serviceInstanceLogsEndpoint=" + + getServiceInstanceLogsEndpoint(), "spring.cloud.openservicebroker.catalog.services[0].plans[0].id=" + PLAN_ID, "spring.cloud.openservicebroker.catalog.services[0].plans[0].name=standard", "spring.cloud.openservicebroker.catalog.services[0].plans[0].bindable=true", @@ -179,6 +200,10 @@ private List getAppBrokerProperties(BrokerProperties brokerProperties) { return appBrokerProperties; } + private String getServiceInstanceLogsEndpoint() { + return "https://" + testBrokerAppName() + "." + cloudFoundryProperties.getApiHost().substring(4) + "/logs/"; + } + @BeforeEach void configureJsonPath() { Configuration.setDefaults(new Configuration.Defaults() { @@ -332,7 +357,7 @@ protected String getServiceInstanceGuid(String serviceInstanceName) { .block(); } - private Mono getServiceInstanceMono(String serviceInstanceName) { + Mono getServiceInstanceMono(String serviceInstanceName) { return userCloudFoundryService.getServiceInstance(serviceInstanceName); } @@ -423,7 +448,8 @@ protected Mono manageApps(String serviceInstanceName, String serviceName .getApplicationRoute(testBrokerAppName()) .flatMap(appRoute -> webClient.get() - .uri(URI.create(appRoute + "/" + operation + "/" + serviceName + "/" + planName + "/" + serviceInstanceId)) + .uri(URI.create( + appRoute + "/" + operation + "/" + serviceName + "/" + planName + "/" + serviceInstanceId)) .retrieve() .toEntity(String.class) .map(HttpEntity::getBody))); @@ -451,11 +477,81 @@ private WebClient getSslIgnoringWebClient() { protected Mono> getApplications(String app1, String app2) { return Flux.merge(cloudFoundryService.getApplication(app1), - cloudFoundryService.getApplication(app2)) + cloudFoundryService.getApplication(app2)) .parallel() .runOn(Schedulers.parallel()) .sequential() .collectList(); } + private void prepareCLI() { + try { + cfHome = Files.createTempDirectory("app-broker-acceptance-tests").toString(); + + callCLICommand(List.of("cf", "login", "-a", + cloudFoundryProperties.getApiHost(), + "--skip-ssl-validation", "-u", + cloudFoundryProperties.getUsername(), + "-p", + cloudFoundryProperties.getPassword(), + "-o", "test-instances")) + .block(Duration.ofSeconds(60)); + callCLICommand(List.of("cf", "install-plugin", "-f", "-r", "Cf-Community", "Service Instance Logging")) + .block(Duration.ofSeconds(60)); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + protected Mono callCLICommand(List command) { + return Mono.fromCallable(() -> { + if (LOG.isDebugEnabled()) { + LOG.debug("Executing command: {}", command); + } + ProcessBuilder processBuilder = new ProcessBuilder(command); + processBuilder.environment().put("CF_HOME", cfHome); + processBuilder.redirectErrorStream(true); + Process process = processBuilder.start(); + + return processOutput(process); + }).subscribeOn(Schedulers.boundedElastic()); + } + + private static String processOutput(Process process) throws InterruptedException, ExecutionException { + StringBuffer outputBuilder = new StringBuffer(); + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future future = appendLines(executor, process, outputBuilder); + try { + future.get(30, TimeUnit.SECONDS); + } + catch (TimeoutException e) { + LOG.info("Process reading timed out after 30 seconds"); + } + finally { + future.cancel(true); + executor.shutdownNow(); + process.destroyForcibly(); + } + return outputBuilder.toString(); + } + + private static Future appendLines(ExecutorService executor, Process process, StringBuffer outputBuilder) { + return executor.submit(() -> { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { + String line = ""; + while (line != null) { + line = reader.readLine(); + if (line != null) { + outputBuilder.append(line).append('\n'); + if (LOG.isDebugEnabled()) { + LOG.debug("Read line: {}", line); + } + } + } + } + return null; + }); + } + } diff --git a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingRecentAcceptanceTest.java b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingRecentAcceptanceTest.java new file mode 100644 index 000000000..9e338239d --- /dev/null +++ b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingRecentAcceptanceTest.java @@ -0,0 +1,82 @@ +/* + * Copyright 2002-2020 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.acceptance; + +import java.time.Duration; +import java.util.List; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class LoggingRecentAcceptanceTest extends CloudFoundryAcceptanceTest { + + private static final String APP_CREATE_1 = "app-logging-recent-1"; + + private static final String SI_NAME = "si-logging-recent"; + + private static final String SUFFIX = "logging-recent-instance"; + + private static final String APP_SERVICE_NAME = "app-service-" + SUFFIX; + + private static final String BACKING_SERVICE_NAME = "backing-service-" + SUFFIX; + + @Override + protected String testSuffix() { + return SUFFIX; + } + + @Override + protected String appServiceName() { + return APP_SERVICE_NAME; + } + + @Override + protected String backingServiceName() { + return BACKING_SERVICE_NAME; + } + + @Test + @AppBrokerTestProperties({ + "spring.cloud.appbroker.services[0].service-name=" + APP_SERVICE_NAME, + "spring.cloud.appbroker.services[0].plan-name=" + PLAN_NAME, + "spring.cloud.appbroker.services[0].apps[0].name=" + APP_CREATE_1, + "spring.cloud.appbroker.services[0].apps[0].path=" + BACKING_APP_PATH, + "spring.cloud.appbroker.services[0].target.name=SpacePerServiceInstance", + "spring.cloud.appbroker.deployer.cloudfoundry.properties.stack=cflinuxfs4" + }) + void shouldReturnBackingApplicationLogs() { + createServiceInstance(SI_NAME); + + String lines = callCLICommand( + List.of("cf", "service-logs", SI_NAME, "--recent", "--skip-ssl-validation")) + .block(Duration.ofSeconds(35)); + + assertThat(lines).isNotEmpty(); + + assertThat(lines).contains("Created app with guid"); + assertThat(lines).contains("Updated app with guid"); + assertThat(lines).contains("APP/PROC/WEB"); + } + + @AfterEach + void tearDown() { + deleteServiceInstance(SI_NAME); + } + +} diff --git a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingStreamingAcceptanceTest.java b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingStreamingAcceptanceTest.java new file mode 100644 index 000000000..94b77844b --- /dev/null +++ b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/LoggingStreamingAcceptanceTest.java @@ -0,0 +1,101 @@ +/* + * Copyright 2002-2020 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.acceptance; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.test.StepVerifier; + +import static org.assertj.core.api.Assertions.assertThat; + +class LoggingStreamingAcceptanceTest extends CloudFoundryAcceptanceTest { + + private static final String APP_CREATE_1 = "app-logging-stream-1"; + + private static final String SI_NAME = "si-logging-stream"; + + private static final String SUFFIX = "logging-stream-instance"; + + private static final String APP_SERVICE_NAME = "app-service-" + SUFFIX; + + private static final String BACKING_SERVICE_NAME = "backing-service-" + SUFFIX; + + @Override + protected String testSuffix() { + return SUFFIX; + } + + @Override + protected String appServiceName() { + return APP_SERVICE_NAME; + } + + @Override + protected String backingServiceName() { + return BACKING_SERVICE_NAME; + } + + @Test + @AppBrokerTestProperties({ + "spring.cloud.appbroker.services[0].service-name=" + APP_SERVICE_NAME, + "spring.cloud.appbroker.services[0].plan-name=" + PLAN_NAME, + "spring.cloud.appbroker.services[0].apps[0].name=" + APP_CREATE_1, + "spring.cloud.appbroker.services[0].apps[0].path=" + BACKING_APP_PATH, + "spring.cloud.appbroker.services[0].target.name=SpacePerServiceInstance", + "spring.cloud.appbroker.deployer.cloudfoundry.properties.stack=cflinuxfs4" + }) + void shouldStreamBackingApplicationLogs() { + Mono createServiceInstanceMono = Mono.fromRunnable(() -> + CompletableFuture.runAsync(() -> + createServiceInstance(SI_NAME))) + .subscribeOn(Schedulers.boundedElastic()); + + Mono logStreamingMono = Mono.defer(() -> callCLICommand( + List.of("cf", "service-logs", SI_NAME, "--skip-ssl-validation") + )).subscribeOn(Schedulers.boundedElastic()); + + StepVerifier.create(createServiceInstanceMono + .then(Mono.delay(Duration.ofSeconds(20)).then()) + .then(getServiceInstanceMono(SI_NAME).retry(5)) + .then(Mono.zip(logStreamingMono, logStreamingMono)) + ) + .assertNext(tuple -> { + assertThat(tuple.getT1()).isNotEmpty(); + assertThat(tuple.getT1()).contains("Connected, tailing logs for service instance"); + assertThat(tuple.getT1()).contains("[STG/0]"); + assertThat(tuple.getT1()).doesNotContain("websocket: close"); + + assertThat(tuple.getT2()).isNotEmpty(); + assertThat(tuple.getT2()).contains("Connected, tailing logs for service instance"); + assertThat(tuple.getT2()).contains("[STG/0]"); + assertThat(tuple.getT2()).doesNotContain("websocket: close"); + }) + .verifyComplete(); + } + + @AfterEach + void tearDown() { + deleteServiceInstance(SI_NAME); + } + +} diff --git a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/fixtures/cf/CloudFoundryClientConfiguration.java b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/fixtures/cf/CloudFoundryClientConfiguration.java index 4f85c9a79..998e34f39 100644 --- a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/fixtures/cf/CloudFoundryClientConfiguration.java +++ b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/fixtures/cf/CloudFoundryClientConfiguration.java @@ -20,6 +20,7 @@ import org.cloudfoundry.client.CloudFoundryClient; import org.cloudfoundry.doppler.DopplerClient; +import org.cloudfoundry.logcache.v1.LogCacheClient; import org.cloudfoundry.operations.CloudFoundryOperations; import org.cloudfoundry.operations.DefaultCloudFoundryOperations; import org.cloudfoundry.reactor.ConnectionContext; @@ -27,6 +28,7 @@ import org.cloudfoundry.reactor.TokenProvider; import org.cloudfoundry.reactor.client.ReactorCloudFoundryClient; import org.cloudfoundry.reactor.doppler.ReactorDopplerClient; +import org.cloudfoundry.reactor.logcache.v1.ReactorLogCacheClient; import org.cloudfoundry.reactor.tokenprovider.ClientCredentialsGrantTokenProvider; import org.cloudfoundry.reactor.tokenprovider.PasswordGrantTokenProvider; import org.cloudfoundry.reactor.uaa.ReactorUaaClient; @@ -117,6 +119,15 @@ protected DopplerClient dopplerClient(ConnectionContext connectionContext, .build(); } + @Bean + protected LogCacheClient logCacheClient(ConnectionContext connectionContext, + @Qualifier("userCredentials") TokenProvider tokenProvider) { + return ReactorLogCacheClient.builder() + .connectionContext(connectionContext) + .tokenProvider(tokenProvider) + .build(); + } + @Bean @Qualifier("userCredentials") protected UaaClient userCredentialsUaaClient(ConnectionContext connectionContext, diff --git a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/fixtures/cf/CloudFoundryService.java b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/fixtures/cf/CloudFoundryService.java index 5ac66e8fd..b169d202f 100644 --- a/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/fixtures/cf/CloudFoundryService.java +++ b/spring-cloud-app-broker-acceptance-tests/src/test/java/org/springframework/cloud/appbroker/acceptance/fixtures/cf/CloudFoundryService.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -88,7 +88,9 @@ public class CloudFoundryService { private final CloudFoundryProperties cloudFoundryProperties; - public CloudFoundryService(CloudFoundryClient cloudFoundryClient, CloudFoundryOperations cloudFoundryOperations, + public CloudFoundryService( + CloudFoundryClient cloudFoundryClient, + CloudFoundryOperations cloudFoundryOperations, CloudFoundryProperties cloudFoundryProperties) { this.cloudFoundryClient = cloudFoundryClient; this.cloudFoundryOperations = cloudFoundryOperations; @@ -304,7 +306,7 @@ public Mono getOrCreateDefaultOrg() { public Mono> getSpaces() { return cloudFoundryOperations.spaces().list() .doOnComplete(() -> LOG.info("Success listing spaces")) - .doOnError(e -> LOG.error(String.format("Error listing spaces. error=%s" + e.getMessage()), e)) + .doOnError(e -> LOG.error(String.format("Error listing spaces. error=%s", e.getMessage()), e)) .map(SpaceSummary::getName) .collectList(); } diff --git a/spring-cloud-app-broker-autoconfigure/build.gradle b/spring-cloud-app-broker-autoconfigure/build.gradle index 3c76a6934..ffd9107fa 100644 --- a/spring-cloud-app-broker-autoconfigure/build.gradle +++ b/spring-cloud-app-broker-autoconfigure/build.gradle @@ -22,6 +22,12 @@ plugins { description = "Spring Cloud App Broker Autoconfiguration" +java { + registerFeature("logging") { + usingSourceSet(sourceSets.main) + } +} + dependencies { annotationProcessor platform(SpringBootPlugin.BOM_COORDINATES) annotationProcessor "org.springframework.boot:spring-boot-configuration-processor" @@ -35,6 +41,8 @@ dependencies { api "org.cloudfoundry:cloudfoundry-client-reactor:${cfJavaClientVersion}" api "org.cloudfoundry:cloudfoundry-operations:${cfJavaClientVersion}" + loggingImplementation project(":spring-cloud-app-broker-logging") + testImplementation "org.springframework.boot:spring-boot-starter-test" testImplementation "org.springframework.boot:spring-boot-starter-webflux" testImplementation "io.projectreactor.tools:blockhound-junit-platform:${blockHoundVersion}" diff --git a/spring-cloud-app-broker-autoconfigure/src/main/java/org/springframework/cloud/appbroker/autoconfigure/CloudFoundryAppDeployerAutoConfiguration.java b/spring-cloud-app-broker-autoconfigure/src/main/java/org/springframework/cloud/appbroker/autoconfigure/CloudFoundryAppDeployerAutoConfiguration.java index 4c4e72c88..a427f291c 100644 --- a/spring-cloud-app-broker-autoconfigure/src/main/java/org/springframework/cloud/appbroker/autoconfigure/CloudFoundryAppDeployerAutoConfiguration.java +++ b/spring-cloud-app-broker-autoconfigure/src/main/java/org/springframework/cloud/appbroker/autoconfigure/CloudFoundryAppDeployerAutoConfiguration.java @@ -25,6 +25,7 @@ import org.cloudfoundry.client.CloudFoundryClient; import org.cloudfoundry.doppler.DopplerClient; +import org.cloudfoundry.logcache.v1.LogCacheClient; import org.cloudfoundry.operations.CloudFoundryOperations; import org.cloudfoundry.operations.DefaultCloudFoundryOperations; import org.cloudfoundry.reactor.ConnectionContext; @@ -32,6 +33,7 @@ import org.cloudfoundry.reactor.TokenProvider; import org.cloudfoundry.reactor.client.ReactorCloudFoundryClient; import org.cloudfoundry.reactor.doppler.ReactorDopplerClient; +import org.cloudfoundry.reactor.logcache.v1.ReactorLogCacheClient; import org.cloudfoundry.reactor.tokenprovider.ClientCredentialsGrantTokenProvider; import org.cloudfoundry.reactor.tokenprovider.PasswordGrantTokenProvider; import org.cloudfoundry.reactor.uaa.ReactorUaaClient; @@ -210,6 +212,22 @@ public ReactorDopplerClient dopplerClient(@ConnectionContextQualifier Connection .build(); } + /** + * Provide a {@link LogCacheClient} bean + * + * @param connectionContext the ConnectionContext bean + * @param tokenProvider the TokenProvider bean + * @return the bean + */ + @Bean + public LogCacheClient logCacheClient(@ConnectionContextQualifier ConnectionContext connectionContext, + @TokenQualifier TokenProvider tokenProvider) { + return ReactorLogCacheClient.builder() + .connectionContext(connectionContext) + .tokenProvider(tokenProvider) + .build(); + } + /** * Provide a {@link TokenProvider} bean * diff --git a/spring-cloud-app-broker-autoconfigure/src/main/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceLogStreamAutoConfiguration.java b/spring-cloud-app-broker-autoconfigure/src/main/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceLogStreamAutoConfiguration.java new file mode 100644 index 000000000..6e0e51b36 --- /dev/null +++ b/spring-cloud-app-broker-autoconfigure/src/main/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceLogStreamAutoConfiguration.java @@ -0,0 +1,84 @@ +/* + * Copyright 2016-2024 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.autoconfigure; + +import java.util.HashMap; +import java.util.Map; + +import org.cloudfoundry.client.CloudFoundryClient; +import org.cloudfoundry.logcache.v1.LogCacheClient; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.cloud.appbroker.logging.ApplicationIdsProvider; +import org.springframework.cloud.appbroker.logging.streaming.ApplicationLogStreamPublisher; +import org.springframework.cloud.appbroker.logging.streaming.LogCacheStreamPublisher; +import org.springframework.cloud.appbroker.logging.streaming.LogStreamPublisher; +import org.springframework.cloud.appbroker.logging.streaming.endpoint.StreamingLogWebSocketHandler; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.reactive.HandlerMapping; +import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter; + +@Configuration +@ConditionalOnClass(ApplicationLogStreamPublisher.class) +@ConditionalOnBean(ApplicationIdsProvider.class) +public class ServiceInstanceLogStreamAutoConfiguration { + + @Bean + public StreamingLogWebSocketHandler streamingLogWebSocketHandler( + ApplicationEventPublisher applicationEventPublisher) { + return new StreamingLogWebSocketHandler(applicationEventPublisher); + } + + @Bean + @ConditionalOnMissingBean + public WebSocketHandlerAdapter handlerAdapter() { + return new WebSocketHandlerAdapter(); + } + + @Bean + public HandlerMapping logsHandlerMapping(StreamingLogWebSocketHandler webSocketHandler) { + Map map = new HashMap<>(); + map.put("/logs/**", webSocketHandler); + + SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping(); + handlerMapping.setOrder(1); + handlerMapping.setUrlMap(map); + return handlerMapping; + } + + @Bean + @ConditionalOnMissingBean + public LogStreamPublisher streamLogsPublisher( + CloudFoundryClient cloudFoundryClient, + LogCacheClient logCacheClient, + ApplicationIdsProvider applicationIdsProvider) { + return new LogCacheStreamPublisher(cloudFoundryClient, logCacheClient, applicationIdsProvider); + } + + @Bean + public ApplicationLogStreamPublisher applicationLogsPublisher(LogStreamPublisher logStreamPublisher, + ApplicationEventPublisher eventPublisher) { + return new ApplicationLogStreamPublisher(logStreamPublisher, eventPublisher); + } + +} diff --git a/spring-cloud-app-broker-autoconfigure/src/main/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceRecentLogsAutoConfiguration.java b/spring-cloud-app-broker-autoconfigure/src/main/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceRecentLogsAutoConfiguration.java new file mode 100644 index 000000000..7c08e691d --- /dev/null +++ b/spring-cloud-app-broker-autoconfigure/src/main/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceRecentLogsAutoConfiguration.java @@ -0,0 +1,49 @@ +/* + * Copyright 2016-2024 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.autoconfigure; + +import org.cloudfoundry.logcache.v1.LogCacheClient; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.cloud.appbroker.logging.ApplicationIdsProvider; +import org.springframework.cloud.appbroker.logging.recent.ApplicationRecentLogsProvider; +import org.springframework.cloud.appbroker.logging.recent.RecentLogsProvider; +import org.springframework.cloud.appbroker.logging.recent.endpoint.RecentLogsController; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ConditionalOnClass(ApplicationRecentLogsProvider.class) +@ConditionalOnBean(ApplicationIdsProvider.class) +public class ServiceInstanceRecentLogsAutoConfiguration { + + @Bean + public RecentLogsProvider recentLogsProvider( + LogCacheClient logCacheClient, + ApplicationIdsProvider applicationIdsProvider) { + return new ApplicationRecentLogsProvider(logCacheClient, applicationIdsProvider); + } + + @Bean + @ConditionalOnMissingBean + public RecentLogsController recentLogsController(RecentLogsProvider recentLogsProvider) { + return new RecentLogsController(recentLogsProvider); + } + +} diff --git a/spring-cloud-app-broker-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/spring-cloud-app-broker-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index e2e718516..ba66ea8ff 100644 --- a/spring-cloud-app-broker-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/spring-cloud-app-broker-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1,2 +1,4 @@ org.springframework.cloud.appbroker.autoconfigure.AppBrokerAutoConfiguration org.springframework.cloud.appbroker.autoconfigure.CloudFoundryAppDeployerAutoConfiguration +org.springframework.cloud.appbroker.autoconfigure.ServiceInstanceRecentLogsAutoConfiguration +org.springframework.cloud.appbroker.autoconfigure.ServiceInstanceLogStreamAutoConfiguration diff --git a/spring-cloud-app-broker-autoconfigure/src/test/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceLogStreamAutoConfigurationTest.java b/spring-cloud-app-broker-autoconfigure/src/test/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceLogStreamAutoConfigurationTest.java new file mode 100644 index 000000000..49cb13110 --- /dev/null +++ b/spring-cloud-app-broker-autoconfigure/src/test/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceLogStreamAutoConfigurationTest.java @@ -0,0 +1,96 @@ +/* + * Copyright 2002-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.autoconfigure; + +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; + +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.FilteredClassLoader; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.cloud.appbroker.logging.ApplicationIdsProvider; +import org.springframework.cloud.appbroker.logging.streaming.ApplicationLogStreamPublisher; +import org.springframework.cloud.appbroker.logging.streaming.LogStreamPublisher; +import org.springframework.cloud.appbroker.logging.streaming.endpoint.StreamingLogWebSocketHandler; +import org.springframework.context.annotation.Bean; +import org.springframework.web.reactive.HandlerMapping; +import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter; + +import static org.assertj.core.api.Assertions.assertThat; + +class ServiceInstanceLogStreamAutoConfigurationTest { + + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of( + AppBrokerAutoConfiguration.class, + CloudFoundryAppDeployerAutoConfiguration.class, + ServiceInstanceLogStreamAutoConfiguration.class + )) + .withPropertyValues( + "spring.cloud.appbroker.deployer.cloudfoundry.api-host=https://api.example.local", + "spring.cloud.appbroker.deployer.cloudfoundry.username=user", + "spring.cloud.appbroker.deployer.cloudfoundry.password=secret" + ); + + @Test + void servicesAreNotCreatedWhenPublisherIsNotConfigured() { + contextRunner + .withClassLoader(new FilteredClassLoader(ApplicationLogStreamPublisher.class)) + .withUserConfiguration(LoggingConfiguration.class) + .run(context -> assertThat(context) + .doesNotHaveBean(StreamingLogWebSocketHandler.class) + .doesNotHaveBean(WebSocketHandlerAdapter.class) + .doesNotHaveBean(HandlerMapping.class) + .doesNotHaveBean(LogStreamPublisher.class) + .doesNotHaveBean(ApplicationLogStreamPublisher.class)); + } + + @Test + void servicesAreNotCreatedWhenLoggingIsNotConfigured() { + contextRunner + .run(context -> assertThat(context) + .doesNotHaveBean(StreamingLogWebSocketHandler.class) + .doesNotHaveBean(WebSocketHandlerAdapter.class) + .doesNotHaveBean(HandlerMapping.class) + .doesNotHaveBean(LogStreamPublisher.class) + .doesNotHaveBean(ApplicationLogStreamPublisher.class)); + } + + @Test + void servicesAreCreatedWithLoggingConfigured() { + contextRunner + .withUserConfiguration(LoggingConfiguration.class) + .run(context -> assertThat(context) + .hasSingleBean(StreamingLogWebSocketHandler.class) + .hasSingleBean(WebSocketHandlerAdapter.class) + .hasSingleBean(HandlerMapping.class) + .hasSingleBean(LogStreamPublisher.class) + .hasSingleBean(ApplicationLogStreamPublisher.class)); + } + + @TestConfiguration + public static class LoggingConfiguration { + + @Bean + public ApplicationIdsProvider applicationIdsProvider() { + return serviceInstanceId -> Flux.just("app1"); + } + + } + +} diff --git a/spring-cloud-app-broker-autoconfigure/src/test/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceRecentLogsAutoConfigurationTest.java b/spring-cloud-app-broker-autoconfigure/src/test/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceRecentLogsAutoConfigurationTest.java new file mode 100644 index 000000000..e523d1a7c --- /dev/null +++ b/spring-cloud-app-broker-autoconfigure/src/test/java/org/springframework/cloud/appbroker/autoconfigure/ServiceInstanceRecentLogsAutoConfigurationTest.java @@ -0,0 +1,85 @@ +/* + * Copyright 2002-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.autoconfigure; + +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; + +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.FilteredClassLoader; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.cloud.appbroker.logging.ApplicationIdsProvider; +import org.springframework.cloud.appbroker.logging.recent.ApplicationRecentLogsProvider; +import org.springframework.cloud.appbroker.logging.recent.RecentLogsProvider; +import org.springframework.cloud.appbroker.logging.recent.endpoint.RecentLogsController; +import org.springframework.context.annotation.Bean; + +import static org.assertj.core.api.Assertions.assertThat; + +class ServiceInstanceRecentLogsAutoConfigurationTest { + + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of( + AppBrokerAutoConfiguration.class, + CloudFoundryAppDeployerAutoConfiguration.class, + ServiceInstanceRecentLogsAutoConfiguration.class + )) + .withPropertyValues( + "spring.cloud.appbroker.deployer.cloudfoundry.api-host=https://api.example.local", + "spring.cloud.appbroker.deployer.cloudfoundry.username=user", + "spring.cloud.appbroker.deployer.cloudfoundry.password=secret" + ); + + @Test + void servicesAreNotCreatedWithoutLoggingOnClasspath() { + contextRunner + .withClassLoader(new FilteredClassLoader(ApplicationRecentLogsProvider.class)) + .withUserConfiguration(LoggingConfiguration.class) + .run(context -> assertThat(context) + .doesNotHaveBean(RecentLogsProvider.class) + .doesNotHaveBean(RecentLogsController.class)); + } + + @Test + void servicesAreNotCreatedWithoutRequiredBeansOnClasspath() { + contextRunner + .run(context -> assertThat(context) + .doesNotHaveBean(RecentLogsProvider.class) + .doesNotHaveBean(RecentLogsController.class)); + } + + @Test + void servicesAreCreatedWithLoggingConfigured() { + contextRunner + .withUserConfiguration(LoggingConfiguration.class) + .run(context -> assertThat(context) + .hasSingleBean(RecentLogsProvider.class) + .hasSingleBean(RecentLogsController.class)); + } + + @TestConfiguration + public static class LoggingConfiguration { + + @Bean + public ApplicationIdsProvider applicationIdsProvider() { + return serviceInstanceId -> Flux.just("app1"); + } + + } + +} diff --git a/spring-cloud-app-broker-docs/build.gradle b/spring-cloud-app-broker-docs/build.gradle index 7a31058b7..9542a4437 100644 --- a/spring-cloud-app-broker-docs/build.gradle +++ b/spring-cloud-app-broker-docs/build.gradle @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,7 +37,7 @@ dependencies { implementation "org.springframework.boot:spring-boot-starter-tomcat" implementation "io.projectreactor:reactor-core" implementation "io.r2dbc:r2dbc-h2:1.0.0.RELEASE" - asciidoctorExtensions "io.spring.asciidoctor.backends:spring-asciidoctor-backends:0.0.7" + asciidoctorExtensions "io.spring.asciidoctor.backends:spring-asciidoctor-backends:0.0.7" } javadoc { @@ -63,20 +63,20 @@ asciidoctorPdf { } options doctype: 'book' attributes 'icons': 'font', - 'sectanchors': '', - 'sectnums': '', - 'toc': '', - 'source-highlighter' : 'coderay', - revnumber: project.version, - 'project-version': project.version + 'sectanchors': '', + 'sectnums': '', + 'toc': '', + 'source-highlighter': 'coderay', + revnumber: project.version, + 'project-version': project.version } } asciidoctor { dependsOn asciidoctorPdf - baseDirFollowsSourceFile() - configurations "asciidoctorExtensions" - outputOptions { + baseDirFollowsSourceFile() + configurations "asciidoctorExtensions" + outputOptions { backends "spring-html" } sourceDir = file("$buildDir/asciidoc") @@ -91,21 +91,21 @@ asciidoctor { logDocuments = true options doctype: 'book', eruby: 'erubis' attributes 'revnumber': project.version, - 'spring-version': project.version, - 'branch-or-tag': project.version.endsWith('SNAPSHOT') ? 'main' : "v${project.version}", - 'icons': 'font', - 'idprefix': '', - 'idseparator': '-', - docinfo: 'shared', - sectanchors: '', - sectnums: '', - stylesdir: "css/", - stylesheet: 'spring.css', - 'linkcss': true, - 'nofooter': true, - 'allow-uri-read': '', - 'source-highlighter': 'highlight.js', - 'highlightjsdir': 'js/highlight', - 'highlightjs-theme': 'github', - 'project-version': project.version + 'spring-version': project.version, + 'branch-or-tag': project.version.endsWith('SNAPSHOT') ? 'main' : "v${project.version}", + 'icons': 'font', + 'idprefix': '', + 'idseparator': '-', + docinfo: 'shared', + sectanchors: '', + sectnums: '', + stylesdir: "css/", + stylesheet: 'spring.css', + 'linkcss': true, + 'nofooter': true, + 'allow-uri-read': '', + 'source-highlighter': 'highlight.js', + 'highlightjsdir': 'js/highlight', + 'highlightjs-theme': 'github', + 'project-version': project.version } diff --git a/spring-cloud-app-broker-docs/src/docs/asciidoc/index.adoc b/spring-cloud-app-broker-docs/src/docs/asciidoc/index.adoc index 3c7f56dfa..49f03ae6b 100644 --- a/spring-cloud-app-broker-docs/src/docs/asciidoc/index.adoc +++ b/spring-cloud-app-broker-docs/src/docs/asciidoc/index.adoc @@ -31,6 +31,8 @@ include::getting-started.adoc[] include::advertising-services.adoc[] +include::service-instance-logging.adoc[] + include::service-instances.adoc[] include::service-bindings.adoc[] diff --git a/spring-cloud-app-broker-docs/src/docs/asciidoc/service-instance-logging.adoc b/spring-cloud-app-broker-docs/src/docs/asciidoc/service-instance-logging.adoc new file mode 100644 index 000000000..f205f2d9d --- /dev/null +++ b/spring-cloud-app-broker-docs/src/docs/asciidoc/service-instance-logging.adoc @@ -0,0 +1,61 @@ +[[service-instance-logging]] +:acceptance-tests-dir: ../../../spring-cloud-app-broker-acceptance-tests/src/main/java/org/springframework/cloud/appbroker/acceptance/ +== Service Instance Logging + +You can configure the service broker to be compatible with the https://github.com/pivotal-cf/service-instance-logs-cli-plugin[Service instance logs CLI plugin] in order to tail and stream the logs of the backing application. + + +If you use Gradle, include the following in your application's `build.gradle` file: + +==== +[source,groovy,subs="attributes+"] +---- +dependencies { + api 'spring-cloud-starter-app-broker-logging:{project-version}' +} +---- +==== + +If you use Maven, include the following in your application's `pom.xml` file: + +==== +[source,xml,subs="attributes+"] +---- + + + org.springframework.cloud + spring-cloud-starter-app-broker-logging + {project-version} + + +---- +==== + +Then, include the `serviceInstanceLogsEndpoint` in the catalog metadata properties: + +==== +[source,yaml,subs="+quotes"] +---- +spring: + cloud: + openservicebroker: + catalog: + services: + - id: "service-id" + name: "service-name" + metadata: + properties: + serviceInstanceLogsEndpoint: https://scg-service-broker.system.domain.com/logs/ +---- +==== + +Finally, provide an implementation for `ApplicationIdsProvider` in order to retrieve the backing application id given a service instance id, for example: + +==== +[source,java,%autofit] +---- +include::{acceptance-tests-dir}logging/BackingApplicationIdsProvider.java[] +---- +==== + + diff --git a/spring-cloud-app-broker-logging/build.gradle b/spring-cloud-app-broker-logging/build.gradle new file mode 100644 index 000000000..65f4351c2 --- /dev/null +++ b/spring-cloud-app-broker-logging/build.gradle @@ -0,0 +1,35 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.springframework.boot.gradle.plugin.SpringBootPlugin + +plugins { + id 'org.springframework.boot' apply false +} + +description = "Spring Cloud App Broker Logging" + +dependencies { + api platform(SpringBootPlugin.BOM_COORDINATES) + api "org.springframework.boot:spring-boot-starter-webflux" + api "org.cloudfoundry:cloudfoundry-client-reactor:${cfJavaClientVersion}" + api "org.cloudfoundry:cloudfoundry-operations:${cfJavaClientVersion}" + + testImplementation project(":spring-cloud-starter-app-broker-logging") + testImplementation "org.springframework.boot:spring-boot-starter-test" + testImplementation "org.junit.jupiter:junit-jupiter-api" + testImplementation "org.awaitility:awaitility" +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/ApplicationIdsProvider.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/ApplicationIdsProvider.java new file mode 100644 index 000000000..500260a5d --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/ApplicationIdsProvider.java @@ -0,0 +1,25 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging; + +import reactor.core.publisher.Flux; + +public interface ApplicationIdsProvider { + + Flux getApplicationIds(String serviceInstanceId); + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/LoggingUtils.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/LoggingUtils.java new file mode 100644 index 000000000..379f80e90 --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/LoggingUtils.java @@ -0,0 +1,66 @@ +/* + * Copyright 2016-2024 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging; + + +import java.util.Base64; + +import okio.ByteString; +import org.cloudfoundry.dropsonde.events.Envelope; +import org.cloudfoundry.dropsonde.events.LogMessage; + +public final class LoggingUtils { + + private LoggingUtils() { + } + + public static Envelope convertLogCacheEnvelopeToDropsonde(org.cloudfoundry.logcache.v1.Envelope envelope) { + final Envelope.Builder builder = new Envelope.Builder() + .eventType(Envelope.EventType.LogMessage) + .tags(envelope.getTags()) + .origin(getFromTags(envelope, "rep")) + .timestamp(envelope.getTimestamp()) + .logMessage(new LogMessage.Builder() + .message(ByteString.of(Base64.getDecoder().decode(envelope.getLog().getPayload()))) + .message_type(getMessageType(envelope)) + .timestamp(envelope.getTimestamp()) + .source_instance(envelope.getInstanceId()) + .source_type(getFromTags(envelope, "source_type")) + .build()); + return builder.build(); + } + + private static String getFromTags(org.cloudfoundry.logcache.v1.Envelope envelope, String sourceTypeTag) { + String sourceType = envelope.getTags().get(sourceTypeTag); + if (sourceType == null) { + sourceType = ""; + } + return sourceType; + } + + private static LogMessage.MessageType getMessageType(org.cloudfoundry.logcache.v1.Envelope envelope) { + LogMessage.MessageType messageType; + if ("ERR".equals(envelope.getLog().getType().getValue())) { + messageType = LogMessage.MessageType.ERR; + } + else { + messageType = LogMessage.MessageType.OUT; + } + return messageType; + } + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/ApplicationRecentLogsProvider.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/ApplicationRecentLogsProvider.java new file mode 100644 index 000000000..954f5366e --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/ApplicationRecentLogsProvider.java @@ -0,0 +1,59 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.recent; + +import java.time.Instant; + +import org.cloudfoundry.logcache.v1.Envelope; +import org.cloudfoundry.logcache.v1.EnvelopeType; +import org.cloudfoundry.logcache.v1.LogCacheClient; +import org.cloudfoundry.logcache.v1.ReadRequest; +import reactor.core.publisher.Flux; + +import org.springframework.cloud.appbroker.logging.ApplicationIdsProvider; + +public class ApplicationRecentLogsProvider implements RecentLogsProvider { + + private final LogCacheClient logCacheClient; + + private final ApplicationIdsProvider applicationIdsProvider; + + public ApplicationRecentLogsProvider(LogCacheClient logCacheClient, + ApplicationIdsProvider applicationIdsProvider) { + this.logCacheClient = logCacheClient; + this.applicationIdsProvider = applicationIdsProvider; + } + + @Override + public Flux getLogs(String serviceInstanceId) { + return this.applicationIdsProvider.getApplicationIds(serviceInstanceId) + .flatMap(this::recentLogs); + } + + protected Flux recentLogs(String applicationId) { + return logCacheClient + .read(ReadRequest.builder() + .sourceId(applicationId) + .descending(true) + .envelopeTypes(EnvelopeType.LOG) + .limit(1000) + .startTime(Instant.MIN.getEpochSecond()) + .build()) + .flatMapMany(readResponse -> Flux.fromIterable(readResponse.getEnvelopes().getBatch())); + } + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/RecentLogsProvider.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/RecentLogsProvider.java new file mode 100644 index 000000000..cf908c75b --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/RecentLogsProvider.java @@ -0,0 +1,26 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.recent; + +import org.cloudfoundry.logcache.v1.Envelope; +import reactor.core.publisher.Flux; + +public interface RecentLogsProvider { + + Flux getLogs(String serviceInstanceId); + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/EncodingException.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/EncodingException.java new file mode 100644 index 000000000..7d0adb1a0 --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/EncodingException.java @@ -0,0 +1,27 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.recent.endpoint; + +class EncodingException extends RuntimeException { + + private static final long serialVersionUID = 1837485200518028161L; + + public EncodingException(Throwable throwable) { + super("Failed to encode: " + throwable.getMessage(), throwable); + } + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/LogMessageComparator.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/LogMessageComparator.java new file mode 100644 index 000000000..08c07a7d6 --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/LogMessageComparator.java @@ -0,0 +1,39 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.recent.endpoint; + +import java.util.Comparator; + +import org.cloudfoundry.logcache.v1.Envelope; + +class LogMessageComparator implements Comparator { + + @Override + public int compare(Envelope o1, Envelope o2) { + return Long.compare(getTimestamp(o1), getTimestamp(o2)); + } + + private long getTimestamp(Envelope e) { + if (e.getTimestamp() != null) { + return e.getTimestamp(); + } + else { + return 0; + } + } + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/MultipartEncoder.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/MultipartEncoder.java new file mode 100644 index 000000000..a9ce3b49d --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/MultipartEncoder.java @@ -0,0 +1,72 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.recent.endpoint; + +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.IOException; + +class MultipartEncoder implements Closeable { + + private final byte[] bytesCRLF = {'\r', '\n'}; + + private final byte[] bytesSEP = {'-', '-'}; + + private final byte[] boundary; + + private final ByteArrayOutputStream out = new ByteArrayOutputStream(); + + public MultipartEncoder(String boundary) { + this.boundary = boundary.getBytes(); + } + + public void append(byte[] part) { + try { + out.write(bytesCRLF); + out.write(bytesSEP); + out.write(boundary); + out.write(bytesCRLF); + out.write(bytesCRLF); + out.write(part); + } + catch (IOException e) { + throw new EncodingException(e); + } + } + + public byte[] terminateAndGetBytes() { + try { + out.write(bytesCRLF); + out.write(bytesSEP); + out.write(boundary); + out.write(bytesSEP); + out.write(bytesCRLF); + final byte[] bytes = out.toByteArray(); + out.close(); + return bytes; + } + catch (IOException e) { + throw new EncodingException(e); + } + } + + @Override + public void close() throws IOException { + out.close(); + } + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/RecentLogsController.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/RecentLogsController.java new file mode 100644 index 000000000..d219c790b --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/recent/endpoint/RecentLogsController.java @@ -0,0 +1,64 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.recent.endpoint; + +import java.util.UUID; + +import org.cloudfoundry.logcache.v1.Envelope; +import reactor.core.publisher.Mono; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.appbroker.logging.LoggingUtils; +import org.springframework.cloud.appbroker.logging.recent.RecentLogsProvider; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +public class RecentLogsController { + + private static final LogMessageComparator LOG_MESSAGE_COMPARATOR = new LogMessageComparator(); + + private final RecentLogsProvider recentLogsProviders; + + public RecentLogsController(@Autowired(required = false) RecentLogsProvider recentLogsProviders) { + this.recentLogsProviders = recentLogsProviders; + } + + @RequestMapping("/logs/{serviceInstanceId}/recentlogs") + public Mono> recentLogs(@PathVariable("serviceInstanceId") String serviceInstanceId) { + final String multipartBoundary = UUID.randomUUID().toString(); + final HttpHeaders headers = new HttpHeaders(); + headers.add(HttpHeaders.CONTENT_TYPE, "multipart/mixed; boundary=" + multipartBoundary); + return recentLogsProviders.getLogs(serviceInstanceId) + .collectList() + .doOnNext(envelopes -> envelopes.sort(LOG_MESSAGE_COMPARATOR)) + .map(envelopes -> { + final MultipartEncoder multipart = new MultipartEncoder(multipartBoundary); + for (Envelope message : envelopes) { + var dropsondeEvent = LoggingUtils.convertLogCacheEnvelopeToDropsonde(message); + multipart.append(org.cloudfoundry.dropsonde.events.Envelope.ADAPTER.encode(dropsondeEvent)); + } + + return new ResponseEntity<>(multipart.terminateAndGetBytes(), headers, HttpStatus.OK); + }); + } + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/ApplicationLogStreamPublisher.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/ApplicationLogStreamPublisher.java new file mode 100644 index 000000000..85406af28 --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/ApplicationLogStreamPublisher.java @@ -0,0 +1,147 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.streaming; + +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; + +import org.springframework.cloud.appbroker.logging.streaming.events.ServiceInstanceLogEvent; +import org.springframework.cloud.appbroker.logging.streaming.events.ServiceInstanceLoggingEvent; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationListener; + +public class ApplicationLogStreamPublisher implements ApplicationListener { + + private static final Logger LOG = LoggerFactory.getLogger(ApplicationLogStreamPublisher.class); + + private final Map registry = new HashMap<>(); + + private final LogStreamPublisher logStreamPublisher; + + private final ApplicationEventPublisher publisher; + + public ApplicationLogStreamPublisher( + LogStreamPublisher logStreamPublisher, + ApplicationEventPublisher publisher) { + this.logStreamPublisher = logStreamPublisher; + this.publisher = publisher; + } + + @Override + public void onApplicationEvent(ServiceInstanceLoggingEvent event) { + final String serviceInstanceId = event.getServiceInstanceId(); + switch (event.getOperation()) { + case START: + LOG.debug("Received event to begin listening to logs for {}", serviceInstanceId); + this.startPublishing(serviceInstanceId); + return; + case STOP: + LOG.debug("Received event to stop listening to logs for {}", serviceInstanceId); + this.stopPublishing(serviceInstanceId); + return; + + default: + throw new IllegalArgumentException("Unknown operation: " + event.getOperation()); + } + } + + private void startPublishing(String serviceInstanceId) { + registry.compute(serviceInstanceId, (key, existingRegistration) -> { + if (existingRegistration != null) { + LOG.debug("Incrementing registration subscription count for {}", serviceInstanceId); + existingRegistration.increment(); + return existingRegistration; + } + + Flux logStream = this.logStreamPublisher + .getLogStream(serviceInstanceId); + + final Disposable subscription = logStream + .doOnNext( + envelope -> publisher + .publishEvent(new ServiceInstanceLogEvent(this, serviceInstanceId, envelope))) + .subscribe(); + + LOG.debug("Creating new registration for {}", serviceInstanceId); + return new Registration(subscription); + }); + } + + private void stopPublishing(String serviceInstanceId) { + if (LOG.isDebugEnabled()) { + LOG.debug("Received event to stop publishing logs for {}", serviceInstanceId); + } + + registry.compute(serviceInstanceId, (key, registration) -> { + if (registration == null) { + if (LOG.isWarnEnabled()) { + LOG.warn("Received deregister event for service instance {} but there no event handler registered", + serviceInstanceId); + } + return null; + } + + if (registration.decrement() == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Disposing of registration since there are no more subscriptions"); + } + registration.getSubscription().dispose(); + return null; + } + + return registration; + }); + } + + private final static class Registration { + + private final Disposable subscription; + + private int count = 1; + + private Registration(Disposable subscription) { + this.subscription = subscription; + } + + public void increment() { + if (LOG.isDebugEnabled()) { + LOG.debug("Incrementing subscription count from {} to {}", count, count + 1); + } + + ++count; + } + + public int decrement() { + if (LOG.isDebugEnabled()) { + LOG.debug("Decrementing subscription count from {} to {}", count, count - 1); + } + + return --count; + } + + public Disposable getSubscription() { + return subscription; + } + + } + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java new file mode 100644 index 000000000..1646038ff --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogCacheStreamPublisher.java @@ -0,0 +1,137 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.streaming; + +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.cloudfoundry.client.CloudFoundryClient; +import org.cloudfoundry.client.v2.applications.GetApplicationRequest; +import org.cloudfoundry.dropsonde.events.Envelope; +import org.cloudfoundry.logcache.v1.EnvelopeType; +import org.cloudfoundry.logcache.v1.LogCacheClient; +import org.cloudfoundry.logcache.v1.ReadRequest; +import org.cloudfoundry.logcache.v1.ReadResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + +import org.springframework.cloud.appbroker.logging.ApplicationIdsProvider; +import org.springframework.cloud.appbroker.logging.LoggingUtils; + +/*** + * Class to Stream LocCache Envelopes using the same pattern as the + * loc-cache-cli + */ +public class LogCacheStreamPublisher implements LogStreamPublisher { + + private static final Logger LOG = LoggerFactory.getLogger(LogCacheStreamPublisher.class); + + private final CloudFoundryClient client; + + private final LogCacheClient logCacheClient; + + private final ApplicationIdsProvider applicationIdsProvider; + + public LogCacheStreamPublisher( + CloudFoundryClient client, + LogCacheClient logCacheClient, + ApplicationIdsProvider applicationIdsProvider) { + this.client = client; + this.logCacheClient = logCacheClient; + this.applicationIdsProvider = applicationIdsProvider; + } + + @Override + public Flux getLogStream(String serviceInstanceId) { + return this.applicationIdsProvider + .getApplicationIds(serviceInstanceId) + .doOnNext(id -> LOG.debug("Starting log streaming for app with ID {}", id)) + .flatMap(this::createApplicationStreamer); + } + + protected Flux createApplicationStreamer(String applicationId) { + return getApplicationName(applicationId) + .flatMapMany(appName -> { + long initialStartTime = Instant.now().minus(5, ChronoUnit.SECONDS).toEpochMilli() * 1_000_000L; + return readLogCache(applicationId, initialStartTime) + .flatMapMany(initialResponse -> { + AtomicLong lastTimestamp = getLastTimestamp(initialResponse, initialStartTime); + Flux initialLogs = convertEnvelopesToDropsonde(initialResponse); + Flux polledLogs = Flux.interval(Duration.ofSeconds(1)) + .flatMap(tick -> readLogCache(applicationId, lastTimestamp.get() + 1) + .flatMapMany(readResponse -> { + updateLastTimestampFromResponse(readResponse, lastTimestamp); + return convertEnvelopesToDropsonde(readResponse); + })) + .onErrorResume(error -> { + LOG.error("Error during log polling", error); + return Flux.empty(); + }); + + return Flux.merge(initialLogs, polledLogs) + .retryWhen(Retry.backoff(3, Duration.ofSeconds(5))) + .doOnError(error -> LOG.error("Streaming error", error)); + }); + }); + } + + private static AtomicLong getLastTimestamp(ReadResponse initialResponse, long initialStartTime) { + return new AtomicLong( + initialResponse.getEnvelopes().getBatch().stream() + .mapToLong(org.cloudfoundry.logcache.v1.Envelope::getTimestamp) + .max() + .orElse(initialStartTime) + ); + } + + private static Flux convertEnvelopesToDropsonde(ReadResponse readResponse) { + return Flux.fromIterable(readResponse.getEnvelopes().getBatch()) + .map(LoggingUtils::convertLogCacheEnvelopeToDropsonde); + } + + private Mono getApplicationName(String applicationId) { + return client.applicationsV2() + .get(GetApplicationRequest.builder() + .applicationId(applicationId) + .build()) + .map(response -> response.getEntity().getName()); + } + + private Mono readLogCache(String applicationId, long lastTimestamp) { + return logCacheClient.read( + ReadRequest.builder() + .sourceId(applicationId) + .envelopeTypes(EnvelopeType.LOG) + .startTime(lastTimestamp) + .build()); + } + + private static void updateLastTimestampFromResponse(ReadResponse readResponse, AtomicLong lastTimestamp) { + long maxTimestamp = readResponse.getEnvelopes().getBatch().stream() + .mapToLong(org.cloudfoundry.logcache.v1.Envelope::getTimestamp) + .max() + .orElse(lastTimestamp.get()); + + lastTimestamp.set(maxTimestamp); + } + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogStreamPublisher.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogStreamPublisher.java new file mode 100644 index 000000000..8c9229b61 --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/LogStreamPublisher.java @@ -0,0 +1,25 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.streaming; + +import reactor.core.publisher.Flux; + +public interface LogStreamPublisher { + + Flux getLogStream(String serviceInstanceId); + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/endpoint/ServiceInstanceNotFoundException.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/endpoint/ServiceInstanceNotFoundException.java new file mode 100644 index 000000000..b66552546 --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/endpoint/ServiceInstanceNotFoundException.java @@ -0,0 +1,27 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.streaming.endpoint; + +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.ResponseStatus; + +@ResponseStatus(code = HttpStatus.NOT_FOUND) +class ServiceInstanceNotFoundException extends RuntimeException { + + private static final long serialVersionUID = 8672053621926030384L; + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/endpoint/StreamingLogWebSocketHandler.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/endpoint/StreamingLogWebSocketHandler.java new file mode 100644 index 000000000..44d7248e5 --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/endpoint/StreamingLogWebSocketHandler.java @@ -0,0 +1,123 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.streaming.endpoint; + +import java.net.URI; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.cloudfoundry.dropsonde.events.Envelope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; + +import org.springframework.cloud.appbroker.logging.streaming.events.ServiceInstanceLogEvent; +import org.springframework.cloud.appbroker.logging.streaming.events.StartServiceInstanceLoggingEvent; +import org.springframework.cloud.appbroker.logging.streaming.events.StopServiceInstanceLoggingEvent; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationListener; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.WebSocketSession; +import org.springframework.web.util.UriTemplate; + +public class StreamingLogWebSocketHandler implements WebSocketHandler, ApplicationListener { + + private static final Logger LOG = LoggerFactory.getLogger(StreamingLogWebSocketHandler.class); + + private static final UriTemplate LOGGING_URI_TEMPLATE = new UriTemplate("/logs/{serviceInstanceId}/stream"); + + private final ApplicationEventPublisher eventPublisher; + + private final ConcurrentMap> envelopeSinks = new ConcurrentHashMap<>(); + + public StreamingLogWebSocketHandler(ApplicationEventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; + } + + @Override + public Mono handle(WebSocketSession session) { + String serviceInstanceId = getServiceInstanceId(session); + LOG.info("Connection established [{}], service instance {}", + session.getHandshakeInfo().getRemoteAddress(), + serviceInstanceId); + + Sinks.Many envelopeSink = envelopeSinks + .computeIfAbsent(serviceInstanceId, s -> Sinks.many().multicast().onBackpressureBuffer()); + + eventPublisher.publishEvent(new StartServiceInstanceLoggingEvent(this, serviceInstanceId)); + LOG.info("Published event to start streaming logs for service instance with ID {}", serviceInstanceId); + + return session.send(envelopeSink.asFlux() + .map(envelope -> session.binaryMessage( + dataBufferFactory -> dataBufferFactory.wrap(Envelope.ADAPTER.encode(envelope))))) + .doFinally(signalType -> afterConnectionClosed(session, serviceInstanceId)) + .doOnError(throwable -> LOG.error(String.format("Error handling logging stream for service instance %s", + serviceInstanceId), throwable)); + } + + @Override + public void onApplicationEvent(ServiceInstanceLogEvent event) { + broadcastLogMessage(event); + } + + public void broadcastLogMessage(ServiceInstanceLogEvent event) { + if (LOG.isDebugEnabled()) { + LOG.debug("Received event to broadcast log message for {}", event.getServiceInstanceId()); + } + + Sinks.Many envelopeSink = this.envelopeSinks.get(event.getServiceInstanceId()); + if (envelopeSink == null) { + if (LOG.isWarnEnabled()) { + LOG.warn("No sink found for {}, stopping log streaming", event.getServiceInstanceId()); + } + + eventPublisher.publishEvent(new StopServiceInstanceLoggingEvent(this, event.getServiceInstanceId())); + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Sending message to client for {}", event.getServiceInstanceId()); + } + + envelopeSink.tryEmitNext(event.getEnvelope()).orThrow(); + } + + private void afterConnectionClosed(WebSocketSession webSocketSession, String serviceInstanceId) { + LOG.info("Connection closed [{}], service instance {}", webSocketSession.getHandshakeInfo().getRemoteAddress(), + serviceInstanceId); + + eventPublisher.publishEvent(new StopServiceInstanceLoggingEvent(this, serviceInstanceId)); + + Sinks.Many sink = envelopeSinks.remove(serviceInstanceId); + if (sink != null) { + sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST); + } + } + + private String getServiceInstanceId(WebSocketSession webSocketSession) { + URI uri = webSocketSession.getHandshakeInfo().getUri(); + final Map match = LOGGING_URI_TEMPLATE.match(uri.getPath()); + if (match.isEmpty()) { + throw new ServiceInstanceNotFoundException(); + } + + return match.get("serviceInstanceId"); + } + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/ServiceInstanceLogEvent.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/ServiceInstanceLogEvent.java new file mode 100644 index 000000000..6d450797f --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/ServiceInstanceLogEvent.java @@ -0,0 +1,46 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.streaming.events; + +import org.cloudfoundry.dropsonde.events.Envelope; + +import org.springframework.context.ApplicationEvent; + +public class ServiceInstanceLogEvent extends ApplicationEvent { + + private static final long serialVersionUID = 4391048666734944603L; + + private final String serviceInstanceId; + + private final Envelope envelope; + + public ServiceInstanceLogEvent(Object source, String serviceInstanceId, + Envelope envelope) { + super(source); + this.serviceInstanceId = serviceInstanceId; + this.envelope = envelope; + } + + public String getServiceInstanceId() { + return serviceInstanceId; + } + + public Envelope getEnvelope() { + return envelope; + } + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/ServiceInstanceLoggingEvent.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/ServiceInstanceLoggingEvent.java new file mode 100644 index 000000000..4a45f3bf2 --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/ServiceInstanceLoggingEvent.java @@ -0,0 +1,56 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.streaming.events; + +import org.springframework.context.ApplicationEvent; + +public class ServiceInstanceLoggingEvent extends ApplicationEvent { + + private static final long serialVersionUID = 3721553379568462887L; + + public enum Operation { + + /** + * Start publishing log stream for a given service instance id + */ + START, + + /** + * Stop publishing log stream for a given service instance id + */ + STOP + } + + private final String serviceInstanceId; + + private final Operation operation; + + public ServiceInstanceLoggingEvent(Object source, String serviceInstanceId, Operation operation) { + super(source); + this.serviceInstanceId = serviceInstanceId; + this.operation = operation; + } + + public String getServiceInstanceId() { + return serviceInstanceId; + } + + public Operation getOperation() { + return operation; + } + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/StartServiceInstanceLoggingEvent.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/StartServiceInstanceLoggingEvent.java new file mode 100644 index 000000000..e4dbf24e9 --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/StartServiceInstanceLoggingEvent.java @@ -0,0 +1,27 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.streaming.events; + +public class StartServiceInstanceLoggingEvent extends ServiceInstanceLoggingEvent { + + private static final long serialVersionUID = -5940715663862240039L; + + public StartServiceInstanceLoggingEvent(Object source, String serviceInstanceId) { + super(source, serviceInstanceId, Operation.START); + } + +} diff --git a/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/StopServiceInstanceLoggingEvent.java b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/StopServiceInstanceLoggingEvent.java new file mode 100644 index 000000000..c337bb487 --- /dev/null +++ b/spring-cloud-app-broker-logging/src/main/java/org/springframework/cloud/appbroker/logging/streaming/events/StopServiceInstanceLoggingEvent.java @@ -0,0 +1,27 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.streaming.events; + +public class StopServiceInstanceLoggingEvent extends ServiceInstanceLoggingEvent { + + private static final long serialVersionUID = 2858399700112202361L; + + public StopServiceInstanceLoggingEvent(Object source, String serviceInstanceId) { + super(source, serviceInstanceId, Operation.STOP); + } + +} diff --git a/spring-cloud-app-broker-logging/src/test/java/com/example/recentlog/RecentLogsTestApp.java b/spring-cloud-app-broker-logging/src/test/java/com/example/recentlog/RecentLogsTestApp.java new file mode 100644 index 000000000..95a30111f --- /dev/null +++ b/spring-cloud-app-broker-logging/src/test/java/com/example/recentlog/RecentLogsTestApp.java @@ -0,0 +1,46 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.recentlog; + +import java.util.UUID; + +import reactor.core.publisher.Flux; + +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.appbroker.logging.ApplicationIdsProvider; +import org.springframework.cloud.servicebroker.autoconfigure.web.ServiceBrokerAutoConfiguration; +import org.springframework.cloud.servicebroker.autoconfigure.web.reactive.ServiceBrokerWebFluxAutoConfiguration; +import org.springframework.context.annotation.Bean; + +@SpringBootApplication(exclude = { + ServiceBrokerAutoConfiguration.class, + ServiceBrokerWebFluxAutoConfiguration.class +}) +public class RecentLogsTestApp { + + static final String APP_ID = UUID.randomUUID().toString(); + + public static String getAppId() { + return APP_ID; + } + + @Bean + ApplicationIdsProvider applicationIdsProvider() { + return serviceInstanceId -> Flux.just(APP_ID); + } + +} diff --git a/spring-cloud-app-broker-logging/src/test/java/com/example/streaming/LogStreamingTestApp.java b/spring-cloud-app-broker-logging/src/test/java/com/example/streaming/LogStreamingTestApp.java new file mode 100644 index 000000000..9045cafe3 --- /dev/null +++ b/spring-cloud-app-broker-logging/src/test/java/com/example/streaming/LogStreamingTestApp.java @@ -0,0 +1,65 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.streaming; + +import java.util.UUID; + +import reactor.core.publisher.Flux; + +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.appbroker.logging.ApplicationIdsProvider; +import org.springframework.cloud.appbroker.logging.streaming.events.StopServiceInstanceLoggingEvent; +import org.springframework.cloud.servicebroker.autoconfigure.web.ServiceBrokerAutoConfiguration; +import org.springframework.cloud.servicebroker.autoconfigure.web.reactive.ServiceBrokerWebFluxAutoConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.event.EventListener; + +@SpringBootApplication(exclude = { + ServiceBrokerAutoConfiguration.class, + ServiceBrokerWebFluxAutoConfiguration.class +}) +public class LogStreamingTestApp { + + private static final String APP_ID = UUID.randomUUID().toString(); + + private static boolean receivedStopEvent; + private static String receivedStopEventServiceInstanceId; + + public static String getAppId() { + return APP_ID; + } + + public static boolean isReceivedStopEvent() { + return receivedStopEvent; + } + + public static String getReceivedStopEventServiceInstanceId() { + return receivedStopEventServiceInstanceId; + } + + @Bean + ApplicationIdsProvider applicationIdsProvider() { + return serviceInstanceId -> Flux.just(APP_ID); + } + + @EventListener + public void onStop(StopServiceInstanceLoggingEvent stopServiceInstanceLoggingEvent) { + receivedStopEventServiceInstanceId = stopServiceInstanceLoggingEvent.getServiceInstanceId(); + receivedStopEvent = true; + } + +} diff --git a/spring-cloud-app-broker-logging/src/test/java/org/springframework/cloud/appbroker/logging/recent/ServiceInstanceRecentLogsTest.java b/spring-cloud-app-broker-logging/src/test/java/org/springframework/cloud/appbroker/logging/recent/ServiceInstanceRecentLogsTest.java new file mode 100644 index 000000000..a78d9108b --- /dev/null +++ b/spring-cloud-app-broker-logging/src/test/java/org/springframework/cloud/appbroker/logging/recent/ServiceInstanceRecentLogsTest.java @@ -0,0 +1,103 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.recent; + +import java.time.Instant; +import java.util.Base64; +import java.util.UUID; + +import com.example.recentlog.RecentLogsTestApp; +import org.cloudfoundry.client.CloudFoundryClient; +import org.cloudfoundry.client.v2.applications.ApplicationEntity; +import org.cloudfoundry.client.v2.applications.GetApplicationRequest; +import org.cloudfoundry.client.v2.applications.GetApplicationResponse; +import org.cloudfoundry.logcache.v1.Envelope; +import org.cloudfoundry.logcache.v1.EnvelopeBatch; +import org.cloudfoundry.logcache.v1.Log; +import org.cloudfoundry.logcache.v1.LogCacheClient; +import org.cloudfoundry.logcache.v1.LogType; +import org.cloudfoundry.logcache.v1.ReadRequest; +import org.cloudfoundry.logcache.v1.ReadResponse; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Answers; +import reactor.core.publisher.Mono; + +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.boot.test.web.server.LocalServerPort; +import org.springframework.test.web.reactive.server.WebTestClient; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; + +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = RecentLogsTestApp.class) +class ServiceInstanceRecentLogsTest { + + @LocalServerPort + private int port; + + @MockBean + private LogCacheClient logCacheClient; + + @MockBean(answer = Answers.RETURNS_DEEP_STUBS) + private CloudFoundryClient cloudFoundryClient; + + private String expectedTestMessage; + + private static final String SERVICE_INSTANCE_ID = UUID.randomUUID().toString(); + + @BeforeEach + void setUp() { + expectedTestMessage = "test message " + UUID.randomUUID(); + + Envelope testEnvelope = + Envelope.builder() + .timestamp(Instant.now().toEpochMilli()) + .log(Log.builder() + .payload(new String(Base64.getEncoder().encode(expectedTestMessage.getBytes()))) + .type(LogType.OUT) + .build()) + .build(); + + ReadResponse response = + ReadResponse.builder() + .envelopes(EnvelopeBatch.builder().batch(testEnvelope).build()) + .build(); + + given(logCacheClient.read(any(ReadRequest.class))) + .willReturn(Mono.just(response)); + + given(cloudFoundryClient.applicationsV2() + .get(GetApplicationRequest.builder().applicationId(RecentLogsTestApp.getAppId()).build())) + .willReturn(Mono.just( + GetApplicationResponse.builder().entity(ApplicationEntity.builder().name("test-app").build()).build())); + } + + @Test + void shouldFetchLogs() { + WebTestClient client = WebTestClient.bindToServer().baseUrl("http://localhost:" + port).build(); + + client.get().uri("/logs/{serviceInstanceId}/recentlogs", SERVICE_INSTANCE_ID) + .exchange() + .expectStatus().isOk() + .expectBody(String.class) + .value(Matchers.containsString(expectedTestMessage)); + } + +} diff --git a/spring-cloud-app-broker-logging/src/test/java/org/springframework/cloud/appbroker/logging/streaming/ServiceInstanceLogStreamingTest.java b/spring-cloud-app-broker-logging/src/test/java/org/springframework/cloud/appbroker/logging/streaming/ServiceInstanceLogStreamingTest.java new file mode 100644 index 000000000..fa380bace --- /dev/null +++ b/spring-cloud-app-broker-logging/src/test/java/org/springframework/cloud/appbroker/logging/streaming/ServiceInstanceLogStreamingTest.java @@ -0,0 +1,175 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.appbroker.logging.streaming; + +import java.net.URI; +import java.time.Instant; +import java.util.Base64; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +import com.example.recentlog.RecentLogsTestApp; +import com.example.streaming.LogStreamingTestApp; +import okio.ByteString; +import org.cloudfoundry.client.CloudFoundryClient; +import org.cloudfoundry.client.v2.applications.ApplicationEntity; +import org.cloudfoundry.client.v2.applications.GetApplicationRequest; +import org.cloudfoundry.client.v2.applications.GetApplicationResponse; +import org.cloudfoundry.dropsonde.events.LogMessage; +import org.cloudfoundry.logcache.v1.Envelope; +import org.cloudfoundry.logcache.v1.EnvelopeBatch; +import org.cloudfoundry.logcache.v1.Log; +import org.cloudfoundry.logcache.v1.LogCacheClient; +import org.cloudfoundry.logcache.v1.LogType; +import org.cloudfoundry.logcache.v1.ReadRequest; +import org.cloudfoundry.logcache.v1.ReadResponse; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Answers; +import reactor.core.Disposable; +import reactor.core.publisher.Mono; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.boot.test.web.server.LocalServerPort; +import org.springframework.cloud.appbroker.logging.streaming.events.ServiceInstanceLogEvent; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient; +import org.springframework.web.reactive.socket.client.WebSocketClient; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; + +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = LogStreamingTestApp.class) +class ServiceInstanceLogStreamingTest { + + private static final String SERVICE_INSTANCE_ID = UUID.randomUUID().toString(); + + @LocalServerPort + private int port; + + @MockBean + private LogCacheClient logCacheClient; + + @MockBean(answer = Answers.RETURNS_DEEP_STUBS) + private CloudFoundryClient cloudFoundryClient; + + @Autowired + private ApplicationEventPublisher applicationEventPublisher; + + private org.cloudfoundry.dropsonde.events.Envelope expectedEnvelope; + + private final AtomicReference actualEnvelope = new AtomicReference<>(); + + @BeforeEach + void setUp() { + String expectedTestMessage = "test message " + UUID.randomUUID(); + + Envelope testEnvelope = + Envelope.builder() + .timestamp(Instant.now().toEpochMilli()) + .log(Log.builder() + .payload(new String(Base64.getEncoder().encode(expectedTestMessage.getBytes()))) + .type(LogType.OUT) + .build()) + .build(); + expectedEnvelope = new org.cloudfoundry.dropsonde.events.Envelope.Builder() + .logMessage(new LogMessage.Builder() + .message(ByteString.of(Base64.getEncoder().encode(expectedTestMessage.getBytes()))) + .message_type(LogMessage.MessageType.OUT) + .timestamp(0L) + .build()) + .origin("") + .eventType(org.cloudfoundry.dropsonde.events.Envelope.EventType.LogMessage) + .build(); + actualEnvelope.set(testEnvelope); + + ReadResponse response = + ReadResponse.builder() + .envelopes(EnvelopeBatch.builder().batch(testEnvelope).build()) + .build(); + + given(logCacheClient.read(any(ReadRequest.class))) + .willReturn(Mono.just(response)); + + given(cloudFoundryClient.applicationsV2() + .get(GetApplicationRequest.builder().applicationId(RecentLogsTestApp.getAppId()).build())) + .willReturn(Mono.just( + GetApplicationResponse.builder().entity(ApplicationEntity.builder().name("test-app").build()).build())); + } + + @Test + void shouldPublishWebSocketEndpoint() { + Disposable subscription = connectToLogsStreamEndpoint(); + + await().untilAsserted(() -> assertThat(Base64.getDecoder().decode(actualEnvelope.get().getLog().getPayload())) + .isEqualTo(Base64.getDecoder().decode(expectedEnvelope.logMessage.message.toByteArray()))); + + subscription.dispose(); + } + + @Test + void shouldPublishEventOnDisconnect() { + Disposable subscription = connectToLogsStreamEndpoint(); + + await().untilAsserted(() -> assertThat(actualEnvelope.get()).isNotNull()); + + subscription.dispose(); + + await().untilAsserted(() -> assertThat(LogStreamingTestApp.isReceivedStopEvent()).isTrue()); + } + + @Test + void shouldStopStreamingIfNoClient() { + // CLI plugin doesn't always handle disconnect gracefully, so sometimes it is possible that log stream is + // being published but there is no listener. In this case log streaming should be stopped. + + Disposable subscription = connectToLogsStreamEndpoint(); + subscription.dispose(); + + applicationEventPublisher.publishEvent( + new ServiceInstanceLogEvent(this, SERVICE_INSTANCE_ID, expectedEnvelope)); + await().untilAsserted( + () -> assertThat(LogStreamingTestApp.getReceivedStopEventServiceInstanceId()).isEqualTo( + SERVICE_INSTANCE_ID)); + } + + private Disposable connectToLogsStreamEndpoint() { + URI uri = URI.create("ws://localhost:" + port + "/logs/" + SERVICE_INSTANCE_ID + "/stream"); + + WebSocketClient client = new ReactorNettyWebSocketClient(); + return client.execute(uri, getWebSocketHandler()).subscribe(); + } + + private WebSocketHandler getWebSocketHandler() { + return session -> session + .receive() + .doOnNext(message -> { + DataBuffer buffer = message.getPayload(); + actualEnvelope.set(Envelope.builder() + .log(Log.builder().payload(new String(Base64.getDecoder().decode(buffer.toString()))).build()) + .build()); + }) + .then(); + } + +} diff --git a/spring-cloud-starter-app-broker-logging/build.gradle b/spring-cloud-starter-app-broker-logging/build.gradle new file mode 100644 index 000000000..128b3468a --- /dev/null +++ b/spring-cloud-starter-app-broker-logging/build.gradle @@ -0,0 +1,22 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +description = "Spring Cloud App Broker Starter for Logging" + +dependencies { + api project(":spring-cloud-starter-app-broker") + api project(":spring-cloud-app-broker-logging") +} diff --git a/src/pmd/pmdTestRuleSet.xml b/src/pmd/pmdTestRuleSet.xml index 8d5ef6221..cd4fe9e9f 100644 --- a/src/pmd/pmdTestRuleSet.xml +++ b/src/pmd/pmdTestRuleSet.xml @@ -73,6 +73,7 @@ +