Skip to content

Commit

Permalink
Use YAML modifications instead of regex replacement (kubeflow#2815)
Browse files Browse the repository at this point in the history
* use YAML support for IAM bindings

* write to iam_bindings.yaml

* yaml config to cluster-kubeflow

* yaml support to storage-kubeflow

* comments

* fix

* add desc
  • Loading branch information
gabrielwen authored and k8s-ci-robot committed Mar 28, 2019
1 parent aced3de commit 640b2e8
Showing 1 changed file with 216 additions and 67 deletions.
283 changes: 216 additions & 67 deletions bootstrap/pkg/kfapp/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import (
"os/exec"
"path"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -693,16 +692,209 @@ func setNameVal(entries []configtypes.NameValue, name string, val string, requir
return entries
}

//TODO(#2515)
func (gcp *Gcp) replaceText(regex string, repl string, src []byte) []byte {
re := regexp.MustCompile(regex)
buf := re.ReplaceAll(src, []byte(repl))
return buf
// Helper function to generate account field for IAP.
func (gcp *Gcp) getIapAccount() string {
iapAcct := "serviceAccount:" + gcp.Spec.Email
if !strings.Contains(gcp.Spec.Email, "iam.gserviceaccount.com") {
iapAcct = "user:" + gcp.Spec.Email
}
return iapAcct
}

// Write IAM binding rules based on GCP app config.
func (gcp *Gcp) writeIamBindingsFile(src string, dest string) error {
buf, err := ioutil.ReadFile(src)
if err != nil {
return &kfapis.KfError{
Code: int(kfapis.INTERNAL_ERROR),
Message: fmt.Sprintf("Error when reading template %v: %v", src, err),
}
}

var data map[string]interface{}
if err = yaml.Unmarshal(buf, &data); err != nil {
return &kfapis.KfError{
Code: int(kfapis.INTERNAL_ERROR),
Message: fmt.Sprintf("Error when unmarshaling template %v: %v", src, err),
}
}

e, ok := data["bindings"]
if !ok {
return &kfapis.KfError{
Code: int(kfapis.INTERNAL_ERROR),
Message: "Invalid IAM bindings format: not able to find `bindings` entry.",
}
}

roles := map[string]string{
"set-kubeflow-admin-service-account": "serviceAccount:" + getSA(gcp.Name, "admin", gcp.Spec.Project),
"set-kubeflow-user-service-account": "serviceAccount:" + getSA(gcp.Name, "user", gcp.Spec.Project),
"set-kubeflow-vm-service-account": "serviceAccount:" + getSA(gcp.Name, "vm", gcp.Spec.Project),
"set-kubeflow-iap-account": gcp.getIapAccount(),
}

bindings := e.([]interface{})
for idx, b := range bindings {
binding := b.(map[string]interface{})
if mem, ok := binding["members"]; ok {
members := mem.([]interface{})
var newMembers []string
for _, m := range members {
member := m.(string)
if acct, ok := roles[member]; ok {
newMembers = append(newMembers, acct)
} else {
newMembers = append(newMembers, member)
}
}
binding["members"] = newMembers
bindings[idx] = binding
} else {
return &kfapis.KfError{
Code: int(kfapis.INTERNAL_ERROR),
Message: "Invalid IAM bindings format: not able to find `members` entry.",
}
}
}
data["bindings"] = bindings

if buf, err = yaml.Marshal(data); err != nil {
return &kfapis.KfError{
Code: int(kfapis.INTERNAL_ERROR),
Message: fmt.Sprintf("Error when marshaling IAM bindings: %v", err),
}
}
if err = ioutil.WriteFile(dest, buf, 0644); err != nil {
return &kfapis.KfError{
Code: int(kfapis.INTERNAL_ERROR),
Message: fmt.Sprintf("Error when writing IAM bindings: %v", err),
}
}
return nil
}

// Replace placeholders and write to cluster-kubeflow.yaml
func (gcp *Gcp) writeClusterConfig(src string, dest string) error {
buf, err := ioutil.ReadFile(src)
if err != nil {
return &kfapis.KfError{
Code: int(kfapis.INTERNAL_ERROR),
Message: fmt.Sprintf("Error when reading template %v: %v", src, err),
}
}

var data map[string]interface{}
if err = yaml.Unmarshal(buf, &data); err != nil {
return &kfapis.KfError{
Code: int(kfapis.INTERNAL_ERROR),
Message: fmt.Sprintf("Error when unmarshaling template %v: %v", src, err),
}
}

res, ok := data["resources"]
if !ok {
return &kfapis.KfError{
Code: int(kfapis.INTERNAL_ERROR),
Message: "Invalid cluster config - not able to find resources entry.",
}
}

resources := res.([]interface{})
for idx, re := range resources {
resource := re.(map[string]interface{})
var properties map[string]interface{}
if props, ok := resource["properties"]; ok {
properties = props.(map[string]interface{})
} else {
properties = make(map[string]interface{})
}
properties["gkeApiVersion"] = kftypes.DefaultGkeApiVer
properties["zone"] = gcp.Spec.Zone
properties["users"] = []string{
gcp.getIapAccount(),
}
properties["ipName"] = gcp.Spec.IpName
resource["properties"] = properties
resources[idx] = resource
}
data["resources"] = resources

if buf, err = yaml.Marshal(data); err != nil {
return &kfapis.KfError{
Code: int(kfapis.INTERNAL_ERROR),
Message: fmt.Sprintf("Error when marshaling for %v: %v", dest, err),
}
}
if err = ioutil.WriteFile(dest, buf, 0644); err != nil {
return &kfapis.KfError{
Code: int(kfapis.INTERNAL_ERROR),
Message: fmt.Sprintf("Error when writing to %v: %v", dest, err),
}
}

return nil
}

// Replace placeholders and write to storage-kubeflow.yaml
func (gcp *Gcp) writeStorageConfig(src string, dest string) error {
buf, err := ioutil.ReadFile(src)
if err != nil {
return &kfapis.KfError{
Code: int(kfapis.INTERNAL_ERROR),
Message: fmt.Sprintf("Error when reading storage-kubeflow template: %v", err),
}
}

var data map[string]interface{}
if err = yaml.Unmarshal(buf, &data); err != nil {
return &kfapis.KfError{
Code: int(kfapis.INTERNAL_ERROR),
Message: fmt.Sprintf("Error when unmarshaling template %v: %v", src, err),
}
}

res, ok := data["resources"]
if !ok {
return &kfapis.KfError{
Code: int(kfapis.INTERNAL_ERROR),
Message: "Invalid storage config - not able to find resources entry.",
}
}

resources := res.([]interface{})
for idx, re := range resources {
resource := re.(map[string]interface{})
var properties map[string]interface{}
if props, ok := resource["properties"]; ok {
properties = props.(map[string]interface{})
} else {
properties = make(map[string]interface{})
}
properties["zone"] = gcp.Spec.Zone
properties["createPipelinePersistentStorage"] = true
resource["properties"] = properties
resources[idx] = resource
}
data["resources"] = resources

if buf, err = yaml.Marshal(data); err != nil {
return &kfapis.KfError{
Code: int(kfapis.INTERNAL_ERROR),
Message: fmt.Sprintf("Error when marshaling for %v: %v", dest, err),
}
}
if err = ioutil.WriteFile(dest, buf, 0644); err != nil {
return &kfapis.KfError{
Code: int(kfapis.INTERNAL_ERROR),
Message: fmt.Sprintf("Error when writing to %v: %v", dest, err),
}
}

return nil
}

// TODO(#2515): Switch from string replacement to YAML config.
func (gcp *Gcp) generateDMConfigs() error {
// TODO(gabrielwen): Use YAML support instead of string replacement.
appDir := gcp.Spec.AppDir
gcpConfigDir := path.Join(appDir, GCP_CONFIG)
gcpConfigDirErr := os.MkdirAll(gcpConfigDir, os.ModePerm)
Expand All @@ -712,8 +904,8 @@ func (gcp *Gcp) generateDMConfigs() error {
repo := gcp.Spec.Repo
parentDir := path.Dir(repo)
sourceDir := path.Join(parentDir, "deployment/gke/deployment_manager_configs")
files := []string{"cluster-kubeflow.yaml", "cluster.jinja", "cluster.jinja.schema",
"storage-kubeflow.yaml", "storage.jinja", "storage.jinja.schema"}
files := []string{"cluster.jinja", "cluster.jinja.schema", "storage.jinja",
"storage.jinja.schema"}
for _, file := range files {
sourceFile := filepath.Join(sourceDir, file)
destFile := filepath.Join(gcpConfigDir, file)
Expand All @@ -722,68 +914,25 @@ func (gcp *Gcp) generateDMConfigs() error {
return fmt.Errorf("could not copy %v to %v Error %v", sourceFile, destFile, copyErr)
}
}

// Reading from templates and write to gcp_config directory with content had placeholders
// replaced.
from := filepath.Join(sourceDir, "iam_bindings_template.yaml")
to := filepath.Join(gcpConfigDir, "iam_bindings.yaml")
iamBindings := map[string]string{
"from": from,
"to": to,
}
iamBindingsErr := gcp.copyFile(iamBindings["from"], iamBindings["to"])
if iamBindingsErr != nil {
return fmt.Errorf("could not copy iam_bindings Error %v", iamBindingsErr)
if err := gcp.writeIamBindingsFile(from, to); err != nil {
return err
}
iamBindingsData, iamBindingsDataErr := ioutil.ReadFile(to) // just pass the file name
if iamBindingsDataErr != nil {
return fmt.Errorf("could not read %v Error %v", to, iamBindingsDataErr)
from = filepath.Join(sourceDir, CONFIG_FILE)
to = filepath.Join(gcpConfigDir, CONFIG_FILE)
if err := gcp.writeClusterConfig(from, to); err != nil {
return err
}
adminEmail := getSA(gcp.Name, "admin", gcp.Spec.Project)
repl := "serviceAccount:" + adminEmail
iamBindingsData = gcp.replaceText("set-kubeflow-admin-service-account", repl, iamBindingsData)
userEmail := getSA(gcp.Name, "user", gcp.Spec.Project)
repl = "serviceAccount:" + userEmail
iamBindingsData = gcp.replaceText("set-kubeflow-user-service-account", repl, iamBindingsData)
vmEmail := getSA(gcp.Name, "vm", gcp.Spec.Project)
repl = "serviceAccount:" + vmEmail
iamBindingsData = gcp.replaceText("set-kubeflow-vm-service-account", repl, iamBindingsData)
iamEntry := "serviceAccount:" + gcp.Spec.Email
re := regexp.MustCompile("iam.gserviceaccount.com")
if !re.MatchString(gcp.Spec.Email) {
iamEntry = "user:" + gcp.Spec.Email
}
iamBindingsData = gcp.replaceText("set-kubeflow-iap-account", iamEntry, iamBindingsData)
srcErr := ioutil.WriteFile(to, iamBindingsData, 0644)
if srcErr != nil {
return fmt.Errorf("cound not write to %v Error %v", to, srcErr)
}
configFile := filepath.Join(gcpConfigDir, CONFIG_FILE)
configFileData, configFileDataErr := ioutil.ReadFile(configFile)
if configFileDataErr != nil {
return fmt.Errorf("could not read %v Error %v", configFile, configFileDataErr)
}
storageFile := filepath.Join(gcpConfigDir, STORAGE_FILE)
storageFileData, storageFileDataErr := ioutil.ReadFile(storageFile)
if storageFileDataErr != nil {
return fmt.Errorf("could not read %v Error %v", storageFile, storageFileDataErr)
}
configFileData = gcp.replaceText("SET_GKE_API_VERSION", kftypes.DefaultGkeApiVer, configFileData)
repl = "zone: " + gcp.Spec.Zone
configFileData = gcp.replaceText("zone: SET_THE_ZONE", repl, configFileData)
storageFileData = gcp.replaceText("zone: SET_THE_ZONE", repl, storageFileData)
repl = "users: [\"" + iamEntry + "\"]"
configFileData = gcp.replaceText("users:", repl, configFileData)
repl = "ipName: " + gcp.Spec.IpName
configFileData = gcp.replaceText("ipName: kubeflow-ip", repl, configFileData)
configFileErr := ioutil.WriteFile(configFile, configFileData, 0644)
if configFileErr != nil {
return fmt.Errorf("cound not write to %v Error %v", configFile, configFileErr)
}
repl = "createPipelinePersistentStorage: true"
storageFileData = gcp.replaceText("createPipelinePersistentStorage: SET_CREATE_PIPELINE_PERSISTENT_STORAGE",
repl, storageFileData)
storageFileErr := ioutil.WriteFile(storageFile, storageFileData, 0644)
if storageFileErr != nil {
return fmt.Errorf("cound not write to %v Error %v", storageFile, storageFileErr)
from = filepath.Join(sourceDir, STORAGE_FILE)
to = filepath.Join(gcpConfigDir, STORAGE_FILE)
if err := gcp.writeStorageConfig(from, to); err != nil {
return err
}

return nil
}

Expand Down

0 comments on commit 640b2e8

Please sign in to comment.