diff --git a/docs/guides/sql-target.md b/docs/guides/sql-target.md
index 7a9930a2a..d05e038be 100644
--- a/docs/guides/sql-target.md
+++ b/docs/guides/sql-target.md
@@ -50,3 +50,56 @@ class MyConnector(SQLConnector):
         to_sql.register_format_handler("uri", URI)
         return to_sql
 ```
+
+### Use the `x-sql-datatype` JSON Schema extension
+
+You can register new type handlers for the `x-sql-datatype` extension:
+
+```python
+from my_sqlalchemy_dialect import URI
+
+
+class MyConnector(SQLConnector):
+    @functools.cached_property
+    def jsonschema_to_sql(self):
+        to_sql = JSONSchemaToSQL()
+        to_sql.register_sql_datatype_handler("smallint", sa.types.SMALLINT)
+        return to_sql
+```
+
+Then you can annotate the tap' catalog to specify the SQL type:
+
+````{tab} meltano.yml
+```yaml
+# https://docs.meltano.com/concepts/plugins/#schema-extra
+plugins:
+  extractors:
+  - name: tap-example
+    schema:
+      addresses:
+        number:
+          x-sql-datatype: smallint
+```
+````
+
+````{tab} JSON catalog
+```json
+{
+  "streams": [
+    {
+      "stream": "addresses",
+      "tap_stream_id": "addresses",
+      "schema": {
+        "type": "object",
+        "properties": {
+          "number": {
+            "type": "integer",
+            "x-sql-datatype": "smallint"
+          }
+        }
+      }
+    }
+  ]
+}
+```
+````
diff --git a/singer_sdk/_singerlib/schema.py b/singer_sdk/_singerlib/schema.py
index bb7851b2b..ecbae296f 100644
--- a/singer_sdk/_singerlib/schema.py
+++ b/singer_sdk/_singerlib/schema.py
@@ -42,6 +42,8 @@
     "anyOf",
     "patternProperties",
     "allOf",
+    # JSON Schema extensions
+    "x-sql-datatype",
 ]
 
 
@@ -84,6 +86,9 @@ class Schema:
     contentMediaType: str | None = None  # noqa: N815
     contentEncoding: str | None = None  # noqa: N815
 
+    # JSON Schema extensions
+    x_sql_datatype: str | None = None
+
     def to_dict(self) -> dict[str, t.Any]:
         """Return the raw JSON Schema as a (possibly nested) dict.
 
@@ -99,12 +104,14 @@ def to_dict(self) -> dict[str, t.Any]:
             result["items"] = self.items.to_dict()
 
         for key in STANDARD_KEYS:
-            if self.__dict__.get(key) is not None:
-                result[key] = self.__dict__[key]
+            attr = key.replace("-", "_")
+            if (val := self.__dict__.get(attr)) is not None:
+                result[key] = val
 
         for key in META_KEYS:
-            if self.__dict__.get(key) is not None:
-                result[f"${key}"] = self.__dict__[key]
+            attr = key.replace("-", "_")
+            if (val := self.__dict__.get(attr)) is not None:
+                result[f"${key}"] = val
 
         return result
 
@@ -142,6 +149,7 @@ def from_dict(
             ...             "description": "Age in years which must be equal to or greater than zero.",
             ...             "type": "integer",
             ...             "minimum": 0,
+            ...             "x-sql-datatype": "smallint",
             ...         },
             ...     },
             ...     "required": ["firstName", "lastName"],
@@ -153,6 +161,8 @@ def from_dict(
             "The person's first name."
             >>> schema.properties["age"].minimum
             0
+            >>> schema.properties["age"].x_sql_datatype
+            'smallint'
             >>> schema.schema
             'http://json-schema.org/draft/2020-12/schema'
         """  # noqa: E501
@@ -168,12 +178,14 @@ def from_dict(
             kwargs["items"] = cls.from_dict(items, **schema_defaults)
 
         for key in STANDARD_KEYS:
+            attr = key.replace("-", "_")
             if key in data:
-                kwargs[key] = data[key]
+                kwargs[attr] = data[key]
 
         for key in META_KEYS:
+            attr = key.replace("-", "_")
             if f"${key}" in data:
-                kwargs[key] = data[f"${key}"]
+                kwargs[attr] = data[f"${key}"]
 
         return cls(**kwargs)
 
diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py
index 00c83b884..1b0f646cc 100644
--- a/singer_sdk/connectors/sql.py
+++ b/singer_sdk/connectors/sql.py
@@ -240,6 +240,10 @@ class JSONSchemaToSQL:
     This class provides a mapping from JSON Schema types to SQLAlchemy types.
 
     .. versionadded:: 0.42.0
+    .. versionchanged:: 0.44.0
+       Added the
+       :meth:`singer_sdk.connectors.sql.JSONSchemaToSQL.register_sql_datatype_handler`
+       method to map custom ``x-sql-datatype`` annotations into SQLAlchemy types.
     """
 
     def __init__(self, *, max_varchar_length: int | None = None) -> None:
@@ -276,6 +280,8 @@ def __init__(self, *, max_varchar_length: int | None = None) -> None:
             "ipv6": lambda _: sa.types.VARCHAR(45),
         }
 
+        self._sql_datatype_mapping: dict[str, JSONtoSQLHandler] = {}
+
         self._fallback_type: type[sa.types.TypeEngine] = sa.types.VARCHAR
 
     def _invoke_handler(  # noqa: PLR6301
@@ -338,6 +344,25 @@ def register_format_handler(
         """  # noqa: E501
         self._format_handlers[format_name] = handler
 
+    def register_sql_datatype_handler(
+        self,
+        sql_datatype: str,
+        handler: JSONtoSQLHandler,
+    ) -> None:
+        """Register a custom ``x-sql-datatype`` handler.
+
+        Args:
+            sql_datatype: The x-sql-datatype string.
+            handler: Either a SQLAlchemy type class or a callable that takes a schema
+                    dict and returns a SQLAlchemy type instance.
+
+        Example:
+            >>> from sqlalchemy.types import SMALLINT
+            >>> to_sql = JSONSchemaToSQL()
+            >>> to_sql.register_sql_datatype_handler("smallint", SMALLINT)
+        """
+        self._sql_datatype_mapping[sql_datatype] = handler
+
     def handle_multiple_types(self, types: t.Sequence[str]) -> sa.types.TypeEngine:  # noqa: ARG002, PLR6301
         """Handle multiple types by returning a VARCHAR.
 
@@ -374,10 +399,20 @@ def _get_type_from_schema(self, schema: dict) -> sa.types.TypeEngine | None:
         Returns:
             SQL type if one can be determined, None otherwise.
         """
-        # Check if this is a string with format first
-        if schema.get("type") == "string" and "format" in schema:
-            format_type = self._handle_format(schema)
-            if format_type is not None:
+        # Check x-sql-datatype first
+        if x_sql_datatype := schema.get("x-sql-datatype"):
+            if handler := self._sql_datatype_mapping.get(x_sql_datatype):
+                return self._invoke_handler(handler, schema)
+
+            warnings.warn(
+                f"This target does not support the x-sql-datatype '{x_sql_datatype}'",
+                UserWarning,
+                stacklevel=2,
+            )
+
+        # Check if this is a string with format then
+        if schema.get("type") == "string" and "format" in schema:  # noqa: SIM102
+            if (format_type := self._handle_format(schema)) is not None:
                 return format_type
 
         # Then check regular types
diff --git a/tests/core/test_connector_sql.py b/tests/core/test_connector_sql.py
index a8b39521e..b32bae207 100644
--- a/tests/core/test_connector_sql.py
+++ b/tests/core/test_connector_sql.py
@@ -717,6 +717,22 @@ def handle_raw_string(self, schema):
         result = json_schema_to_sql.to_sql_type(image_type)
         assert isinstance(result, sa.types.LargeBinary)
 
+    def test_annotation_sql_datatype(self):
+        json_schema_to_sql = JSONSchemaToSQL()
+        json_schema_to_sql.register_sql_datatype_handler("json", sa.types.JSON)
+        jsonschema_type = {"type": ["string"], "x-sql-datatype": "json"}
+        result = json_schema_to_sql.to_sql_type(jsonschema_type)
+        assert isinstance(result, sa.types.JSON)
+
+        unknown_type = {"type": ["string"], "x-sql-datatype": "unknown"}
+        with pytest.warns(
+            UserWarning,
+            match="This target does not support the x-sql-datatype",
+        ):
+            result = json_schema_to_sql.to_sql_type(unknown_type)
+
+        assert isinstance(result, sa.types.VARCHAR)
+
 
 def test_bench_discovery(benchmark, tmp_path: Path):
     def _discover_catalog(connector):