Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix recover from non proto remote message #1014

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions _examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
It is recommanded to create your own go.work to use the local version of the main module.

Besides, go.work is not recommanded to be commited to the repository.

Not all examples are up-to-date, please help to update them if you find any outdated examples.
4 changes: 2 additions & 2 deletions _examples/cluster-grain/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ module cluster-grain

go 1.21

replace github.com/asynkron/protoactor-go => ../..
// replace github.com/asynkron/protoactor-go => ../..

require (
github.com/asynkron/goconsole v0.0.0-20160504192649-bfa12eebf716
github.com/asynkron/protoactor-go v0.0.0-00010101000000-000000000000
github.com/asynkron/protoactor-go v0.0.0-20231231215642-2ecba7517929
google.golang.org/protobuf v1.31.0
)

Expand Down
4 changes: 2 additions & 2 deletions _examples/cluster-grain/shared/build.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
protoc --go_out=. --go_opt=paths=source_relative --proto_path=. protos.proto
protoc --go_out=. --go_opt=paths=source_relative --plugin=$GOPATH/bin/protoc-gen-go-grain --go-grain_out=. --go-grain_opt=paths=source_relative protos.proto
protoc --go_out=. --go_opt=paths=source_relative \
--plugin=protoc-gen-go-grain=../../../protobuf/protoc-gen-go-grain/protoc-gen-go-grain.sh --go-grain_out=. --go-grain_opt=paths=source_relative -I../../ -I. protos.proto
4 changes: 2 additions & 2 deletions _examples/cluster-grain/shared/protos.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 22 additions & 21 deletions _examples/cluster-grain/shared/protos_grain.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion actor/actor_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"log/slog"
"runtime/debug"
"sync/atomic"
"time"

Expand Down Expand Up @@ -702,10 +703,10 @@ func (ctx *actorContext) finalizeStop() {
//

func (ctx *actorContext) EscalateFailure(reason interface{}, message interface{}) {
// TODO: add callstack to log?
ctx.Logger().Info("[ACTOR] Recovering", slog.Any("self", ctx.self), slog.Any("reason", reason))
// debug setting, allows to output supervision failures in console/error level
if ctx.actorSystem.Config.DeveloperSupervisionLogging {
fmt.Printf("debug.Stack(): %s\n", debug.Stack())
fmt.Println("[Supervision] Actor:", ctx.self, " failed with message:", message, " exception:", reason)
ctx.Logger().Error("[Supervision]", slog.Any("actor", ctx.self), slog.Any("message", message), slog.Any("exception", reason))
}
Expand Down
5 changes: 1 addition & 4 deletions cluster/identitylookup/disthash/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,7 @@ func (pm *Manager) onClusterTopology(tplg *clustering.ClusterTopology) {
pm.cluster.Logger().Info("onClusterTopology", slog.Uint64("topology-hash", tplg.TopologyHash))

for _, m := range tplg.Members {
pm.cluster.Logger().Info("Got member ", slog.String("MemberId", m.Id))
for _, k := range m.Kinds {
pm.cluster.Logger().Info("" + m.Id + " - " + k)
}
pm.cluster.Logger().Info("Got member", slog.Any("member", m))
}

pm.rdv = clustering.NewRendezvous()
Expand Down
70 changes: 50 additions & 20 deletions cluster/pubsub_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,22 @@ import (
"github.com/asynkron/protoactor-go/remote"
)

var (
_ remote.RootSerializable = (*PubSubBatch)(nil)
_ remote.RootSerializable = (*DeliverBatchRequest)(nil)
_ remote.RootSerializable = (*PubSubAutoRespondBatch)(nil)

_ remote.RootSerialized = (*PubSubBatchTransport)(nil)
_ remote.RootSerialized = (*DeliverBatchRequestTransport)(nil)
_ remote.RootSerialized = (*PubSubAutoRespondBatchTransport)(nil)
)

type PubSubBatch struct {
Envelopes []interface{}
}

// Serialize converts a PubSubBatch to a PubSubBatchTransport.
func (b *PubSubBatch) Serialize() remote.RootSerialized {
func (b *PubSubBatch) Serialize() (remote.RootSerialized, error) {
batch := &PubSubBatchTransport{
TypeNames: make([]string, 0),
Envelopes: make([]*PubSubEnvelope, 0),
Expand All @@ -20,7 +30,7 @@ func (b *PubSubBatch) Serialize() remote.RootSerialized {
var serializerId int32
messageData, typeName, err := remote.Serialize(envelope, serializerId)
if err != nil {
panic(err)
return nil, err
}
// batch.TypeNames.IndexOf(typeName)
typeIndex := -1
Expand All @@ -40,23 +50,23 @@ func (b *PubSubBatch) Serialize() remote.RootSerialized {
SerializerId: serializerId,
})
}
return batch
return batch, nil
}

// Deserialize converts a PubSubBatchTransport to a PubSubBatch.
func (t *PubSubBatchTransport) Deserialize() remote.RootSerializable {
func (t *PubSubBatchTransport) Deserialize() (remote.RootSerializable, error) {
b := &PubSubBatch{
Envelopes: make([]interface{}, 0),
}

for _, envelope := range t.Envelopes {
message, err := remote.Deserialize(envelope.MessageData, t.TypeNames[envelope.TypeId], envelope.SerializerId)
if err != nil {
panic(err)
return nil, err
}
b.Envelopes = append(b.Envelopes, message)
}
return b
return b, nil
}

type DeliverBatchRequest struct {
Expand All @@ -65,34 +75,49 @@ type DeliverBatchRequest struct {
Topic string
}

func (d *DeliverBatchRequest) Serialize() remote.RootSerialized {
func (d *DeliverBatchRequest) Serialize() (remote.RootSerialized, error) {
rs, err := d.PubSubBatch.Serialize()
if err != nil {
return nil, err
}

return &DeliverBatchRequestTransport{
Subscribers: d.Subscribers,
Batch: d.PubSubBatch.Serialize().(*PubSubBatchTransport),
Batch: rs.(*PubSubBatchTransport),
Topic: d.Topic,
}
}, nil
}

func (t *DeliverBatchRequestTransport) Deserialize() remote.RootSerializable {
func (t *DeliverBatchRequestTransport) Deserialize() (remote.RootSerializable, error) {
rs, err := t.Batch.Deserialize()
if err != nil {
return nil, err
}

return &DeliverBatchRequest{
Subscribers: t.Subscribers,
PubSubBatch: t.Batch.Deserialize().(*PubSubBatch),
PubSubBatch: rs.(*PubSubBatch),
Topic: t.Topic,
}
}, nil
}

type PubSubAutoRespondBatch struct {
Envelopes []interface{}
}

// Serialize converts a PubSubAutoRespondBatch to a PubSubAutoRespondBatchTransport.
func (b *PubSubAutoRespondBatch) Serialize() remote.RootSerialized {
func (b *PubSubAutoRespondBatch) Serialize() (remote.RootSerialized, error) {
batch := &PubSubBatch{Envelopes: b.Envelopes}
transport := batch.Serialize().(*PubSubBatchTransport)
return &PubSubAutoRespondBatchTransport{
TypeNames: transport.TypeNames,
Envelopes: transport.Envelopes,

rs, err := batch.Serialize()
if err != nil {
return nil, err
}

return &PubSubAutoRespondBatchTransport{
TypeNames: rs.(*PubSubBatchTransport).TypeNames,
Envelopes: rs.(*PubSubBatchTransport).Envelopes,
}, nil
}

// GetAutoResponse returns a PublishResponse.
Expand All @@ -108,12 +133,17 @@ func (b *PubSubAutoRespondBatch) GetMessages() []interface{} {
}

// Deserialize converts a PubSubAutoRespondBatchTransport to a PubSubAutoRespondBatch.
func (t *PubSubAutoRespondBatchTransport) Deserialize() remote.RootSerializable {
func (t *PubSubAutoRespondBatchTransport) Deserialize() (remote.RootSerializable, error) {
batch := &PubSubBatchTransport{
TypeNames: t.TypeNames,
Envelopes: t.Envelopes,
}
return &PubSubAutoRespondBatch{
Envelopes: batch.Deserialize().(*PubSubBatch).Envelopes,
rs, err := batch.Deserialize()
if err != nil {
return nil, err
}

return &PubSubAutoRespondBatch{
Envelopes: rs.(*PubSubBatch).Envelopes,
}, nil
}
14 changes: 10 additions & 4 deletions remote/endpoint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

type endpointLazy struct {
// valueFunc func() *endpoint
unloaded uint32
unloaded atomic.Bool
once sync.Once
endpoint atomic.Value
manager *endpointManager
Expand All @@ -27,6 +27,7 @@ func NewEndpointLazy(em *endpointManager, address string) *endpointLazy {
}

func (el *endpointLazy) connect() {
el.manager.remote.actorSystem.Logger().Debug("connecting to remote address", slog.String("address", el.address))
em := el.manager
system := em.remote.actorSystem
rst, _ := system.Root.RequestFuture(em.endpointSupervisor, el.address, -1).Result()
Expand Down Expand Up @@ -247,10 +248,10 @@ func (em *endpointManager) removeEndpoint(msg *EndpointTerminatedEvent) {
v, ok := em.connections.Load(msg.Address)
if ok {
le := v.(*endpointLazy)
if atomic.CompareAndSwapUint32(&le.unloaded, 0, 1) {
if le.unloaded.CompareAndSwap(false, true) {
em.connections.Delete(msg.Address)
ep := le.Get()
em.remote.Logger().Debug("Sending EndpointTerminatedEvent to EndpointWatcher ans EndpointWriter", slog.String("address", msg.Address))
em.remote.Logger().Debug("Sending EndpointTerminatedEvent to EndpointWatcher and EndpointWriter", slog.String("address", msg.Address))
em.remote.actorSystem.Root.Send(ep.watcher, msg)
em.remote.actorSystem.Root.Send(ep.writer, msg)
}
Expand All @@ -274,13 +275,18 @@ func (state *endpointSupervisor) Receive(ctx actor.Context) {
writer: state.spawnEndpointWriter(state.remote, address, ctx),
watcher: state.spawnEndpointWatcher(state.remote, address, ctx),
}
ctx.Logger().Debug("id", slog.String("ewr", e.writer.Id), slog.String("ewa", e.watcher.Id))
ctx.Respond(e)
}
}

func (state *endpointSupervisor) HandleFailure(actorSystem *actor.ActorSystem, supervisor actor.Supervisor, child *actor.PID, rs *actor.RestartStatistics, reason interface{}, message interface{}) {
actorSystem.Logger().Debug("EndpointSupervisor handling failure", slog.Any("reason", reason), slog.Any("message", message))
supervisor.RestartChildren(child)
// use restart will cause a start loop, just stop it for now
// supervisor.RestartChildren(child)

// TODO: an extra stop is sent to the deadletter caused by EndpointTerminatedEvent
supervisor.StopChildren(child)
}

func (state *endpointSupervisor) spawnEndpointWriter(remote *Remote, address string, ctx actor.Context) *actor.PID {
Expand Down
6 changes: 5 additions & 1 deletion remote/endpoint_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ func (s *endpointReader) onMessageBatch(m *MessageBatch) error {
// translate from on-the-wire representation to in-process representation
// this only applies to root level messages, and never on nested child messages
if v, ok := message.(RootSerialized); ok {
message = v.Deserialize()
message, err = v.Deserialize()
if err != nil {
s.remote.Logger().Error("EndpointReader failed to deserialize", slog.Any("error", err))
return err
}
}

switch msg := message.(type) {
Expand Down
Loading
Loading