diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5b694c028db..5c0c9efa79c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -32,6 +32,7 @@
 
 - **General:** Automatically release container image for ARM ([#2263]https://github.com/kedacore/keda/issues/2263))
 - **General:** Automatically run end-to-end tests on ARM ([#2262]https://github.com/kedacore/keda/issues/2262))
+- **General:** Introduce new Azure Data Explorer Scaler ([#1488](https://github.com/kedacore/keda/issues/1488))
 
 ### Improvements
 
diff --git a/go.mod b/go.mod
index bae8c89bf9d..cba453a2db8 100644
--- a/go.mod
+++ b/go.mod
@@ -7,6 +7,7 @@ require (
 	cloud.google.com/go/monitoring v1.2.0
 	github.com/Azure/azure-amqp-common-go/v3 v3.2.2
 	github.com/Azure/azure-event-hubs-go/v3 v3.3.16
+	github.com/Azure/azure-kusto-go v0.5.0
 	github.com/Azure/azure-sdk-for-go v61.4.0+incompatible
 	github.com/Azure/azure-service-bus-go v0.11.5
 	github.com/Azure/azure-storage-blob-go v0.14.0
diff --git a/go.sum b/go.sum
index 0252b3edc74..9090ba7148c 100644
--- a/go.sum
+++ b/go.sum
@@ -62,11 +62,14 @@ github.com/Azure/azure-amqp-common-go/v3 v3.2.2 h1:CJpxNAGxP7UBhDusRUoaOn0uOorQy
 github.com/Azure/azure-amqp-common-go/v3 v3.2.2/go.mod h1:O6X1iYHP7s2x7NjUKsXVhkwWrQhxrd+d8/3rRadj4CI=
 github.com/Azure/azure-event-hubs-go/v3 v3.3.16 h1:e3iHaU6Tgq5A8F313uIUWwwXtXAn75iIC6ekgzW6TG8=
 github.com/Azure/azure-event-hubs-go/v3 v3.3.16/go.mod h1:xgDvUi1+8/bb11WTEaU7VwZREYufzKzjWE4YiPZixb0=
+github.com/Azure/azure-kusto-go v0.5.0 h1:BUD6WK22gXlWp2x81uy8ztWyyAKoP8Mv2RU82XaiZhw=
+github.com/Azure/azure-kusto-go v0.5.0/go.mod h1:2xOhBxRcHyyNifFHmNMcqYL6AMdhyrUHCkEJkrZ+EI4=
 github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
 github.com/Azure/azure-pipeline-go v0.1.9/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
 github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
 github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
 github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
+github.com/Azure/azure-sdk-for-go v61.2.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
 github.com/Azure/azure-sdk-for-go v61.4.0+incompatible h1:BF2Pm3aQWIa6q9KmxyF1JYKYXtVw67vtvu2Wd54NGuY=
 github.com/Azure/azure-sdk-for-go v61.4.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
 github.com/Azure/azure-sdk-for-go/sdk/azcore v0.19.0/go.mod h1:h6H6c8enJmmocHUbLiiGY6sx7f9i+X3m1CHdd5c6Rdw=
@@ -388,6 +391,7 @@ github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/E
 github.com/gocql/gocql v0.0.0-20211222173705-d73e6b1002a7 h1:jmIMM+nEO+vjz9xaRIg9sZNtNLq5nsSbsxwe1OtRwv4=
 github.com/gocql/gocql v0.0.0-20211222173705-d73e6b1002a7/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJrHG8=
 github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
+github.com/gofrs/uuid v4.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
 github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
 github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
 github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
@@ -687,6 +691,7 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
 github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
 github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
 github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4FW1e6jwpg=
 github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
 github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
diff --git a/pkg/scalers/azure/azure_data_explorer.go b/pkg/scalers/azure/azure_data_explorer.go
new file mode 100644
index 00000000000..ed1f3444031
--- /dev/null
+++ b/pkg/scalers/azure/azure_data_explorer.go
@@ -0,0 +1,131 @@
+/*
+Copyright 2021 The KEDA Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package azure
+
+import (
+	"context"
+	"fmt"
+	"io"
+	"strconv"
+
+	"github.com/Azure/azure-kusto-go/kusto"
+	"github.com/Azure/azure-kusto-go/kusto/data/table"
+	"github.com/Azure/azure-kusto-go/kusto/unsafe"
+	"github.com/Azure/go-autorest/autorest/azure/auth"
+	logf "sigs.k8s.io/controller-runtime/pkg/log"
+)
+
+type DataExplorerMetadata struct {
+	ClientID     string
+	ClientSecret string
+	DatabaseName string
+	Endpoint     string
+	MetricName   string
+	PodIdentity  string
+	Query        string
+	TenantID     string
+	Threshold    int64
+}
+
+var azureDataExplorerLogger = logf.Log.WithName("azure_data_explorer_scaler")
+
+func CreateAzureDataExplorerClient(metadata *DataExplorerMetadata) (*kusto.Client, error) {
+	authConfig, err := getDataExplorerAuthConfig(metadata)
+	if err != nil {
+		return nil, fmt.Errorf("failed to get data explorer auth config: %v", err)
+	}
+
+	client, err := kusto.New(metadata.Endpoint, kusto.Authorization{Config: *authConfig})
+	if err != nil {
+		return nil, fmt.Errorf("failed to create kusto client: %v", err)
+	}
+
+	return client, nil
+}
+
+func getDataExplorerAuthConfig(metadata *DataExplorerMetadata) (*auth.AuthorizerConfig, error) {
+	var authConfig auth.AuthorizerConfig
+
+	if metadata.PodIdentity != "" {
+		authConfig = auth.NewMSIConfig()
+		azureDataExplorerLogger.V(1).Info("Creating Azure Data Explorer Client using Pod Identity")
+		return &authConfig, nil
+	}
+
+	if metadata.ClientID != "" && metadata.ClientSecret != "" && metadata.TenantID != "" {
+		authConfig = auth.NewClientCredentialsConfig(metadata.ClientID, metadata.ClientSecret, metadata.TenantID)
+		azureDataExplorerLogger.V(1).Info("Creating Azure Data Explorer Client using clientID, clientSecret and tenantID")
+		return &authConfig, nil
+	}
+
+	return nil, fmt.Errorf("missing credentials. please reconfigure your scaled object metadata")
+}
+
+func GetAzureDataExplorerMetricValue(ctx context.Context, client *kusto.Client, db string, query string) (int64, error) {
+	azureDataExplorerLogger.V(1).Info("Querying Azure Data Explorer", "db", db, "query", query)
+
+	iter, err := client.Query(ctx, db, kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: false})).UnsafeAdd(query))
+	if err != nil {
+		return -1, fmt.Errorf("failed to get azure data explorer metric result from query %s: %v", query, err)
+	}
+	defer iter.Stop()
+
+	row, err := iter.Next()
+	if err != nil {
+		return -1, fmt.Errorf("failed to get query %s result: %v", query, err)
+	}
+
+	if !row.ColumnTypes[0].Type.Valid() {
+		return -1, fmt.Errorf("column type %s is not valid", row.ColumnTypes[0].Type)
+	}
+
+	// Return error if there is more than one row.
+	_, err = iter.Next()
+	if err != io.EOF {
+		return -1, fmt.Errorf("query %s result had more than a single result row", query)
+	}
+
+	metricValue, err := extractDataExplorerMetricValue(row)
+	if err != nil {
+		return -1, fmt.Errorf("failed to extract value from query %s: %v", query, err)
+	}
+
+	return metricValue, nil
+}
+
+func extractDataExplorerMetricValue(row *table.Row) (int64, error) {
+	if row == nil || len(row.ColumnTypes) == 0 {
+		return -1, fmt.Errorf("query has no results")
+	}
+
+	// Query result validation.
+	dataType := row.ColumnTypes[0].Type
+	if dataType != "real" && dataType != "int" && dataType != "long" {
+		return -1, fmt.Errorf("data type %s is not valid", dataType)
+	}
+
+	value, err := strconv.Atoi(row.Values[0].String())
+	if err != nil {
+		return -1, fmt.Errorf("failed to convert result %s to int", row.Values[0].String())
+	}
+	if value < 0 {
+		return -1, fmt.Errorf("query result must be >= 0 but received: %d", value)
+	}
+
+	azureDataExplorerLogger.V(1).Info("Query Result", "value", value, "dataType", dataType)
+	return int64(value), nil
+}
diff --git a/pkg/scalers/azure/azure_data_explorer_test.go b/pkg/scalers/azure/azure_data_explorer_test.go
new file mode 100644
index 00000000000..8f467f6605c
--- /dev/null
+++ b/pkg/scalers/azure/azure_data_explorer_test.go
@@ -0,0 +1,100 @@
+/*
+Copyright 2021 The KEDA Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package azure
+
+import (
+	"testing"
+
+	"github.com/Azure/azure-kusto-go/kusto/data/errors"
+	"github.com/Azure/azure-kusto-go/kusto/data/table"
+	"github.com/Azure/azure-kusto-go/kusto/data/types"
+	"github.com/Azure/azure-kusto-go/kusto/data/value"
+)
+
+type testExtractDataExplorerMetricValue struct {
+	testRow *table.Row
+	isError bool
+}
+
+type testGetDataExplorerAuthConfig struct {
+	testMetadata *DataExplorerMetadata
+	isError      bool
+}
+
+var (
+	clientID                 = "test_client_id"
+	rowName                  = "result"
+	rowType     types.Column = "long"
+	rowValue    int64        = 3
+	podIdentity              = "Azure"
+	secret                   = "test_secret"
+	tenantID                 = "test_tenant_id"
+)
+
+var testExtractDataExplorerMetricValues = []testExtractDataExplorerMetricValue{
+	// pass
+	{testRow: &table.Row{ColumnTypes: table.Columns{{Name: rowName, Type: rowType}}, Values: value.Values{value.Long{Value: rowValue, Valid: true}}, Op: errors.OpQuery}, isError: false},
+	// nil row - fail
+	{testRow: nil, isError: true},
+	// Empty row - fail
+	{testRow: &table.Row{}, isError: true},
+	// Metric value is not bigger than 0 - fail
+	{testRow: &table.Row{ColumnTypes: table.Columns{{Name: rowName, Type: rowType}}, Values: value.Values{value.Long{Value: -1, Valid: true}}, Op: errors.OpQuery}, isError: true},
+	// Metric result is invalid - fail
+	{testRow: &table.Row{ColumnTypes: table.Columns{{Name: rowName, Type: rowType}}, Values: value.Values{value.String{Value: "invalid", Valid: true}}, Op: errors.OpQuery}, isError: true},
+	// Metric Type is not valid - fail
+	{testRow: &table.Row{ColumnTypes: table.Columns{{Name: rowName, Type: "String"}}, Values: value.Values{value.Long{Value: rowValue, Valid: true}}, Op: errors.OpQuery}, isError: true},
+}
+
+var testGetDataExplorerAuthConfigs = []testGetDataExplorerAuthConfig{
+	// Auth with aad app - pass
+	{testMetadata: &DataExplorerMetadata{ClientID: clientID, ClientSecret: secret, TenantID: tenantID}, isError: false},
+	// Auth with podIdentity - pass
+	{testMetadata: &DataExplorerMetadata{PodIdentity: podIdentity}, isError: false},
+	// Empty metadata - fail
+	{testMetadata: &DataExplorerMetadata{}, isError: true},
+	// Empty tenantID - fail
+	{testMetadata: &DataExplorerMetadata{ClientID: clientID, ClientSecret: secret}, isError: true},
+	// Empty clientID - fail
+	{testMetadata: &DataExplorerMetadata{ClientSecret: secret, TenantID: tenantID}, isError: true},
+	// Empty clientSecret - fail
+	{testMetadata: &DataExplorerMetadata{ClientID: clientID, TenantID: tenantID}, isError: true},
+}
+
+func TestExtractDataExplorerMetricValue(t *testing.T) {
+	for _, testData := range testExtractDataExplorerMetricValues {
+		_, err := extractDataExplorerMetricValue(testData.testRow)
+		if err != nil && !testData.isError {
+			t.Error("Expected success but got error", err)
+		}
+		if testData.isError && err == nil {
+			t.Error("Expected error but got success")
+		}
+	}
+}
+
+func TestGetDataExplorerAuthConfig(t *testing.T) {
+	for _, testData := range testGetDataExplorerAuthConfigs {
+		_, err := getDataExplorerAuthConfig(testData.testMetadata)
+		if err != nil && !testData.isError {
+			t.Error("Expected success but got error", err)
+		}
+		if testData.isError && err == nil {
+			t.Error("Expected error but got success")
+		}
+	}
+}
diff --git a/pkg/scalers/azure_data_explorer_scaler.go b/pkg/scalers/azure_data_explorer_scaler.go
new file mode 100644
index 00000000000..35a26cf8613
--- /dev/null
+++ b/pkg/scalers/azure_data_explorer_scaler.go
@@ -0,0 +1,187 @@
+/*
+Copyright 2021 The KEDA Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package scalers
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+
+	"github.com/Azure/azure-kusto-go/kusto"
+	"k8s.io/api/autoscaling/v2beta2"
+	"k8s.io/apimachinery/pkg/api/resource"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/labels"
+	"k8s.io/metrics/pkg/apis/external_metrics"
+	logf "sigs.k8s.io/controller-runtime/pkg/log"
+
+	kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
+	"github.com/kedacore/keda/v2/pkg/scalers/azure"
+	kedautil "github.com/kedacore/keda/v2/pkg/util"
+)
+
+type azureDataExplorerScaler struct {
+	metadata  *azure.DataExplorerMetadata
+	client    *kusto.Client
+	name      string
+	namespace string
+}
+
+const adxName = "azure-data-explorer"
+
+var dataExplorerLogger = logf.Log.WithName("azure_data_explorer_scaler")
+
+func NewAzureDataExplorerScaler(config *ScalerConfig) (Scaler, error) {
+	metadata, err := parseAzureDataExplorerMetadata(config)
+	if err != nil {
+		return nil, fmt.Errorf("failed to parse azure data explorer metadata: %s", err)
+	}
+
+	client, err := azure.CreateAzureDataExplorerClient(metadata)
+	if err != nil {
+		return nil, fmt.Errorf("failed to create azure data explorer client: %s", err)
+	}
+
+	return &azureDataExplorerScaler{
+		metadata:  metadata,
+		client:    client,
+		name:      config.Name,
+		namespace: config.Namespace,
+	}, nil
+}
+
+func parseAzureDataExplorerMetadata(config *ScalerConfig) (*azure.DataExplorerMetadata, error) {
+	metadata, err := parseAzureDataExplorerAuthParams(config)
+	if err != nil {
+		return nil, err
+	}
+
+	// Get database name.
+	databaseName, err := getParameterFromConfig(config, "databaseName", false)
+	if err != nil {
+		return nil, err
+	}
+	metadata.DatabaseName = databaseName
+
+	// Get endpoint.
+	endpoint, err := getParameterFromConfig(config, "endpoint", false)
+	if err != nil {
+		return nil, err
+	}
+	metadata.Endpoint = endpoint
+
+	// Get query.
+	query, err := getParameterFromConfig(config, "query", false)
+	if err != nil {
+		return nil, err
+	}
+	metadata.Query = query
+
+	// Get threshold.
+	if val, ok := config.TriggerMetadata["threshold"]; ok {
+		threshold, err := strconv.ParseInt(val, 10, 64)
+		if err != nil {
+			return nil, fmt.Errorf("error parsing metadata. Details: can't parse threshold. Inner Error: %v", err)
+		}
+		metadata.Threshold = threshold
+	}
+
+	// Generate metricName.
+	metadata.MetricName = GenerateMetricNameWithIndex(config.ScalerIndex, kedautil.NormalizeString(fmt.Sprintf("%s-%s", adxName, metadata.DatabaseName)))
+
+	dataExplorerLogger.V(1).Info("Parsed azureDataExplorerMetadata",
+		"database", metadata.DatabaseName,
+		"endpoint", metadata.Endpoint,
+		"metricName", metadata.MetricName,
+		"query", metadata.Query,
+		"threshold", metadata.Threshold)
+
+	return metadata, nil
+}
+
+func parseAzureDataExplorerAuthParams(config *ScalerConfig) (*azure.DataExplorerMetadata, error) {
+	metadata := azure.DataExplorerMetadata{}
+
+	switch config.PodIdentity {
+	case kedav1alpha1.PodIdentityProviderAzure:
+		metadata.PodIdentity = string(config.PodIdentity)
+	case "", kedav1alpha1.PodIdentityProviderNone:
+		dataExplorerLogger.V(1).Info("Pod Identity is not provided. Trying to resolve clientId, clientSecret and tenantId.")
+
+		tenantID, err := getParameterFromConfig(config, "tenantId", true)
+		if err != nil {
+			return nil, err
+		}
+		metadata.TenantID = tenantID
+
+		clientID, err := getParameterFromConfig(config, "clientId", true)
+		if err != nil {
+			return nil, err
+		}
+		metadata.ClientID = clientID
+
+		clientSecret, err := getParameterFromConfig(config, "clientSecret", true)
+		if err != nil {
+			return nil, err
+		}
+		metadata.ClientSecret = clientSecret
+	default:
+		return nil, fmt.Errorf("error parsing auth params")
+	}
+	return &metadata, nil
+}
+
+func (s azureDataExplorerScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
+	metricValue, err := azure.GetAzureDataExplorerMetricValue(ctx, s.client, s.metadata.DatabaseName, s.metadata.Query)
+	if err != nil {
+		return []external_metrics.ExternalMetricValue{}, fmt.Errorf("failed to get metrics for scaled object %s in namespace %s: %v", s.name, s.namespace, err)
+	}
+
+	metric := external_metrics.ExternalMetricValue{
+		MetricName: metricName,
+		Value:      *resource.NewQuantity(metricValue, resource.DecimalSI),
+		Timestamp:  metav1.Now(),
+	}
+	return append([]external_metrics.ExternalMetricValue{}, metric), nil
+}
+
+func (s azureDataExplorerScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
+	externalMetric := &v2beta2.ExternalMetricSource{
+		Metric: v2beta2.MetricIdentifier{
+			Name: s.metadata.MetricName,
+		},
+		Target: v2beta2.MetricTarget{
+			Type:         v2beta2.AverageValueMetricType,
+			AverageValue: resource.NewQuantity(s.metadata.Threshold, resource.DecimalSI),
+		},
+	}
+	metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType}
+	return []v2beta2.MetricSpec{metricSpec}
+}
+
+func (s azureDataExplorerScaler) IsActive(ctx context.Context) (bool, error) {
+	metricValue, err := azure.GetAzureDataExplorerMetricValue(ctx, s.client, s.metadata.DatabaseName, s.metadata.Query)
+	if err != nil {
+		return false, fmt.Errorf("failed to get azure data explorer metric value: %s", err)
+	}
+
+	return metricValue > 0, nil
+}
+
+func (s azureDataExplorerScaler) Close(context.Context) error {
+	return nil
+}
diff --git a/pkg/scalers/azure_data_explorer_scaler_test.go b/pkg/scalers/azure_data_explorer_scaler_test.go
new file mode 100644
index 00000000000..00c82709f60
--- /dev/null
+++ b/pkg/scalers/azure_data_explorer_scaler_test.go
@@ -0,0 +1,157 @@
+/*
+Copyright 2021 The KEDA Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package scalers
+
+import (
+	"context"
+	"fmt"
+	"testing"
+
+	kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
+	kedautil "github.com/kedacore/keda/v2/pkg/util"
+)
+
+type parseDataExplorerMetadataTestData struct {
+	metadata map[string]string
+	isError  bool
+}
+
+type dataExplorerMetricIdentifier struct {
+	metadataTestData *parseDataExplorerMetadataTestData
+	scalerIndex      int
+	name             string
+}
+
+var (
+	aadAppClientID        = "eebdbbab-cf74-4791-a5c6-1ef5d90b1fa8"
+	aadAppSecret          = "test_app_secret"
+	azureTenantID         = "8fe57c22-02b1-4b87-8c24-ae21dea4fa6a"
+	databaseName          = "test_database"
+	dataExplorerQuery     = "print 3"
+	dataExplorerThreshold = "1"
+	dataExplorerEndpoint  = "https://test-keda-e2e.eastus.kusto.windows.net"
+)
+
+// Valid auth params with aad application and passwd
+var dataExplorerResolvedEnv = map[string]string{
+	"tenantId":     azureTenantID,
+	"clientId":     aadAppClientID,
+	"clientSecret": aadAppSecret,
+}
+
+var testDataExplorerMetadataWithClientAndSecret = []parseDataExplorerMetadataTestData{
+	// Empty metadata - fail
+	{map[string]string{}, true},
+	// Missing tenantId - fail
+	{map[string]string{"tenantId": "", "clientId": aadAppClientID, "clientSecret": aadAppSecret, "endpoint": dataExplorerEndpoint, "databaseName": databaseName, "query": dataExplorerQuery, "threshold": dataExplorerThreshold}, true},
+	// Missing clientId - fail
+	{map[string]string{"tenantId": azureTenantID, "clientId": "", "clientSecret": aadAppSecret, "endpoint": dataExplorerEndpoint, "databaseName": databaseName, "query": dataExplorerQuery, "threshold": dataExplorerThreshold}, true},
+	// Missing clientSecret - fail
+	{map[string]string{"tenantId": azureTenantID, "clientId": aadAppClientID, "clientSecret": "", "endpoint": dataExplorerEndpoint, "databaseName": databaseName, "query": dataExplorerQuery, "threshold": dataExplorerThreshold}, true},
+	// Missing endpoint - fail
+	{map[string]string{"tenantId": azureTenantID, "clientId": aadAppClientID, "clientSecret": aadAppSecret, "endpoint": "", "databaseName": databaseName, "query": dataExplorerQuery, "threshold": dataExplorerThreshold}, true},
+	// Missing databaseName - fail
+	{map[string]string{"tenantId": azureTenantID, "clientId": aadAppClientID, "clientSecret": aadAppSecret, "endpoint": dataExplorerEndpoint, "databaseName": "", "query": dataExplorerQuery, "threshold": dataExplorerThreshold}, true},
+	// Missing query - fail
+	{map[string]string{"tenantId": azureTenantID, "clientId": aadAppClientID, "clientSecret": aadAppSecret, "endpoint": dataExplorerEndpoint, "databaseName": databaseName, "query": "", "threshold": dataExplorerThreshold}, true},
+	// Missing threshold - fail
+	{map[string]string{"tenantId": azureTenantID, "clientId": aadAppClientID, "clientSecret": aadAppSecret, "endpoint": dataExplorerEndpoint, "databaseName": databaseName, "query": dataExplorerQuery, "threshold": ""}, true},
+	// All parameters set - pass
+	{map[string]string{"tenantId": azureTenantID, "clientId": aadAppClientID, "clientSecret": aadAppSecret, "endpoint": dataExplorerEndpoint, "databaseName": databaseName, "query": dataExplorerQuery, "threshold": dataExplorerThreshold}, false},
+}
+
+var testDataExplorerMetadataWithPodIdentity = []parseDataExplorerMetadataTestData{
+	// Empty metadata - fail
+	{map[string]string{}, true},
+	// Missing endpoint - fail
+	{map[string]string{"tenantId": azureTenantID, "clientId": aadAppClientID, "clientSecret": aadAppSecret, "endpoint": "", "databaseName": databaseName, "query": dataExplorerQuery, "threshold": dataExplorerThreshold}, true},
+	// Missing query - fail
+	{map[string]string{"tenantId": azureTenantID, "clientId": aadAppClientID, "clientSecret": aadAppSecret, "endpoint": dataExplorerEndpoint, "databaseName": databaseName, "query": "", "threshold": dataExplorerThreshold}, true},
+	// Missing threshold - fail
+	{map[string]string{"tenantId": azureTenantID, "clientId": aadAppClientID, "clientSecret": aadAppSecret, "endpoint": dataExplorerEndpoint, "databaseName": databaseName, "query": dataExplorerQuery, "threshold": ""}, true},
+	// All parameters set - pass
+	{map[string]string{"tenantId": azureTenantID, "clientId": aadAppClientID, "clientSecret": aadAppSecret, "endpoint": dataExplorerEndpoint, "databaseName": databaseName, "query": dataExplorerQuery, "threshold": dataExplorerThreshold}, false},
+}
+
+var testDataExplorerMetricIdentifiers = []dataExplorerMetricIdentifier{
+	{&testDataExplorerMetadataWithClientAndSecret[len(testDataExplorerMetadataWithClientAndSecret)-1], 0, GenerateMetricNameWithIndex(0, kedautil.NormalizeString(fmt.Sprintf("%s-%s", adxName, databaseName)))},
+	{&testDataExplorerMetadataWithPodIdentity[len(testDataExplorerMetadataWithPodIdentity)-1], 1, GenerateMetricNameWithIndex(1, kedautil.NormalizeString(fmt.Sprintf("%s-%s", adxName, databaseName)))},
+}
+
+func TestDataExplorerParseMetadata(t *testing.T) {
+	// Auth through clientId, clientSecret and tenantId
+	for _, testData := range testDataExplorerMetadataWithClientAndSecret {
+		_, err := parseAzureDataExplorerMetadata(
+			&ScalerConfig{
+				ResolvedEnv:     dataExplorerResolvedEnv,
+				TriggerMetadata: testData.metadata,
+				AuthParams:      map[string]string{},
+				PodIdentity:     ""})
+
+		if err != nil && !testData.isError {
+			t.Error("Expected success but got error", err)
+		}
+		if testData.isError && err == nil {
+			t.Error("Expected error but got success")
+		}
+	}
+
+	// Auth through Pod Identity
+	for _, testData := range testDataExplorerMetadataWithPodIdentity {
+		_, err := parseAzureDataExplorerMetadata(
+			&ScalerConfig{
+				ResolvedEnv:     dataExplorerResolvedEnv,
+				TriggerMetadata: testData.metadata,
+				AuthParams:      map[string]string{},
+				PodIdentity:     kedav1alpha1.PodIdentityProviderAzure})
+
+		if err != nil && !testData.isError {
+			t.Error("Expected success but got error", err)
+		}
+		if testData.isError && err == nil {
+			t.Error("Expected error but got success")
+		}
+	}
+}
+
+func TestDataExplorerGetMetricSpecForScaling(t *testing.T) {
+	for _, testData := range testDataExplorerMetricIdentifiers {
+		meta, err := parseAzureDataExplorerMetadata(
+			&ScalerConfig{
+				ResolvedEnv:     dataExplorerResolvedEnv,
+				TriggerMetadata: testData.metadataTestData.metadata,
+				AuthParams:      map[string]string{},
+				PodIdentity:     "",
+				ScalerIndex:     testData.scalerIndex})
+		if err != nil {
+			t.Error("Failed to parse metadata:", err)
+		}
+
+		mockDataExplorerScaler := azureDataExplorerScaler{
+			metadata:  meta,
+			client:    nil,
+			name:      "mock_scaled_object",
+			namespace: "mock_namespace",
+		}
+
+		metricSpec := mockDataExplorerScaler.GetMetricSpecForScaling(context.Background())
+		metricName := metricSpec[0].External.Metric.Name
+		if metricName != testData.name {
+			t.Error("Wrong External metric source name:", metricName)
+		}
+	}
+}
diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go
index 9f384ee24f2..3e98f28b39f 100644
--- a/pkg/scaling/scale_handler.go
+++ b/pkg/scaling/scale_handler.go
@@ -362,6 +362,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string,
 		return scalers.NewAzureAppInsightsScaler(config)
 	case "azure-blob":
 		return scalers.NewAzureBlobScaler(config)
+	case "azure-data-explorer":
+		return scalers.NewAzureDataExplorerScaler(config)
 	case "azure-eventhub":
 		return scalers.NewAzureEventHubScaler(config)
 	case "azure-log-analytics":
diff --git a/tests/scalers/azure-data-explorer.test.ts b/tests/scalers/azure-data-explorer.test.ts
new file mode 100644
index 00000000000..62360bc810b
--- /dev/null
+++ b/tests/scalers/azure-data-explorer.test.ts
@@ -0,0 +1,245 @@
+import * as fs from 'fs'
+import * as sh from 'shelljs'
+import * as tmp from 'tmp'
+import test from 'ava'
+
+const dataExplorerDb = process.env['AZURE_DATA_EXPLORER_DB']
+const dataExplorerEndpoint = process.env['AZURE_DATA_EXPLORER_ENDPOINT']
+const spId = process.env['AZURE_SP_ID']
+const spSecret = process.env['AZURE_SP_KEY']
+const spTenantId = process.env['AZURE_SP_TENANT']
+
+const testName = 'test-azure-data-explorer'
+const dataExplorerNamespace = `${testName}-ns`
+const scaleInDesiredReplicaCount = '0'
+const scaleInMetricValue = '0'
+const scaleOutDesiredReplicaCount = '4'
+const scaleOutMetricValue = '18'
+const scaledObjectName = `${testName}-scaled-object`
+const secretName = `${testName}-secret`
+const serviceName = `${testName}-sts`
+const triggerAuthThroughClientAndSecretName = `${testName}-trigger-auth-client-and-secret`
+const triggerAuthThroughPodIdentityName = `${testName}-trigger-auth-pod-identity`
+
+test.before(t => {
+    if (!spId || !spSecret || !spTenantId) {
+        t.fail('required parameters for data explorer e2e test were not resolved')
+    }
+
+    sh.config.silent = true
+
+    // Clean namespace if it exists. Otherwise, create new one.
+    if (sh.exec(`kubectl get namespace ${dataExplorerNamespace}`).code === 0) {
+        t.is(
+            0,
+            sh.exec(`kubectl delete all --all -n ${dataExplorerNamespace}`).code,
+            'Clean namespace should work.')
+    }
+    else {
+        t.is(
+            0,
+            sh.exec(`kubectl create namespace ${dataExplorerNamespace}`).code,
+            'Create namespace should work.')
+    }
+
+    // Create secret
+    const secretFile = tmp.fileSync()
+    fs.writeFileSync(
+        secretFile.name,
+        secretYaml
+            .replace('{{CLIENT_ID}}', Buffer.from(spId).toString('base64'))
+            .replace('{{CLIENT_SECRET}}', Buffer.from(spSecret).toString('base64'))
+            .replace('{{TENANT_ID}}', Buffer.from(spTenantId).toString('base64')))
+    t.is(
+        0,
+        sh.exec(`kubectl apply -f ${secretFile.name} -n ${dataExplorerNamespace}`).code,
+        'Creating a secret should work.')
+
+    // Create deployment
+    t.is(
+        0,
+        sh.exec(`kubectl apply -f ${createYamlFile(stsYaml)} -n ${dataExplorerNamespace}`).code,
+        'Creating a statefulset should work.')
+
+    // Validate initial replica count
+    const replicaCount = sh.exec(`kubectl get sts ${serviceName} -n ${dataExplorerNamespace} -o jsonpath="{.spec.replicas}"`).stdout
+    t.is(
+        replicaCount,
+        scaleInDesiredReplicaCount,
+        `Replica count should start with ${scaleInDesiredReplicaCount} replicas.`)
+})
+
+test.serial.cb(`Replica count should be scaled out to ${scaleOutDesiredReplicaCount} replicas [Pod Identity]`, t => {
+    // Create trigger auth through Pod Identity
+    t.is(
+        0,
+        sh.exec(`kubectl apply -f ${createYamlFile(triggerAuthPodIdentityYaml)}`).code,
+        'Creating a trigger auth should work.')
+
+    // Create scaled object
+    t.is(
+        0,
+        sh.exec(`kubectl apply -f ${createYamlFile(scaledObjectYaml)}`).code,
+        'Creating a scaled object should work.')
+
+    // Test scale out [Pod Identity]
+    testDeploymentScale(t, scaleOutDesiredReplicaCount)
+    t.end()
+})
+
+test.serial.cb(`Replica count should be scaled in to ${scaleInDesiredReplicaCount} replicas [Pod Identity]`, t => {
+    // Edit azure data explorer query in order to scale down to 0 replicas
+    const scaledObjectFile = tmp.fileSync()
+    fs.writeFileSync(scaledObjectFile.name, scaledObjectYaml.replace(scaleOutMetricValue, scaleInMetricValue))
+    t.is(
+        0,
+        sh.exec(`kubectl apply -f ${scaledObjectFile.name} -n ${dataExplorerNamespace}`).code,
+        'Edit scaled object query should work.')
+
+    // Test scale in [Pod Identity]
+    testDeploymentScale(t, scaleInDesiredReplicaCount)
+    t.end()
+})
+
+test.serial.cb(`Replica count should be scaled out to ${scaleOutDesiredReplicaCount} replicas [clientId & clientSecret]`, t => {
+    // Create trigger auth through clientId, clientSecret and tenantId
+    t.is(
+        0,
+        sh.exec(`kubectl apply -f ${createYamlFile(triggerAuthClientIdAndSecretYaml)} -n ${dataExplorerNamespace}`).code,
+        'Change trigger of scaled object auth from pod identity to aad app should work.')
+
+    // Change trigger auth of scaled object from pod identity to clientId and clientSecret
+    const scaledObjectFile = tmp.fileSync()
+    fs.writeFileSync(scaledObjectFile.name, scaledObjectYaml.replace(triggerAuthThroughPodIdentityName, triggerAuthThroughClientAndSecretName))
+    t.is(
+        0,
+        sh.exec(`kubectl apply -f ${scaledObjectFile.name} -n ${dataExplorerNamespace}`).code,
+        'Change trigger of scaled object auth from pod identity to aad app should work.')
+
+    // Test scale out [clientId & clientSecret]
+    testDeploymentScale(t, scaleOutDesiredReplicaCount)
+    t.end()
+})
+
+test.after.always.cb('Clean up E2E K8s objects', t => {
+    const resources = [
+        `scaledobject.keda.sh/${scaledObjectName}`,
+        `triggerauthentications.keda.sh/${triggerAuthThroughClientAndSecretName}`,
+        `triggerauthentications.keda.sh/${triggerAuthThroughPodIdentityName}`,
+        `statefulsets.apps/${serviceName}`,
+        `v1/${secretName}`,
+        `v1/${dataExplorerNamespace}`
+    ]
+
+    for (const resource of resources) {
+        sh.exec(`kubectl delete ${resource} -n ${dataExplorerNamespace}`)
+    }
+
+    t.end()
+})
+
+function testDeploymentScale(t, desiredReplicaCount: string) {
+    let currentReplicaCount = '-1'
+    for (let i = 0; i < 120 && currentReplicaCount !== desiredReplicaCount; i++) {
+        currentReplicaCount = sh.exec(`kubectl get sts ${serviceName} -n ${dataExplorerNamespace} -o jsonpath="{.spec.replicas}"`).stdout
+        if (currentReplicaCount !== desiredReplicaCount) {
+            sh.exec(`sleep 2s`)
+        }
+    }
+    t.is(desiredReplicaCount, currentReplicaCount, `Replica count should be ${desiredReplicaCount} after some time`)
+}
+
+function createYamlFile(yaml: string) {
+    const tmpFile = tmp.fileSync()
+    fs.writeFileSync(tmpFile.name, yaml)
+    return tmpFile.name
+}
+
+const stsYaml =
+`apiVersion: apps/v1
+kind: StatefulSet
+metadata:
+  name: ${serviceName}
+  namespace: ${dataExplorerNamespace}
+spec:
+  serviceName: ${serviceName}
+  replicas: ${scaleInDesiredReplicaCount}
+  selector:
+    matchLabels:
+      app: ${serviceName}
+  template:
+    metadata:
+      labels:
+        app: ${serviceName}
+    spec:
+      containers:
+      - name: nginx
+        image: nginx:1.16.1`
+
+const scaledObjectYaml =
+`apiVersion: keda.sh/v1alpha1
+kind: ScaledObject
+metadata:
+  name: ${scaledObjectName}
+  namespace: ${dataExplorerNamespace}
+  labels:
+    deploymentName: ${serviceName}
+spec:
+  scaleTargetRef:
+    kind: StatefulSet
+    name: ${serviceName}
+  cooldownPeriod: 10
+  minReplicaCount: 0
+  maxReplicaCount: 10
+  pollingInterval: 30
+  triggers:
+  - type: azure-data-explorer
+    metadata:
+      databaseName: ${dataExplorerDb}
+      endpoint: ${dataExplorerEndpoint}
+      query: print result = ${scaleOutMetricValue}
+      threshold: "5"
+    authenticationRef:
+      name: ${triggerAuthThroughPodIdentityName}`
+
+// K8s manifests for auth through Pod Identity
+const triggerAuthPodIdentityYaml =
+`apiVersion: keda.sh/v1alpha1
+kind: TriggerAuthentication
+metadata:
+  name: ${triggerAuthThroughPodIdentityName}
+  namespace: ${dataExplorerNamespace}
+spec:
+  podIdentity:
+    provider: azure`
+
+// K8s manifests for auth through clientId, clientSecret and tenantId
+const secretYaml =
+`apiVersion: v1
+kind: Secret
+metadata:
+  name: ${secretName}
+  namespace: ${dataExplorerNamespace}
+type: Opaque
+data:
+  clientId: {{CLIENT_ID}}
+  clientSecret: {{CLIENT_SECRET}}
+  tenantId: {{TENANT_ID}}`
+
+const triggerAuthClientIdAndSecretYaml =
+`apiVersion: keda.sh/v1alpha1
+kind: TriggerAuthentication
+metadata:
+  name: ${triggerAuthThroughClientAndSecretName}
+  namespace: ${dataExplorerNamespace}
+spec:
+  secretTargetRef:
+    - parameter: clientId
+      name: ${secretName}
+      key: clientId
+    - parameter: clientSecret
+      name: ${secretName}
+      key: clientSecret
+    - parameter: tenantId
+      name: ${secretName}
+      key: tenantId`