diff --git a/.gitignore b/.gitignore index ce9291d..b0c14b7 100644 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,12 @@ vendor/ dist/ -.vscode/ \ No newline at end of file +.vscode/ +.idea/ +.DS_Store +.terraform/ +*/.terraform/ +.terraform.lock.hcl +terraform.tfstate +terraform.tfstate.backup +*.tfvars \ No newline at end of file diff --git a/README.md b/README.md index 6155a43..0cc60c6 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,18 @@ # terraform-provider-confluent-schema-registry A terraform provider for managing schemas in a Confluent schema registry +## Developing at Fetch +The local development workflow +https://github.com/hashicorp/terraform-provider-aws/issues/5396 +08:32 +1. make go code change +2. make build +3. cp to new place in plugins dir + 4. $ cp dist/terraform-provider-schemaregistry ~/.terraform.d/plugins/local/fetch-rewards/confluent-schema-registry/1.1.0/darwin_arm64/terraform-provider-confluent-schema-registry_1.1.0 +4. delete providers in .terraform +5. delete .terraform.lock.hcl +6. terraform init + ## Provider configuration ``` terraform { @@ -117,3 +129,13 @@ output "schema_string" { ` terraform import schemaregistry_schema.main ` + +The `test-new-build.sh` script can be used to easily test the provider locally. It will build the provider, copy it to the local plugins directory, and run `terraform init` in the `terraform-test-files` directory. + +From the terraform-test-files directory, you can run your commands as you normally would to test your provider changes, ie `terraform apply`, `terraform plan`, etc. + +To see any logging that you added to the provider (using tflog) you can add the environment variable `TF_LOG=INFO` to your command, ie `TF_LOG=INFO terraform apply`. + +An example of using tflog to log a message in the provider is below: + +```tflog.Info(ctx, "Configuring SchemaRegistry client")``` \ No newline at end of file diff --git a/schemaregistry/data_source_schema.go b/schemaregistry/data_source_schema.go index c00679b..8fe8b9d 100644 --- a/schemaregistry/data_source_schema.go +++ b/schemaregistry/data_source_schema.go @@ -2,6 +2,7 @@ package schemaregistry import ( "context" + "fmt" "github.com/ashleybill/srclient" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" @@ -81,9 +82,17 @@ func dataSourceSubjectRead(ctx context.Context, d *schema.ResourceData, m interf return diag.FromErr(err) } - d.Set("schema", schema.Schema()) - d.Set("schema_id", schema.ID()) - d.Set("version", schema.Version()) + if err = d.Set("schema_id", schema.ID()); err != nil { + return diag.FromErr(fmt.Errorf("error in dataSourceSubjectRead with setting schema_id: %w", err)) + } + + if err = d.Set("schema", schema.Schema()); err != nil { + return diag.FromErr(fmt.Errorf("error in dataSourceSubjectRead with setting schema: %w", err)) + } + + if err = d.Set("version", schema.Version()); err != nil { + return diag.FromErr(fmt.Errorf("error in dataSourceSubjectRead with setting version: %w", err)) + } if err = d.Set("references", FromRegistryReferences(schema.References())); err != nil { return diag.FromErr(err) diff --git a/schemaregistry/provider.go b/schemaregistry/provider.go index 286bee3..b46dc23 100644 --- a/schemaregistry/provider.go +++ b/schemaregistry/provider.go @@ -3,6 +3,7 @@ package schemaregistry import ( "context" "errors" + "github.com/hashicorp/terraform-plugin-log/tflog" "github.com/ashleybill/srclient" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" @@ -43,7 +44,7 @@ func providerConfigure(ctx context.Context, d *schema.ResourceData) (interface{} url := d.Get("schema_registry_url").(string) username := d.Get("username").(string) password := d.Get("password").(string) - + tflog.Info(ctx, "Configuring SchemaRegistry client") // Warning or errors can be collected in a slice type var diags diag.Diagnostics diff --git a/schemaregistry/resource_schema.go b/schemaregistry/resource_schema.go index 23f4fb5..4c97f9d 100644 --- a/schemaregistry/resource_schema.go +++ b/schemaregistry/resource_schema.go @@ -105,22 +105,13 @@ func schemaCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) schema, err := client.CreateSchema(subject, schemaString, schemaType, references...) if err != nil { - return diag.FromErr(fmt.Errorf("error creating in the createschema function: %w", err)) + return diag.FromErr(err) } d.SetId(formatSchemaVersionID(subject)) - - if err = d.Set("schema_id", schema.ID()); err != nil { - return diag.FromErr(fmt.Errorf("error in schemaCreate with setting schema_id: %w", err)) - } - - if err = d.Set("schema", schema.Schema()); err != nil { - return diag.FromErr(fmt.Errorf("error in schemaCreate with setting schema: %w", err)) - } - - if err = d.Set("version", schema.Version()); err != nil { - return diag.FromErr(fmt.Errorf("error in schemaCreate with setting version: %w", err)) - } + d.Set("schema_id", schema.ID()) + d.Set("schema", schema.Schema()) + d.Set("version", schema.Version()) if err = d.Set("reference", FromRegistryReferences(schema.References())); err != nil { return diag.FromErr(err) @@ -136,28 +127,40 @@ func schemaUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) schemaString := d.Get("schema").(string) references := ToRegistryReferences(d.Get("reference").([]interface{})) schemaType := ToSchemaType(d.Get("schema_type")) - + currentSchemaId := d.Get("schema_id").(int) client := meta.(*srclient.SchemaRegistryClient) + // This CreateSchema call does not fail if the schema already exists -- it just returns the schema. + // This isn't ideal because if we update a schema with an OLD schema string, it will just return that old version + // without updating the newest version to that version. + // This results in a permanent diff in terraform -- because the latest schema is not matching what is in our new terraform. schema, err := client.CreateSchema(subject, schemaString, schemaType, references...) if err != nil { if strings.Contains(err.Error(), "409") { - return diag.Errorf(`invalid "schema": incompatible. your schema has the compatability type set to BACKWARD. this means you can only perform the following: delete field, create OPTIONAL fields.`) + return diag.FromErr(fmt.Errorf("invalid 'schema': Incompatible. Please check the compatability level of your schema and compare it against the allowed actions found here: https://docs.confluent.io/cloud/current/sr/fundamentals/schema-evolution.html#compatibility-types.")) } - return diag.FromErr(fmt.Errorf("error creating in the updateschema function: %w", err)) - } - - if err = d.Set("schema_id", schema.ID()); err != nil { - return diag.FromErr(fmt.Errorf("error in schemaUpdate with setting schema_id: %w", err)) - } - - if err = d.Set("schema", schema.Schema()); err != nil { - return diag.FromErr(fmt.Errorf("error in schemaUpdate with setting schema: %w", err)) + return diag.FromErr(err) } - if err = d.Set("version", schema.Version()); err != nil { - return diag.FromErr(fmt.Errorf("error in schemaUpdate with setting version: %w", err)) + // If the schema returned from the above call is that of an EXISTING schema, we now do a soft delete on the old + //schema and then recreate it, so that the "old" version is now the most updated version and our state matches + //(soft delete just de-registers if from the subject it i think? but it still exists) + if schema.ID() < currentSchemaId { + err = client.DeleteSubjectByVersion(subject, schema.Version(), false) + if err != nil { + return diag.FromErr(err) + } + schema, err = client.CreateSchema(subject, schemaString, schemaType, references...) + if err != nil { + if strings.Contains(err.Error(), "409") { + return diag.FromErr(fmt.Errorf("invalid 'schema': Incompatible. Please check the compatability level of your schema and compare it against the allowed actions found here: https://docs.confluent.io/cloud/current/sr/fundamentals/schema-evolution.html#compatibility-types.")) + } + return diag.FromErr(err) + } } + d.Set("schema_id", schema.ID()) + d.Set("schema", schema.Schema()) + d.Set("version", schema.Version()) if err = d.Set("reference", FromRegistryReferences(schema.References())); err != nil { return diag.FromErr(err) @@ -168,50 +171,26 @@ func schemaUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) func schemaRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { var diags diag.Diagnostics + client := meta.(*srclient.SchemaRegistryClient) subject := extractSchemaVersionID(d.Id()) - //TODO - // I feel like we want to 1. get the latest schema from the client (in case a change was made in UI) - // 2. compare the latest schema to the schema in tf state - // 3. if they are different, show the difference and say that the schema is going to be changed (back to what it was?) -- that would possibly give incompaatable error, because we - // TODO - newSchema := d.Get("schema") // TODO this is getting it from state. so if it is fucked up in state.... - references := ToRegistryReferences(d.Get("reference").([]interface{})) - schemaType := ToSchemaType(d.Get("schema_type")) - var schema *srclient.Schema var err error - - if newSchema == nil { - fmt.Println("getting latest schema") - schema, err = client.GetLatestSchema(subject) - if err != nil { - return diag.FromErr(fmt.Errorf("error getting last schema: %w", err)) - } - } else { - fmt.Println("looking up schema") - schema, err = client.LookupSchema(subject, newSchema.(string), schemaType, references...) - if err != nil { - // TODO: it is breaking here on terraform plan when we change things in terraform - return diag.FromErr(fmt.Errorf("error looking up schema: %w. newSchema is %v", err, newSchema)) - } - } - - if err = d.Set("schema_id", schema.ID()); err != nil { - return diag.FromErr(fmt.Errorf("error in schemaRead with setting schema_id: %w", err)) + // before, the provider tried to look up the schema by the schema string. + // The issue was that when a terraform apply ran and failed, it was looking for a schema string that didn't exist (before the tf state gets updated even on a failure) + // now, we do not use the tf state to refresh -- we get the latest schema from the registry + latestSchema, err := client.GetLatestSchema(subject) + if err != nil { + return diag.FromErr(fmt.Errorf("error getting last schema: %w", err)) } - if err = d.Set("schema", schema.Schema()); err != nil { - return diag.FromErr(fmt.Errorf("error in schemaRead with setting schema: %w", err)) - } - if err = d.Set("subject", subject); err != nil { - return diag.FromErr(fmt.Errorf("error in schemaRead with setting subject: %w", err)) - } - if err = d.Set("version", schema.Version()); err != nil { - return diag.FromErr(fmt.Errorf("error in schemaRead with setting version: %w", err)) - } + // At this point, the schema read in matches the most recent version found in the kafka ui/registry + d.Set("schema", latestSchema.Schema()) + d.Set("schema_id", latestSchema.ID()) + d.Set("subject", subject) + d.Set("version", latestSchema.Version()) - if err = d.Set("reference", FromRegistryReferences(schema.References())); err != nil { + if err = d.Set("reference", FromRegistryReferences(latestSchema.References())); err != nil { return diag.FromErr(err) } diff --git a/terraform-test-files/provider.tf b/terraform-test-files/provider.tf new file mode 100644 index 0000000..0a4169f --- /dev/null +++ b/terraform-test-files/provider.tf @@ -0,0 +1,32 @@ +terraform { + required_providers { + schemaregistry = { + source = "local/fetch-rewards/confluent-schema-registry" + version = "1.1.0" + } + + kafka = { + source = "mongey/kafka", + version = "0.5.3" + } + aws = { + source = "hashicorp/aws", + version = "5.21.0" + } + } +} + + +provider "schemaregistry" { + alias = "schema-registry-dev" + schema_registry_url = "https://dev-event-tracking-schema-registry.fetchrewards.com" +} + +provider "kafka" { + alias = "event-tracking-dev" + bootstrap_servers = [ + "dev-event-tracking-kafka.fetchrewards.com:9094" + ] + skip_tls_verify = true + tls_enabled = false +} \ No newline at end of file diff --git a/terraform-test-files/schema.tf b/terraform-test-files/schema.tf new file mode 100644 index 0000000..d5ec632 --- /dev/null +++ b/terraform-test-files/schema.tf @@ -0,0 +1,16 @@ +resource "kafka_topic" "test_provider_topic" { + name = "provider-test-topic" + partitions = 1 + replication_factor = 1 + config = { + "min.insync.replicas": "1" + } + provider= kafka.event-tracking-dev +} + +resource "schemaregistry_schema" "test_provider_topic_schema" { + provider= schemaregistry.schema-registry-dev + schema = "{\n\t\"type\": \"record\",\n\t\"name\": \"AwesomeEvent\",\n\t\"namespace\": \"com.fetchrewards.kia\",\n\t\"fields\": [\n\t\t{\n\t\t\t\"name\": \"id\",\n\t\t\t\"type\": \"int\",\n\t\t\t\"default\": \"null\"\n\t\t},\n\t\t{\n\t\t\t\"name\": \"eventType\",\n\t\t\t\"type\": \"string\"\n\t\t},\n\t\t{\n\t\t\t\"name\": \"updateTs\",\n\t\t\t\"type\": \"int\"\n\t\t}\n\t]\n}" + schema_type = "AVRO" + subject = "provider-test-schema" +} diff --git a/test-new-build.sh b/test-new-build.sh new file mode 100755 index 0000000..8cf27f1 --- /dev/null +++ b/test-new-build.sh @@ -0,0 +1,7 @@ +cd .. || exit +make build +cp dist/terraform-provider-schemaregistry ~/.terraform.d/plugins/local/fetch-rewards/confluent-schema-registry/1.1.0/darwin_arm64/terraform-provider-confluent-schema-registry_1.1.0 +cd terraform-files-test || exit +rm -rf .terraform/providers +rm .terraform.lock.hcl +terraform init \ No newline at end of file