diff --git a/changelog/unreleased/notifications-service.md b/changelog/unreleased/notifications-service.md new file mode 100644 index 00000000000..a6bc56932fa --- /dev/null +++ b/changelog/unreleased/notifications-service.md @@ -0,0 +1,5 @@ +Enhancement: Implement notifications service + +Implemented the minimal version of the notifications service to be able to notify a user when they received a share. + +https://github.com/owncloud/ocis/pull/3217 diff --git a/notifications/cmd/notifications/main.go b/notifications/cmd/notifications/main.go new file mode 100644 index 00000000000..6b43234330b --- /dev/null +++ b/notifications/cmd/notifications/main.go @@ -0,0 +1,14 @@ +package main + +import ( + "os" + + "github.com/owncloud/ocis/notifications/pkg/command" + "github.com/owncloud/ocis/notifications/pkg/config" +) + +func main() { + if err := command.Execute(config.DefaultConfig()); err != nil { + os.Exit(1) + } +} diff --git a/notifications/pkg/channels/channels.go b/notifications/pkg/channels/channels.go new file mode 100644 index 00000000000..f4e427b018c --- /dev/null +++ b/notifications/pkg/channels/channels.go @@ -0,0 +1,105 @@ +// Package channels provides different communication channels to notify users. +package channels + +import ( + "context" + "net/smtp" + + gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" + groups "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" + rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" + "github.com/cs3org/reva/pkg/rgrpc/todo/pool" + "github.com/owncloud/ocis/notifications/pkg/config" + "github.com/owncloud/ocis/ocis-pkg/log" + "github.com/pkg/errors" +) + +// Channel defines the methods of a communication channel. +type Channel interface { + // SendMessage sends a message to users. + SendMessage(userIDs []string, msg string) error + // SendMessageToGroup sends a message to a group. + SendMessageToGroup(groupdID *groups.GroupId, msg string) error +} + +// NewMailChannel instantiates a new mail communication channel. +func NewMailChannel(cfg config.Config, logger log.Logger) (Channel, error) { + gc, err := pool.GetGatewayServiceClient(cfg.Notifications.RevaGateway) + if err != nil { + logger.Error().Err(err).Msg("could not get gateway client") + return nil, err + } + return Mail{ + gatewayClient: gc, + conf: cfg, + logger: logger, + }, nil +} + +// Mail is the communcation channel for email. +type Mail struct { + gatewayClient gateway.GatewayAPIClient + conf config.Config + logger log.Logger +} + +// SendMessage sends a message to all given users. +func (m Mail) SendMessage(userIDs []string, msg string) error { + to, err := m.getReceiverAddresses(userIDs) + if err != nil { + return err + } + body := []byte(msg) + + smtpConf := m.conf.Notifications.SMTP + auth := smtp.PlainAuth("", smtpConf.Sender, smtpConf.Password, smtpConf.Host) + if err := smtp.SendMail(smtpConf.Host+":"+smtpConf.Port, auth, smtpConf.Sender, to, body); err != nil { + return errors.Wrap(err, "could not send mail") + } + return nil +} + +// SendMessageToGroup sends a message to all members of the given group. +func (m Mail) SendMessageToGroup(groupID *groups.GroupId, msg string) error { + // TODO We need an authenticated context here... + res, err := m.gatewayClient.GetGroup(context.Background(), &groups.GetGroupRequest{GroupId: groupID}) + if err != nil { + return err + } + if res.Status.Code != rpc.Code_CODE_OK { + return errors.New("could not get group") + } + + members := make([]string, 0, len(res.Group.Members)) + for _, id := range res.Group.Members { + members = append(members, id.OpaqueId) + } + + return m.SendMessage(members, msg) +} + +func (m Mail) getReceiverAddresses(receivers []string) ([]string, error) { + addresses := make([]string, 0, len(receivers)) + for _, id := range receivers { + // Authenticate is too costly but at the moment our only option to get the user. + // We don't have an authenticated context so calling `GetUser` doesn't work. + res, err := m.gatewayClient.Authenticate(context.Background(), &gateway.AuthenticateRequest{ + Type: "machine", + ClientId: "userid:" + id, + ClientSecret: m.conf.Notifications.MachineAuthSecret, + }) + if err != nil { + return nil, err + } + if res.Status.Code != rpc.Code_CODE_OK { + m.logger.Error(). + Interface("status", res.Status). + Str("receiver_id", id). + Msg("could not get user") + continue + } + addresses = append(addresses, res.User.Mail) + } + + return addresses, nil +} diff --git a/notifications/pkg/command/health.go b/notifications/pkg/command/health.go new file mode 100644 index 00000000000..7f2adb85b5d --- /dev/null +++ b/notifications/pkg/command/health.go @@ -0,0 +1,18 @@ +package command + +import ( + "github.com/owncloud/ocis/notifications/pkg/config" + "github.com/urfave/cli/v2" +) + +// Health is the entrypoint for the health command. +func Health(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "health", + Usage: "Check health status", + Action: func(c *cli.Context) error { + // Not implemented + return nil + }, + } +} diff --git a/notifications/pkg/command/root.go b/notifications/pkg/command/root.go new file mode 100644 index 00000000000..805d8cec3ac --- /dev/null +++ b/notifications/pkg/command/root.go @@ -0,0 +1,64 @@ +package command + +import ( + "context" + "os" + + "github.com/owncloud/ocis/notifications/pkg/config" + "github.com/owncloud/ocis/ocis-pkg/clihelper" + ociscfg "github.com/owncloud/ocis/ocis-pkg/config" + "github.com/thejerf/suture/v4" + "github.com/urfave/cli/v2" +) + +// GetCommands provides all commands for this service +func GetCommands(cfg *config.Config) cli.Commands { + return []*cli.Command{ + // start this service + Server(cfg), + + // interaction with this service + + // infos about this service + Health(cfg), + Version(cfg), + } +} + +// Execute is the entry point for the notifications command. +func Execute(cfg *config.Config) error { + app := clihelper.DefaultApp(&cli.App{ + Name: "notifications", + Usage: "starts notifications service", + Commands: GetCommands(cfg), + }) + + cli.HelpFlag = &cli.BoolFlag{ + Name: "help,h", + Usage: "Show the help", + } + + return app.Run(os.Args) +} + +// SutureService allows for the notifications command to be embedded and supervised by a suture supervisor tree. +type SutureService struct { + cfg *config.Config +} + +// NewSutureService creates a new notifications.SutureService +func NewSutureService(cfg *ociscfg.Config) suture.Service { + cfg.Settings.Commons = cfg.Commons + return SutureService{ + cfg: cfg.Notifications, + } +} + +func (s SutureService) Serve(ctx context.Context) error { + s.cfg.Context = ctx + if err := Execute(s.cfg); err != nil { + return err + } + + return nil +} diff --git a/notifications/pkg/command/server.go b/notifications/pkg/command/server.go new file mode 100644 index 00000000000..7f22fa1cb7c --- /dev/null +++ b/notifications/pkg/command/server.go @@ -0,0 +1,50 @@ +package command + +import ( + "fmt" + + "github.com/asim/go-micro/plugins/events/nats/v4" + "github.com/cs3org/reva/pkg/events" + "github.com/cs3org/reva/pkg/events/server" + "github.com/owncloud/ocis/notifications/pkg/channels" + "github.com/owncloud/ocis/notifications/pkg/config" + "github.com/owncloud/ocis/notifications/pkg/config/parser" + "github.com/owncloud/ocis/notifications/pkg/logging" + "github.com/owncloud/ocis/notifications/pkg/service" + "github.com/urfave/cli/v2" +) + +// Server is the entrypoint for the server command. +func Server(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "server", + Usage: fmt.Sprintf("start %s extension without runtime (unsupervised mode)", cfg.Service.Name), + Category: "server", + Before: func(c *cli.Context) error { + return parser.ParseConfig(cfg) + }, + Action: func(c *cli.Context) error { + logger := logging.Configure(cfg.Service.Name, cfg.Log) + + evs := []events.Unmarshaller{ + events.ShareCreated{}, + } + + evtsCfg := cfg.Notifications.Events + client, err := server.NewNatsStream(nats.Address(evtsCfg.Endpoint), nats.ClusterID(evtsCfg.Cluster)) + if err != nil { + return err + } + evts, err := events.Consume(client, evtsCfg.ConsumerGroup, evs...) + if err != nil { + return err + } + channel, err := channels.NewMailChannel(*cfg, logger) + if err != nil { + return err + } + svc := service.NewEventsNotifier(evts, channel, logger) + return svc.Run() + }, + } +} diff --git a/notifications/pkg/command/version.go b/notifications/pkg/command/version.go new file mode 100644 index 00000000000..f2d47a569a5 --- /dev/null +++ b/notifications/pkg/command/version.go @@ -0,0 +1,19 @@ +package command + +import ( + "github.com/owncloud/ocis/notifications/pkg/config" + "github.com/urfave/cli/v2" +) + +// Version prints the service versions of all running instances. +func Version(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "version", + Usage: "print the version of this binary and the running extension instances", + Category: "info", + Action: func(c *cli.Context) error { + // not implemented + return nil + }, + } +} diff --git a/notifications/pkg/config/config.go b/notifications/pkg/config/config.go new file mode 100644 index 00000000000..8ac7da5d8f6 --- /dev/null +++ b/notifications/pkg/config/config.go @@ -0,0 +1,44 @@ +package config + +import ( + "context" + + "github.com/owncloud/ocis/ocis-pkg/shared" +) + +// Config combines all available configuration parts. +type Config struct { + *shared.Commons + + Service Service + + Log *Log `ocisConfig:"log"` + Debug Debug `ocisConfig:"debug"` + + Notifications Notifications `ocisConfig:"notifications"` + + Context context.Context +} + +// Notifications definces the config options for the notifications service. +type Notifications struct { + SMTP SMTP `ocisConfig:"SMTP"` + Events Events `ocisConfig:"events"` + RevaGateway string `ocisConfig:"reva_gateway" env:"REVA_GATEWAY;NOTIFICATIONS_REVA_GATEWAY"` + MachineAuthSecret string `ocisConfig:"machine_auth_api_key" env:"OCIS_MACHINE_AUTH_API_KEY;NOTIFICATIONS_MACHINE_AUTH_API_KEY"` +} + +// SMTP combines the smtp configuration options. +type SMTP struct { + Host string `ocisConfig:"smtp_host" env:"NOTIFICATIONS_SMTP_HOST"` + Port string `ocisConfig:"smtp_port" env:"NOTIFICATIONS_SMTP_PORT"` + Sender string `ocisConfig:"smtp_sender" env:"NOTIFICATIONS_SMTP_SENDER"` + Password string `ocisConfig:"smtp_password" env:"NOTIFICATIONS_SMTP_PASSWORD"` +} + +// Events combines the configuration options for the event bus. +type Events struct { + Endpoint string `ocisConfig:"events_endpoint" env:"NOTIFICATIONS_EVENTS_ENDPOINT"` + Cluster string `ocisConfig:"events_cluster" env:"NOTIFICATIONS_EVENTS_CLUSTER"` + ConsumerGroup string `ocisConfig:"events_group" env:"NOTIFICATIONS_EVENTS_GROUP"` +} diff --git a/notifications/pkg/config/debug.go b/notifications/pkg/config/debug.go new file mode 100644 index 00000000000..da6d2d59060 --- /dev/null +++ b/notifications/pkg/config/debug.go @@ -0,0 +1,9 @@ +package config + +// Debug defines the available debug configuration. +type Debug struct { + Addr string `ocisConfig:"addr" env:"NOTIFICATIONS_DEBUG_ADDR"` + Token string `ocisConfig:"token" env:"NOTIFICATIONS_DEBUG_TOKEN"` + Pprof bool `ocisConfig:"pprof" env:"NOTIFICATIONS_DEBUG_PPROF"` + Zpages bool `ocisConfig:"zpages" env:"NOTIFICATIONS_DEBUG_ZPAGES"` +} diff --git a/notifications/pkg/config/defaultconfig.go b/notifications/pkg/config/defaultconfig.go new file mode 100644 index 00000000000..2d9169f7b2c --- /dev/null +++ b/notifications/pkg/config/defaultconfig.go @@ -0,0 +1,27 @@ +package config + +// NOTE: Most of this configuration is not needed to keep it as simple as possible +// TODO: Clean up unneeded configuration + +func DefaultConfig() *Config { + return &Config{ + Service: Service{ + Name: "notifications", + }, + Notifications: Notifications{ + SMTP: SMTP{ + Host: "127.0.0.1", + Port: "1025", + Sender: "god@example.com", + Password: "godisdead", + }, + Events: Events{ + Endpoint: "127.0.0.1:4222", + Cluster: "test-cluster", + ConsumerGroup: "notifications", + }, + RevaGateway: "127.0.0.1:9142", + MachineAuthSecret: "change-me-please", + }, + } +} diff --git a/notifications/pkg/config/log.go b/notifications/pkg/config/log.go new file mode 100644 index 00000000000..ddb4d391f04 --- /dev/null +++ b/notifications/pkg/config/log.go @@ -0,0 +1,9 @@ +package config + +// Log defines the available log configuration. +type Log struct { + Level string `mapstructure:"level" env:"OCIS_LOG_LEVEL;NOTIFICATIONS_LOG_LEVEL"` + Pretty bool `mapstructure:"pretty" env:"OCIS_LOG_PRETTY;NOTIFICATIONS_LOG_PRETTY"` + Color bool `mapstructure:"color" env:"OCIS_LOG_COLOR;NOTIFICATIONS_LOG_COLOR"` + File string `mapstructure:"file" env:"OCIS_LOG_FILE;NOTIFICATIONS_LOG_FILE"` +} diff --git a/notifications/pkg/config/parser/parse.go b/notifications/pkg/config/parser/parse.go new file mode 100644 index 00000000000..5bc4e6e5713 --- /dev/null +++ b/notifications/pkg/config/parser/parse.go @@ -0,0 +1,40 @@ +package parser + +import ( + "errors" + + "github.com/owncloud/ocis/notifications/pkg/config" + ociscfg "github.com/owncloud/ocis/ocis-pkg/config" + + "github.com/owncloud/ocis/ocis-pkg/config/envdecode" +) + +// ParseConfig loads accounts configuration from known paths. +func ParseConfig(cfg *config.Config) error { + _, err := ociscfg.BindSourcesToStructs(cfg.Service.Name, cfg) + if err != nil { + return err + } + + // provide with defaults for shared logging, since we need a valid destination address for BindEnv. + if cfg.Log == nil && cfg.Commons != nil && cfg.Commons.Log != nil { + cfg.Log = &config.Log{ + Level: cfg.Commons.Log.Level, + Pretty: cfg.Commons.Log.Pretty, + Color: cfg.Commons.Log.Color, + File: cfg.Commons.Log.File, + } + } else if cfg.Log == nil { + cfg.Log = &config.Log{} + } + + // load all env variables relevant to the config in the current context. + if err := envdecode.Decode(cfg); err != nil { + // no environment variable set for this config is an expected "error" + if !errors.Is(err, envdecode.ErrNoTargetFieldsAreSet) { + return err + } + } + + return nil +} diff --git a/notifications/pkg/config/service.go b/notifications/pkg/config/service.go new file mode 100644 index 00000000000..f98aa3d27e5 --- /dev/null +++ b/notifications/pkg/config/service.go @@ -0,0 +1,6 @@ +package config + +// Service defines the available service configuration. +type Service struct { + Name string +} diff --git a/notifications/pkg/logging/logging.go b/notifications/pkg/logging/logging.go new file mode 100644 index 00000000000..039b0451c47 --- /dev/null +++ b/notifications/pkg/logging/logging.go @@ -0,0 +1,17 @@ +package logging + +import ( + "github.com/owncloud/ocis/notifications/pkg/config" + "github.com/owncloud/ocis/ocis-pkg/log" +) + +// LoggerFromConfig initializes a service-specific logger instance. +func Configure(name string, cfg *config.Log) log.Logger { + return log.NewLogger( + log.Name(name), + log.Level(cfg.Level), + log.Pretty(cfg.Pretty), + log.Color(cfg.Color), + log.File(cfg.File), + ) +} diff --git a/notifications/pkg/service/service.go b/notifications/pkg/service/service.go new file mode 100644 index 00000000000..81e4a26b4ac --- /dev/null +++ b/notifications/pkg/service/service.go @@ -0,0 +1,64 @@ +package service + +import ( + "os" + "os/signal" + "syscall" + + "github.com/cs3org/reva/pkg/events" + "github.com/owncloud/ocis/notifications/pkg/channels" + "github.com/owncloud/ocis/ocis-pkg/log" +) + +type Service interface { + Run() error +} + +func NewEventsNotifier(events <-chan interface{}, channel channels.Channel, logger log.Logger) Service { + return eventsNotifier{ + logger: logger, + channel: channel, + events: events, + signals: make(chan os.Signal, 1), + } +} + +type eventsNotifier struct { + logger log.Logger + channel channels.Channel + events <-chan interface{} + signals chan os.Signal +} + +func (s eventsNotifier) Run() error { + signal.Notify(s.signals, syscall.SIGINT, syscall.SIGTERM) + s.logger.Debug(). + Msg("eventsNotifier started") + for { + select { + case evt := <-s.events: + go func() { + switch e := evt.(type) { + case events.ShareCreated: + msg := "You got a share!" + var err error + if e.GranteeUserID != nil { + err = s.channel.SendMessage([]string{e.GranteeUserID.OpaqueId}, msg) + } else if e.GranteeGroupID != nil { + err = s.channel.SendMessageToGroup(e.GranteeGroupID, msg) + } + if err != nil { + s.logger.Error(). + Err(err). + Str("event", "ShareCreated"). + Msg("failed to send a message") + } + } + }() + case <-s.signals: + s.logger.Debug(). + Msg("eventsNotifier stopped") + return nil + } + } +} diff --git a/ocis-pkg/config/config.go b/ocis-pkg/config/config.go index 76aaeb35f69..66707055a68 100644 --- a/ocis-pkg/config/config.go +++ b/ocis-pkg/config/config.go @@ -9,6 +9,7 @@ import ( graph "github.com/owncloud/ocis/graph/pkg/config" idp "github.com/owncloud/ocis/idp/pkg/config" nats "github.com/owncloud/ocis/nats/pkg/config" + notifications "github.com/owncloud/ocis/notifications/pkg/config" ocs "github.com/owncloud/ocis/ocs/pkg/config" proxy "github.com/owncloud/ocis/proxy/pkg/config" settings "github.com/owncloud/ocis/settings/pkg/config" @@ -62,6 +63,7 @@ type Config struct { GraphExplorer *graphExplorer.Config `ocisConfig:"graph_explorer"` IDP *idp.Config `ocisConfig:"idp"` Nats *nats.Config `ocisConfig:"nats"` + Notifications *notifications.Config `ocisConfig:"notifications"` OCS *ocs.Config `ocisConfig:"ocs"` Web *web.Config `ocisConfig:"web"` Proxy *proxy.Config `ocisConfig:"proxy"` diff --git a/ocis-pkg/config/defaultconfig.go b/ocis-pkg/config/defaultconfig.go index 53853cbfaf0..60eea86a67b 100644 --- a/ocis-pkg/config/defaultconfig.go +++ b/ocis-pkg/config/defaultconfig.go @@ -7,6 +7,7 @@ import ( graph "github.com/owncloud/ocis/graph/pkg/config" idp "github.com/owncloud/ocis/idp/pkg/config" nats "github.com/owncloud/ocis/nats/pkg/config" + notifications "github.com/owncloud/ocis/notifications/pkg/config" ocs "github.com/owncloud/ocis/ocs/pkg/config" proxy "github.com/owncloud/ocis/proxy/pkg/config" settings "github.com/owncloud/ocis/settings/pkg/config" @@ -31,6 +32,7 @@ func DefaultConfig() *Config { Graph: graph.DefaultConfig(), IDP: idp.DefaultConfig(), Nats: nats.DefaultConfig(), + Notifications: notifications.DefaultConfig(), Proxy: proxy.DefaultConfig(), GraphExplorer: graphExplorer.DefaultConfig(), OCS: ocs.DefaultConfig(), diff --git a/ocis/pkg/command/notifications.go b/ocis/pkg/command/notifications.go new file mode 100644 index 00000000000..ec2eb357066 --- /dev/null +++ b/ocis/pkg/command/notifications.go @@ -0,0 +1,26 @@ +package command + +import ( + "github.com/owncloud/ocis/notifications/pkg/command" + "github.com/owncloud/ocis/ocis-pkg/config" + "github.com/owncloud/ocis/ocis-pkg/config/parser" + "github.com/owncloud/ocis/ocis/pkg/register" + "github.com/urfave/cli/v2" +) + +// NatsServerCommand is the entrypoint for the nats server command. +func NotificationsCommand(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "notifications", + Usage: "start notifications service", + Category: "extensions", + Before: func(ctx *cli.Context) error { + return parser.ParseConfig(cfg) + }, + Subcommands: command.GetCommands(cfg.Notifications), + } +} + +func init() { + register.AddCommand(NotificationsCommand) +} diff --git a/ocis/pkg/runtime/service/service.go b/ocis/pkg/runtime/service/service.go index d160305ee06..fa1a129088b 100644 --- a/ocis/pkg/runtime/service/service.go +++ b/ocis/pkg/runtime/service/service.go @@ -24,6 +24,7 @@ import ( graph "github.com/owncloud/ocis/graph/pkg/command" idp "github.com/owncloud/ocis/idp/pkg/command" nats "github.com/owncloud/ocis/nats/pkg/command" + notifications "github.com/owncloud/ocis/notifications/pkg/command" "github.com/owncloud/ocis/ocis-pkg/config" ociscfg "github.com/owncloud/ocis/ocis-pkg/config" "github.com/owncloud/ocis/ocis-pkg/log" @@ -114,6 +115,7 @@ func NewService(options ...Option) (*Service, error) { s.ServicesRegistry["storage-shares"] = storage.NewStorageShares s.ServicesRegistry["storage-public-link"] = storage.NewStoragePublicLink s.ServicesRegistry["storage-appprovider"] = storage.NewAppProvider + s.ServicesRegistry["notifications"] = notifications.NewSutureService // populate delayed services s.Delayed["storage-sharing"] = storage.NewSharing