-
Notifications
You must be signed in to change notification settings - Fork 41
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
1,104 additions
and
60 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
package shield | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" | ||
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" | ||
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" | ||
sh "github.com/odpf/shield/proto/v1beta1" | ||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" | ||
"google.golang.org/grpc" | ||
"time" | ||
) | ||
|
||
const ( | ||
service = "shield" | ||
GRPCMaxClientSendSize = 45 << 20 // 45MB | ||
GRPCMaxClientRecvSize = 45 << 20 // 45MB | ||
GRPCMaxRetry uint = 3 | ||
) | ||
|
||
type Client interface { | ||
sh.ShieldServiceClient | ||
Connect(ctx context.Context, host string) error | ||
Close() error | ||
} | ||
|
||
func newClient() Client { | ||
return &client{} | ||
} | ||
|
||
type client struct { | ||
sh.ShieldServiceClient | ||
conn *grpc.ClientConn | ||
} | ||
|
||
func (c *client) Connect(ctx context.Context, host string) (err error) { | ||
dialTimeoutCtx, dialCancel := context.WithTimeout(ctx, time.Second*2) | ||
defer dialCancel() | ||
|
||
if c.conn, err = c.createConnection(dialTimeoutCtx, host); err != nil { | ||
err = fmt.Errorf("error creating connection: %w", err) | ||
return | ||
} | ||
|
||
c.ShieldServiceClient = sh.NewShieldServiceClient(c.conn) | ||
|
||
return | ||
} | ||
|
||
func (c *client) Close() error { | ||
return c.conn.Close() | ||
} | ||
|
||
func (c *client) createConnection(ctx context.Context, host string) (*grpc.ClientConn, error) { | ||
retryOpts := []grpc_retry.CallOption{ | ||
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100 * time.Millisecond)), | ||
grpc_retry.WithMax(GRPCMaxRetry), | ||
} | ||
var opts []grpc.DialOption | ||
opts = append(opts, | ||
grpc.WithInsecure(), | ||
grpc.WithBlock(), | ||
grpc.WithDefaultCallOptions( | ||
grpc.MaxCallSendMsgSize(GRPCMaxClientSendSize), | ||
grpc.MaxCallRecvMsgSize(GRPCMaxClientRecvSize), | ||
), | ||
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( | ||
grpc_retry.UnaryClientInterceptor(retryOpts...), | ||
otelgrpc.UnaryClientInterceptor(), | ||
grpc_prometheus.UnaryClientInterceptor, | ||
)), | ||
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( | ||
otelgrpc.StreamClientInterceptor(), | ||
grpc_prometheus.StreamClientInterceptor, | ||
)), | ||
) | ||
|
||
return grpc.DialContext(ctx, host, opts...) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
package shield | ||
|
||
import ( | ||
"context" | ||
_ "embed" // used to print the embedded assets | ||
"fmt" | ||
"github.com/odpf/meteor/models" | ||
commonv1beta1 "github.com/odpf/meteor/models/odpf/assets/common/v1beta1" | ||
assetsv1beta1 "github.com/odpf/meteor/models/odpf/assets/v1beta1" | ||
"github.com/odpf/meteor/plugins" | ||
"github.com/odpf/meteor/registry" | ||
"github.com/odpf/meteor/utils" | ||
"github.com/odpf/salt/log" | ||
sh "github.com/odpf/shield/proto/v1beta1" | ||
) | ||
|
||
//go:embed README.md | ||
var summary string | ||
|
||
// Config holds the set of configuration for the shield extractor | ||
type Config struct { | ||
Host string `mapstructure:"host" validate:"required"` | ||
} | ||
|
||
var sampleConfig = ` | ||
host: shield.com:80` | ||
|
||
// Extractor manages the communication with the shield service | ||
type Extractor struct { | ||
logger log.Logger | ||
config Config | ||
client Client | ||
} | ||
|
||
func New(logger log.Logger, client Client) *Extractor { | ||
return &Extractor{ | ||
logger: logger, | ||
client: client, | ||
} | ||
} | ||
|
||
// Info returns the detailed information about the extractor | ||
func (e *Extractor) Info() plugins.Info { | ||
return plugins.Info{ | ||
Description: "Shield' users metadata", | ||
SampleConfig: sampleConfig, | ||
Summary: summary, | ||
Tags: []string{"shield", "extractor"}, | ||
} | ||
} | ||
|
||
// Validate validates the configuration of the extractor | ||
func (e *Extractor) Validate(configMap map[string]interface{}) (err error) { | ||
return utils.BuildConfig(configMap, &Config{}) | ||
} | ||
|
||
// Init initializes the extractor | ||
func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) (err error) { | ||
if err := utils.BuildConfig(configMap, &e.config); err != nil { | ||
return plugins.InvalidConfigError{} | ||
} | ||
|
||
if err := e.client.Connect(ctx, e.config.Host); err != nil { | ||
return fmt.Errorf("error connecting to host: %w", err) | ||
} | ||
|
||
return | ||
} | ||
|
||
// Extract extracts the user information | ||
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error { | ||
defer e.client.Close() | ||
|
||
listUsers, err := e.client.ListUsers(ctx, &sh.ListUsersRequest{ | ||
Fields: nil, | ||
}) | ||
if err != nil { | ||
return fmt.Errorf("error fetching users: %w", err) | ||
} | ||
|
||
for _, user := range listUsers.Users { | ||
role, roleErr := e.client.GetRole(ctx, &sh.GetRoleRequest{Id: user.GetId()}) | ||
if roleErr != nil { | ||
return fmt.Errorf("error fetching user roles: %w", err) | ||
} | ||
|
||
grp, grpErr := e.client.GetGroup(ctx, &sh.GetGroupRequest{Id: user.GetId()}) | ||
if grpErr != nil { | ||
return fmt.Errorf("error fetching user groups: %w", err) | ||
} | ||
|
||
emit(models.NewRecord(&assetsv1beta1.User{ | ||
Resource: &commonv1beta1.Resource{ | ||
Urn: fmt.Sprintf("%s::%s/%s", service, e.config.Host, user.GetId()), | ||
Name: user.GetName(), | ||
Service: service, | ||
Type: "user", | ||
Description: user.GetSlug(), | ||
}, | ||
Email: user.GetEmail(), | ||
Username: user.GetId(), | ||
FullName: user.GetName(), | ||
DisplayName: "", | ||
Title: "", | ||
Status: "active", | ||
ManagerEmail: "", | ||
Profiles: nil, | ||
Memberships: []*assetsv1beta1.Membership{ | ||
{ | ||
GroupUrn: fmt.Sprintf("%s:%s", grp.Group.GetName(), grp.Group.GetId()), | ||
Role: []string{role.Role.GetName()}, | ||
}, | ||
}, | ||
Timestamps: &commonv1beta1.Timestamp{ | ||
CreateTime: user.GetCreatedAt(), | ||
UpdateTime: user.GetUpdatedAt(), | ||
}, | ||
Event: nil, | ||
})) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Register the extractor to catalog | ||
func init() { | ||
if err := registry.Extractors.Register("shield", func() plugins.Extractor { | ||
return New(plugins.GetLog(), newClient()) | ||
}); err != nil { | ||
panic(err) | ||
} | ||
} |
Oops, something went wrong.