Skip to content

Commit

Permalink
Make rest client close event stream
Browse files Browse the repository at this point in the history
  • Loading branch information
altafan committed Feb 5, 2025
1 parent 4e937f0 commit 18109f3
Showing 1 changed file with 93 additions and 80 deletions.
173 changes: 93 additions & 80 deletions pkg/client-sdk/client/rest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,9 @@ func (c *restClient) GetEventStream(
ctx context.Context, requestID string,
) (<-chan client.RoundEventChannel, func(), error) {
eventsCh := make(chan client.RoundEventChannel)
closeCh := make(chan struct{})

go func(eventsCh chan client.RoundEventChannel) {
go func(eventsCh chan client.RoundEventChannel, closeCh chan struct{}) {
httpClient := &http.Client{Timeout: time.Second * 0}

resp, err := httpClient.Get(fmt.Sprintf("%s/v1/events", c.serverURL))
Expand All @@ -273,103 +274,115 @@ func (c *restClient) GetEventStream(
return
}

chunkCh := make(chan []byte)
reader := bufio.NewReader(resp.Body)

for {
chunk, err := reader.ReadBytes('\n')
if err != nil {
// Stream ended
if err == io.EOF {
go func() {
for {
chunk, err := reader.ReadBytes('\n')
if err != nil {
// Stream ended
if err == io.EOF {
return
}
log.WithError(err).Warn("failed to read from round event stream")
return
}
log.WithError(err).Warn("failed to read from round event stream")
return
}

chunk = bytes.Trim(chunk, "\n")
resp := ark_service.ArkServiceGetEventStreamOKBody{}
if err := json.Unmarshal(chunk, &resp); err != nil {
eventsCh <- client.RoundEventChannel{
Err: fmt.Errorf("failed to parse message from round event stream: %s", err),
}
return
}
chunk = bytes.Trim(chunk, "\n")

emptyResp := ark_service.ArkServiceGetEventStreamOKBody{}
if resp == emptyResp {
continue
chunkCh <- chunk
}
}()

if resp.Error != nil {
eventsCh <- client.RoundEventChannel{
Err: fmt.Errorf("received error %d: %s", resp.Error.Code, resp.Error.Message),
for {
select {
case <-closeCh:
return
case chunk := <-chunkCh:
resp := ark_service.ArkServiceGetEventStreamOKBody{}
if err := json.Unmarshal(chunk, &resp); err != nil {
eventsCh <- client.RoundEventChannel{
Err: fmt.Errorf("failed to parse message from round event stream: %s", err),
}
return
}
continue
}

// Handle different event types
var event client.RoundEvent
var _err error
switch {
case resp.Result.RoundFailed != nil:
e := resp.Result.RoundFailed
event = client.RoundFailedEvent{
ID: e.ID,
Reason: e.Reason,
emptyResp := ark_service.ArkServiceGetEventStreamOKBody{}
if resp == emptyResp {
continue
}
case resp.Result.RoundFinalization != nil:
e := resp.Result.RoundFinalization
tree := treeFromProto{e.VtxoTree}.parse()

minRelayFeeRate, err := strconv.Atoi(e.MinRelayFeeRate)
if err != nil {
_err = err
break
if resp.Error != nil {
eventsCh <- client.RoundEventChannel{
Err: fmt.Errorf("received error %d: %s", resp.Error.Code, resp.Error.Message),
}
continue
}

event = client.RoundFinalizationEvent{
ID: e.ID,
Tx: e.RoundTx,
Tree: tree,
Connectors: e.Connectors,
MinRelayFeeRate: chainfee.SatPerKVByte(minRelayFeeRate),
}
case resp.Result.RoundFinalized != nil:
e := resp.Result.RoundFinalized
event = client.RoundFinalizedEvent{
ID: e.ID,
Txid: e.RoundTxid,
// Handle different event types
var event client.RoundEvent
var _err error
switch {
case resp.Result.RoundFailed != nil:
e := resp.Result.RoundFailed
event = client.RoundFailedEvent{
ID: e.ID,
Reason: e.Reason,
}
case resp.Result.RoundFinalization != nil:
e := resp.Result.RoundFinalization
tree := treeFromProto{e.VtxoTree}.parse()

minRelayFeeRate, err := strconv.Atoi(e.MinRelayFeeRate)
if err != nil {
_err = err
break
}

event = client.RoundFinalizationEvent{
ID: e.ID,
Tx: e.RoundTx,
Tree: tree,
Connectors: e.Connectors,
MinRelayFeeRate: chainfee.SatPerKVByte(minRelayFeeRate),
}
case resp.Result.RoundFinalized != nil:
e := resp.Result.RoundFinalized
event = client.RoundFinalizedEvent{
ID: e.ID,
Txid: e.RoundTxid,
}
case resp.Result.RoundSigning != nil:
e := resp.Result.RoundSigning
event = client.RoundSigningStartedEvent{
ID: e.ID,
UnsignedTree: treeFromProto{e.UnsignedVtxoTree}.parse(),
UnsignedRoundTx: e.UnsignedRoundTx,
CosignersPubkeys: e.CosignersPubkeys,
}
case resp.Result.RoundSigningNoncesGenerated != nil:
e := resp.Result.RoundSigningNoncesGenerated
reader := hex.NewDecoder(strings.NewReader(e.TreeNonces))
nonces, err := bitcointree.DecodeNonces(reader)
if err != nil {
_err = err
break
}
event = client.RoundSigningNoncesGeneratedEvent{
ID: e.ID,
Nonces: nonces,
}
}
case resp.Result.RoundSigning != nil:
e := resp.Result.RoundSigning
event = client.RoundSigningStartedEvent{
ID: e.ID,
UnsignedTree: treeFromProto{e.UnsignedVtxoTree}.parse(),
UnsignedRoundTx: e.UnsignedRoundTx,
CosignersPubkeys: e.CosignersPubkeys,
}
case resp.Result.RoundSigningNoncesGenerated != nil:
e := resp.Result.RoundSigningNoncesGenerated
reader := hex.NewDecoder(strings.NewReader(e.TreeNonces))
nonces, err := bitcointree.DecodeNonces(reader)
if err != nil {
_err = err
break
}
event = client.RoundSigningNoncesGeneratedEvent{
ID: e.ID,
Nonces: nonces,
}
}

eventsCh <- client.RoundEventChannel{
Event: event,
Err: _err,
eventsCh <- client.RoundEventChannel{
Event: event,
Err: _err,
}
}
}
}(eventsCh)
}(eventsCh, closeCh)

return eventsCh, func() {}, nil
return eventsCh, func() { closeCh <- struct{}{} }, nil
}

func (a *restClient) Ping(
Expand Down

0 comments on commit 18109f3

Please sign in to comment.