Skip to content

Commit

Permalink
Prow secret agent: Add secret parsing hook mechanism
Browse files Browse the repository at this point in the history
This change addresses an old TODO of mine to add a parsing hook
mechanism to the secret agent. We need this for the github rsa key,
where we will currently panic if we get something that is not parseable.

This change uses generics, so before we can merge it, we need to update
to go 1.18.

This change consists of:
* Adding a new AddWithParser func to the secret package that will
  executed a passed parsing func as a post-load hook and not load a
  secret that doesn't parse
* Update the data map of the secret agent to not use []byte as key type
  but an interface. This is needed because the type information of
  generics is inaccessible at runtime, which means a generic type is not
  really a type that can be referenced elsewhere, only concrete
  instantiations of it can - But there might be more than one, which is
  the reason for using generics in the first place, thus we are required
  to generalize them back throgh the interface
* This also means that the only way of getting the parsed secret is by
  using a getter returned by AddWithParser - This is different from raw
  secrets, where we can get them later using the path (as they all have
  the same type)
  • Loading branch information
alvaroaleman committed Apr 20, 2022
1 parent 991be95 commit 0e771b6
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 97 deletions.
121 changes: 49 additions & 72 deletions prow/config/secret/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ limitations under the License.
package secret

import (
"os"
"fmt"
"sync"
"time"

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -34,8 +33,7 @@ var secretAgent *agent

func init() {
secretAgent = &agent{
RWMutex: sync.RWMutex{},
secretsMap: map[string][]byte{},
secretsMap: map[string]secretReloader{},
ReloadingCensorer: secretutil.NewCensorer(),
}
logrus.SetFormatter(logrusutil.NewFormatterWithCensor(logrus.StandardLogger().Formatter, secretAgent.ReloadingCensorer))
Expand All @@ -45,38 +43,39 @@ func init() {
// Additionally, Start wraps the current standard logger formatter with a
// censoring formatter that removes secret occurrences from the logs.
func (a *agent) Start(paths []string) error {
secretsMap, err := loadSecrets(paths)
if err != nil {
return err
}

a.secretsMap = secretsMap
a.secretsMap = make(map[string]secretReloader, len(paths))
a.ReloadingCensorer = secretutil.NewCensorer()
a.refreshCensorer()

logrus.SetFormatter(logrusutil.NewFormatterWithCensor(logrus.StandardLogger().Formatter, a.ReloadingCensorer))

// Start one goroutine for each file to monitor and update the secret's values.
for secretPath := range secretsMap {
go a.reloadSecret(secretPath)
for _, path := range paths {
if err := a.Add(path); err != nil {
return fmt.Errorf("failed to load secret at %s: %w", path, err)
}
}

logrus.SetFormatter(logrusutil.NewFormatterWithCensor(logrus.StandardLogger().Formatter, a.ReloadingCensorer))

return nil
}

// Add registers a new path to the agent.
func Add(paths ...string) error {
secrets, err := loadSecrets(paths)
if err != nil {
return err
for _, path := range paths {
if err := secretAgent.Add(path); err != nil {
return err
}
}
return nil
}

for path, value := range secrets {
secretAgent.setSecret(path, value)
// Start one goroutine for each file to monitor and update the secret's values.
go secretAgent.reloadSecret(path)
// AddWithParser registers a new path to the agent. The secret will only be updated if it can
// be successfully parsed. The returned getter must be kept, as it is the only way of accessing
// the typed secret.
func AddWithParser[T any](path string, parsingFN func([]byte) (T, error)) (func() T, error) {
loader := &parsingSecretReloader[T]{
path: path,
parsingFN: parsingFN,
}
return nil
return loader.get, secretAgent.add(path, loader)
}

// GetSecret returns the value of a secret stored in a map.
Expand All @@ -98,81 +97,59 @@ func Censor(content []byte) []byte {
// agent watches a path and automatically loads the secrets stored.
type agent struct {
sync.RWMutex
secretsMap map[string][]byte
secretsMap map[string]secretReloader
*secretutil.ReloadingCensorer
}

type secretReloader interface {
getRaw() []byte
start(reloadCensor func()) error
}

// Add registers a new path to the agent.
func (a *agent) Add(path string) error {
secret, err := loadSingleSecret(path)
if err != nil {
return a.add(path, &parsingSecretReloader[[]byte]{
path: path,
parsingFN: func(b []byte) ([]byte, error) { return b, nil },
})
}

func (a *agent) add(path string, loader secretReloader) error {
if err := loader.start(a.refreshCensorer); err != nil {
return err
}

a.setSecret(path, secret)
a.setSecret(path, loader)

// Start one goroutine for each file to monitor and update the secret's values.
go a.reloadSecret(path)
return nil
}

// reloadSecret will begin polling the secret file at the path. If the first load
// fails, Start with return the error and abort. Future load failures will log
// the failure message but continue attempting to load.
func (a *agent) reloadSecret(secretPath string) {
var lastModTime time.Time
logger := logrus.NewEntry(logrus.StandardLogger())

skips := 0
for range time.Tick(1 * time.Second) {
if skips < 600 {
// Check if the file changed to see if it needs to be re-read.
secretStat, err := os.Stat(secretPath)
if err != nil {
logger.WithField("secret-path", secretPath).
WithError(err).Error("Error loading secret file.")
continue
}

recentModTime := secretStat.ModTime()
if !recentModTime.After(lastModTime) {
skips++
continue // file hasn't been modified
}
lastModTime = recentModTime
}

if secretValue, err := loadSingleSecret(secretPath); err != nil {
logger.WithField("secret-path: ", secretPath).
WithError(err).Error("Error loading secret.")
} else {
a.setSecret(secretPath, secretValue)
skips = 0
}
}
}

// GetSecret returns the value of a secret stored in a map.
func (a *agent) GetSecret(secretPath string) []byte {
a.RLock()
defer a.RUnlock()
return a.secretsMap[secretPath]
if val, set := a.secretsMap[secretPath]; set {
return val.getRaw()
}
return nil
}

// setSecret sets a value in a map of secrets.
func (a *agent) setSecret(secretPath string, secretValue []byte) {
func (a *agent) setSecret(secretPath string, secretValue secretReloader) {
a.Lock()
defer a.Unlock()
a.secretsMap[secretPath] = secretValue
a.Unlock()
a.refreshCensorer()
}

// refreshCensorer should be called when the lock is held and the secrets map changes
// refreshCensorer should be called when the secrets map changes
func (a *agent) refreshCensorer() {
var secrets [][]byte
a.RLock()
for _, value := range a.secretsMap {
secrets = append(secrets, value)
secrets = append(secrets, value.getRaw())
}
a.RUnlock()
a.ReloadingCensorer.RefreshBytes(secrets...)
}

Expand Down Expand Up @@ -200,7 +177,7 @@ func (a *agent) getSecrets() sets.String {
defer a.RUnlock()
secrets := sets.NewString()
for _, v := range a.secretsMap {
secrets.Insert(string(v))
secrets.Insert(string(v.getRaw()))
}
return secrets
}
77 changes: 77 additions & 0 deletions prow/config/secret/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ limitations under the License.
package secret

import (
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"testing"
"time"

"github.com/sirupsen/logrus"

Expand Down Expand Up @@ -98,3 +102,76 @@ func TestCensoringFormatter(t *testing.T) {
})
}
}

func TestAddWithParser(t *testing.T) {
t.Parallel()
// Go never runs a test in parallel with itself, so run
// the test twice to make sure the race detector checks
// the thread safety.
for idx := range []int{0, 1} {
t.Run(strconv.Itoa(idx), testAddWithParser)
}
}

func testAddWithParser(t *testing.T) {
t.Parallel()

tmpDir := t.TempDir()

secretPath := filepath.Join(tmpDir, "secret")
if err := ioutil.WriteFile(secretPath, []byte("1"), 0644); err != nil {
t.Fatalf("failed to write initial content of secret: %v", err)
}

vals := make(chan int, 3)
errs := make(chan error, 3)
generator, err := AddWithParser(
secretPath,
func(raw []byte) (int, error) {
val, err := strconv.Atoi(string(raw))
if err != nil {
errs <- err
return val, err
}
vals <- val
return val, err
},
)
if err != nil {
t.Fatalf("AddWithParser failed: %v", err)
}

checkValueAndErr := func(expected int, e error) {
t.Helper()
select {
case v := <-vals:
if v != expected {
t.Errorf("expected value to get updated to %d but got updated to %d", expected, v)
}
break
case err := <-errs:
if e == nil || e.Error() != err.Error() {
t.Fatalf("expected error %v, got %v", e, err)
}
// the agent reloads every second, so ten seconds should be plenty
case <-time.After(10 * time.Second):
t.Fatalf("timed out waiting for value %d and error %d", expected, e)
}
if actual := generator(); actual != expected {
t.Errorf("expected value %d from generator, got %d", expected, actual)
}
}
checkValueAndErr(1, nil)

if err := ioutil.WriteFile(secretPath, []byte("2"), 0644); err != nil {
t.Fatalf("failed to update secret on disk: %v", err)
}
// expect secret to get updated
checkValueAndErr(2, nil)

if err := ioutil.WriteFile(secretPath, []byte("not-a-number"), 0644); err != nil {
t.Fatalf("failed to update secret on disk: %v", err)
}
// expect secret to remain unchanged and an error in the parsing func
checkValueAndErr(2, errors.New(`strconv.Atoi: parsing "not-a-number": invalid syntax`))
}
99 changes: 99 additions & 0 deletions prow/config/secret/reloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package secret

import (
"os"
"sync"
"time"

"github.com/sirupsen/logrus"
)

type parsingSecretReloader[T any] struct {
lock sync.RWMutex
path string
rawValue []byte
parsed T
parsingFN func([]byte) (T, error)
}

func (p *parsingSecretReloader[T]) start(reloadCensor func()) error {
raw, parsed, err := loadSingleSecretWithParser(p.path, p.parsingFN)
if err != nil {
return err
}
p.lock.Lock()
p.rawValue = raw
p.parsed = parsed
p.lock.Unlock()
reloadCensor()

go p.reloadSecret(reloadCensor)
return nil
}

func (p *parsingSecretReloader[T]) reloadSecret(reloadCensor func()) {
var lastModTime time.Time
logger := logrus.NewEntry(logrus.StandardLogger())

skips := 0
for range time.Tick(1 * time.Second) {
if skips < 600 {
// Check if the file changed to see if it needs to be re-read.
secretStat, err := os.Stat(p.path)
if err != nil {
logger.WithField("secret-path", p.path).WithError(err).Error("Error loading secret file.")
continue
}

recentModTime := secretStat.ModTime()
if !recentModTime.After(lastModTime) {
skips++
continue // file hasn't been modified
}
lastModTime = recentModTime
}

raw, parsed, err := loadSingleSecretWithParser(p.path, p.parsingFN)
if err != nil {
logger.WithField("secret-path", p.path).WithError(err).Error("Error loading secret.")
continue
}

p.lock.Lock()
p.rawValue = raw
p.parsed = parsed
p.lock.Unlock()
reloadCensor()

skips = 0
}

}

func (p *parsingSecretReloader[T]) getRaw() []byte {
p.lock.RLock()
defer p.lock.RUnlock()
return p.rawValue
}

func (p *parsingSecretReloader[T]) get() T {
p.lock.RLock()
defer p.lock.RUnlock()
return p.parsed
}
Loading

0 comments on commit 0e771b6

Please sign in to comment.