From 457b2e2beb425c2aa8210c04ca5e7be48234ab50 Mon Sep 17 00:00:00 2001
From: Peng Huo <penghuo@gmail.com>
Date: Thu, 17 Nov 2022 21:58:19 -0800
Subject: [PATCH 1/3] Move DataSourceServiceImpl to core module

Signed-off-by: Peng Huo <penghuo@gmail.com>
---
 .../sql/datasource/DataSourceService.java     |  25 ++-
 .../sql/datasource/DataSourceServiceImpl.java |  94 +++++++++
 .../sql/datasource/model/DataSource.java      |   5 +-
 .../datasource/model/DataSourceMetadata.java  |  20 +-
 ...ConnectorType.java => DataSourceType.java} |   2 +-
 .../sql/storage/DataSourceFactory.java        |  31 +++
 .../sql/storage/StorageEngineFactory.java     |  19 --
 .../sql/analysis/AnalyzerTestBase.java        |  17 +-
 .../catalog/DataSourceServiceImplTest.java    | 140 +++++++++++++
 .../model/auth/AuthenticationTypeTest.java    |  30 +++
 .../datasource/DataSourceTableScanTest.java   |   6 +-
 .../org/opensearch/sql/ppl/StandaloneIT.java  |  16 +-
 .../storage/OpenSearchDataSourceFactory.java  |  34 ++++
 .../OpenSearchDataSourceFactoryTest.java      |  51 +++++
 .../org/opensearch/sql/plugin/SQLPlugin.java  |  87 ++++++--
 .../datasource/DataSourceServiceImpl.java     | 191 ------------------
 .../datasource/DataSourceServiceImplTest.java | 167 ++++++++-------
 .../resources/datasource_missing_name.json    |  11 -
 .../src/test/resources/empty_datasource.json  |   1 -
 .../resources/illegal_datasource_name.json    |  12 --
 .../storage/PrometheusStorageFactory.java     |  22 +-
 .../storage/PrometheusStorageFactoryTest.java |  22 +-
 22 files changed, 634 insertions(+), 369 deletions(-)
 create mode 100644 core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java
 rename core/src/main/java/org/opensearch/sql/datasource/model/{ConnectorType.java => DataSourceType.java} (84%)
 create mode 100644 core/src/main/java/org/opensearch/sql/storage/DataSourceFactory.java
 delete mode 100644 core/src/main/java/org/opensearch/sql/storage/StorageEngineFactory.java
 create mode 100644 core/src/test/java/org/opensearch/sql/catalog/DataSourceServiceImplTest.java
 create mode 100644 core/src/test/java/org/opensearch/sql/catalog/model/auth/AuthenticationTypeTest.java
 create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactory.java
 create mode 100644 opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactoryTest.java
 delete mode 100644 plugin/src/main/java/org/opensearch/sql/plugin/datasource/DataSourceServiceImpl.java
 delete mode 100644 plugin/src/test/resources/datasource_missing_name.json
 delete mode 100644 plugin/src/test/resources/empty_datasource.json
 delete mode 100644 plugin/src/test/resources/illegal_datasource_name.json

diff --git a/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java b/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java
index 88ba8e508c..6a2c7f46d3 100644
--- a/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java
+++ b/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java
@@ -7,34 +7,37 @@
 
 import java.util.Set;
 import org.opensearch.sql.datasource.model.DataSource;
-import org.opensearch.sql.storage.StorageEngine;
+import org.opensearch.sql.datasource.model.DataSourceMetadata;
 
 /**
- * DataSource Service manages datasources.
+ * DataSource Service manage {@link DataSource}.
  */
 public interface DataSourceService {
 
   /**
-   * Returns all datasource objects.
+   * Returns all DataSource objects.
    *
-   * @return DataSource datasources.
+   * @return set of {@link DataSource}.
    */
   Set<DataSource> getDataSources();
 
   /**
-   * Returns DataSource with corresponding to the datasource name.
+   * Returns {@link DataSource} with corresponding to the DataSource name.
    *
-   * @param dataSourceName Name of the datasource.
-   * @return DataSource datasource.
+   * @param dataSourceName Name of the {@link DataSource}.
+   * @return {@link DataSource}.
    */
   DataSource getDataSource(String dataSourceName);
 
   /**
-   * Default opensearch engine is not defined in datasources config.
-   * So the registration of default datasource happens separately.
+   * Register {@link DataSource} defined by {@link DataSourceMetadata}.
    *
-   * @param storageEngine StorageEngine.
+   * @param dataSourceMetadata {@link DataSourceMetadata}.
    */
-  void registerDefaultOpenSearchDataSource(StorageEngine storageEngine);
+  void addDataSource(DataSourceMetadata dataSourceMetadata);
 
+  /**
+   * remove all the registered {@link DataSource}.
+   */
+  void clear();
 }
diff --git a/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java b/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java
new file mode 100644
index 0000000000..ab1de8256d
--- /dev/null
+++ b/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.sql.datasource;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.opensearch.sql.datasource.model.DataSource;
+import org.opensearch.sql.datasource.model.DataSourceMetadata;
+import org.opensearch.sql.datasource.model.DataSourceType;
+import org.opensearch.sql.storage.DataSourceFactory;
+
+/**
+ * Default implementation of {@link DataSourceService}. It is per-jvm single instance.
+ *
+ * <p>{@link DataSourceService} is constructed by the list of {@link DataSourceFactory} at service
+ * bootstrap time. The set of {@link DataSourceFactory} is immutable. Client could add {@link
+ * DataSource} defined by {@link DataSourceMetadata} at any time. {@link DataSourceService} use
+ * {@link DataSourceFactory} to create {@link DataSource}.
+ */
+public class DataSourceServiceImpl implements DataSourceService {
+
+  private static String DATASOURCE_NAME_REGEX = "[@*A-Za-z]+?[*a-zA-Z_\\-0-9]*";
+
+  private final ConcurrentHashMap<String, DataSource> dataSourceMap;
+
+  private final Map<DataSourceType, DataSourceFactory> dataSourceFactoryMap;
+
+  /**
+   * Construct from the set of {@link DataSourceFactory} at bootstrap time.
+   */
+  public DataSourceServiceImpl(Set<DataSourceFactory> dataSourceFactories) {
+    dataSourceFactoryMap =
+        dataSourceFactories.stream()
+            .collect(Collectors.toMap(DataSourceFactory::getDataSourceType, f -> f));
+    dataSourceMap = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public Set<DataSource> getDataSources() {
+    return new HashSet<>(dataSourceMap.values());
+  }
+
+  @Override
+  public DataSource getDataSource(String dataSourceName) {
+    if (!dataSourceMap.containsKey(dataSourceName)) {
+      throw new IllegalArgumentException(
+          String.format("DataSource with name %s doesn't exist.", dataSourceName));
+    }
+    return dataSourceMap.get(dataSourceName);
+  }
+
+  @Override
+  public void addDataSource(DataSourceMetadata metadata) {
+    validateDataSource(metadata);
+    dataSourceMap.put(
+        metadata.getName(),
+        dataSourceFactoryMap.get(metadata.getConnector()).createDataSource(metadata));
+  }
+
+  @Override
+  public void clear() {
+    dataSourceMap.clear();
+  }
+
+  /**
+   * This can be moved to a different validator class when we introduce more connectors.
+   *
+   * @param metadata {@link DataSourceMetadata}.
+   */
+  private void validateDataSource(DataSourceMetadata metadata) {
+    if (StringUtils.isEmpty(metadata.getName())) {
+      throw new IllegalArgumentException(
+          "Missing Name Field from a DataSource. Name is a required parameter.");
+    }
+    if (!metadata.getName().matches(DATASOURCE_NAME_REGEX)) {
+      throw new IllegalArgumentException(
+          String.format(
+              "DataSource Name: %s contains illegal characters. Allowed characters: a-zA-Z0-9_-*@.",
+              metadata.getName()));
+    }
+    if (Objects.isNull(metadata.getProperties())) {
+      throw new IllegalArgumentException(
+          "Missing properties field in catalog configuration. Properties are required parameters.");
+    }
+  }
+}
diff --git a/core/src/main/java/org/opensearch/sql/datasource/model/DataSource.java b/core/src/main/java/org/opensearch/sql/datasource/model/DataSource.java
index a6ac9a6d66..5deb460961 100644
--- a/core/src/main/java/org/opensearch/sql/datasource/model/DataSource.java
+++ b/core/src/main/java/org/opensearch/sql/datasource/model/DataSource.java
@@ -12,6 +12,9 @@
 import lombok.RequiredArgsConstructor;
 import org.opensearch.sql.storage.StorageEngine;
 
+/**
+ * Each user configured datasource mapping to one instance of DataSource per JVM.
+ */
 @Getter
 @RequiredArgsConstructor
 @EqualsAndHashCode
@@ -19,7 +22,7 @@ public class DataSource {
 
   private final String name;
 
-  private final ConnectorType connectorType;
+  private final DataSourceType connectorType;
 
   @EqualsAndHashCode.Exclude
   private final StorageEngine storageEngine;
diff --git a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java
index dbde5040e9..f97d272bb9 100644
--- a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java
+++ b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java
@@ -5,16 +5,23 @@
 
 package org.opensearch.sql.datasource.model;
 
+
+import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;
+
 import com.fasterxml.jackson.annotation.JsonFormat;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableMap;
 import java.util.Map;
+import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.Setter;
+import org.opensearch.sql.datasource.DataSourceService;
 
 @JsonIgnoreProperties(ignoreUnknown = true)
 @Getter
 @Setter
+@EqualsAndHashCode
 public class DataSourceMetadata {
 
   @JsonProperty(required = true)
@@ -22,9 +29,20 @@ public class DataSourceMetadata {
 
   @JsonProperty(required = true)
   @JsonFormat(with = JsonFormat.Feature.ACCEPT_CASE_INSENSITIVE_PROPERTIES)
-  private ConnectorType connector;
+  private DataSourceType connector;
 
   @JsonProperty(required = true)
   private Map<String, String> properties;
 
+  /**
+   * Default OpenSearch {@link DataSourceMetadata}. Which is used to register default OpenSearch
+   * {@link DataSource} to {@link DataSourceService}.
+   */
+  public static DataSourceMetadata defaultOpenSearchDataSourceMetadata() {
+    DataSourceMetadata dataSourceMetadata = new DataSourceMetadata();
+    dataSourceMetadata.setName(DEFAULT_DATASOURCE_NAME);
+    dataSourceMetadata.setConnector(DataSourceType.OPENSEARCH);
+    dataSourceMetadata.setProperties(ImmutableMap.of());
+    return dataSourceMetadata;
+  }
 }
diff --git a/core/src/main/java/org/opensearch/sql/datasource/model/ConnectorType.java b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java
similarity index 84%
rename from core/src/main/java/org/opensearch/sql/datasource/model/ConnectorType.java
rename to core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java
index b540d7d401..b6176de5b4 100644
--- a/core/src/main/java/org/opensearch/sql/datasource/model/ConnectorType.java
+++ b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java
@@ -5,6 +5,6 @@
 
 package org.opensearch.sql.datasource.model;
 
-public enum ConnectorType {
+public enum DataSourceType {
   PROMETHEUS,OPENSEARCH
 }
diff --git a/core/src/main/java/org/opensearch/sql/storage/DataSourceFactory.java b/core/src/main/java/org/opensearch/sql/storage/DataSourceFactory.java
new file mode 100644
index 0000000000..20d263e601
--- /dev/null
+++ b/core/src/main/java/org/opensearch/sql/storage/DataSourceFactory.java
@@ -0,0 +1,31 @@
+/*
+ *
+ *  * Copyright OpenSearch Contributors
+ *  * SPDX-License-Identifier: Apache-2.0
+ *
+ */
+
+package org.opensearch.sql.storage;
+
+import org.opensearch.sql.datasource.DataSourceService;
+import org.opensearch.sql.datasource.model.DataSource;
+import org.opensearch.sql.datasource.model.DataSourceMetadata;
+import org.opensearch.sql.datasource.model.DataSourceType;
+
+/**
+ * {@link DataSourceFactory} is used to create {@link DataSource} from {@link DataSourceMetadata}.
+ * Each data source define {@link DataSourceFactory} and register to {@link DataSourceService}.
+ * {@link DataSourceFactory} is one instance per JVM . Each {@link DataSourceType} mapping to one
+ * {@link DataSourceFactory}.
+ */
+public interface DataSourceFactory {
+  /**
+   * Get {@link DataSourceType}.
+   */
+  DataSourceType getDataSourceType();
+
+  /**
+   * Create {@link DataSource}.
+   */
+  DataSource createDataSource(DataSourceMetadata metadata);
+}
diff --git a/core/src/main/java/org/opensearch/sql/storage/StorageEngineFactory.java b/core/src/main/java/org/opensearch/sql/storage/StorageEngineFactory.java
deleted file mode 100644
index 85d29abf5c..0000000000
--- a/core/src/main/java/org/opensearch/sql/storage/StorageEngineFactory.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- *
- *  * Copyright OpenSearch Contributors
- *  * SPDX-License-Identifier: Apache-2.0
- *
- */
-
-package org.opensearch.sql.storage;
-
-import java.util.Map;
-import org.opensearch.sql.datasource.model.ConnectorType;
-
-public interface StorageEngineFactory {
-
-  ConnectorType getConnectorType();
-
-  StorageEngine getStorageEngine(String catalogName, Map<String, String> requiredConfig);
-
-}
diff --git a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java
index b1da7b3e86..ff421a1b79 100644
--- a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java
+++ b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java
@@ -22,10 +22,10 @@
 import org.opensearch.sql.config.TestConfig;
 import org.opensearch.sql.data.type.ExprType;
 import org.opensearch.sql.datasource.DataSourceService;
-import org.opensearch.sql.datasource.model.ConnectorType;
 import org.opensearch.sql.datasource.model.DataSource;
+import org.opensearch.sql.datasource.model.DataSourceMetadata;
+import org.opensearch.sql.datasource.model.DataSourceType;
 import org.opensearch.sql.exception.ExpressionEvaluationException;
-import org.opensearch.sql.expression.DSL;
 import org.opensearch.sql.expression.Expression;
 import org.opensearch.sql.expression.ReferenceExpression;
 import org.opensearch.sql.expression.env.Environment;
@@ -144,9 +144,7 @@ protected Environment<Expression, ExprType> typeEnv() {
   @Bean
   protected Analyzer analyzer(ExpressionAnalyzer expressionAnalyzer,
                       DataSourceService dataSourceService,
-                      StorageEngine storageEngine,
                       Table table) {
-    dataSourceService.registerDefaultOpenSearchDataSource(storageEngine);
     BuiltinFunctionRepository functionRepository = BuiltinFunctionRepository.getInstance();
     functionRepository.register("prometheus", new FunctionResolver() {
 
@@ -195,7 +193,7 @@ private class DefaultDataSourceService implements DataSourceService {
 
     private StorageEngine storageEngine = storageEngine();
     private final DataSource dataSource
-        = new DataSource("prometheus", ConnectorType.PROMETHEUS, storageEngine);
+        = new DataSource("prometheus", DataSourceType.PROMETHEUS, storageEngine);
 
 
     @Override
@@ -209,8 +207,13 @@ public DataSource getDataSource(String dataSourceName) {
     }
 
     @Override
-    public void registerDefaultOpenSearchDataSource(StorageEngine storageEngine) {
-      this.storageEngine = storageEngine;
+    public void addDataSource(DataSourceMetadata metadata) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void clear() {
+      throw new UnsupportedOperationException();
     }
   }
 
diff --git a/core/src/test/java/org/opensearch/sql/catalog/DataSourceServiceImplTest.java b/core/src/test/java/org/opensearch/sql/catalog/DataSourceServiceImplTest.java
new file mode 100644
index 0000000000..8470d87470
--- /dev/null
+++ b/core/src/test/java/org/opensearch/sql/catalog/DataSourceServiceImplTest.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.sql.catalog;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.when;
+import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.HashSet;
+import java.util.Map;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.opensearch.sql.datasource.DataSourceService;
+import org.opensearch.sql.datasource.DataSourceServiceImpl;
+import org.opensearch.sql.datasource.model.DataSource;
+import org.opensearch.sql.datasource.model.DataSourceMetadata;
+import org.opensearch.sql.datasource.model.DataSourceType;
+import org.opensearch.sql.storage.DataSourceFactory;
+import org.opensearch.sql.storage.StorageEngine;
+
+@ExtendWith(MockitoExtension.class)
+class DataSourceServiceImplTest {
+
+  static final String NAME = "opensearch";
+
+  @Mock private DataSourceFactory dataSourceFactory;
+
+  @Mock private StorageEngine storageEngine;
+
+  private DataSourceService dataSourceService;
+
+  @BeforeEach
+  public void setup() {
+    lenient()
+        .doAnswer(
+            invocation -> {
+              DataSourceMetadata metadata = invocation.getArgument(0);
+              return new DataSource(metadata.getName(), metadata.getConnector(), storageEngine);
+            })
+        .when(dataSourceFactory)
+        .createDataSource(any());
+    when(dataSourceFactory.getDataSourceType()).thenReturn(DataSourceType.OPENSEARCH);
+    dataSourceService =
+        new DataSourceServiceImpl(
+            new HashSet<>() {
+              {
+                add(dataSourceFactory);
+              }
+            });
+  }
+
+  @Test
+  void getDataSourceSuccess() {
+    dataSourceService.addDataSource(DataSourceMetadata.defaultOpenSearchDataSourceMetadata());
+
+    assertEquals(
+        new DataSource(DEFAULT_DATASOURCE_NAME, DataSourceType.OPENSEARCH, storageEngine),
+        dataSourceService.getDataSource(DEFAULT_DATASOURCE_NAME));
+  }
+
+  @Test
+  void getNotExistDataSourceShouldFail() {
+    IllegalArgumentException exception =
+        assertThrows(IllegalArgumentException.class, () -> dataSourceService.getDataSource("mock"));
+    assertEquals("DataSource with name mock doesn't exist.", exception.getMessage());
+  }
+
+  @Test
+  void getAddDataSourcesShouldSuccess() {
+    assertEquals(0, dataSourceService.getDataSources().size());
+
+    dataSourceService.addDataSource(metadata(NAME, DataSourceType.OPENSEARCH, ImmutableMap.of()));
+    assertEquals(1, dataSourceService.getDataSources().size());
+  }
+
+  @Test
+  void noDataSourceExistAfterClear() {
+    dataSourceService.addDataSource(metadata(NAME, DataSourceType.OPENSEARCH, ImmutableMap.of()));
+    assertEquals(1, dataSourceService.getDataSources().size());
+
+    dataSourceService.clear();
+    assertEquals(0, dataSourceService.getDataSources().size());
+  }
+
+  @Test
+  void metaDataMissingNameShouldFail() {
+    IllegalArgumentException exception =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                dataSourceService.addDataSource(
+                    metadata(null, DataSourceType.OPENSEARCH, ImmutableMap.of())));
+    assertEquals(
+        "Missing Name Field from a DataSource. Name is a required parameter.",
+        exception.getMessage());
+  }
+
+  @Test
+  void metaDataHasIllegalDataSourceNameShouldFail() {
+    IllegalArgumentException exception =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                dataSourceService.addDataSource(
+                    metadata("prometheus.test", DataSourceType.OPENSEARCH, ImmutableMap.of())));
+    assertEquals(
+        "DataSource Name: prometheus.test contains illegal characters. "
+            + "Allowed characters: a-zA-Z0-9_-*@.",
+        exception.getMessage());
+  }
+
+  @Test
+  void metaDataMissingPropertiesShouldFail() {
+    IllegalArgumentException exception =
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> dataSourceService.addDataSource(metadata(NAME, DataSourceType.OPENSEARCH, null)));
+    assertEquals(
+        "Missing properties field in catalog configuration. Properties are required parameters.",
+        exception.getMessage());
+  }
+
+  DataSourceMetadata metadata(String name, DataSourceType type, Map<String, String> properties) {
+    DataSourceMetadata dataSourceMetadata = new DataSourceMetadata();
+    dataSourceMetadata.setName(name);
+    dataSourceMetadata.setConnector(type);
+    dataSourceMetadata.setProperties(properties);
+    return dataSourceMetadata;
+  }
+}
diff --git a/core/src/test/java/org/opensearch/sql/catalog/model/auth/AuthenticationTypeTest.java b/core/src/test/java/org/opensearch/sql/catalog/model/auth/AuthenticationTypeTest.java
new file mode 100644
index 0000000000..1dd91517f3
--- /dev/null
+++ b/core/src/test/java/org/opensearch/sql/catalog/model/auth/AuthenticationTypeTest.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.sql.catalog.model.auth;
+
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import org.junit.jupiter.api.Test;
+import org.opensearch.sql.datasource.model.auth.AuthenticationType;
+
+class AuthenticationTypeTest {
+  @Test
+  void getAuthType() {
+    assertEquals(
+        AuthenticationType.BASICAUTH,
+        AuthenticationType.get(AuthenticationType.BASICAUTH.getName()));
+    assertEquals(
+        AuthenticationType.AWSSIGV4AUTH,
+        AuthenticationType.get(AuthenticationType.AWSSIGV4AUTH.getName()));
+  }
+
+  @Test
+  void getNotExistAuthType() {
+    assertNull(AuthenticationType.get("mock"));
+  }
+}
diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java
index a57d1b1a89..2f7188a248 100644
--- a/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java
+++ b/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java
@@ -24,8 +24,8 @@
 import org.opensearch.sql.data.model.ExprTupleValue;
 import org.opensearch.sql.data.model.ExprValueUtils;
 import org.opensearch.sql.datasource.DataSourceService;
-import org.opensearch.sql.datasource.model.ConnectorType;
 import org.opensearch.sql.datasource.model.DataSource;
+import org.opensearch.sql.datasource.model.DataSourceType;
 import org.opensearch.sql.storage.StorageEngine;
 
 @ExtendWith(MockitoExtension.class)
@@ -52,8 +52,8 @@ void testExplain() {
   @Test
   void testIterator() {
     Set<DataSource> dataSourceSet = new HashSet<>();
-    dataSourceSet.add(new DataSource("prometheus", ConnectorType.PROMETHEUS, storageEngine));
-    dataSourceSet.add(new DataSource("opensearch", ConnectorType.OPENSEARCH, storageEngine));
+    dataSourceSet.add(new DataSource("prometheus", DataSourceType.PROMETHEUS, storageEngine));
+    dataSourceSet.add(new DataSource("opensearch", DataSourceType.OPENSEARCH, storageEngine));
     when(dataSourceService.getDataSources()).thenReturn(dataSourceSet);
 
     assertFalse(dataSourceTableScan.hasNext());
diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java
index a3b341c4a8..b8abc4d45e 100644
--- a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java
+++ b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java
@@ -6,9 +6,11 @@
 
 package org.opensearch.sql.ppl;
 
+import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata;
 import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
@@ -23,6 +25,7 @@
 import org.opensearch.sql.common.response.ResponseListener;
 import org.opensearch.sql.common.setting.Settings;
 import org.opensearch.sql.datasource.DataSourceService;
+import org.opensearch.sql.datasource.DataSourceServiceImpl;
 import org.opensearch.sql.executor.DefaultQueryManager;
 import org.opensearch.sql.executor.ExecutionEngine;
 import org.opensearch.sql.executor.ExecutionEngine.QueryResponse;
@@ -35,14 +38,14 @@
 import org.opensearch.sql.opensearch.client.OpenSearchRestClient;
 import org.opensearch.sql.opensearch.executor.OpenSearchExecutionEngine;
 import org.opensearch.sql.opensearch.executor.protector.OpenSearchExecutionProtector;
-import org.opensearch.sql.opensearch.storage.OpenSearchStorageEngine;
+import org.opensearch.sql.opensearch.storage.OpenSearchDataSourceFactory;
 import org.opensearch.sql.planner.Planner;
 import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer;
-import org.opensearch.sql.plugin.datasource.DataSourceServiceImpl;
 import org.opensearch.sql.ppl.config.PPLServiceConfig;
 import org.opensearch.sql.ppl.domain.PPLQueryRequest;
 import org.opensearch.sql.protocol.response.QueryResult;
 import org.opensearch.sql.protocol.response.format.SimpleJsonResponseFormatter;
+import org.opensearch.sql.storage.DataSourceFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.config.ConfigurableBeanFactory;
 import org.springframework.context.annotation.AnnotationConfigApplicationContext;
@@ -72,9 +75,12 @@ public void init() {
         new OpenSearchExecutionProtector(new AlwaysHealthyMonitor())));
     context.registerBean(OpenSearchClient.class, () -> client);
     context.registerBean(Settings.class, () -> defaultSettings());
-    OpenSearchStorageEngine openSearchStorageEngine = new OpenSearchStorageEngine(client, defaultSettings());
-    DataSourceServiceImpl.getInstance().registerDefaultOpenSearchDataSource(openSearchStorageEngine);
-    context.registerBean(DataSourceService.class, DataSourceServiceImpl::getInstance);
+    DataSourceService dataSourceService = new DataSourceServiceImpl(
+        new ImmutableSet.Builder<DataSourceFactory>()
+            .add(new OpenSearchDataSourceFactory(client, defaultSettings()))
+            .build());
+    dataSourceService.addDataSource(defaultOpenSearchDataSourceMetadata());
+    context.registerBean(DataSourceService.class, () -> dataSourceService);
     context.register(StandaloneConfig.class);
     context.register(PPLServiceConfig.class);
     context.refresh();
diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactory.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactory.java
new file mode 100644
index 0000000000..011f6236fb
--- /dev/null
+++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.sql.opensearch.storage;
+
+import lombok.RequiredArgsConstructor;
+import org.opensearch.sql.common.setting.Settings;
+import org.opensearch.sql.datasource.model.DataSource;
+import org.opensearch.sql.datasource.model.DataSourceMetadata;
+import org.opensearch.sql.datasource.model.DataSourceType;
+import org.opensearch.sql.opensearch.client.OpenSearchClient;
+import org.opensearch.sql.storage.DataSourceFactory;
+
+@RequiredArgsConstructor
+public class OpenSearchDataSourceFactory implements DataSourceFactory {
+
+  /** OpenSearch client connection. */
+  private final OpenSearchClient client;
+
+  private final Settings settings;
+
+  @Override
+  public DataSourceType getDataSourceType() {
+    return DataSourceType.OPENSEARCH;
+  }
+
+  @Override
+  public DataSource createDataSource(DataSourceMetadata metadata) {
+    return new DataSource(metadata.getName(), DataSourceType.OPENSEARCH,
+        new OpenSearchStorageEngine(client, settings));
+  }
+}
diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactoryTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactoryTest.java
new file mode 100644
index 0000000000..a9e4e153fc
--- /dev/null
+++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactoryTest.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.sql.opensearch.storage;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.when;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.opensearch.sql.common.setting.Settings;
+import org.opensearch.sql.datasource.model.DataSource;
+import org.opensearch.sql.datasource.model.DataSourceMetadata;
+import org.opensearch.sql.datasource.model.DataSourceType;
+import org.opensearch.sql.opensearch.client.OpenSearchClient;
+
+@ExtendWith(MockitoExtension.class)
+class OpenSearchDataSourceFactoryTest {
+
+  @Mock private OpenSearchClient client;
+
+  @Mock private Settings settings;
+
+  @Mock private DataSourceMetadata dataSourceMetadata;
+
+  private OpenSearchDataSourceFactory factory;
+
+  @BeforeEach
+  public void setup() {
+    factory = new OpenSearchDataSourceFactory(client, settings);
+  }
+
+  @Test
+  void getDataSourceType() {
+    assertEquals(DataSourceType.OPENSEARCH, factory.getDataSourceType());
+  }
+
+  @Test
+  void createDataSource() {
+    when(dataSourceMetadata.getName()).thenReturn("opensearch");
+
+    DataSource dataSource = factory.createDataSource(dataSourceMetadata);
+    assertEquals("opensearch", dataSource.getName());
+    assertEquals(DataSourceType.OPENSEARCH, dataSource.getConnectorType());
+  }
+}
diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
index 137422d4ac..577b396d62 100644
--- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
+++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
@@ -5,13 +5,27 @@
 
 package org.opensearch.sql.plugin;
 
+import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Objects;
+import java.util.Set;
 import java.util.function.Supplier;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.opensearch.action.ActionRequest;
 import org.opensearch.action.ActionResponse;
 import org.opensearch.action.ActionType;
@@ -41,6 +55,9 @@
 import org.opensearch.script.ScriptEngine;
 import org.opensearch.script.ScriptService;
 import org.opensearch.sql.datasource.DataSourceService;
+import org.opensearch.sql.datasource.DataSourceServiceImpl;
+import org.opensearch.sql.datasource.model.DataSource;
+import org.opensearch.sql.datasource.model.DataSourceMetadata;
 import org.opensearch.sql.legacy.esdomain.LocalClusterState;
 import org.opensearch.sql.legacy.executor.AsyncRestExecutor;
 import org.opensearch.sql.legacy.metrics.Metrics;
@@ -50,11 +67,10 @@
 import org.opensearch.sql.opensearch.security.SecurityAccess;
 import org.opensearch.sql.opensearch.setting.LegacyOpenDistroSettings;
 import org.opensearch.sql.opensearch.setting.OpenSearchSettings;
-import org.opensearch.sql.opensearch.storage.OpenSearchStorageEngine;
+import org.opensearch.sql.opensearch.storage.OpenSearchDataSourceFactory;
 import org.opensearch.sql.opensearch.storage.script.ExpressionScriptEngine;
 import org.opensearch.sql.opensearch.storage.serialization.DefaultExpressionSerializer;
 import org.opensearch.sql.plugin.config.OpenSearchPluginConfig;
-import org.opensearch.sql.plugin.datasource.DataSourceServiceImpl;
 import org.opensearch.sql.plugin.datasource.DataSourceSettings;
 import org.opensearch.sql.plugin.rest.RestPPLQueryAction;
 import org.opensearch.sql.plugin.rest.RestPPLStatsAction;
@@ -63,8 +79,9 @@
 import org.opensearch.sql.plugin.transport.TransportPPLQueryAction;
 import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
 import org.opensearch.sql.ppl.config.PPLServiceConfig;
+import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
 import org.opensearch.sql.sql.config.SQLServiceConfig;
-import org.opensearch.sql.storage.StorageEngine;
+import org.opensearch.sql.storage.DataSourceFactory;
 import org.opensearch.threadpool.ExecutorBuilder;
 import org.opensearch.threadpool.FixedExecutorBuilder;
 import org.opensearch.threadpool.ThreadPool;
@@ -73,6 +90,8 @@
 
 public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin, ReloadablePlugin {
 
+  private static final Logger LOG = LogManager.getLogger();
+
   private ClusterService clusterService;
 
   /**
@@ -84,6 +103,8 @@ public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin, Rel
 
   private AnnotationConfigApplicationContext applicationContext;
 
+  private DataSourceService dataSourceService;
+
   public String name() {
     return "sql";
   }
@@ -142,9 +163,15 @@ public Collection<Object> createComponents(
     this.clusterService = clusterService;
     this.pluginSettings = new OpenSearchSettings(clusterService.getClusterSettings());
     this.client = (NodeClient) client;
-    DataSourceServiceImpl.getInstance().loadConnectors(clusterService.getSettings());
-    DataSourceServiceImpl.getInstance()
-        .registerDefaultOpenSearchDataSource(openSearchStorageEngine());
+    this.dataSourceService =
+        new DataSourceServiceImpl(
+            new ImmutableSet.Builder<DataSourceFactory>()
+                .add(new OpenSearchDataSourceFactory(
+                        new OpenSearchNodeClient(this.client), pluginSettings))
+                .add(new PrometheusStorageFactory())
+                .build());
+    dataSourceService.addDataSource(defaultOpenSearchDataSourceMetadata());
+    loadDataSources(dataSourceService, clusterService.getSettings());
     LocalClusterState.state().setClusterService(clusterService);
     LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings);
 
@@ -156,7 +183,7 @@ public Collection<Object> createComponents(
           applicationContext.registerBean(
               org.opensearch.sql.common.setting.Settings.class, () -> pluginSettings);
           applicationContext.registerBean(
-              DataSourceService.class, () -> DataSourceServiceImpl.getInstance());
+              DataSourceService.class, () -> dataSourceService);
           applicationContext.register(OpenSearchPluginConfig.class);
           applicationContext.register(PPLServiceConfig.class);
           applicationContext.register(SQLServiceConfig.class);
@@ -196,13 +223,49 @@ public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<
 
   @Override
   public void reload(Settings settings) {
-    DataSourceServiceImpl.getInstance().loadConnectors(settings);
-    DataSourceServiceImpl.getInstance()
-        .registerDefaultOpenSearchDataSource(openSearchStorageEngine());
+    dataSourceService.clear();
+    dataSourceService.addDataSource(defaultOpenSearchDataSourceMetadata());
+    loadDataSources(dataSourceService, settings);
   }
 
-  private StorageEngine openSearchStorageEngine() {
-    return new OpenSearchStorageEngine(new OpenSearchNodeClient(client), pluginSettings);
+  /**
+   * load {@link DataSource} from settings.
+   */
+  @VisibleForTesting
+  public static void loadDataSources(DataSourceService dataSourceService, Settings settings) {
+    SecurityAccess.doPrivileged(
+        () -> {
+          InputStream inputStream = DataSourceSettings.DATASOURCE_CONFIG.get(settings);
+          if (inputStream != null) {
+            ObjectMapper objectMapper = new ObjectMapper();
+            objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+            try {
+              List<DataSourceMetadata> metadataList =
+                  objectMapper.readValue(inputStream, new TypeReference<>() {});
+              verifyDuplicateName(metadataList);
+              metadataList.forEach(metadata -> dataSourceService.addDataSource(metadata));
+            } catch (IOException e) {
+              LOG.error(
+                  "DataSource Configuration File uploaded is malformed. Verify and re-upload.", e);
+            } catch (Throwable e) {
+              LOG.error("DataSource construction failed.", e);
+            }
+          }
+          return null;
+        });
   }
 
+  static void verifyDuplicateName(List<DataSourceMetadata> metadataList) {
+    Set<String> seenNames = new HashSet<>();
+    for (DataSourceMetadata metadata : metadataList) {
+      if (seenNames.contains(metadata.getName())) {
+        throw new IllegalArgumentException(
+            String.format(
+                Locale.ROOT,
+                "Datasource name should be unique, Duplicate datasource found %s",
+                metadata.getName()));
+      }
+      seenNames.add(metadata.getName());
+    }
+  }
 }
diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/datasource/DataSourceServiceImpl.java b/plugin/src/main/java/org/opensearch/sql/plugin/datasource/DataSourceServiceImpl.java
deleted file mode 100644
index be7cc70af0..0000000000
--- a/plugin/src/main/java/org/opensearch/sql/plugin/datasource/DataSourceServiceImpl.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Copyright OpenSearch Contributors
- * SPDX-License-Identifier: Apache-2.0
- */
-
-package org.opensearch.sql.plugin.datasource;
-
-import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.opensearch.common.settings.Settings;
-import org.opensearch.sql.datasource.DataSourceService;
-import org.opensearch.sql.datasource.model.ConnectorType;
-import org.opensearch.sql.datasource.model.DataSource;
-import org.opensearch.sql.datasource.model.DataSourceMetadata;
-import org.opensearch.sql.expression.function.BuiltinFunctionRepository;
-import org.opensearch.sql.opensearch.security.SecurityAccess;
-import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
-import org.opensearch.sql.storage.StorageEngine;
-import org.opensearch.sql.storage.StorageEngineFactory;
-
-/**
- * This class manages datasources and responsible for creating connectors to the datasources.
- */
-public class DataSourceServiceImpl implements DataSourceService {
-
-  private static final DataSourceServiceImpl INSTANCE = new DataSourceServiceImpl();
-
-  private static final String DATASOURCE_NAME_REGEX = "[@*A-Za-z]+?[*a-zA-Z_\\-0-9]*";
-
-  private static final Logger LOG = LogManager.getLogger();
-
-  private Map<String, DataSource> datasourceMap = new HashMap<>();
-
-  private final Map<ConnectorType, StorageEngineFactory> connectorTypeStorageEngineFactoryMap;
-
-  public static DataSourceServiceImpl getInstance() {
-    return INSTANCE;
-  }
-
-  private DataSourceServiceImpl() {
-    connectorTypeStorageEngineFactoryMap = new HashMap<>();
-    PrometheusStorageFactory prometheusStorageFactory = new PrometheusStorageFactory();
-    connectorTypeStorageEngineFactoryMap.put(prometheusStorageFactory.getConnectorType(),
-        prometheusStorageFactory);
-  }
-
-  /**
-   * This function reads settings and loads connectors to the data stores.
-   * This will be invoked during start up and also when settings are updated.
-   *
-   * @param settings settings.
-   */
-  public void loadConnectors(Settings settings) {
-    SecurityAccess.doPrivileged(() -> {
-      InputStream inputStream = DataSourceSettings.DATASOURCE_CONFIG.get(settings);
-      if (inputStream != null) {
-        ObjectMapper objectMapper = new ObjectMapper();
-        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-        try {
-          List<DataSourceMetadata> dataSourceMetadataList =
-              objectMapper.readValue(inputStream, new TypeReference<>() {
-              });
-          validateDataSourceMetadata(dataSourceMetadataList);
-          constructConnectors(dataSourceMetadataList);
-        } catch (IOException e) {
-          LOG.error("DataSources Configuration File uploaded is malformed. Verify and re-upload.",
-              e);
-        } catch (Throwable e) {
-          LOG.error("DataSource construction failed.", e);
-        }
-      }
-      return null;
-    });
-  }
-
-  @Override
-  public Set<DataSource> getDataSources() {
-    return new HashSet<>(datasourceMap.values());
-  }
-
-  @Override
-  public DataSource getDataSource(String dataSourceName) {
-    if (!datasourceMap.containsKey(dataSourceName)) {
-      throw new IllegalArgumentException(
-          String.format("DataSource with name %s doesn't exist.", dataSourceName));
-    }
-    return datasourceMap.get(dataSourceName);
-  }
-
-
-  @Override
-  public void registerDefaultOpenSearchDataSource(StorageEngine storageEngine) {
-    if (storageEngine == null) {
-      throw new IllegalArgumentException("Default storage engine can't be null");
-    }
-    datasourceMap.put(DEFAULT_DATASOURCE_NAME,
-        new DataSource(DEFAULT_DATASOURCE_NAME, ConnectorType.OPENSEARCH, storageEngine));
-    registerFunctions(DEFAULT_DATASOURCE_NAME, storageEngine);
-  }
-
-  private StorageEngine createStorageEngine(DataSourceMetadata dataSourceMetadata) {
-    ConnectorType connector = dataSourceMetadata.getConnector();
-    switch (connector) {
-      case PROMETHEUS:
-        StorageEngine storageEngine = connectorTypeStorageEngineFactoryMap
-            .get(dataSourceMetadata.getConnector())
-            .getStorageEngine(dataSourceMetadata.getName(), dataSourceMetadata.getProperties());
-        registerFunctions(dataSourceMetadata.getName(), storageEngine);
-        return storageEngine;
-      default:
-        throw new IllegalStateException(
-            String.format("Unsupported Connector: %s", connector.name()));
-    }
-  }
-
-  private void constructConnectors(List<DataSourceMetadata> dataSourceMetadataList) {
-    datasourceMap = new HashMap<>();
-    for (DataSourceMetadata dataSourceMetadata : dataSourceMetadataList) {
-      try {
-        String dataSourceName = dataSourceMetadata.getName();
-        StorageEngine storageEngine = createStorageEngine(dataSourceMetadata);
-        datasourceMap.put(dataSourceName,
-            new DataSource(dataSourceMetadata.getName(), dataSourceMetadata.getConnector(),
-                storageEngine));
-      } catch (Throwable e) {
-        LOG.error("DataSource : {} storage engine creation failed with the following message: {}",
-            dataSourceMetadata.getName(), e.getMessage(), e);
-      }
-    }
-  }
-
-  /**
-   * This can be moved to a different validator class
-   * when we introduce more connectors.
-   *
-   * @param dataSourceMetadataList dataSourceMetadataList.
-   */
-  private void validateDataSourceMetadata(List<DataSourceMetadata> dataSourceMetadataList) {
-
-    Set<String> reviewedDataSources = new HashSet<>();
-    for (DataSourceMetadata dataSourceMetadata : dataSourceMetadataList) {
-
-      if (StringUtils.isEmpty(dataSourceMetadata.getName())) {
-        throw new IllegalArgumentException(
-            "Missing Name Field for a dataSource. Name is a required parameter.");
-      }
-
-      if (!dataSourceMetadata.getName().matches(DATASOURCE_NAME_REGEX)) {
-        throw new IllegalArgumentException(
-            String.format("DataSource Name: %s contains illegal characters."
-            + " Allowed characters: a-zA-Z0-9_-*@ ", dataSourceMetadata.getName()));
-      }
-
-      String dataSourceName = dataSourceMetadata.getName();
-      if (reviewedDataSources.contains(dataSourceName)) {
-        throw new IllegalArgumentException("DataSources with same name are not allowed.");
-      } else {
-        reviewedDataSources.add(dataSourceName);
-      }
-
-      if (Objects.isNull(dataSourceMetadata.getProperties())) {
-        throw new IllegalArgumentException("Missing properties field in dataSource configuration."
-            + "Properties are required parameters");
-      }
-
-    }
-  }
-
-  // TODO: for now register storage engine functions here which should be static per storage engine
-  private void registerFunctions(String catalogName, StorageEngine storageEngine) {
-    storageEngine.getFunctions()
-        .forEach(functionResolver ->
-            BuiltinFunctionRepository.getInstance().register(catalogName, functionResolver));
-  }
-
-
-}
diff --git a/plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceServiceImplTest.java b/plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceServiceImplTest.java
index e53cdd64c3..a0ea4032d5 100644
--- a/plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceServiceImplTest.java
+++ b/plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceServiceImplTest.java
@@ -5,137 +5,134 @@
 
 package org.opensearch.sql.plugin.datasource;
 
-import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
+import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import lombok.SneakyThrows;
-import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.opensearch.common.settings.MockSecureSettings;
 import org.opensearch.common.settings.Settings;
-import org.opensearch.sql.datasource.model.ConnectorType;
-import org.opensearch.sql.datasource.model.DataSource;
-import org.opensearch.sql.storage.StorageEngine;
+import org.opensearch.sql.datasource.DataSourceService;
+import org.opensearch.sql.datasource.model.DataSourceMetadata;
+import org.opensearch.sql.datasource.model.DataSourceType;
+import org.opensearch.sql.plugin.SQLPlugin;
 
 @RunWith(MockitoJUnitRunner.class)
 public class DataSourceServiceImplTest {
 
-  public static final String DATASOURCE_SETTING_METADATA_KEY =
+  public static final String CATALOG_SETTING_METADATA_KEY =
       "plugins.query.federation.datasources.config";
 
   @Mock
-  private StorageEngine storageEngine;
+  private DataSourceService dataSourceService;
 
   @SneakyThrows
   @Test
   public void testLoadConnectors() {
-    Settings settings = getDataSourceSettings("datasources.json");
-    DataSourceServiceImpl.getInstance().loadConnectors(settings);
-    Set<DataSource> expected = new HashSet<>() {{
-        add(new DataSource("prometheus", ConnectorType.PROMETHEUS, storageEngine));
-      }};
-    Assert.assertEquals(expected, DataSourceServiceImpl.getInstance().getDataSources());
+    Settings settings = getCatalogSettings("datasources.json");
+    loadConnectors(settings);
+    List<DataSourceMetadata> expected =
+        new ArrayList<>() {
+          {
+            add(
+                metadata(
+                    "prometheus",
+                    DataSourceType.PROMETHEUS,
+                    ImmutableMap.of(
+                        "prometheus.uri", "http://localhost:9090",
+                        "prometheus.auth.type", "basicauth",
+                        "prometheus.auth.username", "admin",
+                        "prometheus.auth.password", "type")));
+          }
+        };
+
+    verifyAddDataSourceWithMetadata(expected);
   }
 
-
   @SneakyThrows
   @Test
-  public void testLoadConnectorsWithMultipleDataSources() {
-    Settings settings = getDataSourceSettings("multiple_datasources.json");
-    DataSourceServiceImpl.getInstance().loadConnectors(settings);
-    Set<DataSource> expected = new HashSet<>() {{
-        add(new DataSource("prometheus", ConnectorType.PROMETHEUS, storageEngine));
-        add(new DataSource("prometheus-1", ConnectorType.PROMETHEUS, storageEngine));
+  public void testLoadConnectorsWithMultipleCatalogs() {
+    Settings settings = getCatalogSettings("multiple_datasources.json");
+    loadConnectors(settings);
+    List<DataSourceMetadata> expected = new ArrayList<>() {{
+        add(metadata("prometheus", DataSourceType.PROMETHEUS, ImmutableMap.of(
+            "prometheus.uri", "http://localhost:9090",
+            "prometheus.auth.type", "basicauth",
+            "prometheus.auth.username", "admin",
+            "prometheus.auth.password", "type"
+        )));
+        add(metadata("prometheus-1", DataSourceType.PROMETHEUS, ImmutableMap.of(
+            "prometheus.uri", "http://localhost:9090",
+            "prometheus.auth.type", "awssigv4",
+            "prometheus.auth.region", "us-east-1",
+            "prometheus.auth.access_key", "accessKey",
+            "prometheus.auth.secret_key", "secretKey"
+        )));
       }};
-    Assert.assertEquals(expected, DataSourceServiceImpl.getInstance().getDataSources());
-  }
 
-  @SneakyThrows
-  @Test
-  public void testLoadConnectorsWithMissingName() {
-    Settings settings = getDataSourceSettings("datasource_missing_name.json");
-    Set<DataSource> expected = DataSourceServiceImpl.getInstance().getDataSources();
-    DataSourceServiceImpl.getInstance().loadConnectors(settings);
-    Assert.assertEquals(expected, DataSourceServiceImpl.getInstance().getDataSources());
+    verifyAddDataSourceWithMetadata(expected);
   }
 
   @SneakyThrows
   @Test
-  public void testLoadConnectorsWithDuplicateDataSourceNames() {
-    Settings settings = getDataSourceSettings("duplicate_datasource_names.json");
-    Set<DataSource> expected = DataSourceServiceImpl.getInstance().getDataSources();
-    DataSourceServiceImpl.getInstance().loadConnectors(settings);
-    Assert.assertEquals(expected, DataSourceServiceImpl.getInstance().getDataSources());
-  }
+  public void testLoadConnectorsWithDuplicateCatalogNames() {
+    Settings settings = getCatalogSettings("duplicate_datasource_names.json");
+    loadConnectors(settings);
 
-  @SneakyThrows
-  @Test
-  public void testLoadConnectorsWithMalformedJson() {
-    Settings settings = getDataSourceSettings("malformed_datasources.json");
-    Set<DataSource> expected = DataSourceServiceImpl.getInstance().getDataSources();
-    DataSourceServiceImpl.getInstance().loadConnectors(settings);
-    Assert.assertEquals(expected, DataSourceServiceImpl.getInstance().getDataSources());
+    verify(dataSourceService, never()).addDataSource(any());
   }
 
   @SneakyThrows
   @Test
-  public void testGetStorageEngineAfterGetDataSources() {
-    Settings settings = getDataSourceSettings("empty_datasource.json");
-    DataSourceServiceImpl.getInstance().loadConnectors(settings);
-    DataSourceServiceImpl.getInstance().registerDefaultOpenSearchDataSource(storageEngine);
-    Set<DataSource> expected = new HashSet<>();
-    expected.add(new DataSource(DEFAULT_DATASOURCE_NAME, ConnectorType.OPENSEARCH, storageEngine));
-    Assert.assertEquals(expected, DataSourceServiceImpl.getInstance().getDataSources());
-    Assert.assertEquals(storageEngine,
-        DataSourceServiceImpl.getInstance()
-            .getDataSource(DEFAULT_DATASOURCE_NAME).getStorageEngine());
-    Assert.assertEquals(expected, DataSourceServiceImpl.getInstance().getDataSources());
-    Assert.assertEquals(storageEngine,
-        DataSourceServiceImpl.getInstance()
-            .getDataSource(DEFAULT_DATASOURCE_NAME).getStorageEngine());
-    IllegalArgumentException illegalArgumentException
-        = Assert.assertThrows(IllegalArgumentException.class,
-          () -> DataSourceServiceImpl.getInstance().getDataSource("test"));
-    Assert.assertEquals("DataSource with name test doesn't exist.",
-        illegalArgumentException.getMessage());
-  }
-
-
-  @SneakyThrows
-  @Test
-  public void testGetStorageEngineAfterLoadingConnectors() {
-    Settings settings = getDataSourceSettings("empty_datasource.json");
-    DataSourceServiceImpl.getInstance().registerDefaultOpenSearchDataSource(storageEngine);
-    //Load Connectors will empty the dataSourceMap.So OpenSearch Storage Engine
-    DataSourceServiceImpl.getInstance().loadConnectors(settings);
-    Set<DataSource> expected = new HashSet<>();
-    Assert.assertEquals(expected, DataSourceServiceImpl.getInstance().getDataSources());
-  }
+  public void testLoadConnectorsWithMalformedJson() {
+    Settings settings = getCatalogSettings("malformed_datasources.json");
+    loadConnectors(settings);
 
-  @SneakyThrows
-  @Test
-  public void testLoadConnectorsWithIllegalDataSourceNames() {
-    Settings settings = getDataSourceSettings("illegal_datasource_name.json");
-    Set<DataSource> expected = DataSourceServiceImpl.getInstance().getDataSources();
-    DataSourceServiceImpl.getInstance().loadConnectors(settings);
-    Assert.assertEquals(expected, DataSourceServiceImpl.getInstance().getDataSources());
+    verify(dataSourceService, never()).addDataSource(any());
   }
 
-  private Settings getDataSourceSettings(String filename) throws URISyntaxException, IOException {
+  private Settings getCatalogSettings(String filename) throws URISyntaxException, IOException {
     MockSecureSettings mockSecureSettings = new MockSecureSettings();
     ClassLoader classLoader = getClass().getClassLoader();
     Path filepath = Paths.get(classLoader.getResource(filename).toURI());
-    mockSecureSettings.setFile(DATASOURCE_SETTING_METADATA_KEY, Files.readAllBytes(filepath));
+    mockSecureSettings.setFile(CATALOG_SETTING_METADATA_KEY, Files.readAllBytes(filepath));
     return Settings.builder().setSecureSettings(mockSecureSettings).build();
   }
 
+  void loadConnectors(Settings settings) {
+    SQLPlugin.loadDataSources(dataSourceService, settings);
+  }
+
+  void verifyAddDataSourceWithMetadata(List<DataSourceMetadata> metadataList) {
+    int expectCount = metadataList.size();
+    ArgumentCaptor<DataSourceMetadata> metadataCaptor =
+        ArgumentCaptor.forClass(DataSourceMetadata.class);
+    verify(dataSourceService, times(expectCount)).addDataSource(metadataCaptor.capture());
+    List<DataSourceMetadata> actualValues = metadataCaptor.getAllValues();
+    assertEquals(metadataList, actualValues);
+  }
+
+  DataSourceMetadata metadata(String name, DataSourceType type, Map<String, String> properties) {
+    DataSourceMetadata dataSourceMetadata = new DataSourceMetadata();
+    dataSourceMetadata.setName(name);
+    dataSourceMetadata.setConnector(type);
+    dataSourceMetadata.setProperties(properties);
+    return dataSourceMetadata;
+  }
 }
diff --git a/plugin/src/test/resources/datasource_missing_name.json b/plugin/src/test/resources/datasource_missing_name.json
deleted file mode 100644
index 4491ebb0db..0000000000
--- a/plugin/src/test/resources/datasource_missing_name.json
+++ /dev/null
@@ -1,11 +0,0 @@
-[
-  {
-    "connector": "prometheus",
-    "properties" : {
-      "prometheus.uri" : "http://localhost:9090",
-      "prometheus.auth.type" : "basicauth",
-      "prometheus.auth.username" : "admin",
-      "prometheus.auth.password" : "type"
-    }
-  }
-]
\ No newline at end of file
diff --git a/plugin/src/test/resources/empty_datasource.json b/plugin/src/test/resources/empty_datasource.json
deleted file mode 100644
index 0637a088a0..0000000000
--- a/plugin/src/test/resources/empty_datasource.json
+++ /dev/null
@@ -1 +0,0 @@
-[]
\ No newline at end of file
diff --git a/plugin/src/test/resources/illegal_datasource_name.json b/plugin/src/test/resources/illegal_datasource_name.json
deleted file mode 100644
index 212ca6ec93..0000000000
--- a/plugin/src/test/resources/illegal_datasource_name.json
+++ /dev/null
@@ -1,12 +0,0 @@
-[
-  {
-    "name" : "prometheus.test",
-    "connector": "prometheus",
-    "properties" : {
-      "prometheus.uri" : "http://localhost:9090",
-      "prometheus.auth.type" : "basicauth",
-      "prometheus.auth.username" : "admin",
-      "prometheus.auth.password" : "type"
-    }
-  }
-]
\ No newline at end of file
diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java
index 6aec951b3d..4e8b30af2f 100644
--- a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java
+++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java
@@ -14,16 +14,18 @@
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import okhttp3.OkHttpClient;
-import org.opensearch.sql.datasource.model.ConnectorType;
+import org.opensearch.sql.datasource.model.DataSource;
+import org.opensearch.sql.datasource.model.DataSourceMetadata;
+import org.opensearch.sql.datasource.model.DataSourceType;
 import org.opensearch.sql.datasource.model.auth.AuthenticationType;
 import org.opensearch.sql.prometheus.authinterceptors.AwsSigningInterceptor;
 import org.opensearch.sql.prometheus.authinterceptors.BasicAuthenticationInterceptor;
 import org.opensearch.sql.prometheus.client.PrometheusClient;
 import org.opensearch.sql.prometheus.client.PrometheusClientImpl;
+import org.opensearch.sql.storage.DataSourceFactory;
 import org.opensearch.sql.storage.StorageEngine;
-import org.opensearch.sql.storage.StorageEngineFactory;
 
-public class PrometheusStorageFactory implements StorageEngineFactory {
+public class PrometheusStorageFactory implements DataSourceFactory {
 
   public static final String URI = "prometheus.uri";
   public static final String AUTH_TYPE = "prometheus.auth.type";
@@ -33,14 +35,20 @@ public class PrometheusStorageFactory implements StorageEngineFactory {
   public static final String ACCESS_KEY = "prometheus.auth.access_key";
   public static final String SECRET_KEY = "prometheus.auth.secret_key";
 
-
   @Override
-  public ConnectorType getConnectorType() {
-    return ConnectorType.PROMETHEUS;
+  public DataSourceType getDataSourceType() {
+    return DataSourceType.PROMETHEUS;
   }
 
   @Override
-  public StorageEngine getStorageEngine(String catalogName, Map<String, String> requiredConfig) {
+  public DataSource createDataSource(DataSourceMetadata metadata) {
+    return new DataSource(
+        metadata.getName(),
+        DataSourceType.PROMETHEUS,
+        getStorageEngine(metadata.getName(), metadata.getProperties()));
+  }
+
+  StorageEngine getStorageEngine(String catalogName, Map<String, String> requiredConfig) {
     validateFieldsInConfig(requiredConfig, Set.of(URI));
     PrometheusClient prometheusClient;
     try {
diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java
index d2f445a093..91cb8df1ea 100644
--- a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java
+++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java
@@ -13,7 +13,9 @@
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.junit.jupiter.MockitoExtension;
-import org.opensearch.sql.datasource.model.ConnectorType;
+import org.opensearch.sql.datasource.model.DataSource;
+import org.opensearch.sql.datasource.model.DataSourceMetadata;
+import org.opensearch.sql.datasource.model.DataSourceType;
 import org.opensearch.sql.storage.StorageEngine;
 
 @ExtendWith(MockitoExtension.class)
@@ -22,7 +24,8 @@ public class PrometheusStorageFactoryTest {
   @Test
   void testGetConnectorType() {
     PrometheusStorageFactory prometheusStorageFactory = new PrometheusStorageFactory();
-    Assertions.assertEquals(ConnectorType.PROMETHEUS, prometheusStorageFactory.getConnectorType());
+    Assertions.assertEquals(
+        DataSourceType.PROMETHEUS, prometheusStorageFactory.getDataSourceType());
   }
 
   @Test
@@ -130,6 +133,21 @@ void testGetStorageEngineWithInvalidURISyntax() {
         exception.getMessage().contains("Prometheus Client creation failed due to:"));
   }
 
+  @Test
+  void createDataSourceSuccess() {
+    HashMap<String, String> properties = new HashMap<>();
+    properties.put("prometheus.uri", "http://dummyprometheus:9090");
+    properties.put("prometheus.auth.type", "basicauth");
+    properties.put("prometheus.auth.username", "admin");
+    properties.put("prometheus.auth.password", "admin");
 
+    DataSourceMetadata metadata = new DataSourceMetadata();
+    metadata.setName("prometheus");
+    metadata.setConnector(DataSourceType.PROMETHEUS);
+    metadata.setProperties(properties);
+
+    DataSource dataSource = new PrometheusStorageFactory().createDataSource(metadata);
+    Assertions.assertTrue(dataSource.getStorageEngine() instanceof PrometheusStorageEngine);
+  }
 }
 

From 1373172149f6f9895d5c13d1a37bc388c665f9c8 Mon Sep 17 00:00:00 2001
From: Peng Huo <penghuo@gmail.com>
Date: Fri, 18 Nov 2022 11:34:38 -0800
Subject: [PATCH 2/3] update addSource() interface

Signed-off-by: Peng Huo <penghuo@gmail.com>
---
 .../sql/datasource/DataSourceService.java     |  4 +-
 .../sql/datasource/DataSourceServiceImpl.java | 48 +++++++++++--------
 .../sql/analysis/AnalyzerTestBase.java        |  2 +-
 .../DataSourceServiceImplTest.java            | 24 ++++++++--
 .../model/auth/AuthenticationTypeTest.java    |  3 +-
 .../org/opensearch/sql/plugin/SQLPlugin.java  | 20 +-------
 ...lTest.java => DataSourceMetaDataTest.java} | 29 ++++-------
 .../resources/duplicate_datasource_names.json | 22 ---------
 8 files changed, 63 insertions(+), 89 deletions(-)
 rename core/src/test/java/org/opensearch/sql/{catalog => datasource}/DataSourceServiceImplTest.java (86%)
 rename core/src/test/java/org/opensearch/sql/{catalog => datasource}/model/auth/AuthenticationTypeTest.java (85%)
 rename plugin/src/test/java/org/opensearch/sql/plugin/datasource/{DataSourceServiceImplTest.java => DataSourceMetaDataTest.java} (80%)
 delete mode 100644 plugin/src/test/resources/duplicate_datasource_names.json

diff --git a/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java b/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java
index 6a2c7f46d3..37e6f8e085 100644
--- a/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java
+++ b/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java
@@ -32,9 +32,9 @@ public interface DataSourceService {
   /**
    * Register {@link DataSource} defined by {@link DataSourceMetadata}.
    *
-   * @param dataSourceMetadata {@link DataSourceMetadata}.
+   * @param metadatas list of {@link DataSourceMetadata}.
    */
-  void addDataSource(DataSourceMetadata dataSourceMetadata);
+  void addDataSource(DataSourceMetadata... metadatas);
 
   /**
    * remove all the registered {@link DataSource}.
diff --git a/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java b/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java
index ab1de8256d..3319f0dde6 100644
--- a/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java
+++ b/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java
@@ -5,13 +5,15 @@
 
 package org.opensearch.sql.datasource;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
-import org.apache.commons.lang3.StringUtils;
+import org.opensearch.sql.common.utils.StringUtils;
 import org.opensearch.sql.datasource.model.DataSource;
 import org.opensearch.sql.datasource.model.DataSourceMetadata;
 import org.opensearch.sql.datasource.model.DataSourceType;
@@ -58,11 +60,13 @@ public DataSource getDataSource(String dataSourceName) {
   }
 
   @Override
-  public void addDataSource(DataSourceMetadata metadata) {
-    validateDataSource(metadata);
-    dataSourceMap.put(
-        metadata.getName(),
-        dataSourceFactoryMap.get(metadata.getConnector()).createDataSource(metadata));
+  public void addDataSource(DataSourceMetadata... metadatas) {
+    for (DataSourceMetadata metadata : metadatas) {
+      validateDataSourceMetaData(metadata);
+      dataSourceMap.put(
+          metadata.getName(),
+          dataSourceFactoryMap.get(metadata.getConnector()).createDataSource(metadata));
+    }
   }
 
   @Override
@@ -75,20 +79,22 @@ public void clear() {
    *
    * @param metadata {@link DataSourceMetadata}.
    */
-  private void validateDataSource(DataSourceMetadata metadata) {
-    if (StringUtils.isEmpty(metadata.getName())) {
-      throw new IllegalArgumentException(
-          "Missing Name Field from a DataSource. Name is a required parameter.");
-    }
-    if (!metadata.getName().matches(DATASOURCE_NAME_REGEX)) {
-      throw new IllegalArgumentException(
-          String.format(
-              "DataSource Name: %s contains illegal characters. Allowed characters: a-zA-Z0-9_-*@.",
-              metadata.getName()));
-    }
-    if (Objects.isNull(metadata.getProperties())) {
-      throw new IllegalArgumentException(
-          "Missing properties field in catalog configuration. Properties are required parameters.");
-    }
+  private void validateDataSourceMetaData(DataSourceMetadata metadata) {
+    Preconditions.checkArgument(
+        !Strings.isNullOrEmpty(metadata.getName()),
+        "Missing Name Field from a DataSource. Name is a required parameter.");
+    Preconditions.checkArgument(
+        !dataSourceMap.containsKey(metadata.getName()),
+        StringUtils.format(
+            "Datasource name should be unique, Duplicate datasource found %s.",
+            metadata.getName()));
+    Preconditions.checkArgument(
+        metadata.getName().matches(DATASOURCE_NAME_REGEX),
+        StringUtils.format(
+            "DataSource Name: %s contains illegal characters. Allowed characters: a-zA-Z0-9_-*@.",
+            metadata.getName()));
+    Preconditions.checkArgument(
+        !Objects.isNull(metadata.getProperties()),
+        "Missing properties field in catalog configuration. Properties are required parameters.");
   }
 }
diff --git a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java
index ff421a1b79..a485541a34 100644
--- a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java
+++ b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java
@@ -207,7 +207,7 @@ public DataSource getDataSource(String dataSourceName) {
     }
 
     @Override
-    public void addDataSource(DataSourceMetadata metadata) {
+    public void addDataSource(DataSourceMetadata... metadatas) {
       throw new UnsupportedOperationException();
     }
 
diff --git a/core/src/test/java/org/opensearch/sql/catalog/DataSourceServiceImplTest.java b/core/src/test/java/org/opensearch/sql/datasource/DataSourceServiceImplTest.java
similarity index 86%
rename from core/src/test/java/org/opensearch/sql/catalog/DataSourceServiceImplTest.java
rename to core/src/test/java/org/opensearch/sql/datasource/DataSourceServiceImplTest.java
index 8470d87470..2b40b32ee6 100644
--- a/core/src/test/java/org/opensearch/sql/catalog/DataSourceServiceImplTest.java
+++ b/core/src/test/java/org/opensearch/sql/datasource/DataSourceServiceImplTest.java
@@ -3,7 +3,7 @@
  * SPDX-License-Identifier: Apache-2.0
  */
 
-package org.opensearch.sql.catalog;
+package org.opensearch.sql.datasource;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -15,13 +15,12 @@
 import com.google.common.collect.ImmutableMap;
 import java.util.HashSet;
 import java.util.Map;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
-import org.opensearch.sql.datasource.DataSourceService;
-import org.opensearch.sql.datasource.DataSourceServiceImpl;
 import org.opensearch.sql.datasource.model.DataSource;
 import org.opensearch.sql.datasource.model.DataSourceMetadata;
 import org.opensearch.sql.datasource.model.DataSourceType;
@@ -59,6 +58,11 @@ public void setup() {
             });
   }
 
+  @AfterEach
+  public void clear() {
+    dataSourceService.clear();
+  }
+
   @Test
   void getDataSourceSuccess() {
     dataSourceService.addDataSource(DataSourceMetadata.defaultOpenSearchDataSourceMetadata());
@@ -130,6 +134,20 @@ void metaDataMissingPropertiesShouldFail() {
         exception.getMessage());
   }
 
+  @Test
+  void metaDataHasDuplicateNameShouldFail() {
+    dataSourceService.addDataSource(metadata(NAME, DataSourceType.OPENSEARCH, ImmutableMap.of()));
+    assertEquals(1, dataSourceService.getDataSources().size());
+
+    IllegalArgumentException exception =
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> dataSourceService.addDataSource(metadata(NAME, DataSourceType.OPENSEARCH, null)));
+    assertEquals(
+        String.format("Datasource name should be unique, Duplicate datasource found %s.", NAME),
+        exception.getMessage());
+  }
+
   DataSourceMetadata metadata(String name, DataSourceType type, Map<String, String> properties) {
     DataSourceMetadata dataSourceMetadata = new DataSourceMetadata();
     dataSourceMetadata.setName(name);
diff --git a/core/src/test/java/org/opensearch/sql/catalog/model/auth/AuthenticationTypeTest.java b/core/src/test/java/org/opensearch/sql/datasource/model/auth/AuthenticationTypeTest.java
similarity index 85%
rename from core/src/test/java/org/opensearch/sql/catalog/model/auth/AuthenticationTypeTest.java
rename to core/src/test/java/org/opensearch/sql/datasource/model/auth/AuthenticationTypeTest.java
index 1dd91517f3..f9e4f3ce59 100644
--- a/core/src/test/java/org/opensearch/sql/catalog/model/auth/AuthenticationTypeTest.java
+++ b/core/src/test/java/org/opensearch/sql/datasource/model/auth/AuthenticationTypeTest.java
@@ -3,14 +3,13 @@
  * SPDX-License-Identifier: Apache-2.0
  */
 
-package org.opensearch.sql.catalog.model.auth;
+package org.opensearch.sql.datasource.model.auth;
 
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
 import org.junit.jupiter.api.Test;
-import org.opensearch.sql.datasource.model.auth.AuthenticationType;
 
 class AuthenticationTypeTest {
   @Test
diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
index 577b396d62..fab14966d8 100644
--- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
+++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
@@ -18,11 +18,8 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Locale;
 import java.util.Objects;
-import java.util.Set;
 import java.util.function.Supplier;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -242,8 +239,7 @@ public static void loadDataSources(DataSourceService dataSourceService, Settings
             try {
               List<DataSourceMetadata> metadataList =
                   objectMapper.readValue(inputStream, new TypeReference<>() {});
-              verifyDuplicateName(metadataList);
-              metadataList.forEach(metadata -> dataSourceService.addDataSource(metadata));
+              dataSourceService.addDataSource(metadataList.toArray(new DataSourceMetadata[0]));
             } catch (IOException e) {
               LOG.error(
                   "DataSource Configuration File uploaded is malformed. Verify and re-upload.", e);
@@ -254,18 +250,4 @@ public static void loadDataSources(DataSourceService dataSourceService, Settings
           return null;
         });
   }
-
-  static void verifyDuplicateName(List<DataSourceMetadata> metadataList) {
-    Set<String> seenNames = new HashSet<>();
-    for (DataSourceMetadata metadata : metadataList) {
-      if (seenNames.contains(metadata.getName())) {
-        throw new IllegalArgumentException(
-            String.format(
-                Locale.ROOT,
-                "Datasource name should be unique, Duplicate datasource found %s",
-                metadata.getName()));
-      }
-      seenNames.add(metadata.getName());
-    }
-  }
 }
diff --git a/plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceServiceImplTest.java b/plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceMetaDataTest.java
similarity index 80%
rename from plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceServiceImplTest.java
rename to plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceMetaDataTest.java
index a0ea4032d5..dc6eadbf7e 100644
--- a/plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceServiceImplTest.java
+++ b/plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceMetaDataTest.java
@@ -34,9 +34,9 @@
 import org.opensearch.sql.plugin.SQLPlugin;
 
 @RunWith(MockitoJUnitRunner.class)
-public class DataSourceServiceImplTest {
+public class DataSourceMetaDataTest {
 
-  public static final String CATALOG_SETTING_METADATA_KEY =
+  public static final String DATASOURCE_SETTING_METADATA_KEY =
       "plugins.query.federation.datasources.config";
 
   @Mock
@@ -45,7 +45,7 @@ public class DataSourceServiceImplTest {
   @SneakyThrows
   @Test
   public void testLoadConnectors() {
-    Settings settings = getCatalogSettings("datasources.json");
+    Settings settings = getDataSourceSettings("datasources.json");
     loadConnectors(settings);
     List<DataSourceMetadata> expected =
         new ArrayList<>() {
@@ -67,8 +67,8 @@ public void testLoadConnectors() {
 
   @SneakyThrows
   @Test
-  public void testLoadConnectorsWithMultipleCatalogs() {
-    Settings settings = getCatalogSettings("multiple_datasources.json");
+  public void testLoadConnectorsWithMultipleDataSources() {
+    Settings settings = getDataSourceSettings("multiple_datasources.json");
     loadConnectors(settings);
     List<DataSourceMetadata> expected = new ArrayList<>() {{
         add(metadata("prometheus", DataSourceType.PROMETHEUS, ImmutableMap.of(
@@ -89,29 +89,20 @@ public void testLoadConnectorsWithMultipleCatalogs() {
     verifyAddDataSourceWithMetadata(expected);
   }
 
-  @SneakyThrows
-  @Test
-  public void testLoadConnectorsWithDuplicateCatalogNames() {
-    Settings settings = getCatalogSettings("duplicate_datasource_names.json");
-    loadConnectors(settings);
-
-    verify(dataSourceService, never()).addDataSource(any());
-  }
-
   @SneakyThrows
   @Test
   public void testLoadConnectorsWithMalformedJson() {
-    Settings settings = getCatalogSettings("malformed_datasources.json");
+    Settings settings = getDataSourceSettings("malformed_datasources.json");
     loadConnectors(settings);
 
     verify(dataSourceService, never()).addDataSource(any());
   }
 
-  private Settings getCatalogSettings(String filename) throws URISyntaxException, IOException {
+  private Settings getDataSourceSettings(String filename) throws URISyntaxException, IOException {
     MockSecureSettings mockSecureSettings = new MockSecureSettings();
     ClassLoader classLoader = getClass().getClassLoader();
     Path filepath = Paths.get(classLoader.getResource(filename).toURI());
-    mockSecureSettings.setFile(CATALOG_SETTING_METADATA_KEY, Files.readAllBytes(filepath));
+    mockSecureSettings.setFile(DATASOURCE_SETTING_METADATA_KEY, Files.readAllBytes(filepath));
     return Settings.builder().setSecureSettings(mockSecureSettings).build();
   }
 
@@ -120,11 +111,11 @@ void loadConnectors(Settings settings) {
   }
 
   void verifyAddDataSourceWithMetadata(List<DataSourceMetadata> metadataList) {
-    int expectCount = metadataList.size();
     ArgumentCaptor<DataSourceMetadata> metadataCaptor =
         ArgumentCaptor.forClass(DataSourceMetadata.class);
-    verify(dataSourceService, times(expectCount)).addDataSource(metadataCaptor.capture());
+    verify(dataSourceService, times(1)).addDataSource(metadataCaptor.capture());
     List<DataSourceMetadata> actualValues = metadataCaptor.getAllValues();
+    assertEquals(metadataList.size(), actualValues.size());
     assertEquals(metadataList, actualValues);
   }
 
diff --git a/plugin/src/test/resources/duplicate_datasource_names.json b/plugin/src/test/resources/duplicate_datasource_names.json
deleted file mode 100644
index eefc56b6ef..0000000000
--- a/plugin/src/test/resources/duplicate_datasource_names.json
+++ /dev/null
@@ -1,22 +0,0 @@
-[
-  {
-    "name" : "prometheus",
-    "connector": "prometheus",
-    "properties" : {
-      "prometheus.uri" : "http://localhost:9090",
-      "prometheus.auth.type" : "basicauth",
-      "prometheus.auth.username" : "admin",
-      "prometheus.auth.password" : "type"
-    }
-  },
-  {
-    "name" : "prometheus",
-    "connector": "prometheus",
-    "properties" : {
-      "prometheus.uri" : "http://localhost:9090",
-      "prometheus.auth.type" : "basicauth",
-      "prometheus.auth.username" : "admin",
-      "prometheus.auth.password" : "type"
-    }
-  }
-]
\ No newline at end of file

From bbe396c3f5d6ae12e710f5a1f46844a845df8723 Mon Sep 17 00:00:00 2001
From: Peng Huo <penghuo@gmail.com>
Date: Fri, 18 Nov 2022 11:42:42 -0800
Subject: [PATCH 3/3] address comments

Signed-off-by: Peng Huo <penghuo@gmail.com>
---
 .../org/opensearch/sql/datasource/DataSourceServiceImpl.java   | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java b/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java
index 3319f0dde6..274024e548 100644
--- a/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java
+++ b/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java
@@ -7,7 +7,6 @@
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -47,7 +46,7 @@ public DataSourceServiceImpl(Set<DataSourceFactory> dataSourceFactories) {
 
   @Override
   public Set<DataSource> getDataSources() {
-    return new HashSet<>(dataSourceMap.values());
+    return Set.copyOf(dataSourceMap.values());
   }
 
   @Override