Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented a foundation for gRPC streaming #786

Merged
merged 57 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
dbe4fd4
Added streaming compatible interceptor for user verification
JosteinLindhom Sep 9, 2022
a089029
Implemented the full `Interceptor` interface for all interceptors
JosteinLindhom Sep 13, 2022
3aca989
Merge branch 'master' into submission-stream
JosteinLindhom Sep 14, 2022
9944ade
Added stream package
JosteinLindhom Sep 21, 2022
ae6e652
Added logic to streaming handler in userInterceptor
JosteinLindhom Sep 21, 2022
651af08
Removed streaming services for now
JosteinLindhom Sep 21, 2022
fe45648
Added StreamServices & removed service.GetStreams()
JosteinLindhom Sep 21, 2022
f2f70eb
Export structs in stream package
JosteinLindhom Sep 21, 2022
3268edc
Export interceptor structs
JosteinLindhom Sep 21, 2022
1a27ab0
Split `stream.go` into three different files.
JosteinLindhom Sep 23, 2022
63f986b
Removed named receivers where not used
JosteinLindhom Sep 23, 2022
95d62b7
Merge branch 'master' into submission-stream
JosteinLindhom Sep 23, 2022
c17cea0
Made interceptor error messages idiomatic Go style
meling Sep 28, 2022
79b179e
Merge branch 'master' into submission-stream
meling Sep 28, 2022
81a8929
Fixed some locking and cleanup issues in `Pool`
JosteinLindhom Sep 29, 2022
34752c1
Added a simple test for `Pool` and its API
JosteinLindhom Sep 29, 2022
144bcb1
Removed `pool`
JosteinLindhom Sep 29, 2022
c7e4e52
Moved previous `pool` functionality into stream and service
JosteinLindhom Sep 29, 2022
85a0ab5
Added a for loop around `stream.Run's` select statement.
JosteinLindhom Sep 30, 2022
f3f6d94
Added a closed flag to allow outsiders to check if channel has been c…
JosteinLindhom Sep 30, 2022
8c41067
Moved mockStream to a separate test file.
JosteinLindhom Sep 30, 2022
0a97fc5
Added check to see if channel is already closed
JosteinLindhom Sep 30, 2022
09f74ce
Update type to StreamInterface
JosteinLindhom Sep 30, 2022
d773745
Added a test to check that multiple streams of same ID
JosteinLindhom Sep 30, 2022
3a3b015
Merge remote-tracking branch 'origin/master' into submission-stream
JosteinLindhom Sep 30, 2022
c6e05ec
Fixed an issue where a deadlock would occur.
JosteinLindhom Oct 4, 2022
3c26fc2
Updated mock stream to reflect changes in stream.go
JosteinLindhom Oct 4, 2022
e76c4f0
Added a test that tries to trigger a data race
JosteinLindhom Oct 4, 2022
0b914cc
Added missing cancel() call for contexts
meling Oct 4, 2022
a124039
Fixed for loop issue
meling Oct 4, 2022
25e7cbc
Unexported newStream and newMockStream
meling Oct 4, 2022
fba24e9
Send more messages
meling Oct 4, 2022
859f8c8
Underscored unused t variable in TestStreamClose
meling Oct 4, 2022
7dd9305
Removed error on CloseBy(); ignore error from Run()
meling Oct 4, 2022
823c51e
Minor adjustment of doc comments
meling Oct 4, 2022
98d5098
Minor: added another doc comment
meling Oct 4, 2022
b141093
Removed `GetID` from `StreamInterface`
JosteinLindhom Oct 5, 2022
a82173f
Removed unused exported method `Closed()`
JosteinLindhom Oct 5, 2022
63e8135
Removed printing from mockStream
JosteinLindhom Oct 5, 2022
0eb0b59
Updated docstring to reflect renamed method
JosteinLindhom Oct 5, 2022
ea85083
Added `processHeader` method UserInterceptor
JosteinLindhom Oct 5, 2022
9d58e1c
Added missing lock for `Service.Close()`
JosteinLindhom Oct 5, 2022
af9e957
Renamed `st` -> `stream` for both `Add()` and `AddStream()`
JosteinLindhom Oct 5, 2022
7c41b63
Revised processHeader doc comment
meling Oct 5, 2022
5b2707e
Return nil for unsupported StreamingClientFunc
meling Oct 5, 2022
34be63e
Use regular lock rather than read lock for `SendTo`
JosteinLindhom Oct 5, 2022
7c8768f
Merge branch 'submission-stream' of github.com:quickfeed/quickfeed in…
JosteinLindhom Oct 5, 2022
e755658
Fixed method name mismatch in docstring
JosteinLindhom Oct 5, 2022
5d12775
Reverted return nil for StreamClientFunc
meling Oct 5, 2022
9561da8
Removed mistaken comment in req_validation.go
meling Oct 5, 2022
649d970
Added GetID() back to StreamInterface
JosteinLindhom Oct 5, 2022
a49a761
Removed `AddStream`, `Add` now accepts only `StreamInterface`
JosteinLindhom Oct 5, 2022
49d3c2c
Export `stream.NewStream`
JosteinLindhom Oct 5, 2022
21d5e40
Update stream tests to reflect removal of `AddStream`
JosteinLindhom Oct 5, 2022
474fc5c
Exported the Stream[T] struct from NewStream()
meling Oct 5, 2022
f5bb511
Removed GetID() from interface, added userID to Add()
meling Oct 5, 2022
04bcfbb
Use sync.Mutex instead of RWMutex
meling Oct 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .vscode/quickfeed-words.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Godkjent
gofumports
gofumpt
golangci
Gollum
googleapis
gopath
gopb
Expand Down Expand Up @@ -139,9 +140,11 @@ qfconnect
qlog
qtest
quickfeed
Radagast
Repos
repotoken
run
Sauron
scms
scorelimit
scriptfile
Expand Down
10 changes: 5 additions & 5 deletions web/grpc_auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ func TestGrpcAuth(t *testing.T) {
t.Fatal(err)
}
shutdown := web.MockQuickFeedServer(t, logger, db, connect.WithInterceptors(
interceptor.Metrics(),
interceptor.Validation(logger),
interceptor.UnaryUserVerifier(logger, tm),
interceptor.AccessControl(tm),
interceptor.TokenRefresher(tm),
interceptor.NewMetricsInterceptor(),
interceptor.NewValidationInterceptor(logger),
interceptor.NewUserInterceptor(logger, tm),
interceptor.NewAccessControlInterceptor(tm),
interceptor.NewTokenInterceptor(tm),
))

client := qtest.QuickFeedClient("")
Expand Down
166 changes: 92 additions & 74 deletions web/interceptor/access_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,89 +76,107 @@ var accessRolesFor = map[string]roles{
"CreateCourse": {admin},
}

// AccessControl checks user information stored in the JWT claims against the list of roles required to call the method.
func AccessControl(tm *auth.TokenManager) connect.Interceptor {
return connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc {
return connect.UnaryFunc(func(ctx context.Context, request connect.AnyRequest) (connect.AnyResponse, error) {
procedure := request.Spec().Procedure
method := procedure[strings.LastIndex(procedure, "/")+1:]
req, ok := request.Any().(requestID)
if !ok {
return nil, connect.NewError(connect.CodeUnimplemented,
fmt.Errorf("%s failed: message type %T does not implement IDFor interface", method, request))
}
claims, ok := auth.ClaimsFromContext(ctx)
if !ok {
return nil, connect.NewError(connect.CodePermissionDenied,
fmt.Errorf("AccessControl(%s): failed to get claims from request context", method))
}
for _, role := range accessRolesFor[method] {
switch role {
case none:
return next(ctx, request)
case user:
if claims.SameUser(req) {
// Make sure the user is not updating own admin status.
if method == "UpdateUser" {
if req.(*qf.User).GetIsAdmin() && !claims.Admin {
return nil, connect.NewError(connect.CodePermissionDenied,
fmt.Errorf("AccessControl(%s): user %d attempted to change admin status from %v to %v",
method, claims.UserID, claims.Admin, req.(*qf.User).GetIsAdmin()))
}
}
return next(ctx, request)
}
case student:
// GetSubmissions is used to fetch individual and group submissions.
// For individual submissions needs an extra check for user ID in request.
if method == "GetSubmissions" && req.IDFor("group") == 0 {
if !claims.SameUser(req) {
return nil, connect.NewError(connect.CodePermissionDenied,
fmt.Errorf("AccessControl(%s): ID mismatch in claims (%d) and request (%d)",
method, claims.UserID, req.IDFor("user")))
}
}
if claims.HasCourseStatus(req, qf.Enrollment_STUDENT) {
return next(ctx, request)
}
case group:
// Request for CreateGroup will not have ID yet, need to check
// if the user is in the group (unless teacher).
if method == "CreateGroup" {
notMember := !req.(*qf.Group).Contains(&qf.User{ID: claims.UserID})
notTeacher := !claims.HasCourseStatus(req, qf.Enrollment_TEACHER)
if notMember && notTeacher {
type AccessControlInterceptor struct {
tokenManager *auth.TokenManager
}

func NewAccessControlInterceptor(tm *auth.TokenManager) *AccessControlInterceptor {
return &AccessControlInterceptor{tokenManager: tm}
}

func (*AccessControlInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc {
return connect.StreamingHandlerFunc(func(ctx context.Context, conn connect.StreamingHandlerConn) error {
return next(ctx, conn)
})
}

func (*AccessControlInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc {
return connect.StreamingClientFunc(func(ctx context.Context, spec connect.Spec) connect.StreamingClientConn {
return next(ctx, spec)
})
}

// WrapUnary checks user information stored in the JWT claims against the list of roles required to call the method.
func (a *AccessControlInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
return connect.UnaryFunc(func(ctx context.Context, request connect.AnyRequest) (connect.AnyResponse, error) {
procedure := request.Spec().Procedure
method := procedure[strings.LastIndex(procedure, "/")+1:]
req, ok := request.Any().(requestID)
if !ok {
return nil, connect.NewError(connect.CodeUnimplemented,
fmt.Errorf("access denied for %s: message type %T does not implement 'requestID' interface", method, request))
}
claims, ok := auth.ClaimsFromContext(ctx)
if !ok {
return nil, connect.NewError(connect.CodePermissionDenied,
fmt.Errorf("access denied for %s: failed to get claims from request context", method))
}
for _, role := range accessRolesFor[method] {
switch role {
case none:
return next(ctx, request)
case user:
if claims.SameUser(req) {
// Make sure the user is not updating own admin status.
if method == "UpdateUser" {
if req.(*qf.User).GetIsAdmin() && !claims.Admin {
return nil, connect.NewError(connect.CodePermissionDenied,
fmt.Errorf("AccessControl(%s): user %d tried to create group while not teacher or group member", method, claims.UserID))
fmt.Errorf("access denied for %s: user %d attempted to change admin status from %v to %v",
method, claims.UserID, claims.Admin, req.(*qf.User).GetIsAdmin()))
}
// Otherwise, create the group.
return next(ctx, request)
}
groupID := req.IDFor("group")
for _, group := range claims.Groups {
if group == groupID {
return next(ctx, request)
}
return next(ctx, request)
}
case student:
// GetSubmissions is used to fetch individual and group submissions.
// For individual submissions needs an extra check for user ID in request.
if method == "GetSubmissions" && req.IDFor("group") == 0 {
if !claims.SameUser(req) {
return nil, connect.NewError(connect.CodePermissionDenied,
fmt.Errorf("access denied for %s: ID mismatch in claims (%d) and request (%d)",
method, claims.UserID, req.IDFor("user")))
}
case teacher:
if method == "GetUserByCourse" {
if err := claims.IsCourseTeacher(tm.Database(), request.Any().(*qf.CourseUserRequest)); err != nil {
return nil, connect.NewError(connect.CodePermissionDenied,
fmt.Errorf("AccessControl(%s): %w", method, err))
}
return next(ctx, request)
}
if claims.HasCourseStatus(req, qf.Enrollment_STUDENT) {
return next(ctx, request)
}
case group:
// Request for CreateGroup will not have ID yet, need to check
// if the user is in the group (unless teacher).
if method == "CreateGroup" {
notMember := !req.(*qf.Group).Contains(&qf.User{ID: claims.UserID})
notTeacher := !claims.HasCourseStatus(req, qf.Enrollment_TEACHER)
if notMember && notTeacher {
return nil, connect.NewError(connect.CodePermissionDenied,
fmt.Errorf("access denied for %s: user %d tried to create group while not teacher or group member", method, claims.UserID))
}
if claims.HasCourseStatus(req, qf.Enrollment_TEACHER) {
// Otherwise, create the group.
return next(ctx, request)
}
groupID := req.IDFor("group")
for _, group := range claims.Groups {
if group == groupID {
return next(ctx, request)
}
case admin:
if claims.Admin {
return next(ctx, request)
}
case teacher:
if method == "GetUserByCourse" {
if err := claims.IsCourseTeacher(a.tokenManager.Database(), request.Any().(*qf.CourseUserRequest)); err != nil {
return nil, connect.NewError(connect.CodePermissionDenied,
fmt.Errorf("access denied for %s: %w", method, err))
}
return next(ctx, request)
}
if claims.HasCourseStatus(req, qf.Enrollment_TEACHER) {
return next(ctx, request)
}
case admin:
if claims.Admin {
return next(ctx, request)
}
}
return nil, connect.NewError(connect.CodePermissionDenied,
fmt.Errorf("AccessDenied(%s): required roles %v not satisfied by claims: %s", method, accessRolesFor[method], claims))
})
}
return nil, connect.NewError(connect.CodePermissionDenied,
fmt.Errorf("access denied for %s: required roles %v not satisfied by claims: %s", method, accessRolesFor[method], claims))
})
}
4 changes: 2 additions & 2 deletions web/interceptor/access_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func TestAccessControl(t *testing.T) {
t.Fatal(err)
}
shutdown := web.MockQuickFeedServer(t, logger, db, connect.WithInterceptors(
interceptor.UnaryUserVerifier(logger, tm),
interceptor.AccessControl(tm),
interceptor.NewUserInterceptor(logger, tm),
interceptor.NewAccessControlInterceptor(tm),
))

client := qtest.QuickFeedClient("")
Expand Down
62 changes: 43 additions & 19 deletions web/interceptor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,50 @@ var (
}, []string{"user"})
)

func Metrics() connect.Interceptor {
return connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc {
return connect.UnaryFunc(func(ctx context.Context, request connect.AnyRequest) (connect.AnyResponse, error) {
procedure := request.Spec().Procedure
methodName := procedure[strings.LastIndex(procedure, "/")+1:]
defer metricsTimer(methodName)()
resp, err := next(ctx, request)
accessedMethodsCounter.WithLabelValues(methodName).Inc()
if resp != nil {
respondedMethodsCounter.WithLabelValues(methodName).Inc()
}
if err != nil {
failedMethodsCounter.WithLabelValues(methodName).Inc()
if methodName == "GetUser" {
// Can't get the user ID from err; so just counting
loginCounter.WithLabelValues("").Inc()
}
type MetricsInterceptor struct{}

func NewMetricsInterceptor() *MetricsInterceptor {
return &MetricsInterceptor{}
}

func (*MetricsInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc {
return connect.StreamingHandlerFunc(func(ctx context.Context, conn connect.StreamingHandlerConn) error {
procedure := conn.Spec().Procedure
methodName := procedure[strings.LastIndex(procedure, "/")+1:]
defer metricsTimer(methodName)()
accessedMethodsCounter.WithLabelValues(methodName).Inc()
err := next(ctx, conn)
if err != nil {
failedMethodsCounter.WithLabelValues(methodName).Inc()
}
return err
})
}

func (*MetricsInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc {
return connect.StreamingClientFunc(func(ctx context.Context, spec connect.Spec) connect.StreamingClientConn {
return next(ctx, spec)
})
}

func (*MetricsInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
return connect.UnaryFunc(func(ctx context.Context, request connect.AnyRequest) (connect.AnyResponse, error) {
procedure := request.Spec().Procedure
methodName := procedure[strings.LastIndex(procedure, "/")+1:]
defer metricsTimer(methodName)()
resp, err := next(ctx, request)
accessedMethodsCounter.WithLabelValues(methodName).Inc()
if resp != nil {
respondedMethodsCounter.WithLabelValues(methodName).Inc()
}
if err != nil {
failedMethodsCounter.WithLabelValues(methodName).Inc()
if methodName == "GetUser" {
// Can't get the user ID from err; so just counting
loginCounter.WithLabelValues("").Inc()
}
return resp, err
})
}
return resp, err
})
}

Expand Down
56 changes: 37 additions & 19 deletions web/interceptor/req_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,50 @@ type idCleaner interface {
RemoveRemoteID()
}

// Validation returns a new unary server interceptor that validates requests
type ValidationInterceptor struct {
logger *zap.SugaredLogger
}

func NewValidationInterceptor(logger *zap.SugaredLogger) *ValidationInterceptor {
return &ValidationInterceptor{logger: logger}
}

func (*ValidationInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc {
return connect.StreamingHandlerFunc(func(ctx context.Context, conn connect.StreamingHandlerConn) error {
return next(ctx, conn)
})
}

func (*ValidationInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc {
return connect.StreamingClientFunc(func(ctx context.Context, spec connect.Spec) connect.StreamingClientConn {
return next(ctx, spec)
})
}

// WrapUnary returns a new unary server interceptor that validates requests
// that implements the validator interface.
// Invalid requests are rejected without logging and before it reaches any
// user-level code and returns an illegal argument to the client.
// Further, the response values are cleaned of any remote IDs.
// In addition, the interceptor also implements a cancellation mechanism.
func Validation(logger *zap.SugaredLogger) connect.Interceptor {
return connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc {
return connect.UnaryFunc(func(ctx context.Context, request connect.AnyRequest) (connect.AnyResponse, error) {
if request.Any() != nil {
if err := validate(logger, request.Any()); err != nil {
// Reject the request if it is invalid.
return nil, err
}
}
resp, err := next(ctx, request)
if err != nil {
// Do not return the message to the client if an error occurs.
// We log the error and return an empty response.
logger.Errorf("Method '%s' failed: %v", request.Spec().Procedure, err)
logger.Errorf("Request Message: %T: %v", request.Any(), request.Any())
func (v *ValidationInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
return connect.UnaryFunc(func(ctx context.Context, request connect.AnyRequest) (connect.AnyResponse, error) {
if request.Any() != nil {
if err := validate(v.logger, request.Any()); err != nil {
// Reject the request if it is invalid.
return nil, err
}
clean(resp.Any())
return resp, err
})
}
resp, err := next(ctx, request)
if err != nil {
// Do not return the message to the client if an error occurs.
// We log the error and return an empty response.
v.logger.Errorf("Method '%s' failed: %v", request.Spec().Procedure, err)
v.logger.Errorf("Request Message: %T: %v", request.Any(), request.Any())
return nil, err
}
clean(resp.Any())
return resp, err
})
}

Expand Down
Loading