diff --git a/.chloggen/mysqlreceiver_older_replica_metrics_support.yaml b/.chloggen/mysqlreceiver_older_replica_metrics_support.yaml
new file mode 100644
index 000000000000..aecdf987b857
--- /dev/null
+++ b/.chloggen/mysqlreceiver_older_replica_metrics_support.yaml
@@ -0,0 +1,27 @@
+# Use this changelog template to create an entry for release notes.
+
+# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
+change_type: bug_fix
+
+# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
+component: mysqlreceiver
+
+# A brief description of the change.  Surround your text with quotes ("") if it needs to start with a backtick (`).
+note: Add replica metric support for versions of MySQL earlier than 8.0.22.
+
+# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
+issues: [35217]
+
+# (Optional) One or more lines of additional information to render under the primary note.
+# These lines will be padded with 2 spaces and then inserted directly into the document.
+# Use pipe (|) for multiline entries.
+subtext:
+
+# If your change doesn't affect end users or the exported elements of any package,
+# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
+# Optional: The change log or logs in which this entry should be included.
+# e.g. '[user]' or '[user, api]'
+# Include 'user' if the change is relevant to end users.
+# Include 'api' if there is a change to a library API.
+# Default: '[user]'
+change_logs: []
diff --git a/receiver/mysqlreceiver/client.go b/receiver/mysqlreceiver/client.go
index 736257e6a623..3aeabf1b777b 100644
--- a/receiver/mysqlreceiver/client.go
+++ b/receiver/mysqlreceiver/client.go
@@ -12,11 +12,12 @@ import (
 
 	// registers the mysql driver
 	"github.com/go-sql-driver/mysql"
+	"github.com/hashicorp/go-version"
 )
 
 type client interface {
 	Connect() error
-	getVersion() (string, error)
+	getVersion() (*version.Version, error)
 	getGlobalStats() (map[string]string, error)
 	getInnodbStats() (map[string]string, error)
 	getTableStats() ([]TableStats, error)
@@ -110,66 +111,83 @@ type tableLockWaitEventStats struct {
 }
 
 type ReplicaStatusStats struct {
-	replicaIOState            string
-	sourceHost                string
-	sourceUser                string
-	sourcePort                int64
-	connectRetry              int64
-	sourceLogFile             string
-	readSourceLogPos          int64
-	relayLogFile              string
-	relayLogPos               int64
-	relaySourceLogFile        string
-	replicaIORunning          string
-	replicaSQLRunning         string
-	replicateDoDB             string
-	replicateIgnoreDB         string
-	replicateDoTable          string
-	replicateIgnoreTable      string
-	replicateWildDoTable      string
-	replicateWildIgnoreTable  string
-	lastErrno                 int64
-	lastError                 string
-	skipCounter               int64
-	execSourceLogPos          int64
-	relayLogSpace             int64
-	untilCondition            string
-	untilLogFile              string
-	untilLogPos               string
-	sourceSSLAllowed          string
-	sourceSSLCAFile           string
-	sourceSSLCAPath           string
-	sourceSSLCert             string
-	sourceSSLCipher           string
-	sourceSSLKey              string
-	secondsBehindSource       sql.NullInt64
-	sourceSSLVerifyServerCert string
-	lastIOErrno               int64
-	lastIOError               string
-	lastSQLErrno              int64
-	lastSQLError              string
-	replicateIgnoreServerIDs  string
-	sourceServerID            int64
-	sourceUUID                string
-	sourceInfoFile            string
-	sqlDelay                  int64
-	sqlRemainingDelay         sql.NullInt64
-	replicaSQLRunningState    string
-	sourceRetryCount          int64
-	sourceBind                string
-	lastIOErrorTimestamp      string
-	lastSQLErrorTimestamp     string
-	sourceSSLCrl              string
-	sourceSSLCrlpath          string
-	retrievedGtidSet          string
-	executedGtidSet           string
-	autoPosition              string
-	replicateRewriteDB        string
-	channelName               string
-	sourceTLSVersion          string
-	sourcePublicKeyPath       string
-	getSourcePublicKey        int64
-	networkNamespace          string
+	replicaIOState              string
+	sourceHost                  string
+	sourceUser                  string
+	sourcePort                  int64
+	connectRetry                int64
+	sourceLogFile               string
+	readSourceLogPos            int64
+	relayLogFile                string
+	relayLogPos                 int64
+	relaySourceLogFile          string
+	replicaIORunning            string
+	replicaSQLRunning           string
+	replicateDoDB               string
+	replicateIgnoreDB           string
+	replicateDoTable            string
+	replicateIgnoreTable        string
+	replicateWildDoTable        string
+	replicateWildIgnoreTable    string
+	lastErrno                   int64
+	lastError                   string
+	skipCounter                 int64
+	execSourceLogPos            int64
+	relayLogSpace               int64
+	untilCondition              string
+	untilLogFile                string
+	untilLogPos                 string
+	sourceSSLAllowed            string
+	sourceSSLCAFile             string
+	sourceSSLCAPath             string
+	sourceSSLCert               string
+	sourceSSLCipher             string
+	sourceSSLKey                string
+	secondsBehindSource         sql.NullInt64
+	sourceSSLVerifyServerCert   string
+	lastIOErrno                 int64
+	lastIOError                 string
+	lastSQLErrno                int64
+	lastSQLError                string
+	replicateIgnoreServerIDs    string
+	sourceServerID              int64
+	sourceUUID                  string
+	sourceInfoFile              string
+	sqlDelay                    int64
+	sqlRemainingDelay           sql.NullInt64
+	replicaSQLRunningState      string
+	sourceRetryCount            int64
+	sourceBind                  string
+	lastIOErrorTimestamp        string
+	lastSQLErrorTimestamp       string
+	sourceSSLCrl                string
+	sourceSSLCrlpath            string
+	retrievedGtidSet            string
+	executedGtidSet             string
+	autoPosition                string
+	replicateRewriteDB          string
+	channelName                 string
+	sourceTLSVersion            string
+	sourcePublicKeyPath         string
+	getSourcePublicKey          int64
+	networkNamespace            string
+	usingGtid                   string
+	gtidIoPos                   string
+	slaveDdlGroups              int64
+	slaveNonTransactionalGroups int64
+	slaveTransactionalGroups    int64
+	retriedTransactions         int64
+	maxRelayLogSize             int64
+	executedLogEntries          int64
+	slaveReceivedHeartbeats     int64
+	slaveHeartbeatPeriod        int64
+	gtidSlavePos                string
+	masterLastEventTime         string
+	slaveLastEventTime          string
+	masterSlaveTimeDiff         string
+	parallelMode                string
+	replicateDoDomainIDs        string
+	replicateIgnoreDomainIDs    string
 }
 
 var _ client = (*mySQLClient)(nil)
@@ -218,15 +236,15 @@ func (c *mySQLClient) Connect() error {
 }
 
 // getVersion queries the db for the version.
-func (c *mySQLClient) getVersion() (string, error) {
+func (c *mySQLClient) getVersion() (*version.Version, error) {
 	query := "SELECT VERSION();"
-	var version string
-	err := c.client.QueryRow(query).Scan(&version)
+	var versionStr string
+	err := c.client.QueryRow(query).Scan(&versionStr)
 	if err != nil {
-		return "", err
+		return nil, err
 	}
-
-	return version, nil
+	version, err := version.NewVersion(versionStr)
+	return version, err
 }
 
 // getGlobalStats queries the db for global status metrics.
@@ -397,16 +415,19 @@ func (c *mySQLClient) getTableLockWaitEventStats() ([]tableLockWaitEventStats, e
 }
 
 func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) {
-	version, err := c.getVersion()
+	mysqlVersion, err := c.getVersion()
 	if err != nil {
 		return nil, err
 	}
 
-	if version < "8.0.22" {
-		return nil, nil
+	query := "SHOW REPLICA STATUS"
+	minMysqlVersion, _ := version.NewVersion("8.0.22")
+	if strings.Contains(mysqlVersion.String(), "MariaDB") {
+		query = "SHOW SLAVE STATUS"
+	} else if mysqlVersion.LessThan(minMysqlVersion) {
+		query = "SHOW SLAVE STATUS"
 	}
 
-	query := "SHOW REPLICA STATUS"
 	rows, err := c.client.Query(query)
 
 	if err != nil {
@@ -427,28 +448,46 @@ func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) {
 			switch strings.ToLower(col) {
 			case "replica_io_state":
 				dest = append(dest, &s.replicaIOState)
+			case "slave_io_state":
+				dest = append(dest, &s.replicaIOState)
 			case "source_host":
 				dest = append(dest, &s.sourceHost)
+			case "master_host":
+				dest = append(dest, &s.sourceHost)
 			case "source_user":
 				dest = append(dest, &s.sourceUser)
+			case "master_user":
+				dest = append(dest, &s.sourceUser)
 			case "source_port":
 				dest = append(dest, &s.sourcePort)
+			case "master_port":
+				dest = append(dest, &s.sourcePort)
 			case "connect_retry":
 				dest = append(dest, &s.connectRetry)
 			case "source_log_file":
 				dest = append(dest, &s.sourceLogFile)
+			case "master_log_file":
+				dest = append(dest, &s.sourceLogFile)
 			case "read_source_log_pos":
 				dest = append(dest, &s.readSourceLogPos)
+			case "read_master_log_pos":
+				dest = append(dest, &s.readSourceLogPos)
 			case "relay_log_file":
 				dest = append(dest, &s.relayLogFile)
 			case "relay_log_pos":
 				dest = append(dest, &s.relayLogPos)
 			case "relay_source_log_file":
 				dest = append(dest, &s.relaySourceLogFile)
+			case "relay_master_log_file":
+				dest = append(dest, &s.relaySourceLogFile)
 			case "replica_io_running":
 				dest = append(dest, &s.replicaIORunning)
+			case "slave_io_running":
+				dest = append(dest, &s.replicaIORunning)
 			case "replica_sql_running":
 				dest = append(dest, &s.replicaSQLRunning)
+			case "slave_sql_running":
+				dest = append(dest, &s.replicaSQLRunning)
 			case "replicate_do_db":
 				dest = append(dest, &s.replicateDoDB)
 			case "replicate_ignore_db":
@@ -469,6 +508,8 @@ func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) {
 				dest = append(dest, &s.skipCounter)
 			case "exec_source_log_pos":
 				dest = append(dest, &s.execSourceLogPos)
+			case "exec_master_log_pos":
+				dest = append(dest, &s.execSourceLogPos)
 			case "relay_log_space":
 				dest = append(dest, &s.relayLogSpace)
 			case "until_condition":
@@ -479,20 +520,36 @@ func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) {
 				dest = append(dest, &s.untilLogPos)
 			case "source_ssl_allowed":
 				dest = append(dest, &s.sourceSSLAllowed)
+			case "master_ssl_allowed":
+				dest = append(dest, &s.sourceSSLAllowed)
 			case "source_ssl_ca_file":
 				dest = append(dest, &s.sourceSSLCAFile)
+			case "master_ssl_ca_file":
+				dest = append(dest, &s.sourceSSLCAFile)
 			case "source_ssl_ca_path":
 				dest = append(dest, &s.sourceSSLCAPath)
+			case "master_ssl_ca_path":
+				dest = append(dest, &s.sourceSSLCAPath)
 			case "source_ssl_cert":
 				dest = append(dest, &s.sourceSSLCert)
+			case "master_ssl_cert":
+				dest = append(dest, &s.sourceSSLCert)
 			case "source_ssl_cipher":
 				dest = append(dest, &s.sourceSSLCipher)
+			case "master_ssl_cipher":
+				dest = append(dest, &s.sourceSSLCipher)
 			case "source_ssl_key":
 				dest = append(dest, &s.sourceSSLKey)
+			case "master_ssl_key":
+				dest = append(dest, &s.sourceSSLKey)
 			case "seconds_behind_source":
 				dest = append(dest, &s.secondsBehindSource)
+			case "seconds_behind_master":
+				dest = append(dest, &s.secondsBehindSource)
 			case "source_ssl_verify_server_cert":
 				dest = append(dest, &s.sourceSSLVerifyServerCert)
+			case "master_ssl_verify_server_cert":
+				dest = append(dest, &s.sourceSSLVerifyServerCert)
 			case "last_io_errno":
 				dest = append(dest, &s.lastIOErrno)
 			case "last_io_error":
@@ -505,28 +562,44 @@ func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) {
 				dest = append(dest, &s.replicateIgnoreServerIDs)
 			case "source_server_id":
 				dest = append(dest, &s.sourceServerID)
+			case "master_server_id":
+				dest = append(dest, &s.sourceServerID)
 			case "source_uuid":
 				dest = append(dest, &s.sourceUUID)
+			case "master_uuid":
+				dest = append(dest, &s.sourceUUID)
 			case "source_info_file":
 				dest = append(dest, &s.sourceInfoFile)
+			case "master_info_file":
+				dest = append(dest, &s.sourceInfoFile)
 			case "sql_delay":
 				dest = append(dest, &s.sqlDelay)
 			case "sql_remaining_delay":
 				dest = append(dest, &s.sqlRemainingDelay)
 			case "replica_sql_running_state":
 				dest = append(dest, &s.replicaSQLRunningState)
+			case "slave_sql_running_state":
+				dest = append(dest, &s.replicaSQLRunningState)
 			case "source_retry_count":
 				dest = append(dest, &s.sourceRetryCount)
+			case "master_retry_count":
+				dest = append(dest, &s.sourceRetryCount)
 			case "source_bind":
 				dest = append(dest, &s.sourceBind)
+			case "master_bind":
+				dest = append(dest, &s.sourceBind)
 			case "last_io_error_timestamp":
 				dest = append(dest, &s.lastIOErrorTimestamp)
 			case "last_sql_error_timestamp":
 				dest = append(dest, &s.lastSQLErrorTimestamp)
 			case "source_ssl_crl":
 				dest = append(dest, &s.sourceSSLCrl)
+			case "master_ssl_crl":
+				dest = append(dest, &s.sourceSSLCrl)
 			case "source_ssl_crlpath":
 				dest = append(dest, &s.sourceSSLCrlpath)
+			case "master_ssl_crlpath":
+				dest = append(dest, &s.sourceSSLCrlpath)
 			case "retrieved_gtid_set":
 				dest = append(dest, &s.retrievedGtidSet)
 			case "executed_gtid_set":
@@ -539,12 +612,52 @@ func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) {
 				dest = append(dest, &s.channelName)
 			case "source_tls_version":
 				dest = append(dest, &s.sourceTLSVersion)
+			case "master_tls_version":
+				dest = append(dest, &s.sourceTLSVersion)
 			case "source_public_key_path":
 				dest = append(dest, &s.sourcePublicKeyPath)
+			case "master_public_key_path":
+				dest = append(dest, &s.sourcePublicKeyPath)
 			case "get_source_public_key":
 				dest = append(dest, &s.getSourcePublicKey)
+			case "get_master_public_key":
+				dest = append(dest, &s.getSourcePublicKey)
 			case "network_namespace":
 				dest = append(dest, &s.networkNamespace)
+			case "using_gtid":
+				dest = append(dest, &s.usingGtid)
+			case "gtid_io_pos":
+				dest = append(dest, &s.gtidIoPos)
+			case "slave_ddl_groups":
+				dest = append(dest, &s.slaveDdlGroups)
+			case "slave_non_transactional_groups":
+				dest = append(dest, &s.slaveNonTransactionalGroups)
+			case "slave_transactional_groups":
+				dest = append(dest, &s.slaveTransactionalGroups)
+			case "retried_transactions":
+				dest = append(dest, &s.retriedTransactions)
+			case "max_relay_log_size":
+				dest = append(dest, &s.maxRelayLogSize)
+			case "executed_log_entries":
+				dest = append(dest, &s.executedLogEntries)
+			case "slave_received_heartbeats":
+				dest = append(dest, &s.slaveReceivedHeartbeats)
+			case "slave_heartbeat_period":
+				dest = append(dest, &s.slaveHeartbeatPeriod)
+			case "gtid_slave_pos":
+				dest = append(dest, &s.gtidSlavePos)
+			case "master_last_event_time":
+				dest = append(dest, &s.masterLastEventTime)
+			case "slave_last_event_time":
+				dest = append(dest, &s.slaveLastEventTime)
+			case "master_slave_time_diff":
+				dest = append(dest, &s.masterSlaveTimeDiff)
+			case "parallel_mode":
+				dest = append(dest, &s.parallelMode)
+			case "replicate_do_domain_ids":
+				dest = append(dest, &s.replicateDoDomainIDs)
+			case "replicate_ignore_domain_ids":
+				dest = append(dest, &s.replicateIgnoreDomainIDs)
 			default:
 				return nil, fmt.Errorf("unknown column name %s for replica status", col)
 			}
diff --git a/receiver/mysqlreceiver/go.mod b/receiver/mysqlreceiver/go.mod
index 44c73fe67420..9f70a56d04ab 100644
--- a/receiver/mysqlreceiver/go.mod
+++ b/receiver/mysqlreceiver/go.mod
@@ -48,6 +48,7 @@ require (
 	github.com/go-viper/mapstructure/v2 v2.1.0 // indirect
 	github.com/gogo/protobuf v1.3.2 // indirect
 	github.com/google/uuid v1.6.0 // indirect
+	github.com/hashicorp/go-version v1.7.0
 	github.com/json-iterator/go v1.1.12 // indirect
 	github.com/klauspost/compress v1.17.9 // indirect
 	github.com/knadh/koanf/maps v0.1.1 // indirect
diff --git a/receiver/mysqlreceiver/go.sum b/receiver/mysqlreceiver/go.sum
index cf30c4b18167..a054b1a95e2a 100644
--- a/receiver/mysqlreceiver/go.sum
+++ b/receiver/mysqlreceiver/go.sum
@@ -58,6 +58,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
 github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms=
 github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg=
+github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY=
+github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
 github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
 github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
 github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
diff --git a/receiver/mysqlreceiver/scraper_test.go b/receiver/mysqlreceiver/scraper_test.go
index 01dc4dd11840..8753752e948a 100644
--- a/receiver/mysqlreceiver/scraper_test.go
+++ b/receiver/mysqlreceiver/scraper_test.go
@@ -12,6 +12,7 @@ import (
 	"strings"
 	"testing"
 
+	"github.com/hashicorp/go-version"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 	"go.opentelemetry.io/collector/config/confignet"
@@ -158,8 +159,9 @@ func (c *mockClient) Connect() error {
 	return nil
 }
 
-func (c *mockClient) getVersion() (string, error) {
-	return "8.0.27", nil
+func (c *mockClient) getVersion() (*version.Version, error) {
+	version, _ := version.NewVersion("8.0.27")
+	return version, nil
 }
 
 func (c *mockClient) getGlobalStats() (map[string]string, error) {