Skip to content

Commit 77d4140

Browse files
feat: new mongodbatlas_stream_connection and mongodbatlas_stream_connections data sources (#1757)
* feat: stream connection data sources with docs * add data sources in example section * addressing doc comments
1 parent 064eb06 commit 77d4140

18 files changed

+730
-62
lines changed

examples/atlas-streams/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ Atlas Stream Processing is composed of multiple components, and users can levera
77
- `mongodbatlas_stream_instance`: Enables creating, modifying, and deleting Stream Instances. as part of this resource, a computed `hostnames` attribute is available for connecting to the created instance.
88
- `mongodbatlas_stream_connection`: Enables creating, modifying, and deleting Stream Instance Connections, which serve as data sources and sinks for your instance.
99

10-
**NOTE**: To leverage these resources you'll need to set the environment variable `MONGODB_ATLAS_ENABLE_BETA=true` as this functionality is currently in preview. Also see [Limitations](https://www.mongodb.com/docs/atlas/atlas-sp/limitations/#std-label-atlas-sp-limitations) of Atlas Streams during this preview period.
10+
**NOTE**: To leverage these resources you'll need to set the environment variable `MONGODB_ATLAS_ENABLE_BETA=true` as this functionality is currently in preview. Also see [Limitations](https://www.mongodb.com/docs/atlas/atlas-sp/limitations/#std-label-atlas-sp-limitations) of Atlas Streams Processing during this preview period.
1111

1212
### Managing Stream Processors
1313

examples/atlas-streams/stream-connection/main.tf

+20
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,24 @@ resource "mongodbatlas_stream_connection" "example-kafka-ssl" {
5252
broker_public_certificate = var.kafka_ssl_cert
5353
protocol = "SSL"
5454
}
55+
}
56+
57+
data "mongodbatlas_stream_connection" "example-kafka-ssl" {
58+
project_id = var.project_id
59+
instance_name = mongodbatlas_stream_instance.example.instance_name
60+
connection_name = mongodbatlas_stream_connection.example-kafka-ssl.connection_name
61+
}
62+
63+
data "mongodbatlas_stream_connections" "example" {
64+
project_id = var.project_id
65+
instance_name = mongodbatlas_stream_instance.example.instance_name
66+
}
67+
68+
# example making use of data sources
69+
output "stream_connection_bootstrap_servers" {
70+
value = data.mongodbatlas_stream_connection.example-kafka-ssl.bootstrap_servers
71+
}
72+
73+
output "stream_connection_total_count" {
74+
value = data.mongodbatlas_stream_connections.example.total_count
5575
}

internal/provider/provider.go

+2
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,8 @@ func (p *MongodbtlasProvider) DataSources(context.Context) []func() datasource.D
418418
betaDataSources := []func() datasource.DataSource{
419419
streaminstance.DataSource,
420420
streaminstance.PluralDataSource,
421+
streamconnection.DataSource,
422+
streamconnection.PluralDataSource,
421423
}
422424
if ProviderEnableBeta {
423425
dataSources = append(dataSources, betaDataSources...)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package streamconnection
2+
3+
import (
4+
"context"
5+
6+
"github.com/hashicorp/terraform-plugin-framework/datasource"
7+
"github.com/hashicorp/terraform-plugin-framework/datasource/schema"
8+
"github.com/hashicorp/terraform-plugin-framework/types"
9+
"github.com/mongodb/terraform-provider-mongodbatlas/internal/config"
10+
)
11+
12+
var _ datasource.DataSource = &streamConnectionDS{}
13+
var _ datasource.DataSourceWithConfigure = &streamConnectionDS{}
14+
15+
func DataSource() datasource.DataSource {
16+
return &streamConnectionDS{
17+
DSCommon: config.DSCommon{
18+
DataSourceName: streamConnectionName,
19+
},
20+
}
21+
}
22+
23+
type streamConnectionDS struct {
24+
config.DSCommon
25+
}
26+
27+
func (d *streamConnectionDS) Schema(ctx context.Context, req datasource.SchemaRequest, resp *datasource.SchemaResponse) {
28+
resp.Schema = schema.Schema{
29+
Attributes: DSAttributes(true),
30+
}
31+
}
32+
33+
// DSAttributes returns the attribute definitions for a single stream connection.
34+
// `withArguments` marks certain attributes as required (for singular data source) or as computed (for plural data source)
35+
func DSAttributes(withArguments bool) map[string]schema.Attribute {
36+
return map[string]schema.Attribute{
37+
"id": schema.StringAttribute{
38+
Computed: true,
39+
},
40+
"project_id": schema.StringAttribute{
41+
Required: withArguments,
42+
Computed: !withArguments,
43+
},
44+
"instance_name": schema.StringAttribute{
45+
Required: withArguments,
46+
Computed: !withArguments,
47+
},
48+
"connection_name": schema.StringAttribute{
49+
Required: withArguments,
50+
Computed: !withArguments,
51+
},
52+
"type": schema.StringAttribute{
53+
Computed: true,
54+
},
55+
56+
// cluster type specific
57+
"cluster_name": schema.StringAttribute{
58+
Computed: true,
59+
},
60+
61+
// kafka type specific
62+
"authentication": schema.SingleNestedAttribute{
63+
Computed: true,
64+
Attributes: map[string]schema.Attribute{
65+
"mechanism": schema.StringAttribute{
66+
Computed: true,
67+
},
68+
"password": schema.StringAttribute{
69+
Computed: true,
70+
Sensitive: true,
71+
},
72+
"username": schema.StringAttribute{
73+
Computed: true,
74+
},
75+
},
76+
},
77+
"bootstrap_servers": schema.StringAttribute{
78+
Computed: true,
79+
},
80+
"config": schema.MapAttribute{
81+
ElementType: types.StringType,
82+
Computed: true,
83+
},
84+
"security": schema.SingleNestedAttribute{
85+
Computed: true,
86+
Attributes: map[string]schema.Attribute{
87+
"broker_public_certificate": schema.StringAttribute{
88+
Computed: true,
89+
},
90+
"protocol": schema.StringAttribute{
91+
Computed: true,
92+
},
93+
},
94+
},
95+
}
96+
}
97+
98+
func (d *streamConnectionDS) Read(ctx context.Context, req datasource.ReadRequest, resp *datasource.ReadResponse) {
99+
var streamConnectionConfig TFStreamConnectionModel
100+
resp.Diagnostics.Append(req.Config.Get(ctx, &streamConnectionConfig)...)
101+
if resp.Diagnostics.HasError() {
102+
return
103+
}
104+
105+
connV2 := d.Client.AtlasV2
106+
projectID := streamConnectionConfig.ProjectID.ValueString()
107+
instanceName := streamConnectionConfig.InstanceName.ValueString()
108+
connectionName := streamConnectionConfig.ConnectionName.ValueString()
109+
apiResp, _, err := connV2.StreamsApi.GetStreamConnection(ctx, projectID, instanceName, connectionName).Execute()
110+
if err != nil {
111+
resp.Diagnostics.AddError("error fetching resource", err.Error())
112+
return
113+
}
114+
115+
newStreamConnectionModel, diags := NewTFStreamConnection(ctx, projectID, instanceName, nil, apiResp)
116+
if diags.HasError() {
117+
resp.Diagnostics.Append(diags...)
118+
return
119+
}
120+
resp.Diagnostics.Append(resp.State.Set(ctx, newStreamConnectionModel)...)
121+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package streamconnection_test
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"testing"
7+
8+
"github.com/hashicorp/terraform-plugin-testing/helper/acctest"
9+
"github.com/hashicorp/terraform-plugin-testing/helper/resource"
10+
"github.com/mongodb/terraform-provider-mongodbatlas/internal/testutil/acc"
11+
)
12+
13+
func TestAccStreamDSStreamConnection_kafkaPlaintext(t *testing.T) {
14+
var (
15+
orgID = os.Getenv("MONGODB_ATLAS_ORG_ID")
16+
projectName = acctest.RandomWithPrefix("test-acc-stream")
17+
instanceName = acctest.RandomWithPrefix("test-acc-instance")
18+
dataSourceName = "data.mongodbatlas_stream_connection.test"
19+
)
20+
resource.ParallelTest(t, resource.TestCase{
21+
PreCheck: func() { acc.PreCheckBetaFlag(t); acc.PreCheckBasic(t) },
22+
ProtoV6ProviderFactories: acc.TestAccProviderV6Factories,
23+
CheckDestroy: CheckDestroyStreamConnection,
24+
Steps: []resource.TestStep{
25+
{
26+
Config: streamConnectionDataSourceConfig(kafkaStreamConnectionConfig(orgID, projectName, instanceName, "user", "rawpassword", "localhost:9092,localhost:9092", "earliest", false)),
27+
Check: kafkaStreamConnectionAttributeChecks(dataSourceName, orgID, projectName, instanceName, "user", "rawpassword", "localhost:9092,localhost:9092", "earliest", false, false),
28+
},
29+
},
30+
})
31+
}
32+
33+
func TestAccStreamDSStreamConnection_kafkaSSL(t *testing.T) {
34+
var (
35+
orgID = os.Getenv("MONGODB_ATLAS_ORG_ID")
36+
projectName = acctest.RandomWithPrefix("test-acc-stream")
37+
instanceName = acctest.RandomWithPrefix("test-acc-instance")
38+
dataSourceName = "data.mongodbatlas_stream_connection.test"
39+
)
40+
resource.ParallelTest(t, resource.TestCase{
41+
PreCheck: func() { acc.PreCheckBetaFlag(t); acc.PreCheckBasic(t) },
42+
ProtoV6ProviderFactories: acc.TestAccProviderV6Factories,
43+
CheckDestroy: CheckDestroyStreamConnection,
44+
Steps: []resource.TestStep{
45+
{
46+
Config: streamConnectionDataSourceConfig(kafkaStreamConnectionConfig(orgID, projectName, instanceName, "user", "rawpassword", "localhost:9092", "earliest", true)),
47+
Check: kafkaStreamConnectionAttributeChecks(dataSourceName, orgID, projectName, instanceName, "user", "rawpassword", "localhost:9092", "earliest", true, false),
48+
},
49+
},
50+
})
51+
}
52+
53+
func TestAccStreamDSStreamConnection_cluster(t *testing.T) {
54+
var (
55+
orgID = os.Getenv("MONGODB_ATLAS_ORG_ID")
56+
clusterInfo = acc.GetClusterInfo(orgID)
57+
instanceName = acctest.RandomWithPrefix("test-acc-name")
58+
dataSourceName = "data.mongodbatlas_stream_connection.test"
59+
)
60+
resource.ParallelTest(t, resource.TestCase{
61+
PreCheck: func() { acc.PreCheckBetaFlag(t); acc.PreCheckBasic(t) },
62+
ProtoV6ProviderFactories: acc.TestAccProviderV6Factories,
63+
CheckDestroy: CheckDestroyStreamConnection,
64+
Steps: []resource.TestStep{
65+
{
66+
Config: streamConnectionDataSourceConfig(clusterStreamConnectionConfig(clusterInfo.ProjectIDStr, instanceName, clusterInfo.ClusterNameStr, clusterInfo.ClusterTerraformStr)),
67+
Check: clusterStreamConnectionAttributeChecks(dataSourceName, clusterInfo.ClusterName),
68+
},
69+
},
70+
})
71+
}
72+
73+
func streamConnectionDataSourceConfig(streamConnectionConfig string) string {
74+
return fmt.Sprintf(`
75+
%s
76+
77+
data "mongodbatlas_stream_connection" "test" {
78+
project_id = mongodbatlas_stream_connection.test.project_id
79+
instance_name = mongodbatlas_stream_connection.test.instance_name
80+
connection_name = mongodbatlas_stream_connection.test.connection_name
81+
}
82+
`, streamConnectionConfig)
83+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package streamconnection
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/hashicorp/terraform-plugin-framework/datasource"
8+
"github.com/hashicorp/terraform-plugin-framework/datasource/schema"
9+
"github.com/hashicorp/terraform-plugin-framework/types"
10+
"github.com/mongodb/terraform-provider-mongodbatlas/internal/common/conversion"
11+
"github.com/mongodb/terraform-provider-mongodbatlas/internal/common/dsschema"
12+
"github.com/mongodb/terraform-provider-mongodbatlas/internal/config"
13+
"go.mongodb.org/atlas-sdk/v20231115002/admin"
14+
)
15+
16+
var _ datasource.DataSource = &streamConnectionsDS{}
17+
var _ datasource.DataSourceWithConfigure = &streamConnectionsDS{}
18+
19+
func PluralDataSource() datasource.DataSource {
20+
return &streamConnectionsDS{
21+
DSCommon: config.DSCommon{
22+
DataSourceName: fmt.Sprintf("%ss", streamConnectionName),
23+
},
24+
}
25+
}
26+
27+
type streamConnectionsDS struct {
28+
config.DSCommon
29+
}
30+
31+
type TFStreamConnectionsDSModel struct {
32+
ID types.String `tfsdk:"id"`
33+
ProjectID types.String `tfsdk:"project_id"`
34+
InstanceName types.String `tfsdk:"instance_name"`
35+
Results []TFStreamConnectionModel `tfsdk:"results"`
36+
PageNum types.Int64 `tfsdk:"page_num"`
37+
ItemsPerPage types.Int64 `tfsdk:"items_per_page"`
38+
TotalCount types.Int64 `tfsdk:"total_count"`
39+
}
40+
41+
func (d *streamConnectionsDS) Schema(ctx context.Context, req datasource.SchemaRequest, resp *datasource.SchemaResponse) {
42+
resp.Schema = dsschema.PaginatedDSSchema(
43+
map[string]schema.Attribute{
44+
"project_id": schema.StringAttribute{
45+
Required: true,
46+
},
47+
"instance_name": schema.StringAttribute{
48+
Required: true,
49+
},
50+
},
51+
DSAttributes(false))
52+
}
53+
54+
func (d *streamConnectionsDS) Read(ctx context.Context, req datasource.ReadRequest, resp *datasource.ReadResponse) {
55+
var streamConnectionsConfig TFStreamConnectionsDSModel
56+
resp.Diagnostics.Append(req.Config.Get(ctx, &streamConnectionsConfig)...)
57+
if resp.Diagnostics.HasError() {
58+
return
59+
}
60+
61+
connV2 := d.Client.AtlasV2
62+
projectID := streamConnectionsConfig.ProjectID.ValueString()
63+
instanceName := streamConnectionsConfig.InstanceName.ValueString()
64+
itemsPerPage := streamConnectionsConfig.ItemsPerPage.ValueInt64Pointer()
65+
pageNum := streamConnectionsConfig.PageNum.ValueInt64Pointer()
66+
67+
apiResp, _, err := connV2.StreamsApi.ListStreamConnectionsWithParams(ctx, &admin.ListStreamConnectionsApiParams{
68+
GroupId: projectID,
69+
TenantName: instanceName,
70+
ItemsPerPage: conversion.Int64PtrToIntPtr(itemsPerPage),
71+
PageNum: conversion.Int64PtrToIntPtr(pageNum),
72+
}).Execute()
73+
74+
if err != nil {
75+
resp.Diagnostics.AddError("error fetching results", err.Error())
76+
return
77+
}
78+
79+
newStreamConnectionsModel, diags := NewTFStreamConnections(ctx, &streamConnectionsConfig, apiResp)
80+
if diags.HasError() {
81+
resp.Diagnostics.Append(diags...)
82+
return
83+
}
84+
resp.Diagnostics.Append(resp.State.Set(ctx, newStreamConnectionsModel)...)
85+
}

0 commit comments

Comments
 (0)