diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py
index bc41601be..a99c9dab5 100644
--- a/singer_sdk/streams/core.py
+++ b/singer_sdk/streams/core.py
@@ -695,8 +695,11 @@ def _increment_stream_state(
     ) -> None:
         """Update state of stream or partition with data from the provided record.
 
-        Raises InvalidStreamSortException is self.is_sorted = True and unsorted data is
-        detected.
+        Raises `InvalidStreamSortException` is `self.is_sorted = True` and unsorted data
+        is detected.
+
+        Note: The default implementation does not advance any bookmarks unless
+        `self.replication_method == 'INCREMENTAL'.
 
         Args:
             latest_record: TODO
@@ -705,28 +708,27 @@ def _increment_stream_state(
         Raises:
             ValueError: TODO
         """
+        # This also creates a state entry if one does not yet exist:
         state_dict = self.get_context_state(context)
-        if latest_record:
-            if self.replication_method in [
-                REPLICATION_INCREMENTAL,
-                REPLICATION_LOG_BASED,
-            ]:
-                if not self.replication_key:
-                    raise ValueError(
-                        f"Could not detect replication key for '{self.name}' stream"
-                        f"(replication method={self.replication_method})"
-                    )
-                treat_as_sorted = self.is_sorted
-                if not treat_as_sorted and self.state_partitioning_keys is not None:
-                    # Streams with custom state partitioning are not resumable.
-                    treat_as_sorted = False
-                increment_state(
-                    state_dict,
-                    replication_key=self.replication_key,
-                    latest_record=latest_record,
-                    is_sorted=treat_as_sorted,
-                    check_sorted=self.check_sorted,
+
+        # Advance state bookmark values if applicable
+        if latest_record and self.replication_method == REPLICATION_INCREMENTAL:
+            if not self.replication_key:
+                raise ValueError(
+                    f"Could not detect replication key for '{self.name}' stream"
+                    f"(replication method={self.replication_method})"
                 )
+            treat_as_sorted = self.is_sorted
+            if not treat_as_sorted and self.state_partitioning_keys is not None:
+                # Streams with custom state partitioning are not resumable.
+                treat_as_sorted = False
+            increment_state(
+                state_dict,
+                replication_key=self.replication_key,
+                latest_record=latest_record,
+                is_sorted=treat_as_sorted,
+                check_sorted=self.check_sorted,
+            )
 
     # Private message authoring methods: