diff --git a/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer/__init__.py b/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer/__init__.py
index bcde8ce8a3e4..2aed6efc9c5d 100644
--- a/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer/__init__.py
+++ b/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer/__init__.py
@@ -38,6 +38,9 @@
 from google.cloud.bigquery_datatransfer_v1.types.datatransfer import (
     DeleteTransferRunRequest,
 )
+from google.cloud.bigquery_datatransfer_v1.types.datatransfer import (
+    EnrollDataSourcesRequest,
+)
 from google.cloud.bigquery_datatransfer_v1.types.datatransfer import (
     GetDataSourceRequest,
 )
@@ -91,6 +94,7 @@
 from google.cloud.bigquery_datatransfer_v1.types.transfer import TransferConfig
 from google.cloud.bigquery_datatransfer_v1.types.transfer import TransferMessage
 from google.cloud.bigquery_datatransfer_v1.types.transfer import TransferRun
+from google.cloud.bigquery_datatransfer_v1.types.transfer import UserInfo
 from google.cloud.bigquery_datatransfer_v1.types.transfer import TransferState
 from google.cloud.bigquery_datatransfer_v1.types.transfer import TransferType
 
@@ -104,6 +108,7 @@
     "DataSourceParameter",
     "DeleteTransferConfigRequest",
     "DeleteTransferRunRequest",
+    "EnrollDataSourcesRequest",
     "GetDataSourceRequest",
     "GetTransferConfigRequest",
     "GetTransferRunRequest",
@@ -125,6 +130,7 @@
     "TransferConfig",
     "TransferMessage",
     "TransferRun",
+    "UserInfo",
     "TransferState",
     "TransferType",
 )
diff --git a/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/__init__.py b/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/__init__.py
index 1c6f0fc8ca37..bb7dd5853177 100644
--- a/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/__init__.py
+++ b/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/__init__.py
@@ -24,6 +24,7 @@
 from .types.datatransfer import DataSourceParameter
 from .types.datatransfer import DeleteTransferConfigRequest
 from .types.datatransfer import DeleteTransferRunRequest
+from .types.datatransfer import EnrollDataSourcesRequest
 from .types.datatransfer import GetDataSourceRequest
 from .types.datatransfer import GetTransferConfigRequest
 from .types.datatransfer import GetTransferRunRequest
@@ -45,6 +46,7 @@
 from .types.transfer import TransferConfig
 from .types.transfer import TransferMessage
 from .types.transfer import TransferRun
+from .types.transfer import UserInfo
 from .types.transfer import TransferState
 from .types.transfer import TransferType
 
@@ -59,6 +61,7 @@
     "DeleteTransferConfigRequest",
     "DeleteTransferRunRequest",
     "EmailPreferences",
+    "EnrollDataSourcesRequest",
     "GetDataSourceRequest",
     "GetTransferConfigRequest",
     "GetTransferRunRequest",
@@ -81,4 +84,5 @@
     "TransferState",
     "TransferType",
     "UpdateTransferConfigRequest",
+    "UserInfo",
 )
diff --git a/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/gapic_metadata.json b/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/gapic_metadata.json
index 75ee9340a410..3b914fe7c9fb 100644
--- a/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/gapic_metadata.json
+++ b/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/gapic_metadata.json
@@ -30,6 +30,11 @@
                 "delete_transfer_run"
               ]
             },
+            "EnrollDataSources": {
+              "methods": [
+                "enroll_data_sources"
+              ]
+            },
             "GetDataSource": {
               "methods": [
                 "get_data_source"
@@ -105,6 +110,11 @@
                 "delete_transfer_run"
               ]
             },
+            "EnrollDataSources": {
+              "methods": [
+                "enroll_data_sources"
+              ]
+            },
             "GetDataSource": {
               "methods": [
                 "get_data_source"
diff --git a/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/services/data_transfer_service/async_client.py b/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/services/data_transfer_service/async_client.py
index 76ce97508751..6c45ebbc43dc 100644
--- a/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/services/data_transfer_service/async_client.py
+++ b/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/services/data_transfer_service/async_client.py
@@ -16,7 +16,7 @@
 from collections import OrderedDict
 import functools
 import re
-from typing import Dict, Sequence, Tuple, Type, Union
+from typing import Dict, Optional, Sequence, Tuple, Type, Union
 import pkg_resources
 import warnings
 
@@ -46,10 +46,8 @@
 
 
 class DataTransferServiceAsyncClient:
-    """The Google BigQuery Data Transfer Service API enables
-    BigQuery users to configure the transfer of their data from
-    other Google Products into BigQuery. This service contains
-    methods that are end user exposed. It backs up the frontend.
+    """This API allows users to manage their data transfers into
+    BigQuery.
     """
 
     _client: DataTransferServiceClient
@@ -125,6 +123,42 @@ def from_service_account_file(cls, filename: str, *args, **kwargs):
 
     from_service_account_json = from_service_account_file
 
+    @classmethod
+    def get_mtls_endpoint_and_cert_source(
+        cls, client_options: Optional[ClientOptions] = None
+    ):
+        """Return the API endpoint and client cert source for mutual TLS.
+
+        The client cert source is determined in the following order:
+        (1) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not "true", the
+        client cert source is None.
+        (2) if `client_options.client_cert_source` is provided, use the provided one; if the
+        default client cert source exists, use the default one; otherwise the client cert
+        source is None.
+
+        The API endpoint is determined in the following order:
+        (1) if `client_options.api_endpoint` if provided, use the provided one.
+        (2) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is "always", use the
+        default mTLS endpoint; if the environment variabel is "never", use the default API
+        endpoint; otherwise if client cert source exists, use the default mTLS endpoint, otherwise
+        use the default API endpoint.
+
+        More details can be found at https://google.aip.dev/auth/4114.
+
+        Args:
+            client_options (google.api_core.client_options.ClientOptions): Custom options for the
+                client. Only the `api_endpoint` and `client_cert_source` properties may be used
+                in this method.
+
+        Returns:
+            Tuple[str, Callable[[], Tuple[bytes, bytes]]]: returns the API endpoint and the
+                client cert source to use.
+
+        Raises:
+            google.auth.exceptions.MutualTLSChannelError: If any errors happen.
+        """
+        return DataTransferServiceClient.get_mtls_endpoint_and_cert_source(client_options)  # type: ignore
+
     @property
     def transport(self) -> DataTransferServiceTransport:
         """Returns the transport used by the client instance.
@@ -196,7 +230,7 @@ async def get_data_source(
         metadata: Sequence[Tuple[str, str]] = (),
     ) -> datatransfer.DataSource:
         r"""Retrieves a supported data source and returns its
-        settings, which can be used for UI rendering.
+        settings.
 
         Args:
             request (Union[google.cloud.bigquery_datatransfer_v1.types.GetDataSourceRequest, dict]):
@@ -219,13 +253,12 @@ async def get_data_source(
 
         Returns:
             google.cloud.bigquery_datatransfer_v1.types.DataSource:
-                Represents data source metadata.
-                Metadata is sufficient to render UI and
-                request proper OAuth tokens.
+                Defines the properties and custom
+                parameters for a data source.
 
         """
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([name])
         if request is not None and has_flattened_params:
@@ -281,7 +314,7 @@ async def list_data_sources(
         metadata: Sequence[Tuple[str, str]] = (),
     ) -> pagers.ListDataSourcesAsyncPager:
         r"""Lists supported data sources and returns their
-        settings, which can be used for UI rendering.
+        settings.
 
         Args:
             request (Union[google.cloud.bigquery_datatransfer_v1.types.ListDataSourcesRequest, dict]):
@@ -312,7 +345,7 @@ async def list_data_sources(
 
         """
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([parent])
         if request is not None and has_flattened_params:
@@ -423,7 +456,7 @@ async def create_transfer_config(
 
         """
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([parent, transfer_config])
         if request is not None and has_flattened_params:
@@ -513,7 +546,7 @@ async def update_transfer_config(
 
         """
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([transfer_config, update_mask])
         if request is not None and has_flattened_params:
@@ -562,8 +595,8 @@ async def delete_transfer_config(
         timeout: float = None,
         metadata: Sequence[Tuple[str, str]] = (),
     ) -> None:
-        r"""Deletes a data transfer configuration,
-        including any associated transfer runs and logs.
+        r"""Deletes a data transfer configuration, including any
+        associated transfer runs and logs.
 
         Args:
             request (Union[google.cloud.bigquery_datatransfer_v1.types.DeleteTransferConfigRequest, dict]):
@@ -586,7 +619,7 @@ async def delete_transfer_config(
                 sent along with the request as metadata.
         """
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([name])
         if request is not None and has_flattened_params:
@@ -674,7 +707,7 @@ async def get_transfer_config(
 
         """
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([name])
         if request is not None and has_flattened_params:
@@ -760,7 +793,7 @@ async def list_transfer_configs(
 
         """
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([parent])
         if request is not None and has_flattened_params:
@@ -873,7 +906,7 @@ async def schedule_transfer_runs(
         )
 
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([parent, start_time, end_time])
         if request is not None and has_flattened_params:
@@ -1002,7 +1035,7 @@ async def get_transfer_run(
                 Represents a data transfer run.
         """
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([name])
         if request is not None and has_flattened_params:
@@ -1080,7 +1113,7 @@ async def delete_transfer_run(
                 sent along with the request as metadata.
         """
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([name])
         if request is not None and has_flattened_params:
@@ -1134,14 +1167,13 @@ async def list_transfer_runs(
         timeout: float = None,
         metadata: Sequence[Tuple[str, str]] = (),
     ) -> pagers.ListTransferRunsAsyncPager:
-        r"""Returns information about running and completed jobs.
+        r"""Returns information about running and completed
+        transfer runs.
 
         Args:
             request (Union[google.cloud.bigquery_datatransfer_v1.types.ListTransferRunsRequest, dict]):
                 The request object. A request to list data transfer
-                runs. UI can use this method to show/filter specific
-                data transfer runs. The data source can use this method
-                to request all scheduled transfer runs.
+                runs.
             parent (:class:`str`):
                 Required. Name of transfer configuration for which
                 transfer runs should be retrieved. Format of transfer
@@ -1168,7 +1200,7 @@ async def list_transfer_runs(
 
         """
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([parent])
         if request is not None and has_flattened_params:
@@ -1229,8 +1261,7 @@ async def list_transfer_logs(
         timeout: float = None,
         metadata: Sequence[Tuple[str, str]] = (),
     ) -> pagers.ListTransferLogsAsyncPager:
-        r"""Returns user facing log messages for the data
-        transfer run.
+        r"""Returns log messages for the transfer run.
 
         Args:
             request (Union[google.cloud.bigquery_datatransfer_v1.types.ListTransferLogsRequest, dict]):
@@ -1261,7 +1292,7 @@ async def list_transfer_logs(
 
         """
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([parent])
         if request is not None and has_flattened_params:
@@ -1324,11 +1355,6 @@ async def check_valid_creds(
     ) -> datatransfer.CheckValidCredsResponse:
         r"""Returns true if valid credentials exist for the given
         data source and requesting user.
-        Some data sources doesn't support service account, so we
-        need to talk to them on behalf of the end user. This API
-        just checks whether we have OAuth token for the
-        particular user, which is a pre-requisite before user
-        can create a transfer config.
 
         Args:
             request (Union[google.cloud.bigquery_datatransfer_v1.types.CheckValidCredsRequest, dict]):
@@ -1362,7 +1388,7 @@ async def check_valid_creds(
 
         """
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([name])
         if request is not None and has_flattened_params:
@@ -1408,6 +1434,56 @@ async def check_valid_creds(
         # Done; return the response.
         return response
 
+    async def enroll_data_sources(
+        self,
+        request: Union[datatransfer.EnrollDataSourcesRequest, dict] = None,
+        *,
+        retry: OptionalRetry = gapic_v1.method.DEFAULT,
+        timeout: float = None,
+        metadata: Sequence[Tuple[str, str]] = (),
+    ) -> None:
+        r"""Enroll data sources in a user project. This allows
+        users to create transfer configurations for these data
+        sources. They will also appear in the ListDataSources
+        RPC and as such, will appear in the BigQuery UI
+        'https://bigquery.cloud.google.com' (and the documents
+        can be found at
+        https://cloud.google.com/bigquery/bigquery-web-ui and
+        https://cloud.google.com/bigquery/docs/working-with-transfers).
+
+        Args:
+            request (Union[google.cloud.bigquery_datatransfer_v1.types.EnrollDataSourcesRequest, dict]):
+                The request object. A request to enroll a set of data
+                sources so they are visible in the BigQuery UI's
+                `Transfer` tab.
+            retry (google.api_core.retry.Retry): Designation of what errors, if any,
+                should be retried.
+            timeout (float): The timeout for this request.
+            metadata (Sequence[Tuple[str, str]]): Strings which should be
+                sent along with the request as metadata.
+        """
+        # Create or coerce a protobuf request object.
+        request = datatransfer.EnrollDataSourcesRequest(request)
+
+        # Wrap the RPC method; this adds retry and timeout information,
+        # and friendly error handling.
+        rpc = gapic_v1.method_async.wrap_method(
+            self._client._transport.enroll_data_sources,
+            default_timeout=None,
+            client_info=DEFAULT_CLIENT_INFO,
+        )
+
+        # Certain fields should be provided within the metadata header;
+        # add these here.
+        metadata = tuple(metadata) + (
+            gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
+        )
+
+        # Send the request.
+        await rpc(
+            request, retry=retry, timeout=timeout, metadata=metadata,
+        )
+
     async def __aenter__(self):
         return self
 
diff --git a/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/services/data_transfer_service/client.py b/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/services/data_transfer_service/client.py
index c60767dade00..785f38c4f020 100644
--- a/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/services/data_transfer_service/client.py
+++ b/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/services/data_transfer_service/client.py
@@ -84,10 +84,8 @@ def get_transport_class(
 
 
 class DataTransferServiceClient(metaclass=DataTransferServiceClientMeta):
-    """The Google BigQuery Data Transfer Service API enables
-    BigQuery users to configure the transfer of their data from
-    other Google Products into BigQuery. This service contains
-    methods that are end user exposed. It backs up the frontend.
+    """This API allows users to manage their data transfers into
+    BigQuery.
     """
 
     @staticmethod
@@ -278,6 +276,73 @@ def parse_common_location_path(path: str) -> Dict[str, str]:
         m = re.match(r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)$", path)
         return m.groupdict() if m else {}
 
+    @classmethod
+    def get_mtls_endpoint_and_cert_source(
+        cls, client_options: Optional[client_options_lib.ClientOptions] = None
+    ):
+        """Return the API endpoint and client cert source for mutual TLS.
+
+        The client cert source is determined in the following order:
+        (1) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not "true", the
+        client cert source is None.
+        (2) if `client_options.client_cert_source` is provided, use the provided one; if the
+        default client cert source exists, use the default one; otherwise the client cert
+        source is None.
+
+        The API endpoint is determined in the following order:
+        (1) if `client_options.api_endpoint` if provided, use the provided one.
+        (2) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is "always", use the
+        default mTLS endpoint; if the environment variabel is "never", use the default API
+        endpoint; otherwise if client cert source exists, use the default mTLS endpoint, otherwise
+        use the default API endpoint.
+
+        More details can be found at https://google.aip.dev/auth/4114.
+
+        Args:
+            client_options (google.api_core.client_options.ClientOptions): Custom options for the
+                client. Only the `api_endpoint` and `client_cert_source` properties may be used
+                in this method.
+
+        Returns:
+            Tuple[str, Callable[[], Tuple[bytes, bytes]]]: returns the API endpoint and the
+                client cert source to use.
+
+        Raises:
+            google.auth.exceptions.MutualTLSChannelError: If any errors happen.
+        """
+        if client_options is None:
+            client_options = client_options_lib.ClientOptions()
+        use_client_cert = os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false")
+        use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto")
+        if use_client_cert not in ("true", "false"):
+            raise ValueError(
+                "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`"
+            )
+        if use_mtls_endpoint not in ("auto", "never", "always"):
+            raise MutualTLSChannelError(
+                "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`"
+            )
+
+        # Figure out the client cert source to use.
+        client_cert_source = None
+        if use_client_cert == "true":
+            if client_options.client_cert_source:
+                client_cert_source = client_options.client_cert_source
+            elif mtls.has_default_client_cert_source():
+                client_cert_source = mtls.default_client_cert_source()
+
+        # Figure out which api endpoint to use.
+        if client_options.api_endpoint is not None:
+            api_endpoint = client_options.api_endpoint
+        elif use_mtls_endpoint == "always" or (
+            use_mtls_endpoint == "auto" and client_cert_source
+        ):
+            api_endpoint = cls.DEFAULT_MTLS_ENDPOINT
+        else:
+            api_endpoint = cls.DEFAULT_ENDPOINT
+
+        return api_endpoint, client_cert_source
+
     def __init__(
         self,
         *,
@@ -328,57 +393,22 @@ def __init__(
         if client_options is None:
             client_options = client_options_lib.ClientOptions()
 
-        # Create SSL credentials for mutual TLS if needed.
-        if os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false") not in (
-            "true",
-            "false",
-        ):
-            raise ValueError(
-                "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`"
-            )
-        use_client_cert = (
-            os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false") == "true"
+        api_endpoint, client_cert_source_func = self.get_mtls_endpoint_and_cert_source(
+            client_options
         )
 
-        client_cert_source_func = None
-        is_mtls = False
-        if use_client_cert:
-            if client_options.client_cert_source:
-                is_mtls = True
-                client_cert_source_func = client_options.client_cert_source
-            else:
-                is_mtls = mtls.has_default_client_cert_source()
-                if is_mtls:
-                    client_cert_source_func = mtls.default_client_cert_source()
-                else:
-                    client_cert_source_func = None
-
-        # Figure out which api endpoint to use.
-        if client_options.api_endpoint is not None:
-            api_endpoint = client_options.api_endpoint
-        else:
-            use_mtls_env = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto")
-            if use_mtls_env == "never":
-                api_endpoint = self.DEFAULT_ENDPOINT
-            elif use_mtls_env == "always":
-                api_endpoint = self.DEFAULT_MTLS_ENDPOINT
-            elif use_mtls_env == "auto":
-                if is_mtls:
-                    api_endpoint = self.DEFAULT_MTLS_ENDPOINT
-                else:
-                    api_endpoint = self.DEFAULT_ENDPOINT
-            else:
-                raise MutualTLSChannelError(
-                    "Unsupported GOOGLE_API_USE_MTLS_ENDPOINT value. Accepted "
-                    "values: never, auto, always"
-                )
+        api_key_value = getattr(client_options, "api_key", None)
+        if api_key_value and credentials:
+            raise ValueError(
+                "client_options.api_key and credentials are mutually exclusive"
+            )
 
         # Save or instantiate the transport.
         # Ordinarily, we provide the transport, but allowing a custom transport
         # instance provides an extensibility point for unusual situations.
         if isinstance(transport, DataTransferServiceTransport):
             # transport is a DataTransferServiceTransport instance.
-            if credentials or client_options.credentials_file:
+            if credentials or client_options.credentials_file or api_key_value:
                 raise ValueError(
                     "When providing a transport instance, "
                     "provide its credentials directly."
@@ -390,6 +420,15 @@ def __init__(
                 )
             self._transport = transport
         else:
+            import google.auth._default  # type: ignore
+
+            if api_key_value and hasattr(
+                google.auth._default, "get_api_key_credentials"
+            ):
+                credentials = google.auth._default.get_api_key_credentials(
+                    api_key_value
+                )
+
             Transport = type(self).get_transport_class(transport)
             self._transport = Transport(
                 credentials=credentials,
@@ -412,7 +451,7 @@ def get_data_source(
         metadata: Sequence[Tuple[str, str]] = (),
     ) -> datatransfer.DataSource:
         r"""Retrieves a supported data source and returns its
-        settings, which can be used for UI rendering.
+        settings.
 
         Args:
             request (Union[google.cloud.bigquery_datatransfer_v1.types.GetDataSourceRequest, dict]):
@@ -435,13 +474,12 @@ def get_data_source(
 
         Returns:
             google.cloud.bigquery_datatransfer_v1.types.DataSource:
-                Represents data source metadata.
-                Metadata is sufficient to render UI and
-                request proper OAuth tokens.
+                Defines the properties and custom
+                parameters for a data source.
 
         """
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([name])
         if request is not None and has_flattened_params:
@@ -487,7 +525,7 @@ def list_data_sources(
         metadata: Sequence[Tuple[str, str]] = (),
     ) -> pagers.ListDataSourcesPager:
         r"""Lists supported data sources and returns their
-        settings, which can be used for UI rendering.
+        settings.
 
         Args:
             request (Union[google.cloud.bigquery_datatransfer_v1.types.ListDataSourcesRequest, dict]):
@@ -518,7 +556,7 @@ def list_data_sources(
 
         """
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([parent])
         if request is not None and has_flattened_params:
@@ -619,7 +657,7 @@ def create_transfer_config(
 
         """
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([parent, transfer_config])
         if request is not None and has_flattened_params:
@@ -709,7 +747,7 @@ def update_transfer_config(
 
         """
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([transfer_config, update_mask])
         if request is not None and has_flattened_params:
@@ -758,8 +796,8 @@ def delete_transfer_config(
         timeout: float = None,
         metadata: Sequence[Tuple[str, str]] = (),
     ) -> None:
-        r"""Deletes a data transfer configuration,
-        including any associated transfer runs and logs.
+        r"""Deletes a data transfer configuration, including any
+        associated transfer runs and logs.
 
         Args:
             request (Union[google.cloud.bigquery_datatransfer_v1.types.DeleteTransferConfigRequest, dict]):
@@ -782,7 +820,7 @@ def delete_transfer_config(
                 sent along with the request as metadata.
         """
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([name])
         if request is not None and has_flattened_params:
@@ -860,7 +898,7 @@ def get_transfer_config(
 
         """
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([name])
         if request is not None and has_flattened_params:
@@ -936,7 +974,7 @@ def list_transfer_configs(
 
         """
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([parent])
         if request is not None and has_flattened_params:
@@ -1039,7 +1077,7 @@ def schedule_transfer_runs(
         )
 
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([parent, start_time, end_time])
         if request is not None and has_flattened_params:
@@ -1171,7 +1209,7 @@ def get_transfer_run(
                 Represents a data transfer run.
         """
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([name])
         if request is not None and has_flattened_params:
@@ -1239,7 +1277,7 @@ def delete_transfer_run(
                 sent along with the request as metadata.
         """
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([name])
         if request is not None and has_flattened_params:
@@ -1283,14 +1321,13 @@ def list_transfer_runs(
         timeout: float = None,
         metadata: Sequence[Tuple[str, str]] = (),
     ) -> pagers.ListTransferRunsPager:
-        r"""Returns information about running and completed jobs.
+        r"""Returns information about running and completed
+        transfer runs.
 
         Args:
             request (Union[google.cloud.bigquery_datatransfer_v1.types.ListTransferRunsRequest, dict]):
                 The request object. A request to list data transfer
-                runs. UI can use this method to show/filter specific
-                data transfer runs. The data source can use this method
-                to request all scheduled transfer runs.
+                runs.
             parent (str):
                 Required. Name of transfer configuration for which
                 transfer runs should be retrieved. Format of transfer
@@ -1317,7 +1354,7 @@ def list_transfer_runs(
 
         """
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([parent])
         if request is not None and has_flattened_params:
@@ -1368,8 +1405,7 @@ def list_transfer_logs(
         timeout: float = None,
         metadata: Sequence[Tuple[str, str]] = (),
     ) -> pagers.ListTransferLogsPager:
-        r"""Returns user facing log messages for the data
-        transfer run.
+        r"""Returns log messages for the transfer run.
 
         Args:
             request (Union[google.cloud.bigquery_datatransfer_v1.types.ListTransferLogsRequest, dict]):
@@ -1400,7 +1436,7 @@ def list_transfer_logs(
 
         """
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([parent])
         if request is not None and has_flattened_params:
@@ -1453,11 +1489,6 @@ def check_valid_creds(
     ) -> datatransfer.CheckValidCredsResponse:
         r"""Returns true if valid credentials exist for the given
         data source and requesting user.
-        Some data sources doesn't support service account, so we
-        need to talk to them on behalf of the end user. This API
-        just checks whether we have OAuth token for the
-        particular user, which is a pre-requisite before user
-        can create a transfer config.
 
         Args:
             request (Union[google.cloud.bigquery_datatransfer_v1.types.CheckValidCredsRequest, dict]):
@@ -1491,7 +1522,7 @@ def check_valid_creds(
 
         """
         # Create or coerce a protobuf request object.
-        # Sanity check: If we got a request object, we should *not* have
+        # Quick check: If we got a request object, we should *not* have
         # gotten any keyword arguments that map to the request.
         has_flattened_params = any([name])
         if request is not None and has_flattened_params:
@@ -1527,6 +1558,57 @@ def check_valid_creds(
         # Done; return the response.
         return response
 
+    def enroll_data_sources(
+        self,
+        request: Union[datatransfer.EnrollDataSourcesRequest, dict] = None,
+        *,
+        retry: OptionalRetry = gapic_v1.method.DEFAULT,
+        timeout: float = None,
+        metadata: Sequence[Tuple[str, str]] = (),
+    ) -> None:
+        r"""Enroll data sources in a user project. This allows
+        users to create transfer configurations for these data
+        sources. They will also appear in the ListDataSources
+        RPC and as such, will appear in the BigQuery UI
+        'https://bigquery.cloud.google.com' (and the documents
+        can be found at
+        https://cloud.google.com/bigquery/bigquery-web-ui and
+        https://cloud.google.com/bigquery/docs/working-with-transfers).
+
+        Args:
+            request (Union[google.cloud.bigquery_datatransfer_v1.types.EnrollDataSourcesRequest, dict]):
+                The request object. A request to enroll a set of data
+                sources so they are visible in the BigQuery UI's
+                `Transfer` tab.
+            retry (google.api_core.retry.Retry): Designation of what errors, if any,
+                should be retried.
+            timeout (float): The timeout for this request.
+            metadata (Sequence[Tuple[str, str]]): Strings which should be
+                sent along with the request as metadata.
+        """
+        # Create or coerce a protobuf request object.
+        # Minor optimization to avoid making a copy if the user passes
+        # in a datatransfer.EnrollDataSourcesRequest.
+        # There's no risk of modifying the input as we've already verified
+        # there are no flattened fields.
+        if not isinstance(request, datatransfer.EnrollDataSourcesRequest):
+            request = datatransfer.EnrollDataSourcesRequest(request)
+
+        # Wrap the RPC method; this adds retry and timeout information,
+        # and friendly error handling.
+        rpc = self._transport._wrapped_methods[self._transport.enroll_data_sources]
+
+        # Certain fields should be provided within the metadata header;
+        # add these here.
+        metadata = tuple(metadata) + (
+            gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
+        )
+
+        # Send the request.
+        rpc(
+            request, retry=retry, timeout=timeout, metadata=metadata,
+        )
+
     def __enter__(self):
         return self
 
diff --git a/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/services/data_transfer_service/transports/base.py b/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/services/data_transfer_service/transports/base.py
index 2d662a5007a0..4120fab2a74d 100644
--- a/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/services/data_transfer_service/transports/base.py
+++ b/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/services/data_transfer_service/transports/base.py
@@ -292,6 +292,9 @@ def _prep_wrapped_messages(self, client_info):
                 default_timeout=20.0,
                 client_info=client_info,
             ),
+            self.enroll_data_sources: gapic_v1.method.wrap_method(
+                self.enroll_data_sources, default_timeout=None, client_info=client_info,
+            ),
         }
 
     def close(self):
@@ -450,5 +453,14 @@ def check_valid_creds(
     ]:
         raise NotImplementedError()
 
+    @property
+    def enroll_data_sources(
+        self,
+    ) -> Callable[
+        [datatransfer.EnrollDataSourcesRequest],
+        Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]],
+    ]:
+        raise NotImplementedError()
+
 
 __all__ = ("DataTransferServiceTransport",)
diff --git a/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/services/data_transfer_service/transports/grpc.py b/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/services/data_transfer_service/transports/grpc.py
index d00ec1040914..c58aa96d6679 100644
--- a/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/services/data_transfer_service/transports/grpc.py
+++ b/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/services/data_transfer_service/transports/grpc.py
@@ -33,10 +33,8 @@
 class DataTransferServiceGrpcTransport(DataTransferServiceTransport):
     """gRPC backend transport for DataTransferService.
 
-    The Google BigQuery Data Transfer Service API enables
-    BigQuery users to configure the transfer of their data from
-    other Google Products into BigQuery. This service contains
-    methods that are end user exposed. It backs up the frontend.
+    This API allows users to manage their data transfers into
+    BigQuery.
 
     This class defines the same methods as the primary client, so the
     primary client can load the underlying transport implementation
@@ -163,8 +161,11 @@ def __init__(
         if not self._grpc_channel:
             self._grpc_channel = type(self).create_channel(
                 self._host,
+                # use the credentials which are saved
                 credentials=self._credentials,
-                credentials_file=credentials_file,
+                # Set ``credentials_file`` to ``None`` here as
+                # the credentials that we saved earlier should be used.
+                credentials_file=None,
                 scopes=self._scopes,
                 ssl_credentials=self._ssl_channel_credentials,
                 quota_project_id=quota_project_id,
@@ -237,7 +238,7 @@ def get_data_source(
         r"""Return a callable for the get data source method over gRPC.
 
         Retrieves a supported data source and returns its
-        settings, which can be used for UI rendering.
+        settings.
 
         Returns:
             Callable[[~.GetDataSourceRequest],
@@ -266,7 +267,7 @@ def list_data_sources(
         r"""Return a callable for the list data sources method over gRPC.
 
         Lists supported data sources and returns their
-        settings, which can be used for UI rendering.
+        settings.
 
         Returns:
             Callable[[~.ListDataSourcesRequest],
@@ -345,8 +346,8 @@ def delete_transfer_config(
     ) -> Callable[[datatransfer.DeleteTransferConfigRequest], empty_pb2.Empty]:
         r"""Return a callable for the delete transfer config method over gRPC.
 
-        Deletes a data transfer configuration,
-        including any associated transfer runs and logs.
+        Deletes a data transfer configuration, including any
+        associated transfer runs and logs.
 
         Returns:
             Callable[[~.DeleteTransferConfigRequest],
@@ -548,7 +549,8 @@ def list_transfer_runs(
     ]:
         r"""Return a callable for the list transfer runs method over gRPC.
 
-        Returns information about running and completed jobs.
+        Returns information about running and completed
+        transfer runs.
 
         Returns:
             Callable[[~.ListTransferRunsRequest],
@@ -576,8 +578,7 @@ def list_transfer_logs(
     ]:
         r"""Return a callable for the list transfer logs method over gRPC.
 
-        Returns user facing log messages for the data
-        transfer run.
+        Returns log messages for the transfer run.
 
         Returns:
             Callable[[~.ListTransferLogsRequest],
@@ -607,11 +608,6 @@ def check_valid_creds(
 
         Returns true if valid credentials exist for the given
         data source and requesting user.
-        Some data sources doesn't support service account, so we
-        need to talk to them on behalf of the end user. This API
-        just checks whether we have OAuth token for the
-        particular user, which is a pre-requisite before user
-        can create a transfer config.
 
         Returns:
             Callable[[~.CheckValidCredsRequest],
@@ -631,6 +627,39 @@ def check_valid_creds(
             )
         return self._stubs["check_valid_creds"]
 
+    @property
+    def enroll_data_sources(
+        self,
+    ) -> Callable[[datatransfer.EnrollDataSourcesRequest], empty_pb2.Empty]:
+        r"""Return a callable for the enroll data sources method over gRPC.
+
+        Enroll data sources in a user project. This allows
+        users to create transfer configurations for these data
+        sources. They will also appear in the ListDataSources
+        RPC and as such, will appear in the BigQuery UI
+        'https://bigquery.cloud.google.com' (and the documents
+        can be found at
+        https://cloud.google.com/bigquery/bigquery-web-ui and
+        https://cloud.google.com/bigquery/docs/working-with-transfers).
+
+        Returns:
+            Callable[[~.EnrollDataSourcesRequest],
+                    ~.Empty]:
+                A function that, when called, will call the underlying RPC
+                on the server.
+        """
+        # Generate a "stub function" on-the-fly which will actually make
+        # the request.
+        # gRPC handles serialization and deserialization, so we just need
+        # to pass in the functions for each.
+        if "enroll_data_sources" not in self._stubs:
+            self._stubs["enroll_data_sources"] = self.grpc_channel.unary_unary(
+                "/google.cloud.bigquery.datatransfer.v1.DataTransferService/EnrollDataSources",
+                request_serializer=datatransfer.EnrollDataSourcesRequest.serialize,
+                response_deserializer=empty_pb2.Empty.FromString,
+            )
+        return self._stubs["enroll_data_sources"]
+
     def close(self):
         self.grpc_channel.close()
 
diff --git a/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/services/data_transfer_service/transports/grpc_asyncio.py b/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/services/data_transfer_service/transports/grpc_asyncio.py
index bcc8e7aafaa8..f13df3dd6bfb 100644
--- a/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/services/data_transfer_service/transports/grpc_asyncio.py
+++ b/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/services/data_transfer_service/transports/grpc_asyncio.py
@@ -34,10 +34,8 @@
 class DataTransferServiceGrpcAsyncIOTransport(DataTransferServiceTransport):
     """gRPC AsyncIO backend transport for DataTransferService.
 
-    The Google BigQuery Data Transfer Service API enables
-    BigQuery users to configure the transfer of their data from
-    other Google Products into BigQuery. This service contains
-    methods that are end user exposed. It backs up the frontend.
+    This API allows users to manage their data transfers into
+    BigQuery.
 
     This class defines the same methods as the primary client, so the
     primary client can load the underlying transport implementation
@@ -208,8 +206,11 @@ def __init__(
         if not self._grpc_channel:
             self._grpc_channel = type(self).create_channel(
                 self._host,
+                # use the credentials which are saved
                 credentials=self._credentials,
-                credentials_file=credentials_file,
+                # Set ``credentials_file`` to ``None`` here as
+                # the credentials that we saved earlier should be used.
+                credentials_file=None,
                 scopes=self._scopes,
                 ssl_credentials=self._ssl_channel_credentials,
                 quota_project_id=quota_project_id,
@@ -241,7 +242,7 @@ def get_data_source(
         r"""Return a callable for the get data source method over gRPC.
 
         Retrieves a supported data source and returns its
-        settings, which can be used for UI rendering.
+        settings.
 
         Returns:
             Callable[[~.GetDataSourceRequest],
@@ -271,7 +272,7 @@ def list_data_sources(
         r"""Return a callable for the list data sources method over gRPC.
 
         Lists supported data sources and returns their
-        settings, which can be used for UI rendering.
+        settings.
 
         Returns:
             Callable[[~.ListDataSourcesRequest],
@@ -356,8 +357,8 @@ def delete_transfer_config(
     ]:
         r"""Return a callable for the delete transfer config method over gRPC.
 
-        Deletes a data transfer configuration,
-        including any associated transfer runs and logs.
+        Deletes a data transfer configuration, including any
+        associated transfer runs and logs.
 
         Returns:
             Callable[[~.DeleteTransferConfigRequest],
@@ -564,7 +565,8 @@ def list_transfer_runs(
     ]:
         r"""Return a callable for the list transfer runs method over gRPC.
 
-        Returns information about running and completed jobs.
+        Returns information about running and completed
+        transfer runs.
 
         Returns:
             Callable[[~.ListTransferRunsRequest],
@@ -593,8 +595,7 @@ def list_transfer_logs(
     ]:
         r"""Return a callable for the list transfer logs method over gRPC.
 
-        Returns user facing log messages for the data
-        transfer run.
+        Returns log messages for the transfer run.
 
         Returns:
             Callable[[~.ListTransferLogsRequest],
@@ -625,11 +626,6 @@ def check_valid_creds(
 
         Returns true if valid credentials exist for the given
         data source and requesting user.
-        Some data sources doesn't support service account, so we
-        need to talk to them on behalf of the end user. This API
-        just checks whether we have OAuth token for the
-        particular user, which is a pre-requisite before user
-        can create a transfer config.
 
         Returns:
             Callable[[~.CheckValidCredsRequest],
@@ -649,6 +645,39 @@ def check_valid_creds(
             )
         return self._stubs["check_valid_creds"]
 
+    @property
+    def enroll_data_sources(
+        self,
+    ) -> Callable[[datatransfer.EnrollDataSourcesRequest], Awaitable[empty_pb2.Empty]]:
+        r"""Return a callable for the enroll data sources method over gRPC.
+
+        Enroll data sources in a user project. This allows
+        users to create transfer configurations for these data
+        sources. They will also appear in the ListDataSources
+        RPC and as such, will appear in the BigQuery UI
+        'https://bigquery.cloud.google.com' (and the documents
+        can be found at
+        https://cloud.google.com/bigquery/bigquery-web-ui and
+        https://cloud.google.com/bigquery/docs/working-with-transfers).
+
+        Returns:
+            Callable[[~.EnrollDataSourcesRequest],
+                    Awaitable[~.Empty]]:
+                A function that, when called, will call the underlying RPC
+                on the server.
+        """
+        # Generate a "stub function" on-the-fly which will actually make
+        # the request.
+        # gRPC handles serialization and deserialization, so we just need
+        # to pass in the functions for each.
+        if "enroll_data_sources" not in self._stubs:
+            self._stubs["enroll_data_sources"] = self.grpc_channel.unary_unary(
+                "/google.cloud.bigquery.datatransfer.v1.DataTransferService/EnrollDataSources",
+                request_serializer=datatransfer.EnrollDataSourcesRequest.serialize,
+                response_deserializer=empty_pb2.Empty.FromString,
+            )
+        return self._stubs["enroll_data_sources"]
+
     def close(self):
         return self.grpc_channel.close()
 
diff --git a/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/types/__init__.py b/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/types/__init__.py
index b79fc3ff37fe..178c2aa68920 100644
--- a/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/types/__init__.py
+++ b/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/types/__init__.py
@@ -21,6 +21,7 @@
     DataSourceParameter,
     DeleteTransferConfigRequest,
     DeleteTransferRunRequest,
+    EnrollDataSourcesRequest,
     GetDataSourceRequest,
     GetTransferConfigRequest,
     GetTransferRunRequest,
@@ -44,6 +45,7 @@
     TransferConfig,
     TransferMessage,
     TransferRun,
+    UserInfo,
     TransferState,
     TransferType,
 )
@@ -56,6 +58,7 @@
     "DataSourceParameter",
     "DeleteTransferConfigRequest",
     "DeleteTransferRunRequest",
+    "EnrollDataSourcesRequest",
     "GetDataSourceRequest",
     "GetTransferConfigRequest",
     "GetTransferRunRequest",
@@ -77,6 +80,7 @@
     "TransferConfig",
     "TransferMessage",
     "TransferRun",
+    "UserInfo",
     "TransferState",
     "TransferType",
 )
diff --git a/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/types/datatransfer.py b/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/types/datatransfer.py
index f47c64269133..04a8e5eb8926 100644
--- a/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/types/datatransfer.py
+++ b/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/types/datatransfer.py
@@ -48,17 +48,14 @@
         "ScheduleTransferRunsResponse",
         "StartManualTransferRunsRequest",
         "StartManualTransferRunsResponse",
+        "EnrollDataSourcesRequest",
     },
 )
 
 
 class DataSourceParameter(proto.Message):
-    r"""Represents a data source parameter with validation rules, so
-    that parameters can be rendered in the UI. These parameters are
-    given to us by supported data sources, and include all needed
-    information for rendering and validation.
-    Thus, whoever uses this api can decide to generate either
-    generic ui, or custom data source specific forms.
+    r"""A parameter used to define custom fields in a data source
+    definition.
 
     Attributes:
         param_id (str):
@@ -134,8 +131,8 @@ class Type(proto.Enum):
 
 
 class DataSource(proto.Message):
-    r"""Represents data source metadata. Metadata is sufficient to
-    render UI and request proper OAuth tokens.
+    r"""Defines the properties and custom parameters for a data
+    source.
 
     Attributes:
         name (str):
@@ -532,9 +529,7 @@ def raw_page(self):
 
 
 class ListTransferRunsRequest(proto.Message):
-    r"""A request to list data transfer runs. UI can use this method
-    to show/filter specific data transfer runs. The data source can
-    use this method to request all scheduled transfer runs.
+    r"""A request to list data transfer runs.
 
     Attributes:
         parent (str):
@@ -790,4 +785,21 @@ class StartManualTransferRunsResponse(proto.Message):
     runs = proto.RepeatedField(proto.MESSAGE, number=1, message=transfer.TransferRun,)
 
 
+class EnrollDataSourcesRequest(proto.Message):
+    r"""A request to enroll a set of data sources so they are visible in the
+    BigQuery UI's ``Transfer`` tab.
+
+    Attributes:
+        name (str):
+            The name of the project resource in the form:
+            ``projects/{project_id}``
+        data_source_ids (Sequence[str]):
+            Data sources that are enrolled. It is
+            required to provide at least one data source id.
+    """
+
+    name = proto.Field(proto.STRING, number=1,)
+    data_source_ids = proto.RepeatedField(proto.STRING, number=2,)
+
+
 __all__ = tuple(sorted(__protobuf__.manifest))
diff --git a/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/types/transfer.py b/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/types/transfer.py
index 432922145d3b..8128710102f9 100644
--- a/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/types/transfer.py
+++ b/packages/google-cloud-bigquery-datatransfer/google/cloud/bigquery_datatransfer_v1/types/transfer.py
@@ -27,6 +27,7 @@
         "TransferState",
         "EmailPreferences",
         "ScheduleOptions",
+        "UserInfo",
         "TransferConfig",
         "TransferRun",
         "TransferMessage",
@@ -98,6 +99,19 @@ class ScheduleOptions(proto.Message):
     end_time = proto.Field(proto.MESSAGE, number=2, message=timestamp_pb2.Timestamp,)
 
 
+class UserInfo(proto.Message):
+    r"""Information about a user.
+
+    Attributes:
+        email (str):
+            E-mail address of the user.
+
+            This field is a member of `oneof`_ ``_email``.
+    """
+
+    email = proto.Field(proto.STRING, number=1, optional=True,)
+
+
 class TransferConfig(proto.Message):
     r"""Represents a data transfer configuration. A transfer configuration
     contains all metadata needed to perform a data transfer. For
@@ -144,8 +158,10 @@ class TransferConfig(proto.Message):
             ``first sunday of quarter 00:00``. See more explanation
             about the format here:
             https://cloud.google.com/appengine/docs/flexible/python/scheduling-jobs-with-cron-yaml#the_schedule_format
-            NOTE: the granularity should be at least 8 hours, or less
-            frequent.
+
+            NOTE: The minimum interval time between recurring transfers
+            depends on the data source; refer to the documentation for
+            your data source.
         schedule_options (google.cloud.bigquery_datatransfer_v1.types.ScheduleOptions):
             Options customizing the data transfer
             schedule.
@@ -184,6 +200,14 @@ class TransferConfig(proto.Message):
             Email notifications will be sent according to
             these preferences to the email address of the
             user who owns this transfer config.
+        owner_info (google.cloud.bigquery_datatransfer_v1.types.UserInfo):
+            Output only. Information about the user whose credentials
+            are used to transfer data. Populated only for
+            ``transferConfigs.get`` requests. In case the user
+            information is not available, this field will not be
+            populated.
+
+            This field is a member of `oneof`_ ``_owner_info``.
     """
 
     name = proto.Field(proto.STRING, number=1,)
@@ -206,6 +230,9 @@ class TransferConfig(proto.Message):
     email_preferences = proto.Field(
         proto.MESSAGE, number=18, message="EmailPreferences",
     )
+    owner_info = proto.Field(
+        proto.MESSAGE, number=27, optional=True, message="UserInfo",
+    )
 
 
 class TransferRun(proto.Message):
@@ -243,8 +270,7 @@ class TransferRun(proto.Message):
             the 'Setting up a data transfer' section for
             each data source. For example the parameters for
             Cloud Storage transfers are listed here:
-            https://cloud.google.com/bigquery-
-            transfer/docs/cloud-storage-transfer#bq
+            https://cloud.google.com/bigquery-transfer/docs/cloud-storage-transfer#bq
         destination_dataset_id (str):
             Output only. The BigQuery target dataset id.
 
diff --git a/packages/google-cloud-bigquery-datatransfer/scripts/fixup_bigquery_datatransfer_v1_keywords.py b/packages/google-cloud-bigquery-datatransfer/scripts/fixup_bigquery_datatransfer_v1_keywords.py
index 0db416f046fb..d26de65952b1 100644
--- a/packages/google-cloud-bigquery-datatransfer/scripts/fixup_bigquery_datatransfer_v1_keywords.py
+++ b/packages/google-cloud-bigquery-datatransfer/scripts/fixup_bigquery_datatransfer_v1_keywords.py
@@ -43,6 +43,7 @@ class bigquery_datatransferCallTransformer(cst.CSTTransformer):
         'create_transfer_config': ('parent', 'transfer_config', 'authorization_code', 'version_info', 'service_account_name', ),
         'delete_transfer_config': ('name', ),
         'delete_transfer_run': ('name', ),
+        'enroll_data_sources': ('name', 'data_source_ids', ),
         'get_data_source': ('name', ),
         'get_transfer_config': ('name', ),
         'get_transfer_run': ('name', ),
diff --git a/packages/google-cloud-bigquery-datatransfer/tests/unit/gapic/bigquery_datatransfer_v1/test_data_transfer_service.py b/packages/google-cloud-bigquery-datatransfer/tests/unit/gapic/bigquery_datatransfer_v1/test_data_transfer_service.py
index 4e76586ab32c..ebab1fe350e1 100644
--- a/packages/google-cloud-bigquery-datatransfer/tests/unit/gapic/bigquery_datatransfer_v1/test_data_transfer_service.py
+++ b/packages/google-cloud-bigquery-datatransfer/tests/unit/gapic/bigquery_datatransfer_v1/test_data_transfer_service.py
@@ -422,6 +422,87 @@ def test_data_transfer_service_client_mtls_env_auto(
                 )
 
 
+@pytest.mark.parametrize(
+    "client_class", [DataTransferServiceClient, DataTransferServiceAsyncClient]
+)
+@mock.patch.object(
+    DataTransferServiceClient,
+    "DEFAULT_ENDPOINT",
+    modify_default_endpoint(DataTransferServiceClient),
+)
+@mock.patch.object(
+    DataTransferServiceAsyncClient,
+    "DEFAULT_ENDPOINT",
+    modify_default_endpoint(DataTransferServiceAsyncClient),
+)
+def test_data_transfer_service_client_get_mtls_endpoint_and_cert_source(client_class):
+    mock_client_cert_source = mock.Mock()
+
+    # Test the case GOOGLE_API_USE_CLIENT_CERTIFICATE is "true".
+    with mock.patch.dict(os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "true"}):
+        mock_api_endpoint = "foo"
+        options = client_options.ClientOptions(
+            client_cert_source=mock_client_cert_source, api_endpoint=mock_api_endpoint
+        )
+        api_endpoint, cert_source = client_class.get_mtls_endpoint_and_cert_source(
+            options
+        )
+        assert api_endpoint == mock_api_endpoint
+        assert cert_source == mock_client_cert_source
+
+    # Test the case GOOGLE_API_USE_CLIENT_CERTIFICATE is "false".
+    with mock.patch.dict(os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "false"}):
+        mock_client_cert_source = mock.Mock()
+        mock_api_endpoint = "foo"
+        options = client_options.ClientOptions(
+            client_cert_source=mock_client_cert_source, api_endpoint=mock_api_endpoint
+        )
+        api_endpoint, cert_source = client_class.get_mtls_endpoint_and_cert_source(
+            options
+        )
+        assert api_endpoint == mock_api_endpoint
+        assert cert_source is None
+
+    # Test the case GOOGLE_API_USE_MTLS_ENDPOINT is "never".
+    with mock.patch.dict(os.environ, {"GOOGLE_API_USE_MTLS_ENDPOINT": "never"}):
+        api_endpoint, cert_source = client_class.get_mtls_endpoint_and_cert_source()
+        assert api_endpoint == client_class.DEFAULT_ENDPOINT
+        assert cert_source is None
+
+    # Test the case GOOGLE_API_USE_MTLS_ENDPOINT is "always".
+    with mock.patch.dict(os.environ, {"GOOGLE_API_USE_MTLS_ENDPOINT": "always"}):
+        api_endpoint, cert_source = client_class.get_mtls_endpoint_and_cert_source()
+        assert api_endpoint == client_class.DEFAULT_MTLS_ENDPOINT
+        assert cert_source is None
+
+    # Test the case GOOGLE_API_USE_MTLS_ENDPOINT is "auto" and default cert doesn't exist.
+    with mock.patch.dict(os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "true"}):
+        with mock.patch(
+            "google.auth.transport.mtls.has_default_client_cert_source",
+            return_value=False,
+        ):
+            api_endpoint, cert_source = client_class.get_mtls_endpoint_and_cert_source()
+            assert api_endpoint == client_class.DEFAULT_ENDPOINT
+            assert cert_source is None
+
+    # Test the case GOOGLE_API_USE_MTLS_ENDPOINT is "auto" and default cert exists.
+    with mock.patch.dict(os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "true"}):
+        with mock.patch(
+            "google.auth.transport.mtls.has_default_client_cert_source",
+            return_value=True,
+        ):
+            with mock.patch(
+                "google.auth.transport.mtls.default_client_cert_source",
+                return_value=mock_client_cert_source,
+            ):
+                (
+                    api_endpoint,
+                    cert_source,
+                ) = client_class.get_mtls_endpoint_and_cert_source()
+                assert api_endpoint == client_class.DEFAULT_MTLS_ENDPOINT
+                assert cert_source == mock_client_cert_source
+
+
 @pytest.mark.parametrize(
     "client_class,transport_class,transport_name",
     [
@@ -458,25 +539,28 @@ def test_data_transfer_service_client_client_options_scopes(
 
 
 @pytest.mark.parametrize(
-    "client_class,transport_class,transport_name",
+    "client_class,transport_class,transport_name,grpc_helpers",
     [
         (
             DataTransferServiceClient,
             transports.DataTransferServiceGrpcTransport,
             "grpc",
+            grpc_helpers,
         ),
         (
             DataTransferServiceAsyncClient,
             transports.DataTransferServiceGrpcAsyncIOTransport,
             "grpc_asyncio",
+            grpc_helpers_async,
         ),
     ],
 )
 def test_data_transfer_service_client_client_options_credentials_file(
-    client_class, transport_class, transport_name
+    client_class, transport_class, transport_name, grpc_helpers
 ):
     # Check the case credentials file is provided.
     options = client_options.ClientOptions(credentials_file="credentials.json")
+
     with mock.patch.object(transport_class, "__init__") as patched:
         patched.return_value = None
         client = client_class(client_options=options, transport=transport_name)
@@ -512,6 +596,72 @@ def test_data_transfer_service_client_client_options_from_dict():
         )
 
 
+@pytest.mark.parametrize(
+    "client_class,transport_class,transport_name,grpc_helpers",
+    [
+        (
+            DataTransferServiceClient,
+            transports.DataTransferServiceGrpcTransport,
+            "grpc",
+            grpc_helpers,
+        ),
+        (
+            DataTransferServiceAsyncClient,
+            transports.DataTransferServiceGrpcAsyncIOTransport,
+            "grpc_asyncio",
+            grpc_helpers_async,
+        ),
+    ],
+)
+def test_data_transfer_service_client_create_channel_credentials_file(
+    client_class, transport_class, transport_name, grpc_helpers
+):
+    # Check the case credentials file is provided.
+    options = client_options.ClientOptions(credentials_file="credentials.json")
+
+    with mock.patch.object(transport_class, "__init__") as patched:
+        patched.return_value = None
+        client = client_class(client_options=options, transport=transport_name)
+        patched.assert_called_once_with(
+            credentials=None,
+            credentials_file="credentials.json",
+            host=client.DEFAULT_ENDPOINT,
+            scopes=None,
+            client_cert_source_for_mtls=None,
+            quota_project_id=None,
+            client_info=transports.base.DEFAULT_CLIENT_INFO,
+            always_use_jwt_access=True,
+        )
+
+    # test that the credentials from file are saved and used as the credentials.
+    with mock.patch.object(
+        google.auth, "load_credentials_from_file", autospec=True
+    ) as load_creds, mock.patch.object(
+        google.auth, "default", autospec=True
+    ) as adc, mock.patch.object(
+        grpc_helpers, "create_channel"
+    ) as create_channel:
+        creds = ga_credentials.AnonymousCredentials()
+        file_creds = ga_credentials.AnonymousCredentials()
+        load_creds.return_value = (file_creds, None)
+        adc.return_value = (creds, None)
+        client = client_class(client_options=options, transport=transport_name)
+        create_channel.assert_called_with(
+            "bigquerydatatransfer.googleapis.com:443",
+            credentials=file_creds,
+            credentials_file=None,
+            quota_project_id=None,
+            default_scopes=("https://www.googleapis.com/auth/cloud-platform",),
+            scopes=None,
+            default_host="bigquerydatatransfer.googleapis.com",
+            ssl_credentials=None,
+            options=[
+                ("grpc.max_send_message_length", -1),
+                ("grpc.max_receive_message_length", -1),
+            ],
+        )
+
+
 @pytest.mark.parametrize("request_type", [datatransfer.GetDataSourceRequest, dict,])
 def test_get_data_source(request_type, transport: str = "grpc"):
     client = DataTransferServiceClient(
@@ -4472,6 +4622,141 @@ async def test_check_valid_creds_flattened_error_async():
         )
 
 
+@pytest.mark.parametrize("request_type", [datatransfer.EnrollDataSourcesRequest, dict,])
+def test_enroll_data_sources(request_type, transport: str = "grpc"):
+    client = DataTransferServiceClient(
+        credentials=ga_credentials.AnonymousCredentials(), transport=transport,
+    )
+
+    # Everything is optional in proto3 as far as the runtime is concerned,
+    # and we are mocking out the actual API, so just send an empty request.
+    request = request_type()
+
+    # Mock the actual call within the gRPC stub, and fake the request.
+    with mock.patch.object(
+        type(client.transport.enroll_data_sources), "__call__"
+    ) as call:
+        # Designate an appropriate return value for the call.
+        call.return_value = None
+        response = client.enroll_data_sources(request)
+
+        # Establish that the underlying gRPC stub method was called.
+        assert len(call.mock_calls) == 1
+        _, args, _ = call.mock_calls[0]
+        assert args[0] == datatransfer.EnrollDataSourcesRequest()
+
+    # Establish that the response is the type that we expect.
+    assert response is None
+
+
+def test_enroll_data_sources_empty_call():
+    # This test is a coverage failsafe to make sure that totally empty calls,
+    # i.e. request == None and no flattened fields passed, work.
+    client = DataTransferServiceClient(
+        credentials=ga_credentials.AnonymousCredentials(), transport="grpc",
+    )
+
+    # Mock the actual call within the gRPC stub, and fake the request.
+    with mock.patch.object(
+        type(client.transport.enroll_data_sources), "__call__"
+    ) as call:
+        client.enroll_data_sources()
+        call.assert_called()
+        _, args, _ = call.mock_calls[0]
+        assert args[0] == datatransfer.EnrollDataSourcesRequest()
+
+
+@pytest.mark.asyncio
+async def test_enroll_data_sources_async(
+    transport: str = "grpc_asyncio", request_type=datatransfer.EnrollDataSourcesRequest
+):
+    client = DataTransferServiceAsyncClient(
+        credentials=ga_credentials.AnonymousCredentials(), transport=transport,
+    )
+
+    # Everything is optional in proto3 as far as the runtime is concerned,
+    # and we are mocking out the actual API, so just send an empty request.
+    request = request_type()
+
+    # Mock the actual call within the gRPC stub, and fake the request.
+    with mock.patch.object(
+        type(client.transport.enroll_data_sources), "__call__"
+    ) as call:
+        # Designate an appropriate return value for the call.
+        call.return_value = grpc_helpers_async.FakeUnaryUnaryCall(None)
+        response = await client.enroll_data_sources(request)
+
+        # Establish that the underlying gRPC stub method was called.
+        assert len(call.mock_calls)
+        _, args, _ = call.mock_calls[0]
+        assert args[0] == datatransfer.EnrollDataSourcesRequest()
+
+    # Establish that the response is the type that we expect.
+    assert response is None
+
+
+@pytest.mark.asyncio
+async def test_enroll_data_sources_async_from_dict():
+    await test_enroll_data_sources_async(request_type=dict)
+
+
+def test_enroll_data_sources_field_headers():
+    client = DataTransferServiceClient(
+        credentials=ga_credentials.AnonymousCredentials(),
+    )
+
+    # Any value that is part of the HTTP/1.1 URI should be sent as
+    # a field header. Set these to a non-empty value.
+    request = datatransfer.EnrollDataSourcesRequest()
+
+    request.name = "name/value"
+
+    # Mock the actual call within the gRPC stub, and fake the request.
+    with mock.patch.object(
+        type(client.transport.enroll_data_sources), "__call__"
+    ) as call:
+        call.return_value = None
+        client.enroll_data_sources(request)
+
+        # Establish that the underlying gRPC stub method was called.
+        assert len(call.mock_calls) == 1
+        _, args, _ = call.mock_calls[0]
+        assert args[0] == request
+
+    # Establish that the field header was sent.
+    _, _, kw = call.mock_calls[0]
+    assert ("x-goog-request-params", "name=name/value",) in kw["metadata"]
+
+
+@pytest.mark.asyncio
+async def test_enroll_data_sources_field_headers_async():
+    client = DataTransferServiceAsyncClient(
+        credentials=ga_credentials.AnonymousCredentials(),
+    )
+
+    # Any value that is part of the HTTP/1.1 URI should be sent as
+    # a field header. Set these to a non-empty value.
+    request = datatransfer.EnrollDataSourcesRequest()
+
+    request.name = "name/value"
+
+    # Mock the actual call within the gRPC stub, and fake the request.
+    with mock.patch.object(
+        type(client.transport.enroll_data_sources), "__call__"
+    ) as call:
+        call.return_value = grpc_helpers_async.FakeUnaryUnaryCall(None)
+        await client.enroll_data_sources(request)
+
+        # Establish that the underlying gRPC stub method was called.
+        assert len(call.mock_calls)
+        _, args, _ = call.mock_calls[0]
+        assert args[0] == request
+
+    # Establish that the field header was sent.
+    _, _, kw = call.mock_calls[0]
+    assert ("x-goog-request-params", "name=name/value",) in kw["metadata"]
+
+
 def test_credentials_transport_error():
     # It is an error to provide credentials and a transport instance.
     transport = transports.DataTransferServiceGrpcTransport(
@@ -4492,6 +4777,23 @@ def test_credentials_transport_error():
             transport=transport,
         )
 
+    # It is an error to provide an api_key and a transport instance.
+    transport = transports.DataTransferServiceGrpcTransport(
+        credentials=ga_credentials.AnonymousCredentials(),
+    )
+    options = client_options.ClientOptions()
+    options.api_key = "api_key"
+    with pytest.raises(ValueError):
+        client = DataTransferServiceClient(client_options=options, transport=transport,)
+
+    # It is an error to provide an api_key and a credential.
+    options = mock.Mock()
+    options.api_key = "api_key"
+    with pytest.raises(ValueError):
+        client = DataTransferServiceClient(
+            client_options=options, credentials=ga_credentials.AnonymousCredentials()
+        )
+
     # It is an error to provide scopes and a transport instance.
     transport = transports.DataTransferServiceGrpcTransport(
         credentials=ga_credentials.AnonymousCredentials(),
@@ -4585,6 +4887,7 @@ def test_data_transfer_service_base_transport():
         "list_transfer_runs",
         "list_transfer_logs",
         "check_valid_creds",
+        "enroll_data_sources",
     )
     for method in methods:
         with pytest.raises(NotImplementedError):
@@ -5112,3 +5415,36 @@ def test_client_ctx():
             with client:
                 pass
             close.assert_called()
+
+
+@pytest.mark.parametrize(
+    "client_class,transport_class",
+    [
+        (DataTransferServiceClient, transports.DataTransferServiceGrpcTransport),
+        (
+            DataTransferServiceAsyncClient,
+            transports.DataTransferServiceGrpcAsyncIOTransport,
+        ),
+    ],
+)
+def test_api_key_credentials(client_class, transport_class):
+    with mock.patch.object(
+        google.auth._default, "get_api_key_credentials", create=True
+    ) as get_api_key_credentials:
+        mock_cred = mock.Mock()
+        get_api_key_credentials.return_value = mock_cred
+        options = client_options.ClientOptions()
+        options.api_key = "api_key"
+        with mock.patch.object(transport_class, "__init__") as patched:
+            patched.return_value = None
+            client = client_class(client_options=options)
+            patched.assert_called_once_with(
+                credentials=mock_cred,
+                credentials_file=None,
+                host=client.DEFAULT_ENDPOINT,
+                scopes=None,
+                client_cert_source_for_mtls=None,
+                quota_project_id=None,
+                client_info=transports.base.DEFAULT_CLIENT_INFO,
+                always_use_jwt_access=True,
+            )