Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscriber not cleaning resources when subscribing to non-existing subscription (Subscriber.subStub and StreamingSubscriberConnection.messageDispatcher) #315

Closed
rgrebski opened this issue Aug 12, 2020 · 4 comments · Fixed by #539
Assignees
Labels
api: pubsub Issues related to the googleapis/java-pubsub API. priority: p2 Moderately-important priority. Fix may not be included in next release. 🚨 This issue needs some love. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@rgrebski
Copy link

rgrebski commented Aug 12, 2020

Some resources (Subscriber.subStub and StreamingSubscriberConnection.messageDispatcher) seem not to be cleaned up when Subscriber fails trying to subscribe to non-existing subscription when using shared ScheduledThreadPoolExecutor. As a result executor work queue grows up in time using memory and cpu.

Attached code fragment below is an example of how to reproduce the issue, scenario:

  1. Share 2 instances of ScheduledThreadPoolExecutor (system executor provider and executor provider) between subscribers
  2. Create 200 Subscribers every 70 seconds - subscribing to non existing topic (you can decrease 70s to something smaller to see the effect faster. I used 70s because I noticed that some threads are being cleaned up 60s after Subscriber starts/fails)
  3. See how memory and cpu usage is growing over time.

I analysed the leak a bit further and I noticed that Watchdog and StreamingSubscriberConnection.messageDispatcher tasks are being queued in ScheduledThreadPoolExecutor. Here is why:

How to fix it ?
I have prepared sample commit with fix that does the cleanup but I guess it may break some internal stuff, so I am sharing this to start discussion and find correct solution :)

Environment details

  1. PubSub Subscriber
  2. OS type and version: Ubuntu 18.04
  3. Java version:
    openjdk version "11.0.5" 2019-10-15
    OpenJDK Runtime Environment 18.9 (build 11.0.5+10)
    OpenJDK 64-Bit Server VM 18.9 (build 11.0.5+10, mixed mode)
  4. pubsub version(s): pubsub-java 1.108.2-SNAPSHOT (latest master, also present in 1.108.0)

Steps to reproduce

  1. Create lots of subscriber for non-existing topics
  2. Each subscriber fails with io.grpc.StatusRuntimeException: NOT_FOUND: Resource not found (resource=nonExistingSubscriptionName)
  3. See how memory and CPU usage grows over time

Code example (Kotlin)

val executorProvider = FixedExecutorProvider.create(
    ScheduledThreadPoolExecutor(
        10 /*corePoolSize*/,
        ThreadFactoryFactory.executorThreadFactory("PubSub-Sub-Executor")
    )
)
val systemExecutorProvider = FixedExecutorProvider.create(
    ScheduledThreadPoolExecutor(
        10 /*corePoolSize*/,
        ThreadFactoryFactory.executorThreadFactory("PubSub-Sub-SystemExecutor")
    )
)

fun main() {
    //wait forever until killed
        while(true) {
            (1..200).map {
                createSubscriber(it)
            }
                .onEach { it.startAsync() }
               
            Thread.sleep(70_000)             
        }

}


private fun createSubscriber(subscriberNumber: Int): Subscriber {
    val subscriber = Subscriber
        //use non existing subscription name
        .newBuilder(
            ProjectSubscriptionName.of("yourProjectId", "nonExistingSubscriptionName"),
            MessageReceiverExample()
        )
        .setCredentialsProvider(credentialsProvider())
        .setSystemExecutorProvider(systemExecutorProvider)
        .setExecutorProvider(executorProvider)
        .setFlowControlSettings(
            FlowControlSettings
                .newBuilder()
                .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
                .setMaxOutstandingElementCount(1000L)
                .setMaxOutstandingRequestBytes(100L * 1024L * 1024L) // 100MB
                .build()
        )
        .build()!!


    return subscriber
}
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/java-pubsub API. label Aug 12, 2020
@yoshi-automation yoshi-automation added triage me I really want to be triaged. 🚨 This issue needs some love. labels Aug 13, 2020
@hannahrogers-google hannahrogers-google added priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. and removed 🚨 This issue needs some love. triage me I really want to be triaged. labels Aug 17, 2020
@rgrebski
Copy link
Author

Any thoughts on the above ?
I would be more than happy to help with implementing the fix, but I need some input from the project team.

@rgrebski
Copy link
Author

rgrebski commented Nov 6, 2020

Ping, anyone here ? :)

@yoshi-automation yoshi-automation added 🚨 This issue needs some love. and removed 🚨 This issue needs some love. labels Feb 4, 2021
@meredithslota
Copy link

@rgrebski Hi there! Sorry, this slipped through our triage process. We've assigned the correct person now and hopefully will have a reply soon. Apologies for the wait.

@hannahrogers-google
Copy link
Contributor

Thank you for reporting this issue! Sorry for the delay, I am looking into this problem right now and will get back to you with my findings.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/java-pubsub API. priority: p2 Moderately-important priority. Fix may not be included in next release. 🚨 This issue needs some love. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
4 participants