Skip to content

Commit

Permalink
MINOR: Kafka Setup SSL Arg Fix (#16469)
Browse files Browse the repository at this point in the history
  • Loading branch information
ayush-shah committed May 30, 2024
1 parent 55400a9 commit 40ecd60
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 5 deletions.
13 changes: 13 additions & 0 deletions ingestion/src/metadata/examples/workflows/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@ source:
# example of scema registry config, for more info about accepted values visit:
# https://docs.confluent.io/5.5.1/clients/confluent-kafka-python/index.html#confluent_kafka.schema_registry.SchemaRegistryClient
#basic.auth.user.info: username:password
# schemaRegistrySSL:
# caCertificate: |
# -----BEGIN CERTIFICATE-----
# sample caCertificateData
# -----END CERTIFICATE-----
# sslCertificate: |
# -----BEGIN CERTIFICATE-----
# sample sslCertificateData
# -----END CERTIFICATE-----
# sslKey: |
# -----BEGIN RSA PRIVATE KEY
# sample sslKeyData
# -----END RSA PRIVATE KEY
sourceConfig:
config:
type: MessagingMetadata
Expand Down
46 changes: 46 additions & 0 deletions ingestion/src/metadata/examples/workflows/kafka_ssl_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
source:
type: kafka
serviceName: local_kafka
serviceConnection:
config:
type: Kafka
bootstrapServers: localhost:9092
schemaRegistryURL: http://localhost:8081
#consumerConfig:
# example of consume config, for more info about accepted values visit:
# https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
#ssl.truststore.password: password
#schemaRegistryConfig:
# example of scema registry config, for more info about accepted values visit:
# https://docs.confluent.io/5.5.1/clients/confluent-kafka-python/index.html#confluent_kafka.schema_registry.SchemaRegistryClient
#basic.auth.user.info: username:password
schemaRegistrySSL:
caCertificate: |
-----BEGIN CERTIFICATE-----
sample caCertificateData
-----END CERTIFICATE-----
sslCertificate: |
-----BEGIN CERTIFICATE-----
sample sslCertificateData
-----END CERTIFICATE-----
sslKey: |
-----BEGIN RSA PRIVATE KEY
sample sslKeyData
-----END RSA PRIVATE KEY
sourceConfig:
config:
type: MessagingMetadata
topicFilterPattern:
excludes:
- _confluent.*
generateSampleData: true
sink:
type: metadata-rest
config: {}
workflowConfig:
# loggerLevel: INFO # DEBUG, INFO, WARN or ERROR
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"""
Kafka source ingestion
"""
from typing import Optional
from typing import Optional, cast

from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import (
KafkaConnection,
Expand All @@ -28,7 +28,9 @@
class KafkaSource(CommonBrokerSource):
def __init__(self, config: WorkflowSource, metadata: OpenMetadata):
self.ssl_manager = None
service_connection = config.serviceConnection.__root__.config
service_connection = cast(
KafkaConnection, config.serviceConnection.__root__.config
)
if service_connection.schemaRegistrySSL:

self.ssl_manager = SSLManager(
Expand All @@ -37,7 +39,7 @@ def __init__(self, config: WorkflowSource, metadata: OpenMetadata):
cert=service_connection.schemaRegistrySSL.__root__.sslCertificate,
)
service_connection = self.ssl_manager.setup_ssl(
config.serviceConnection.__root__.config.sslConfig
config.serviceConnection.__root__.config
)
super().__init__(config, metadata)

Expand Down
84 changes: 82 additions & 2 deletions ingestion/tests/unit/test_ssl_manager.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
"""
Manage SSL test cases
"""

import os
import unittest
from unittest import TestCase
from unittest.mock import patch

from pydantic import SecretStr

from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.messaging.kafka.metadata import KafkaSource
from metadata.utils.ssl_manager import SSLManager


class SSLManagerTest(unittest.TestCase):
class SSLManagerTest(TestCase):
"""
Tests to verify the functionality of SSLManager
"""
Expand Down Expand Up @@ -44,3 +57,70 @@ def test_cleanup_temp_files(self):
temp_file = self.ssl_manager.create_temp_file(SecretStr("Test content"))
self.ssl_manager.cleanup_temp_files()
self.assertFalse(os.path.exists(temp_file))


class KafkaSourceSSLTest(TestCase):
@patch(
"metadata.ingestion.source.messaging.messaging_service.MessagingServiceSource.test_connection"
)
@patch("metadata.ingestion.source.messaging.kafka.metadata.SSLManager")
def test_init(self, mock_ssl_manager, test_connection):
test_connection.return_value = True
config = WorkflowSource(
**{
"type": "kafka",
"serviceName": "local_kafka",
"serviceConnection": {
"config": {
"type": "Kafka",
"bootstrapServers": "localhost:9092",
}
},
"sourceConfig": {"config": {"type": "MessagingMetadata"}},
}
)
metadata = OpenMetadata(
OpenMetadataConnection(
hostPort="http://localhost:8585/api",
authProvider="openmetadata",
securityConfig=OpenMetadataJWTClientConfig(jwtToken="token"),
)
)
kafka_source = KafkaSource(config, metadata)

self.assertIsNone(kafka_source.ssl_manager)
mock_ssl_manager.assert_not_called()

config_with_ssl = WorkflowSource(
**{
"type": "kafka",
"serviceName": "local_kafka",
"serviceConnection": {
"config": {
"type": "Kafka",
"bootstrapServers": "localhost:9092",
"schemaRegistrySSL": {
"caCertificate": "caCertificateData",
"sslKey": "sslKeyData",
"sslCertificate": "sslCertificateData",
},
},
},
"sourceConfig": {"config": {"type": "MessagingMetadata"}},
}
)
kafka_source_with_ssl = KafkaSource(config_with_ssl, metadata)

self.assertIsNotNone(kafka_source_with_ssl.ssl_manager)
self.assertEqual(
kafka_source_with_ssl.service_connection.schemaRegistrySSL.__root__.caCertificate.get_secret_value(),
"caCertificateData",
)
self.assertEqual(
kafka_source_with_ssl.service_connection.schemaRegistrySSL.__root__.sslKey.get_secret_value(),
"sslKeyData",
)
self.assertEqual(
kafka_source_with_ssl.service_connection.schemaRegistrySSL.__root__.sslCertificate.get_secret_value(),
"sslCertificateData",
)

0 comments on commit 40ecd60

Please sign in to comment.