diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 33163b1a..90cc0eb9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -30,10 +30,12 @@ jobs: matrix: os: [ubuntu-latest] scala: [2.13, 3] - java: [temurin@11, temurin@17] + java: [temurin@11, temurin@17, temurin@21] exclude: - scala: 3 java: temurin@17 + - scala: 3 + java: temurin@21 runs-on: ${{ matrix.os }} timeout-minutes: 60 steps: @@ -71,6 +73,19 @@ jobs: if: matrix.java == 'temurin@17' && steps.setup-java-temurin-17.outputs.cache-hit == 'false' run: sbt +update + - name: Setup Java (temurin@21) + id: setup-java-temurin-21 + if: matrix.java == 'temurin@21' + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: 21 + cache: sbt + + - name: sbt update + if: matrix.java == 'temurin@21' && steps.setup-java-temurin-21.outputs.cache-hit == 'false' + run: sbt +update + - name: Check that workflows are up to date run: sbt githubWorkflowCheck @@ -156,6 +171,19 @@ jobs: if: matrix.java == 'temurin@17' && steps.setup-java-temurin-17.outputs.cache-hit == 'false' run: sbt +update + - name: Setup Java (temurin@21) + id: setup-java-temurin-21 + if: matrix.java == 'temurin@21' + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: 21 + cache: sbt + + - name: sbt update + if: matrix.java == 'temurin@21' && steps.setup-java-temurin-21.outputs.cache-hit == 'false' + run: sbt +update + - name: Download target directories (2.13) uses: actions/download-artifact@v4 with: @@ -243,6 +271,19 @@ jobs: if: matrix.java == 'temurin@17' && steps.setup-java-temurin-17.outputs.cache-hit == 'false' run: sbt +update + - name: Setup Java (temurin@21) + id: setup-java-temurin-21 + if: matrix.java == 'temurin@21' + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: 21 + cache: sbt + + - name: sbt update + if: matrix.java == 'temurin@21' && steps.setup-java-temurin-21.outputs.cache-hit == 'false' + run: sbt +update + - name: Submit Dependencies uses: scalacenter/sbt-dependency-submission@v2 with: @@ -316,6 +357,19 @@ jobs: if: matrix.java == 'temurin@17' && steps.setup-java-temurin-17.outputs.cache-hit == 'false' run: sbt +update + - name: Setup Java (temurin@21) + id: setup-java-temurin-21 + if: matrix.java == 'temurin@21' + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: 21 + cache: sbt + + - name: sbt update + if: matrix.java == 'temurin@21' && steps.setup-java-temurin-21.outputs.cache-hit == 'false' + run: sbt +update + - name: Generate site run: sbt docs/tlSite diff --git a/.mergify.yml b/.mergify.yml index 3a3cb2e3..3beb8f38 100644 --- a/.mergify.yml +++ b/.mergify.yml @@ -12,6 +12,7 @@ pull_request_rules: - body~=labels:.*early-semver-patch - status-success=Build and Test (ubuntu-latest, 2.13, temurin@11) - status-success=Build and Test (ubuntu-latest, 2.13, temurin@17) + - status-success=Build and Test (ubuntu-latest, 2.13, temurin@21) - status-success=Build and Test (ubuntu-latest, 3, temurin@11) - status-success=Generate Site (ubuntu-latest, temurin@11) actions: diff --git a/build.sbt b/build.sbt index 468a32dc..fe266a62 100644 --- a/build.sbt +++ b/build.sbt @@ -1,4 +1,5 @@ import com.typesafe.tools.mima.core._ +import explicitdeps.ExplicitDepsPlugin.autoImport.moduleFilterRemoveValue lazy val root = project .in(file(".")) @@ -11,10 +12,6 @@ lazy val core = project name := "http4s-jdk-http-client", libraryDependencies ++= coreDeps, mimaBinaryIssueFilters ++= Seq( - // package private, due to #641 - ProblemFilters.exclude[IncompatibleMethTypeProblem]( - "org.http4s.jdkhttpclient.JdkHttpClient.defaultHttpClient" - ) ) ) @@ -82,7 +79,7 @@ ThisBuild / developers := List( ) ThisBuild / tlJdkRelease := Some(11) -ThisBuild / githubWorkflowJavaVersions := Seq("11", "17").map(JavaSpec.temurin(_)) +ThisBuild / githubWorkflowJavaVersions := Seq("11", "17", "21").map(JavaSpec.temurin(_)) ThisBuild / tlCiReleaseBranches := Seq("main") ThisBuild / tlSitePublishBranch := Some("main") @@ -97,6 +94,7 @@ lazy val docsSettings = Versions .forCurrentVersion(Version("1.x", "1.x")) .withOlderVersions( + Version("0.10.x", "0.10"), Version("0.9.x", "0.9"), Version("0.8.x", "0.8"), Version("0.7.x", "0.7"), diff --git a/core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala b/core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala index 1a23c924..89437ec5 100644 --- a/core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala +++ b/core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala @@ -69,16 +69,23 @@ object JdkHttpClient { case Entity.Strict(bytes) => Resource.pure[F, BodyPublisher](BodyPublishers.ofInputStream(() => bytes.toInputStream)) case Entity.Streamed(body, _) => + def consumeFully = version match { + case HttpClient.Version.HTTP_1_1 => req.isChunked + case HttpClient.Version.HTTP_2 => req.contentLength.isEmpty + } flow .toPublisher(body.chunks.map(_.toByteBuffer)) .map { publisher => - if (req.isChunked) + if (consumeFully) BodyPublishers.fromPublisher(publisher) else req.contentLength match { case Some(length) if length > 0L => BodyPublishers.fromPublisher(publisher, length) - case _ => BodyPublishers.noBody + case _ => + // If we dont do this, we might block finalization + publisher.subscribe(DrainingSubscriber) + BodyPublishers.noBody } } } @@ -252,9 +259,22 @@ object JdkHttpClient { * [[cats.effect.kernel.Async.executor executor]], sets the * [[org.http4s.client.defaults.ConnectTimeout default http4s connect timeout]], and disables * [[https://github.com/http4s/http4s-jdk-http-client/issues/200 TLS 1.3 on JDK 11]]. + * + * On Java 21 and higher, it actively closes the underlying client, releasing its resources + * early. On earlier Java versions, closing the underlying client is not possible, so the release + * is a no-op. On these Java versions (and there only), you can safely use + * [[cats.effect.Resource allocated]] to avoid dealing with resource management. */ - def simple[F[_]](implicit F: Async[F]): F[Client[F]] = - defaultHttpClient[F].map(apply(_)) + def simple[F[_]](implicit F: Async[F]): Resource[F, Client[F]] = + defaultHttpClientResource[F].map(apply(_)) + + private[jdkhttpclient] def defaultHttpClientResource[F[_]](implicit + F: Async[F] + ): Resource[F, HttpClient] = + Resource.make[F, HttpClient](defaultHttpClient[F]) { + case c: AutoCloseable => Sync[F].blocking(c.close()) + case _ => Applicative[F].unit + } private[jdkhttpclient] def defaultHttpClient[F[_]](implicit F: Async[F]): F[HttpClient] = F.executor.flatMap { exec => @@ -296,4 +316,12 @@ object JdkHttpClient { "via", "warning" ).map(CIString(_)) + + private object DrainingSubscriber extends Flow.Subscriber[ByteBuffer] { + override def onSubscribe(subscription: Flow.Subscription): Unit = + subscription.request(Long.MaxValue) + override def onNext(item: ByteBuffer): Unit = () + override def onError(throwable: Throwable): Unit = () + override def onComplete(): Unit = () + } } diff --git a/core/src/main/scala/org/http4s/jdkhttpclient/JdkWSClient.scala b/core/src/main/scala/org/http4s/jdkhttpclient/JdkWSClient.scala index f8d128e1..49488e2f 100644 --- a/core/src/main/scala/org/http4s/jdkhttpclient/JdkWSClient.scala +++ b/core/src/main/scala/org/http4s/jdkhttpclient/JdkWSClient.scala @@ -131,6 +131,11 @@ object JdkWSClient { }) } yield () } + // If the input side is still open (no close received from server), the JDK will not clean up the connection. + // This also implies the client can't be shutdown on Java 21+ as it waits for all open connections + // to be be closed. As we don't expect/handle anything coming on the input anymore + // at this point, we can safely abort. + _ <- F.delay(webSocket.abort()) } yield () } .map { case (webSocket, queue, closedDef, sendSem) => @@ -164,7 +169,12 @@ object JdkWSClient { * [[cats.effect.kernel.Async.executor executor]], sets the * [[org.http4s.client.defaults.ConnectTimeout default http4s connect timeout]], and disables * [[https://github.com/http4s/http4s-jdk-http-client/issues/200 TLS 1.3 on JDK 11]]. + * + * * On Java 21 and higher, it actively closes the underlying client, releasing its resources + * early. On earlier Java versions, closing the underlying client is not possible, so the release + * is a no-op. On these Java versions (and there only), you can safely use + * [[cats.effect.Resource allocated]] to avoid dealing with resource management. */ - def simple[F[_]](implicit F: Async[F]): F[WSClient[F]] = - JdkHttpClient.defaultHttpClient[F].map(apply(_)) + def simple[F[_]](implicit F: Async[F]): Resource[F, WSClient[F]] = + JdkHttpClient.defaultHttpClientResource[F].map(apply(_)) } diff --git a/core/src/test/scala/org/http4s/jdkhttpclient/BodyLeakExample.scala b/core/src/test/scala/org/http4s/jdkhttpclient/BodyLeakExample.scala index 584f5734..01ff1109 100644 --- a/core/src/test/scala/org/http4s/jdkhttpclient/BodyLeakExample.scala +++ b/core/src/test/scala/org/http4s/jdkhttpclient/BodyLeakExample.scala @@ -55,7 +55,7 @@ object BodyLeakExample extends IOApp { .withPort(port"8080") .withHttpApp(app) .build - .product(Resource.eval(JdkHttpClient.simple[IO])) + .product(JdkHttpClient.simple[IO]) .use { case (_, client) => for { counter <- Ref.of[IO, Long](0L) diff --git a/core/src/test/scala/org/http4s/jdkhttpclient/DeadlockWorkaround.scala b/core/src/test/scala/org/http4s/jdkhttpclient/DeadlockWorkaround.scala index e6d903bb..d471ca44 100644 --- a/core/src/test/scala/org/http4s/jdkhttpclient/DeadlockWorkaround.scala +++ b/core/src/test/scala/org/http4s/jdkhttpclient/DeadlockWorkaround.scala @@ -29,7 +29,7 @@ class DeadlockWorkaround extends CatsEffectSuite { test("fail to connect via TLSv1.3 on Java 11") { if (Runtime.version().feature() > 11) IO.pure(true) else - (JdkHttpClient.simple[IO], JdkWSClient.simple[IO]).flatMapN { (http, ws) => + (JdkHttpClient.simple[IO], JdkWSClient.simple[IO]).tupled.use { case (http, ws) => def testSSLFailure(r: IO[Unit]) = r.intercept[SSLHandshakeException] testSSLFailure(http.expect[Unit](uri"https://tls13.1d.pw")) *> testSSLFailure(ws.connectHighLevel(WSRequest(uri"wss://tls13.1d.pw")).use(_ => IO.unit)) diff --git a/core/src/test/scala/org/http4s/jdkhttpclient/JdkHttpClientSpec.scala b/core/src/test/scala/org/http4s/jdkhttpclient/JdkHttpClientSpec.scala index c7308651..77ef29b7 100644 --- a/core/src/test/scala/org/http4s/jdkhttpclient/JdkHttpClientSpec.scala +++ b/core/src/test/scala/org/http4s/jdkhttpclient/JdkHttpClientSpec.scala @@ -28,7 +28,7 @@ import org.typelevel.ci._ import scala.concurrent.duration._ class JdkHttpClientSpec extends ClientRouteTestBattery("JdkHttpClient") { - def clientResource: Resource[IO, Client[IO]] = Resource.eval(JdkHttpClient.simple[IO]) + def clientResource: Resource[IO, Client[IO]] = JdkHttpClient.simple[IO] // regression test for https://github.com/http4s/http4s-jdk-http-client/issues/395 test("Don't error with empty body and explicit Content-Length: 0") { diff --git a/core/src/test/scala/org/http4s/jdkhttpclient/JdkWSClientSpec.scala b/core/src/test/scala/org/http4s/jdkhttpclient/JdkWSClientSpec.scala index 0e49af62..34426222 100644 --- a/core/src/test/scala/org/http4s/jdkhttpclient/JdkWSClientSpec.scala +++ b/core/src/test/scala/org/http4s/jdkhttpclient/JdkWSClientSpec.scala @@ -40,7 +40,7 @@ class JdkWSClientSpec extends CatsEffectSuite { implicit val loggerFactory: LoggerFactory[IO] = NoOpFactory[IO] val webSocket: IOFixture[WSClient[IO]] = - ResourceSuiteLocalFixture("webSocket", Resource.eval(JdkWSClient.simple[IO])) + ResourceSuiteLocalFixture("webSocket", JdkWSClient.simple[IO]) val echoServerUri: IOFixture[Uri] = ResourceSuiteLocalFixture( "echoServerUri", diff --git a/docs/README.md b/docs/README.md index 721bcf2e..33417737 100644 --- a/docs/README.md +++ b/docs/README.md @@ -50,7 +50,7 @@ import org.http4s.jdkhttpclient.JdkHttpClient // It comes for free with `cats.effect.IOApp`: import cats.effect.unsafe.implicits.global -val client: IO[Client[IO]] = JdkHttpClient.simple[IO] +val client: Resource[IO, Client[IO]] = JdkHttpClient.simple[IO] ``` #### Custom clients @@ -91,7 +91,7 @@ def fetchStatus[F[_]](c: Client[F], uri: Uri): F[Status] = c.status(Request[F](Method.GET, uri = uri)) client - .flatMap(c => fetchStatus(c, uri"https://http4s.org/")) + .use(c => fetchStatus(c, uri"https://http4s.org/")) .unsafeRunSync() ``` @@ -103,7 +103,7 @@ create a new `HttpClient` instance on every invocation: ```scala mdoc def fetchStatusInefficiently[F[_]: Async](uri: Uri): F[Status] = - JdkHttpClient.simple[F].flatMap(_.status(Request[F](Method.GET, uri = uri))) + JdkHttpClient.simple[F].use(_.status(Request[F](Method.GET, uri = uri))) ``` @:@