diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py
index caa784407..8dbbea634 100644
--- a/google/cloud/pubsub_v1/publisher/client.py
+++ b/google/cloud/pubsub_v1/publisher/client.py
@@ -31,9 +31,12 @@
 from google.cloud.pubsub_v1 import types
 from google.cloud.pubsub_v1.gapic import publisher_client
 from google.cloud.pubsub_v1.gapic.transports import publisher_grpc_transport
+from google.cloud.pubsub_v1.publisher import exceptions
+from google.cloud.pubsub_v1.publisher import futures
 from google.cloud.pubsub_v1.publisher._batch import thread
 from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer
 from google.cloud.pubsub_v1.publisher._sequencer import unordered_sequencer
+from google.cloud.pubsub_v1.publisher.flow_controller import FlowController
 
 __version__ = pkg_resources.get_distribution("google-cloud-pubsub").version
 
@@ -93,7 +96,11 @@ class Client(object):
 
             # Optional
             publisher_options = pubsub_v1.types.PublisherOptions(
-                enable_message_ordering=False
+                enable_message_ordering=False,
+                flow_control=pubsub_v1.types.PublishFlowControl(
+                    message_limit=2000,
+                    limit_exceeded_behavior=pubsub_v1.types.LimitExceededBehavior.BLOCK,
+                ),
             ),
 
             # Optional
@@ -198,6 +205,9 @@ def __init__(self, batch_settings=(), publisher_options=(), **kwargs):
         # Thread created to commit all sequencers after a timeout.
         self._commit_thread = None
 
+        # The object controlling the message publishing flow
+        self._flow_controller = FlowController(self.publisher_options.flow_control)
+
     @classmethod
     def from_service_account_file(cls, filename, batch_settings=(), **kwargs):
         """Creates an instance of this client using the provided credentials
@@ -364,6 +374,18 @@ def publish(self, topic, data, ordering_key="", **attrs):
             data=data, ordering_key=ordering_key, attributes=attrs
         )
 
+        # Messages should go through flow control to prevent excessive
+        # queuing on the client side (depending on the settings).
+        try:
+            self._flow_controller.add(message)
+        except exceptions.FlowControlLimitError as exc:
+            future = futures.Future()
+            future.set_exception(exc)
+            return future
+
+        def on_publish_done(future):
+            self._flow_controller.release(message)
+
         with self._batch_lock:
             if self._is_stopped:
                 raise RuntimeError("Cannot publish on a stopped publisher.")
@@ -372,6 +394,7 @@ def publish(self, topic, data, ordering_key="", **attrs):
 
             # Delegate the publishing to the sequencer.
             future = sequencer.publish(message)
+            future.add_done_callback(on_publish_done)
 
             # Create a timer thread if necessary to enforce the batching
             # timeout.
diff --git a/google/cloud/pubsub_v1/publisher/exceptions.py b/google/cloud/pubsub_v1/publisher/exceptions.py
index 856be955a..89b3790a0 100644
--- a/google/cloud/pubsub_v1/publisher/exceptions.py
+++ b/google/cloud/pubsub_v1/publisher/exceptions.py
@@ -38,7 +38,12 @@ def __init__(self, ordering_key):
         super(PublishToPausedOrderingKeyException, self).__init__()
 
 
+class FlowControlLimitError(Exception):
+    """An action resulted in exceeding the flow control limits."""
+
+
 __all__ = (
+    "FlowControlLimitError",
     "MessageTooLargeError",
     "PublishError",
     "TimeoutError",
diff --git a/google/cloud/pubsub_v1/publisher/flow_controller.py b/google/cloud/pubsub_v1/publisher/flow_controller.py
new file mode 100644
index 000000000..c10fadcef
--- /dev/null
+++ b/google/cloud/pubsub_v1/publisher/flow_controller.py
@@ -0,0 +1,297 @@
+# Copyright 2020, Google LLC All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from collections import deque
+import logging
+import threading
+import warnings
+
+from google.cloud.pubsub_v1 import types
+from google.cloud.pubsub_v1.publisher import exceptions
+
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class _QuantityReservation(object):
+    """A (partial) reservation of a quantifiable resource."""
+
+    def __init__(self, reserved, needed):
+        self.reserved = reserved
+        self.needed = needed
+
+
+class FlowController(object):
+    """A class used to control the flow of messages passing through it.
+
+    Args:
+        settings (~google.cloud.pubsub_v1.types.PublishFlowControl):
+            Desired flow control configuration.
+    """
+
+    def __init__(self, settings):
+        self._settings = settings
+
+        # Load statistics. They represent the number of messages added, but not
+        # yet released (and their total size).
+        self._message_count = 0
+        self._total_bytes = 0
+
+        # A FIFO queue of threads blocked on adding a message, from first to last.
+        # Only relevant if the configured limit exceeded behavior is BLOCK.
+        self._waiting = deque()
+
+        # Reservations of available flow control bytes by the waiting threads.
+        # Each value is a _QuantityReservation instance.
+        self._byte_reservations = dict()
+        self._reserved_bytes = 0
+
+        # The lock is used to protect all internal state (message and byte count,
+        # waiting threads to add, etc.).
+        self._operational_lock = threading.Lock()
+
+        # The condition for blocking the flow if capacity is exceeded.
+        self._has_capacity = threading.Condition(lock=self._operational_lock)
+
+    def add(self, message):
+        """Add a message to flow control.
+
+        Adding a message updates the internal load statistics, and an action is
+        taken if these limits are exceeded (depending on the flow control settings).
+
+        Args:
+            message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`):
+                The message entering the flow control.
+
+        Raises:
+            :exception:`~pubsub_v1.publisher.exceptions.FlowControlLimitError`:
+                Raised when the desired action is
+                :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.ERROR` and
+                the message would exceed flow control limits, or when the desired action
+                is :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.BLOCK` and
+                the message would block forever against the flow control limits.
+        """
+        if self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.IGNORE:
+            return
+
+        with self._operational_lock:
+            if not self._would_overflow(message):
+                self._message_count += 1
+                self._total_bytes += message.ByteSize()
+                return
+
+            # Adding a message would overflow, react.
+            if (
+                self._settings.limit_exceeded_behavior
+                == types.LimitExceededBehavior.ERROR
+            ):
+                # Raising an error means rejecting a message, thus we do not
+                # add anything to the existing load, but we do report the would-be
+                # load if we accepted the message.
+                load_info = self._load_info(
+                    message_count=self._message_count + 1,
+                    total_bytes=self._total_bytes + message.ByteSize(),
+                )
+                error_msg = "Flow control limits would be exceeded - {}.".format(
+                    load_info
+                )
+                raise exceptions.FlowControlLimitError(error_msg)
+
+            assert (
+                self._settings.limit_exceeded_behavior
+                == types.LimitExceededBehavior.BLOCK
+            )
+
+            # Sanity check - if a message exceeds total flow control limits all
+            # by itself, it would block forever, thus raise error.
+            if (
+                message.ByteSize() > self._settings.byte_limit
+                or self._settings.message_limit < 1
+            ):
+                load_info = self._load_info(
+                    message_count=1, total_bytes=message.ByteSize()
+                )
+                error_msg = (
+                    "Total flow control limits too low for the message, "
+                    "would block forever - {}.".format(load_info)
+                )
+                raise exceptions.FlowControlLimitError(error_msg)
+
+            current_thread = threading.current_thread()
+
+            while self._would_overflow(message):
+                if current_thread not in self._byte_reservations:
+                    self._waiting.append(current_thread)
+                    self._byte_reservations[current_thread] = _QuantityReservation(
+                        reserved=0, needed=message.ByteSize()
+                    )
+
+                _LOGGER.debug(
+                    "Blocking until there is enough free capacity in the flow - "
+                    "{}.".format(self._load_info())
+                )
+
+                self._has_capacity.wait()
+
+                _LOGGER.debug(
+                    "Woke up from waiting on free capacity in the flow - "
+                    "{}.".format(self._load_info())
+                )
+
+            # Message accepted, increase the load and remove thread stats.
+            self._message_count += 1
+            self._total_bytes += message.ByteSize()
+            self._reserved_bytes -= self._byte_reservations[current_thread].reserved
+            del self._byte_reservations[current_thread]
+            self._waiting.remove(current_thread)
+
+    def release(self, message):
+        """Release a mesage from flow control.
+
+        Args:
+            message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`):
+                The message entering the flow control.
+        """
+        if self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.IGNORE:
+            return
+
+        with self._operational_lock:
+            # Releasing a message decreases the load.
+            self._message_count -= 1
+            self._total_bytes -= message.ByteSize()
+
+            if self._message_count < 0 or self._total_bytes < 0:
+                warnings.warn(
+                    "Releasing a message that was never added or already released.",
+                    category=RuntimeWarning,
+                    stacklevel=2,
+                )
+                self._message_count = max(0, self._message_count)
+                self._total_bytes = max(0, self._total_bytes)
+
+            self._distribute_available_bytes()
+
+            # If at least one thread waiting to add() can be unblocked, wake them up.
+            if self._ready_to_unblock():
+                _LOGGER.debug("Notifying threads waiting to add messages to flow.")
+                self._has_capacity.notify_all()
+
+    def _distribute_available_bytes(self):
+        """Distribute availalbe free capacity among the waiting threads in FIFO order.
+
+        The method assumes that the caller has obtained ``_operational_lock``.
+        """
+        available = self._settings.byte_limit - self._total_bytes - self._reserved_bytes
+
+        for thread in self._waiting:
+            if available <= 0:
+                break
+
+            reservation = self._byte_reservations[thread]
+            still_needed = reservation.needed - reservation.reserved
+
+            # Sanity check for any internal inconsistencies.
+            if still_needed < 0:
+                msg = "Too many bytes reserved: {} / {}".format(
+                    reservation.reserved, reservation.needed
+                )
+                warnings.warn(msg, category=RuntimeWarning)
+                still_needed = 0
+
+            can_give = min(still_needed, available)
+            reservation.reserved += can_give
+            self._reserved_bytes += can_give
+            available -= can_give
+
+    def _ready_to_unblock(self):
+        """Determine if any of the threads waiting to add a message can proceed.
+
+        The method assumes that the caller has obtained ``_operational_lock``.
+
+        Returns:
+            bool
+        """
+        if self._waiting:
+            # It's enough to only check the head of the queue, because FIFO
+            # distribution of any free capacity.
+            reservation = self._byte_reservations[self._waiting[0]]
+            return (
+                reservation.reserved >= reservation.needed
+                and self._message_count < self._settings.message_limit
+            )
+
+        return False
+
+    def _would_overflow(self, message):
+        """Determine if accepting a message would exceed flow control limits.
+
+        The method assumes that the caller has obtained ``_operational_lock``.
+
+        Args:
+            message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`):
+                The message entering the flow control.
+
+        Returns:
+            bool
+        """
+        reservation = self._byte_reservations.get(threading.current_thread())
+
+        if reservation:
+            enough_reserved = reservation.reserved >= reservation.needed
+        else:
+            enough_reserved = False
+
+        bytes_taken = self._total_bytes + self._reserved_bytes + message.ByteSize()
+        size_overflow = bytes_taken > self._settings.byte_limit and not enough_reserved
+        msg_count_overflow = self._message_count + 1 > self._settings.message_limit
+
+        return size_overflow or msg_count_overflow
+
+    def _load_info(self, message_count=None, total_bytes=None, reserved_bytes=None):
+        """Return the current flow control load information.
+
+        The caller can optionally adjust some of the values to fit its reporting
+        needs.
+
+        The method assumes that the caller has obtained ``_operational_lock``.
+
+        Args:
+            message_count (Optional[int]):
+                The value to override the current message count with.
+            total_bytes (Optional[int]):
+                The value to override the current total bytes with.
+            reserved_bytes (Optional[int]):
+                The value to override the current number of reserved bytes with.
+
+        Returns:
+            str
+        """
+        msg = "messages: {} / {}, bytes: {} / {} (reserved: {})"
+
+        if message_count is None:
+            message_count = self._message_count
+
+        if total_bytes is None:
+            total_bytes = self._total_bytes
+
+        if reserved_bytes is None:
+            reserved_bytes = self._reserved_bytes
+
+        return msg.format(
+            message_count,
+            self._settings.message_limit,
+            total_bytes,
+            self._settings.byte_limit,
+            reserved_bytes,
+        )
diff --git a/google/cloud/pubsub_v1/types.py b/google/cloud/pubsub_v1/types.py
index eb4f00681..b52b3ea60 100644
--- a/google/cloud/pubsub_v1/types.py
+++ b/google/cloud/pubsub_v1/types.py
@@ -13,7 +13,9 @@
 # limitations under the License.
 
 from __future__ import absolute_import
+
 import collections
+import enum
 import sys
 
 from google.api import http_pb2
@@ -30,25 +32,6 @@
 from google.cloud.pubsub_v1.proto import pubsub_pb2
 
 
-# Define the default publisher options.
-#
-# This class is used when creating a publisher client to pass in options
-# to enable/disable features.
-PublisherOptions = collections.namedtuple(
-    "PublisherConfig", ["enable_message_ordering"]
-)
-PublisherOptions.__new__.__defaults__ = (False,)  # enable_message_ordering: False
-
-if sys.version_info >= (3, 5):
-    PublisherOptions.__doc__ = "The options for the publisher client."
-    PublisherOptions.enable_message_ordering.__doc__ = (
-        "Whether to order messages in a batch by a supplied ordering key."
-        "EXPERIMENTAL: Message ordering is an alpha feature that requires "
-        "special permissions to use. Please contact the Cloud Pub/Sub team for "
-        "more information."
-    )
-
-
 # Define the default values for batching.
 #
 # This class is used when creating a publisher or subscriber client, and
@@ -81,6 +64,63 @@
     )
 
 
+class LimitExceededBehavior(str, enum.Enum):
+    """The possible actions when exceeding the publish flow control limits."""
+
+    IGNORE = "ignore"
+    BLOCK = "block"
+    ERROR = "error"
+
+
+PublishFlowControl = collections.namedtuple(
+    "PublishFlowControl", ["message_limit", "byte_limit", "limit_exceeded_behavior"]
+)
+PublishFlowControl.__new__.__defaults__ = (
+    10 * BatchSettings.__new__.__defaults__[2],  # message limit
+    10 * BatchSettings.__new__.__defaults__[0],  # byte limit
+    LimitExceededBehavior.IGNORE,  # desired behavior
+)
+
+if sys.version_info >= (3, 5):
+    PublishFlowControl.__doc__ = (
+        "The client flow control settings for message publishing."
+    )
+    PublishFlowControl.message_limit.__doc__ = (
+        "The maximum number of messages awaiting to be published."
+    )
+    PublishFlowControl.byte_limit.__doc__ = (
+        "The maximum total size of messages awaiting to be published."
+    )
+    PublishFlowControl.limit_exceeded_behavior.__doc__ = (
+        "The action to take when publish flow control limits are exceeded."
+    )
+
+# Define the default publisher options.
+#
+# This class is used when creating a publisher client to pass in options
+# to enable/disable features.
+PublisherOptions = collections.namedtuple(
+    "PublisherConfig", ["enable_message_ordering", "flow_control"]
+)
+PublisherOptions.__new__.__defaults__ = (
+    False,  # enable_message_ordering: False
+    PublishFlowControl(),  # default flow control settings
+)
+
+if sys.version_info >= (3, 5):
+    PublisherOptions.__doc__ = "The options for the publisher client."
+    PublisherOptions.enable_message_ordering.__doc__ = (
+        "Whether to order messages in a batch by a supplied ordering key."
+        "EXPERIMENTAL: Message ordering is an alpha feature that requires "
+        "special permissions to use. Please contact the Cloud Pub/Sub team for "
+        "more information."
+    )
+    PublisherOptions.flow_control.__doc__ = (
+        "Flow control settings for message publishing by the client. By default "
+        "the publisher client does not do any throttling."
+    )
+
+
 # Define the type class and default values for flow control settings.
 #
 # This class is used when creating a publisher or subscriber client, and
diff --git a/tests/unit/pubsub_v1/publisher/test_flow_controller.py b/tests/unit/pubsub_v1/publisher/test_flow_controller.py
new file mode 100644
index 000000000..26a61663b
--- /dev/null
+++ b/tests/unit/pubsub_v1/publisher/test_flow_controller.py
@@ -0,0 +1,409 @@
+# Copyright 2020, Google LLC All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+
+import threading
+import time
+import warnings
+
+import pytest
+
+from google.cloud.pubsub_v1 import types
+from google.cloud.pubsub_v1.publisher import exceptions
+from google.cloud.pubsub_v1.publisher.flow_controller import FlowController
+
+
+def _run_in_daemon(
+    flow_controller,
+    action,
+    messages,
+    all_done_event,
+    error_event=None,
+    action_pause=None,
+):
+    """Run flow controller action (add or remove messages) in a daemon thread.
+    """
+    assert action in ("add", "release")
+
+    def run_me():
+        method = getattr(flow_controller, action)
+
+        try:
+            for msg in messages:
+                if action_pause is not None:
+                    time.sleep(action_pause)
+                method(msg)
+        except Exception:
+            if error_event is not None:
+                error_event.set()
+        else:
+            all_done_event.set()
+
+    thread = threading.Thread(target=run_me)
+    thread.daemon = True
+    thread.start()
+
+
+def test_no_overflow_no_error():
+    settings = types.PublishFlowControl(
+        message_limit=100,
+        byte_limit=10000,
+        limit_exceeded_behavior=types.LimitExceededBehavior.ERROR,
+    )
+    flow_controller = FlowController(settings)
+
+    # there should be no errors
+    for data in (b"foo", b"bar", b"baz"):
+        msg = types.PubsubMessage(data=data)
+        flow_controller.add(msg)
+
+
+def test_overflow_no_error_on_ignore():
+    settings = types.PublishFlowControl(
+        message_limit=1,
+        byte_limit=2,
+        limit_exceeded_behavior=types.LimitExceededBehavior.IGNORE,
+    )
+    flow_controller = FlowController(settings)
+
+    # there should be no overflow errors
+    flow_controller.add(types.PubsubMessage(data=b"foo"))
+    flow_controller.add(types.PubsubMessage(data=b"bar"))
+
+
+def test_message_count_overflow_error():
+    settings = types.PublishFlowControl(
+        message_limit=1,
+        byte_limit=10000,
+        limit_exceeded_behavior=types.LimitExceededBehavior.ERROR,
+    )
+    flow_controller = FlowController(settings)
+
+    flow_controller.add(types.PubsubMessage(data=b"foo"))
+    with pytest.raises(exceptions.FlowControlLimitError) as error:
+        flow_controller.add(types.PubsubMessage(data=b"bar"))
+
+    assert "messages: 2 / 1" in str(error.value)
+
+
+def test_byte_size_overflow_error():
+    settings = types.PublishFlowControl(
+        message_limit=10000,
+        byte_limit=199,
+        limit_exceeded_behavior=types.LimitExceededBehavior.ERROR,
+    )
+    flow_controller = FlowController(settings)
+
+    # Since the message data itself occupies 100 bytes, it means that both
+    # messages combined will exceed the imposed byte limit of 199, but a single
+    # message will not (the message size overhead is way lower than data size).
+    msg1 = types.PubsubMessage(data=b"x" * 100)
+    msg2 = types.PubsubMessage(data=b"y" * 100)
+
+    flow_controller.add(msg1)
+    with pytest.raises(exceptions.FlowControlLimitError) as error:
+        flow_controller.add(msg2)
+
+    total_size = msg1.ByteSize() + msg2.ByteSize()
+    expected_info = "bytes: {} / 199".format(total_size)
+    assert expected_info in str(error.value)
+
+
+def test_no_error_on_moderate_message_flow():
+    settings = types.PublishFlowControl(
+        message_limit=2,
+        byte_limit=250,
+        limit_exceeded_behavior=types.LimitExceededBehavior.ERROR,
+    )
+    flow_controller = FlowController(settings)
+
+    msg1 = types.PubsubMessage(data=b"x" * 100)
+    msg2 = types.PubsubMessage(data=b"y" * 100)
+    msg3 = types.PubsubMessage(data=b"z" * 100)
+
+    # The flow control settings will accept two in-flight messages, but not three.
+    # If releasing messages works correctly, the sequence below will not raise errors.
+    flow_controller.add(msg1)
+    flow_controller.add(msg2)
+    flow_controller.release(msg1)
+    flow_controller.add(msg3)
+    flow_controller.release(msg2)
+    flow_controller.release(msg3)
+
+
+def test_rejected_messages_do_not_increase_total_load():
+    settings = types.PublishFlowControl(
+        message_limit=1,
+        byte_limit=150,
+        limit_exceeded_behavior=types.LimitExceededBehavior.ERROR,
+    )
+    flow_controller = FlowController(settings)
+
+    msg1 = types.PubsubMessage(data=b"x" * 100)
+    msg2 = types.PubsubMessage(data=b"y" * 100)
+
+    flow_controller.add(msg1)
+
+    for _ in range(5):
+        with pytest.raises(exceptions.FlowControlLimitError):
+            flow_controller.add(types.PubsubMessage(data=b"z" * 100))
+
+    # After releasing a message we should again be able to add another one, despite
+    # previously trying to add a lot of other messages.
+    flow_controller.release(msg1)
+    flow_controller.add(msg2)
+
+
+def test_incorrectly_releasing_too_many_messages():
+    settings = types.PublishFlowControl(
+        message_limit=1,
+        byte_limit=150,
+        limit_exceeded_behavior=types.LimitExceededBehavior.ERROR,
+    )
+    flow_controller = FlowController(settings)
+
+    msg1 = types.PubsubMessage(data=b"x" * 100)
+    msg2 = types.PubsubMessage(data=b"y" * 100)
+    msg3 = types.PubsubMessage(data=b"z" * 100)
+
+    # Releasing a message that would make the load negative should result in a warning.
+    with warnings.catch_warnings(record=True) as warned:
+        flow_controller.release(msg1)
+
+    assert len(warned) == 1
+    assert issubclass(warned[0].category, RuntimeWarning)
+    warning_msg = str(warned[0].message)
+    assert "never added or already released" in warning_msg
+
+    # Incorrectly removing a message does not mess up internal stats, we can
+    # still only add a single message at a time to this flow.
+    flow_controller.add(msg2)
+
+    with pytest.raises(exceptions.FlowControlLimitError) as error:
+        flow_controller.add(msg3)
+
+    error_msg = str(error.value)
+    assert "messages: 2 / 1" in error_msg
+    total_size = msg2.ByteSize() + msg3.ByteSize()
+    expected_size_info = "bytes: {} / 150".format(total_size)
+    assert expected_size_info in error_msg
+
+
+def test_blocking_on_overflow_until_free_capacity():
+    settings = types.PublishFlowControl(
+        message_limit=1,
+        byte_limit=150,
+        limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK,
+    )
+    flow_controller = FlowController(settings)
+
+    msg1 = types.PubsubMessage(data=b"x" * 100)
+    msg2 = types.PubsubMessage(data=b"y" * 100)
+    msg3 = types.PubsubMessage(data=b"z" * 100)
+    msg4 = types.PubsubMessage(data=b"w" * 100)
+
+    # If there is a concurrency bug in FlowController, we do not want to block
+    # the main thread running the tests, thus we delegate all add/release
+    # operations to daemon threads and check the outcome (blocked/not blocked)
+    # through Events.
+    adding_1_done = threading.Event()
+    adding_2_done = threading.Event()
+    adding_3_done = threading.Event()
+    adding_4_done = threading.Event()
+    releasing_1_done = threading.Event()
+    releasing_x_done = threading.Event()
+
+    # Adding a message with free capacity should not block.
+    _run_in_daemon(flow_controller, "add", [msg1], adding_1_done)
+    if not adding_1_done.wait(timeout=0.1):
+        pytest.fail("Adding a message with enough flow capacity blocked or errored.")
+
+    # Adding messages when there is not enough capacity should block, even if
+    # added through multiple threads.
+    _run_in_daemon(flow_controller, "add", [msg2], adding_2_done)
+    if adding_2_done.wait(timeout=0.1):
+        pytest.fail("Adding a message on overflow did not block.")
+
+    _run_in_daemon(flow_controller, "add", [msg3], adding_3_done)
+    if adding_3_done.wait(timeout=0.1):
+        pytest.fail("Adding a message on overflow did not block.")
+
+    _run_in_daemon(flow_controller, "add", [msg4], adding_4_done)
+    if adding_4_done.wait(timeout=0.1):
+        pytest.fail("Adding a message on overflow did not block.")
+
+    # After releasing one message, there should be room for a new message, which
+    # should result in unblocking one of the waiting threads.
+    _run_in_daemon(flow_controller, "release", [msg1], releasing_1_done)
+    if not releasing_1_done.wait(timeout=0.1):
+        pytest.fail("Releasing a message blocked or errored.")
+
+    done_status = [
+        adding_2_done.wait(timeout=0.1),
+        adding_3_done.wait(timeout=0.1),
+        adding_4_done.wait(timeout=0.1),
+    ]
+
+    # In sum() we use the fact that True==1 and False==0, and that Event.wait()
+    # returns False only if it times out, i.e. its internal flag has not been set.
+    done_count = sum(done_status)
+    assert done_count == 1, "Exactly one thread should have been unblocked."
+
+    # Release another message and verify that yet another thread gets unblocked.
+    added_msg = [msg2, msg3, msg4][done_status.index(True)]
+    _run_in_daemon(flow_controller, "release", [added_msg], releasing_x_done)
+
+    if not releasing_x_done.wait(timeout=0.1):
+        pytest.fail("Releasing messages blocked or errored.")
+
+    released_count = sum(
+        (
+            adding_2_done.wait(timeout=0.1),
+            adding_3_done.wait(timeout=0.1),
+            adding_4_done.wait(timeout=0.1),
+        )
+    )
+    assert released_count == 2, "Exactly two threads should have been unblocked."
+
+
+def test_error_if_mesage_would_block_indefinitely():
+    settings = types.PublishFlowControl(
+        message_limit=0,  # simulate non-sane settings
+        byte_limit=1,
+        limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK,
+    )
+    flow_controller = FlowController(settings)
+
+    msg = types.PubsubMessage(data=b"xyz")
+    adding_done = threading.Event()
+    error_event = threading.Event()
+
+    _run_in_daemon(flow_controller, "add", [msg], adding_done, error_event=error_event)
+
+    assert error_event.wait(timeout=0.1), "No error on adding too large a message."
+
+    # Now that we know that an error occurs, we can check its type directly
+    # without the fear of blocking indefinitely.
+    flow_controller = FlowController(settings)  # we want a fresh controller
+    with pytest.raises(exceptions.FlowControlLimitError) as error_info:
+        flow_controller.add(msg)
+
+    error_msg = str(error_info.value)
+    assert "would block forever" in error_msg
+    assert "messages: 1 / 0" in error_msg
+    assert "bytes: {} / 1".format(msg.ByteSize()) in error_msg
+
+
+def test_threads_posting_large_messages_do_not_starve():
+    settings = types.PublishFlowControl(
+        message_limit=100,
+        byte_limit=110,
+        limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK,
+    )
+    flow_controller = FlowController(settings)
+
+    large_msg = types.PubsubMessage(data=b"x" * 100)  # close to entire byte limit
+
+    adding_initial_done = threading.Event()
+    adding_large_done = threading.Event()
+    adding_busy_done = threading.Event()
+    releasing_busy_done = threading.Event()
+    releasing_large_done = threading.Event()
+
+    # Occupy some of the flow capacity, then try to add a large message. Releasing
+    # enough messages should eventually allow the large message to come through, even
+    # if more messages are added after it (those should wait for the large message).
+    initial_messages = [types.PubsubMessage(data=b"x" * 10)] * 5
+    _run_in_daemon(flow_controller, "add", initial_messages, adding_initial_done)
+    assert adding_initial_done.wait(timeout=0.1)
+
+    _run_in_daemon(flow_controller, "add", [large_msg], adding_large_done)
+
+    # Continuously keep adding more messages after the large one.
+    messages = [types.PubsubMessage(data=b"x" * 10)] * 10
+    _run_in_daemon(flow_controller, "add", messages, adding_busy_done, action_pause=0.1)
+
+    # At the same time, gradually keep releasing the messages - the freeed up
+    # capacity should be consumed by the large message, not the other small messages
+    # being added after it.
+    _run_in_daemon(
+        flow_controller, "release", messages, releasing_busy_done, action_pause=0.1
+    )
+
+    # Sanity check - releasing should have completed by now.
+    if not releasing_busy_done.wait(timeout=1.1):
+        pytest.fail("Releasing messages blocked or errored.")
+
+    # Enough messages released, the large message should have come through in
+    # the meantime.
+    if not adding_large_done.wait(timeout=0.1):
+        pytest.fail("A thread adding a large message starved.")
+
+    if adding_busy_done.wait(timeout=0.1):
+        pytest.fail("Adding multiple small messages did not block.")
+
+    # Releasing the large message should unblock adding the remaining "busy" messages
+    # that have not been added yet.
+    _run_in_daemon(flow_controller, "release", [large_msg], releasing_large_done)
+    if not releasing_large_done.wait(timeout=0.1):
+        pytest.fail("Releasing a message blocked or errored.")
+
+    if not adding_busy_done.wait(timeout=1.0):
+        pytest.fail("Adding messages blocked or errored.")
+
+
+def test_warning_on_internal_reservation_stats_error_when_unblocking():
+    settings = types.PublishFlowControl(
+        message_limit=1,
+        byte_limit=150,
+        limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK,
+    )
+    flow_controller = FlowController(settings)
+
+    msg1 = types.PubsubMessage(data=b"x" * 100)
+    msg2 = types.PubsubMessage(data=b"y" * 100)
+
+    # If there is a concurrency bug in FlowController, we do not want to block
+    # the main thread running the tests, thus we delegate all add/release
+    # operations to daemon threads and check the outcome (blocked/not blocked)
+    # through Events.
+    adding_1_done = threading.Event()
+    adding_2_done = threading.Event()
+    releasing_1_done = threading.Event()
+
+    # Adding a message with free capacity should not block.
+    _run_in_daemon(flow_controller, "add", [msg1], adding_1_done)
+    if not adding_1_done.wait(timeout=0.1):
+        pytest.fail("Adding a message with enough flow capacity blocked or errored.")
+
+    # Adding messages when there is not enough capacity should block, even if
+    # added through multiple threads.
+    _run_in_daemon(flow_controller, "add", [msg2], adding_2_done)
+    if adding_2_done.wait(timeout=0.1):
+        pytest.fail("Adding a message on overflow did not block.")
+
+    # Intentionally corrupt internal stats
+    reservation = next(iter(flow_controller._byte_reservations.values()), None)
+    assert reservation is not None, "No messages blocked by flow controller."
+    reservation.reserved = reservation.needed + 1
+
+    with warnings.catch_warnings(record=True) as warned:
+        _run_in_daemon(flow_controller, "release", [msg1], releasing_1_done)
+        if not releasing_1_done.wait(timeout=0.1):
+            pytest.fail("Releasing a message blocked or errored.")
+
+    matches = [warning for warning in warned if warning.category is RuntimeWarning]
+    assert len(matches) == 1
+    assert "too many bytes reserved" in str(matches[0].message).lower()
diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py
index 4ca979892..4e3a3870f 100644
--- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py
+++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py
@@ -25,6 +25,7 @@
 from google.cloud.pubsub_v1 import publisher
 from google.cloud.pubsub_v1 import types
 
+from google.cloud.pubsub_v1.publisher import exceptions
 from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer
 
 
@@ -125,11 +126,17 @@ def test_publish():
     creds = mock.Mock(spec=credentials.Credentials)
     client = publisher.Client(credentials=creds)
 
+    future1 = mock.sentinel.future1
+    future2 = mock.sentinel.future2
+    future1.add_done_callback = mock.Mock(spec=["__call__"])
+    future2.add_done_callback = mock.Mock(spec=["__call__"])
+
     # Use a mock in lieu of the actual batch class.
     batch = mock.Mock(spec=client._batch_class)
+
     # Set the mock up to claim indiscriminately that it accepts all messages.
     batch.will_accept.return_value = True
-    batch.publish.side_effect = (mock.sentinel.future1, mock.sentinel.future2)
+    batch.publish.side_effect = (future1, future2)
 
     topic = "topic/path"
     client._set_batch(topic, batch)
@@ -150,6 +157,30 @@ def test_publish():
     )
 
 
+def test_publish_error_exceeding_flow_control_limits():
+    creds = mock.Mock(spec=credentials.Credentials)
+    publisher_options = types.PublisherOptions(
+        flow_control=types.PublishFlowControl(
+            message_limit=10,
+            byte_limit=150,
+            limit_exceeded_behavior=types.LimitExceededBehavior.ERROR,
+        )
+    )
+    client = publisher.Client(credentials=creds, publisher_options=publisher_options)
+
+    mock_batch = mock.Mock(spec=client._batch_class)
+    mock_batch.will_accept.return_value = True
+    topic = "topic/path"
+    client._set_batch(topic, mock_batch)
+
+    future1 = client.publish(topic, b"a" * 100)
+    future2 = client.publish(topic, b"b" * 100)
+
+    future1.result()  # no error, still within flow control limits
+    with pytest.raises(exceptions.FlowControlLimitError):
+        future2.result()
+
+
 def test_publish_data_not_bytestring_error():
     creds = mock.Mock(spec=credentials.Credentials)
     client = publisher.Client(credentials=creds)
@@ -208,10 +239,13 @@ def test_publish_new_batch_needed():
     # Use mocks in lieu of the actual batch class.
     batch1 = mock.Mock(spec=client._batch_class)
     batch2 = mock.Mock(spec=client._batch_class)
+
     # Set the first mock up to claim indiscriminately that it rejects all
     # messages and the second accepts all.
+    future = mock.sentinel.future
+    future.add_done_callback = mock.Mock(spec=["__call__"])
     batch1.publish.return_value = None
-    batch2.publish.return_value = mock.sentinel.future
+    batch2.publish.return_value = future
 
     topic = "topic/path"
     client._set_batch(topic, batch1)
@@ -390,9 +424,15 @@ def test_publish_with_ordering_key():
 
     # Use a mock in lieu of the actual batch class.
     batch = mock.Mock(spec=client._batch_class)
+
     # Set the mock up to claim indiscriminately that it accepts all messages.
+    future1 = mock.sentinel.future1
+    future2 = mock.sentinel.future2
+    future1.add_done_callback = mock.Mock(spec=["__call__"])
+    future2.add_done_callback = mock.Mock(spec=["__call__"])
+
     batch.will_accept.return_value = True
-    batch.publish.side_effect = (mock.sentinel.future1, mock.sentinel.future2)
+    batch.publish.side_effect = (future1, future2)
 
     topic = "topic/path"
     ordering_key = "k1"