Skip to content

Commit

Permalink
schema returned from read is always the latest version from the schem…
Browse files Browse the repository at this point in the history
…a registry
  • Loading branch information
kia-martinez committed Dec 13, 2023
1 parent 631b5be commit 7884b57
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 68 deletions.
10 changes: 9 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,12 @@
vendor/

dist/
.vscode/
.vscode/
.idea/
.DS_Store
.terraform/
*/.terraform/
.terraform.lock.hcl
terraform.tfstate
terraform.tfstate.backup
*.tfvars
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
# terraform-provider-confluent-schema-registry
A terraform provider for managing schemas in a Confluent schema registry

## Developing at Fetch
The local development workflow
https://github.com/hashicorp/terraform-provider-aws/issues/5396
08:32
1. make go code change
2. make build
3. cp to new place in plugins dir
4. $ cp dist/terraform-provider-schemaregistry ~/.terraform.d/plugins/local/fetch-rewards/confluent-schema-registry/1.1.0/darwin_arm64/terraform-provider-confluent-schema-registry_1.1.0
4. delete providers in .terraform
5. delete .terraform.lock.hcl
6. terraform init

## Provider configuration
```
terraform {
Expand Down Expand Up @@ -117,3 +129,13 @@ output "schema_string" {
`
terraform import schemaregistry_schema.main <subject_name>
`

The `test-new-build.sh` script can be used to easily test the provider locally. It will build the provider, copy it to the local plugins directory, and run `terraform init` in the `terraform-test-files` directory.

From the terraform-test-files directory, you can run your commands as you normally would to test your provider changes, ie `terraform apply`, `terraform plan`, etc.

To see any logging that you added to the provider (using tflog) you can add the environment variable `TF_LOG=INFO` to your command, ie `TF_LOG=INFO terraform apply`.

An example of using tflog to log a message in the provider is below:

```tflog.Info(ctx, "Configuring SchemaRegistry client")```
15 changes: 12 additions & 3 deletions schemaregistry/data_source_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package schemaregistry

import (
"context"
"fmt"

"github.com/ashleybill/srclient"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
Expand Down Expand Up @@ -81,9 +82,17 @@ func dataSourceSubjectRead(ctx context.Context, d *schema.ResourceData, m interf
return diag.FromErr(err)
}

d.Set("schema", schema.Schema())
d.Set("schema_id", schema.ID())
d.Set("version", schema.Version())
if err = d.Set("schema_id", schema.ID()); err != nil {
return diag.FromErr(fmt.Errorf("error in dataSourceSubjectRead with setting schema_id: %w", err))
}

if err = d.Set("schema", schema.Schema()); err != nil {
return diag.FromErr(fmt.Errorf("error in dataSourceSubjectRead with setting schema: %w", err))
}

if err = d.Set("version", schema.Version()); err != nil {
return diag.FromErr(fmt.Errorf("error in dataSourceSubjectRead with setting version: %w", err))
}

if err = d.Set("references", FromRegistryReferences(schema.References())); err != nil {
return diag.FromErr(err)
Expand Down
3 changes: 2 additions & 1 deletion schemaregistry/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package schemaregistry
import (
"context"
"errors"
"github.com/hashicorp/terraform-plugin-log/tflog"

"github.com/ashleybill/srclient"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
Expand Down Expand Up @@ -43,7 +44,7 @@ func providerConfigure(ctx context.Context, d *schema.ResourceData) (interface{}
url := d.Get("schema_registry_url").(string)
username := d.Get("username").(string)
password := d.Get("password").(string)

tflog.Info(ctx, "Configuring SchemaRegistry client")
// Warning or errors can be collected in a slice type
var diags diag.Diagnostics

Expand Down
105 changes: 42 additions & 63 deletions schemaregistry/resource_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,22 +105,13 @@ func schemaCreate(ctx context.Context, d *schema.ResourceData, meta interface{})

schema, err := client.CreateSchema(subject, schemaString, schemaType, references...)
if err != nil {
return diag.FromErr(fmt.Errorf("error creating in the createschema function: %w", err))
return diag.FromErr(err)
}

d.SetId(formatSchemaVersionID(subject))

if err = d.Set("schema_id", schema.ID()); err != nil {
return diag.FromErr(fmt.Errorf("error in schemaCreate with setting schema_id: %w", err))
}

if err = d.Set("schema", schema.Schema()); err != nil {
return diag.FromErr(fmt.Errorf("error in schemaCreate with setting schema: %w", err))
}

if err = d.Set("version", schema.Version()); err != nil {
return diag.FromErr(fmt.Errorf("error in schemaCreate with setting version: %w", err))
}
d.Set("schema_id", schema.ID())
d.Set("schema", schema.Schema())
d.Set("version", schema.Version())

if err = d.Set("reference", FromRegistryReferences(schema.References())); err != nil {
return diag.FromErr(err)
Expand All @@ -136,28 +127,40 @@ func schemaUpdate(ctx context.Context, d *schema.ResourceData, meta interface{})
schemaString := d.Get("schema").(string)
references := ToRegistryReferences(d.Get("reference").([]interface{}))
schemaType := ToSchemaType(d.Get("schema_type"))

currentSchemaId := d.Get("schema_id").(int)
client := meta.(*srclient.SchemaRegistryClient)

// This CreateSchema call does not fail if the schema already exists -- it just returns the schema.
// This isn't ideal because if we update a schema with an OLD schema string, it will just return that old version
// without updating the newest version to that version.
// This results in a permanent diff in terraform -- because the latest schema is not matching what is in our new terraform.
schema, err := client.CreateSchema(subject, schemaString, schemaType, references...)
if err != nil {
if strings.Contains(err.Error(), "409") {
return diag.Errorf(`invalid "schema": incompatible. your schema has the compatability type set to BACKWARD. this means you can only perform the following: delete field, create OPTIONAL fields.`)
return diag.FromErr(fmt.Errorf("invalid 'schema': Incompatible. Please check the compatability level of your schema and compare it against the allowed actions found here: https://docs.confluent.io/cloud/current/sr/fundamentals/schema-evolution.html#compatibility-types."))
}
return diag.FromErr(fmt.Errorf("error creating in the updateschema function: %w", err))
}

if err = d.Set("schema_id", schema.ID()); err != nil {
return diag.FromErr(fmt.Errorf("error in schemaUpdate with setting schema_id: %w", err))
}

if err = d.Set("schema", schema.Schema()); err != nil {
return diag.FromErr(fmt.Errorf("error in schemaUpdate with setting schema: %w", err))
return diag.FromErr(err)
}

if err = d.Set("version", schema.Version()); err != nil {
return diag.FromErr(fmt.Errorf("error in schemaUpdate with setting version: %w", err))
// If the schema returned from the above call is that of an EXISTING schema, we now do a soft delete on the old
//schema and then recreate it, so that the "old" version is now the most updated version and our state matches
//(soft delete just de-registers if from the subject it i think? but it still exists)
if schema.ID() < currentSchemaId {
err = client.DeleteSubjectByVersion(subject, schema.Version(), false)
if err != nil {
return diag.FromErr(err)
}
schema, err = client.CreateSchema(subject, schemaString, schemaType, references...)
if err != nil {
if strings.Contains(err.Error(), "409") {
return diag.FromErr(fmt.Errorf("invalid 'schema': Incompatible. Please check the compatability level of your schema and compare it against the allowed actions found here: https://docs.confluent.io/cloud/current/sr/fundamentals/schema-evolution.html#compatibility-types."))
}
return diag.FromErr(err)
}
}
d.Set("schema_id", schema.ID())
d.Set("schema", schema.Schema())
d.Set("version", schema.Version())

if err = d.Set("reference", FromRegistryReferences(schema.References())); err != nil {
return diag.FromErr(err)
Expand All @@ -168,50 +171,26 @@ func schemaUpdate(ctx context.Context, d *schema.ResourceData, meta interface{})

func schemaRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
var diags diag.Diagnostics

client := meta.(*srclient.SchemaRegistryClient)
subject := extractSchemaVersionID(d.Id())
//TODO
// I feel like we want to 1. get the latest schema from the client (in case a change was made in UI)
// 2. compare the latest schema to the schema in tf state
// 3. if they are different, show the difference and say that the schema is going to be changed (back to what it was?) -- that would possibly give incompaatable error, because we
// TODO
newSchema := d.Get("schema") // TODO this is getting it from state. so if it is fucked up in state....
references := ToRegistryReferences(d.Get("reference").([]interface{}))
schemaType := ToSchemaType(d.Get("schema_type"))

var schema *srclient.Schema
var err error

if newSchema == nil {
fmt.Println("getting latest schema")
schema, err = client.GetLatestSchema(subject)
if err != nil {
return diag.FromErr(fmt.Errorf("error getting last schema: %w", err))
}
} else {
fmt.Println("looking up schema")
schema, err = client.LookupSchema(subject, newSchema.(string), schemaType, references...)
if err != nil {
// TODO: it is breaking here on terraform plan when we change things in terraform
return diag.FromErr(fmt.Errorf("error looking up schema: %w. newSchema is %v", err, newSchema))
}
}

if err = d.Set("schema_id", schema.ID()); err != nil {
return diag.FromErr(fmt.Errorf("error in schemaRead with setting schema_id: %w", err))
// before, the provider tried to look up the schema by the schema string.
// The issue was that when a terraform apply ran and failed, it was looking for a schema string that didn't exist (before the tf state gets updated even on a failure)
// now, we do not use the tf state to refresh -- we get the latest schema from the registry
latestSchema, err := client.GetLatestSchema(subject)
if err != nil {
return diag.FromErr(fmt.Errorf("error getting last schema: %w", err))
}

if err = d.Set("schema", schema.Schema()); err != nil {
return diag.FromErr(fmt.Errorf("error in schemaRead with setting schema: %w", err))
}
if err = d.Set("subject", subject); err != nil {
return diag.FromErr(fmt.Errorf("error in schemaRead with setting subject: %w", err))
}
if err = d.Set("version", schema.Version()); err != nil {
return diag.FromErr(fmt.Errorf("error in schemaRead with setting version: %w", err))
}
// At this point, the schema read in matches the most recent version found in the kafka ui/registry
d.Set("schema", latestSchema.Schema())
d.Set("schema_id", latestSchema.ID())
d.Set("subject", subject)
d.Set("version", latestSchema.Version())

if err = d.Set("reference", FromRegistryReferences(schema.References())); err != nil {
if err = d.Set("reference", FromRegistryReferences(latestSchema.References())); err != nil {
return diag.FromErr(err)
}

Expand Down
32 changes: 32 additions & 0 deletions terraform-test-files/provider.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
terraform {
required_providers {
schemaregistry = {
source = "local/fetch-rewards/confluent-schema-registry"
version = "1.1.0"
}

kafka = {
source = "mongey/kafka",
version = "0.5.3"
}
aws = {
source = "hashicorp/aws",
version = "5.21.0"
}
}
}


provider "schemaregistry" {
alias = "schema-registry-dev"
schema_registry_url = "https://dev-event-tracking-schema-registry.fetchrewards.com"
}

provider "kafka" {
alias = "event-tracking-dev"
bootstrap_servers = [
"dev-event-tracking-kafka.fetchrewards.com:9094"
]
skip_tls_verify = true
tls_enabled = false
}
16 changes: 16 additions & 0 deletions terraform-test-files/schema.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
resource "kafka_topic" "test_provider_topic" {
name = "provider-test-topic"
partitions = 1
replication_factor = 1
config = {
"min.insync.replicas": "1"
}
provider= kafka.event-tracking-dev
}

resource "schemaregistry_schema" "test_provider_topic_schema" {
provider= schemaregistry.schema-registry-dev
schema = "{\n\t\"type\": \"record\",\n\t\"name\": \"AwesomeEvent\",\n\t\"namespace\": \"com.fetchrewards.kia\",\n\t\"fields\": [\n\t\t{\n\t\t\t\"name\": \"id\",\n\t\t\t\"type\": \"int\",\n\t\t\t\"default\": \"null\"\n\t\t},\n\t\t{\n\t\t\t\"name\": \"eventType\",\n\t\t\t\"type\": \"string\"\n\t\t},\n\t\t{\n\t\t\t\"name\": \"updateTs\",\n\t\t\t\"type\": \"int\"\n\t\t}\n\t]\n}"
schema_type = "AVRO"
subject = "provider-test-schema"
}
7 changes: 7 additions & 0 deletions test-new-build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
cd .. || exit
make build
cp dist/terraform-provider-schemaregistry ~/.terraform.d/plugins/local/fetch-rewards/confluent-schema-registry/1.1.0/darwin_arm64/terraform-provider-confluent-schema-registry_1.1.0
cd terraform-files-test || exit
rm -rf .terraform/providers
rm .terraform.lock.hcl
terraform init

0 comments on commit 7884b57

Please sign in to comment.