Skip to content

Commit

Permalink
[SDK] Fix closing event stream in rest client (#449)
Browse files Browse the repository at this point in the history
* Close all stream connections at shutdown

* Fix

* Make rest client close event stream
  • Loading branch information
altafan authored Feb 7, 2025
1 parent b8ba288 commit 50c22e7
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 83 deletions.
173 changes: 93 additions & 80 deletions pkg/client-sdk/client/rest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,9 @@ func (c *restClient) GetEventStream(
ctx context.Context, requestID string,
) (<-chan client.RoundEventChannel, func(), error) {
eventsCh := make(chan client.RoundEventChannel)
closeCh := make(chan struct{})

go func(eventsCh chan client.RoundEventChannel) {
go func(eventsCh chan client.RoundEventChannel, closeCh chan struct{}) {
httpClient := &http.Client{Timeout: time.Second * 0}

resp, err := httpClient.Get(fmt.Sprintf("%s/v1/events", c.serverURL))
Expand All @@ -273,103 +274,115 @@ func (c *restClient) GetEventStream(
return
}

chunkCh := make(chan []byte)
reader := bufio.NewReader(resp.Body)

for {
chunk, err := reader.ReadBytes('\n')
if err != nil {
// Stream ended
if err == io.EOF {
go func() {
for {
chunk, err := reader.ReadBytes('\n')
if err != nil {
// Stream ended
if err == io.EOF {
return
}
log.WithError(err).Warn("failed to read from round event stream")
return
}
log.WithError(err).Warn("failed to read from round event stream")
return
}

chunk = bytes.Trim(chunk, "\n")
resp := ark_service.ArkServiceGetEventStreamOKBody{}
if err := json.Unmarshal(chunk, &resp); err != nil {
eventsCh <- client.RoundEventChannel{
Err: fmt.Errorf("failed to parse message from round event stream: %s", err),
}
return
}
chunk = bytes.Trim(chunk, "\n")

emptyResp := ark_service.ArkServiceGetEventStreamOKBody{}
if resp == emptyResp {
continue
chunkCh <- chunk
}
}()

if resp.Error != nil {
eventsCh <- client.RoundEventChannel{
Err: fmt.Errorf("received error %d: %s", resp.Error.Code, resp.Error.Message),
for {
select {
case <-closeCh:
return
case chunk := <-chunkCh:
resp := ark_service.ArkServiceGetEventStreamOKBody{}
if err := json.Unmarshal(chunk, &resp); err != nil {
eventsCh <- client.RoundEventChannel{
Err: fmt.Errorf("failed to parse message from round event stream: %s", err),
}
return
}
continue
}

// Handle different event types
var event client.RoundEvent
var _err error
switch {
case resp.Result.RoundFailed != nil:
e := resp.Result.RoundFailed
event = client.RoundFailedEvent{
ID: e.ID,
Reason: e.Reason,
emptyResp := ark_service.ArkServiceGetEventStreamOKBody{}
if resp == emptyResp {
continue
}
case resp.Result.RoundFinalization != nil:
e := resp.Result.RoundFinalization
tree := treeFromProto{e.VtxoTree}.parse()

minRelayFeeRate, err := strconv.Atoi(e.MinRelayFeeRate)
if err != nil {
_err = err
break
if resp.Error != nil {
eventsCh <- client.RoundEventChannel{
Err: fmt.Errorf("received error %d: %s", resp.Error.Code, resp.Error.Message),
}
continue
}

event = client.RoundFinalizationEvent{
ID: e.ID,
Tx: e.RoundTx,
Tree: tree,
Connectors: e.Connectors,
MinRelayFeeRate: chainfee.SatPerKVByte(minRelayFeeRate),
}
case resp.Result.RoundFinalized != nil:
e := resp.Result.RoundFinalized
event = client.RoundFinalizedEvent{
ID: e.ID,
Txid: e.RoundTxid,
// Handle different event types
var event client.RoundEvent
var _err error
switch {
case resp.Result.RoundFailed != nil:
e := resp.Result.RoundFailed
event = client.RoundFailedEvent{
ID: e.ID,
Reason: e.Reason,
}
case resp.Result.RoundFinalization != nil:
e := resp.Result.RoundFinalization
tree := treeFromProto{e.VtxoTree}.parse()

minRelayFeeRate, err := strconv.Atoi(e.MinRelayFeeRate)
if err != nil {
_err = err
break
}

event = client.RoundFinalizationEvent{
ID: e.ID,
Tx: e.RoundTx,
Tree: tree,
Connectors: e.Connectors,
MinRelayFeeRate: chainfee.SatPerKVByte(minRelayFeeRate),
}
case resp.Result.RoundFinalized != nil:
e := resp.Result.RoundFinalized
event = client.RoundFinalizedEvent{
ID: e.ID,
Txid: e.RoundTxid,
}
case resp.Result.RoundSigning != nil:
e := resp.Result.RoundSigning
event = client.RoundSigningStartedEvent{
ID: e.ID,
UnsignedTree: treeFromProto{e.UnsignedVtxoTree}.parse(),
UnsignedRoundTx: e.UnsignedRoundTx,
CosignersPubkeys: e.CosignersPubkeys,
}
case resp.Result.RoundSigningNoncesGenerated != nil:
e := resp.Result.RoundSigningNoncesGenerated
reader := hex.NewDecoder(strings.NewReader(e.TreeNonces))
nonces, err := bitcointree.DecodeNonces(reader)
if err != nil {
_err = err
break
}
event = client.RoundSigningNoncesGeneratedEvent{
ID: e.ID,
Nonces: nonces,
}
}
case resp.Result.RoundSigning != nil:
e := resp.Result.RoundSigning
event = client.RoundSigningStartedEvent{
ID: e.ID,
UnsignedTree: treeFromProto{e.UnsignedVtxoTree}.parse(),
UnsignedRoundTx: e.UnsignedRoundTx,
CosignersPubkeys: e.CosignersPubkeys,
}
case resp.Result.RoundSigningNoncesGenerated != nil:
e := resp.Result.RoundSigningNoncesGenerated
reader := hex.NewDecoder(strings.NewReader(e.TreeNonces))
nonces, err := bitcointree.DecodeNonces(reader)
if err != nil {
_err = err
break
}
event = client.RoundSigningNoncesGeneratedEvent{
ID: e.ID,
Nonces: nonces,
}
}

eventsCh <- client.RoundEventChannel{
Event: event,
Err: _err,
eventsCh <- client.RoundEventChannel{
Event: event,
Err: _err,
}
}
}
}(eventsCh)
}(eventsCh, closeCh)

return eventsCh, func() {}, nil
return eventsCh, func() { closeCh <- struct{}{} }, nil
}

func (a *restClient) Ping(
Expand Down
7 changes: 6 additions & 1 deletion server/internal/interface/grpc/handlers/arkservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ type handler struct {

eventsListenerHandler *listenerHanlder[*arkv1.GetEventStreamResponse]
transactionsListenerHandler *listenerHanlder[*arkv1.GetTransactionsStreamResponse]

stopCh <-chan struct{}
}

func NewHandler(service application.Service) service {
func NewHandler(service application.Service, stopCh <-chan struct{}) service {
h := &handler{
svc: service,
eventsListenerHandler: newListenerHandler[*arkv1.GetEventStreamResponse](),
transactionsListenerHandler: newListenerHandler[*arkv1.GetTransactionsStreamResponse](),
stopCh: stopCh,
}

go h.listenToEvents()
Expand Down Expand Up @@ -295,6 +298,8 @@ func (h *handler) GetEventStream(

for {
select {
case <-h.stopCh:
return nil
case <-stream.Context().Done():
return nil
case ev := <-listener.ch:
Expand Down
9 changes: 7 additions & 2 deletions server/internal/interface/grpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type service struct {
grpcServer *grpc.Server
macaroonSvc *macaroons.Service
otelShutdown func(context.Context) error

stopCh chan (struct{})
}

func NewService(
Expand Down Expand Up @@ -100,7 +102,9 @@ func NewService(
log.Debugf("generated TLS key pair at path: %s", svcConfig.tlsDatadir())
}

return &service{svcConfig, appConfig, nil, nil, macaroonSvc, nil}, nil
stopCh := make(chan struct{}, 1)

return &service{svcConfig, appConfig, nil, nil, macaroonSvc, nil, stopCh}, nil
}

func (s *service) Start() error {
Expand Down Expand Up @@ -159,6 +163,7 @@ func (s *service) stop(withAppSvc bool) {
s.server.Shutdown(context.Background())
log.Info("stopped grpc server")
if withAppSvc {
s.stopCh <- struct{}{}
appSvc, _ := s.appConfig.AppService()
if appSvc != nil {
appSvc.Stop()
Expand Down Expand Up @@ -202,7 +207,7 @@ func (s *service) newServer(tlsConfig *tls.Config, withAppSvc bool) error {
return err
}
appSvc = svc
appHandler := handlers.NewHandler(appSvc)
appHandler := handlers.NewHandler(appSvc, s.stopCh)
arkv1.RegisterArkServiceServer(grpcServer, appHandler)
arkv1.RegisterExplorerServiceServer(grpcServer, appHandler)
}
Expand Down

0 comments on commit 50c22e7

Please sign in to comment.