diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
index b393cbfd5ec6b..6e923f736adc3 100644
--- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
+++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
@@ -50,6 +50,13 @@
 _RESUME_THRESHOLD = 0.8
 """The load threshold below which to resume the incoming message stream."""
 
+_DEFAULT_STREAM_ACK_DEADLINE = 60
+"""The default message acknowledge deadline in seconds for incoming message stream.
+
+This default deadline is dynamically modified for the messages that are added
+to the lease management.
+"""
+
 
 def _maybe_wrap_exception(exception):
     """Wraps a gRPC exception class, if needed."""
@@ -384,8 +391,17 @@ def open(self, callback, on_callback_error):
         )
 
         # Create the RPC
-        subscription = self._client.api.get_subscription(self._subscription)
-        stream_ack_deadline_seconds = subscription.ack_deadline_seconds
+
+        # We must use a fixed value for the ACK deadline, as we cannot read it
+        # from the subscription. The latter would require `pubsub.subscriptions.get`
+        # permission, which is not granted to the default subscriber role
+        # `roles/pubsub.subscriber`.
+        # See also https://github.com/googleapis/google-cloud-python/issues/9339
+        #
+        # When dynamic lease management is enabled for the "on hold" messages,
+        # the defautl stream ACK deadline should again be set based on the
+        # historic ACK timing data, i.e. `self.ack_histogram.percentile(99)`.
+        stream_ack_deadline_seconds = _DEFAULT_STREAM_ACK_DEADLINE
 
         get_initial_request = functools.partial(
             self._get_initial_request, stream_ack_deadline_seconds
diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py
index c8030a5773db3..cb00a4b91ecd2 100644
--- a/pubsub/tests/system.py
+++ b/pubsub/tests/system.py
@@ -381,6 +381,10 @@ class CallbackError(Exception):
         with pytest.raises(CallbackError):
             future.result(timeout=30)
 
+    @pytest.mark.xfail(
+        reason="The default stream ACK deadline is static and received messages "
+        "exceeding FlowControl.max_messages are currently not lease managed."
+    )
     def test_streaming_pull_ack_deadline(
         self, publisher, subscriber, project, topic_path, subscription_path, cleanup
     ):
@@ -395,7 +399,7 @@ def test_streaming_pull_ack_deadline(
         # Subscribe to the topic. This must happen before the messages
         # are published.
         subscriber.create_subscription(
-            subscription_path, topic_path, ack_deadline_seconds=60
+            subscription_path, topic_path, ack_deadline_seconds=240
         )
 
         # publish some messages and wait for completion
@@ -403,7 +407,7 @@ def test_streaming_pull_ack_deadline(
 
         # subscribe to the topic
         callback = StreamingPullCallback(
-            processing_time=15,  # more than the default ACK deadline of 10 seconds
+            processing_time=70,  # more than the default stream ACK deadline (60s)
             resolve_at_msg_count=3,  # one more than the published messages count
         )
         flow_control = types.FlowControl(max_messages=1)
@@ -411,13 +415,13 @@ def test_streaming_pull_ack_deadline(
             subscription_path, callback, flow_control=flow_control
         )
 
-        # We expect to process the first two messages in 2 * 15 seconds, and
+        # We expect to process the first two messages in 2 * 70 seconds, and
         # any duplicate message that is re-sent by the backend in additional
-        # 15 seconds, totalling 45 seconds (+ overhead) --> if there have been
-        # no duplicates in 60 seconds, we can reasonably assume that there
+        # 70 seconds, totalling 210 seconds (+ overhead) --> if there have been
+        # no duplicates in 240 seconds, we can reasonably assume that there
         # won't be any.
         try:
-            callback.done_future.result(timeout=60)
+            callback.done_future.result(timeout=240)
         except exceptions.TimeoutError:
             # future timed out, because we received no excessive messages
             assert sorted(callback.seen_message_ids) == [1, 2]
diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
index 352b09ba83fc1..2bd20caa04d15 100644
--- a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
+++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
@@ -404,12 +404,9 @@ def test_heartbeat_inactive():
     "google.cloud.pubsub_v1.subscriber._protocol.heartbeater.Heartbeater", autospec=True
 )
 def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bidi_rpc):
+    stream_ack_deadline = streaming_pull_manager._DEFAULT_STREAM_ACK_DEADLINE
+
     manager = make_manager()
-    manager._client.api.get_subscription.return_value = types.Subscription(
-        name="projects/foo/subscriptions/bar",
-        topic="projects/foo/topics/baz",
-        ack_deadline_seconds=123,
-    )
 
     manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error)
 
@@ -437,7 +434,8 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi
     )
     initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"]
     assert initial_request_arg.func == manager._get_initial_request
-    assert initial_request_arg.args[0] == 123
+    assert initial_request_arg.args[0] == stream_ack_deadline
+    assert not manager._client.api.get_subscription.called
 
     resumable_bidi_rpc.return_value.add_done_callback.assert_called_once_with(
         manager._on_rpc_done