From 969cbe4d5ae5fb65093db0dd8da6b6a23f54ee7a Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Thu, 3 Jun 2021 14:30:10 +0530 Subject: [PATCH] store the count of unique receieved cids --- channelmonitor/channelmonitor_test.go | 4 +++ channels/channel_state.go | 7 ++++ channels/channels_fsm.go | 1 + channels/channels_test.go | 3 ++ channels/internal/internalchannel.go | 3 ++ channels/internal/internalchannel_cbor_gen.go | 33 ++++++++++++++++++- types.go | 3 ++ 7 files changed, 53 insertions(+), 1 deletion(-) diff --git a/channelmonitor/channelmonitor_test.go b/channelmonitor/channelmonitor_test.go index eefe3fc7..496519c9 100644 --- a/channelmonitor/channelmonitor_test.go +++ b/channelmonitor/channelmonitor_test.go @@ -544,6 +544,10 @@ func (m *mockChannelState) Status() datatransfer.Status { return datatransfer.Ongoing } +func (m *mockChannelState) NReceivedCids() uint64 { + panic("implement me") +} + func (m *mockChannelState) TransferID() datatransfer.TransferID { panic("implement me") } diff --git a/channels/channel_state.go b/channels/channel_state.go index a8beb25d..c2cb645c 100644 --- a/channels/channel_state.go +++ b/channels/channel_state.go @@ -50,6 +50,8 @@ type channelState struct { voucherDecoder DecoderByTypeFunc channelCIDsReader ChannelCIDsReader + nReceivedCids uint64 + // stages tracks the timeline of events related to a data transfer, for // traceability purposes. stages *datatransfer.ChannelStages @@ -107,6 +109,10 @@ func (c channelState) ReceivedCids() []cid.Cid { return receivedCids } +func (c channelState) NReceivedCids() uint64 { + return c.nReceivedCids +} + // Sender returns the peer id for the node that is sending data func (c channelState) Sender() peer.ID { return c.sender } @@ -211,6 +217,7 @@ func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByT voucherDecoder: voucherDecoder, channelCIDsReader: channelCIDsReader, stages: c.Stages, + nReceivedCids: c.NReceivedCids, } } diff --git a/channels/channels_fsm.go b/channels/channels_fsm.go index 79711ebc..1128e11c 100644 --- a/channels/channels_fsm.go +++ b/channels/channels_fsm.go @@ -50,6 +50,7 @@ var ChannelEvents = fsm.Events{ fsm.Event(datatransfer.DataReceivedProgress).FromMany(transferringStates...).ToNoChange(). Action(func(chst *internal.ChannelState, delta uint64) error { chst.Received += delta + chst.NReceivedCids = chst.NReceivedCids + 1 chst.AddLog("received data") return nil }), diff --git a/channels/channels_test.go b/channels/channels_test.go index 6ad58fad..a09e4b89 100644 --- a/channels/channels_test.go +++ b/channels/channels_test.go @@ -143,6 +143,7 @@ func TestChannels(t *testing.T) { require.Equal(t, datatransfer.Requested, state.Status()) require.Equal(t, uint64(0), state.Received()) require.Equal(t, uint64(0), state.Sent()) + require.Empty(t, state.ReceivedCids()) isNew, err := channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0], 50) @@ -153,6 +154,7 @@ func TestChannels(t *testing.T) { require.Equal(t, uint64(50), state.Received()) require.Equal(t, uint64(0), state.Sent()) require.Equal(t, []cid.Cid{cids[0]}, state.ReceivedCids()) + require.Equal(t, uint64(1), state.NReceivedCids()) isNew, err = channelList.DataSent(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1], 100) require.NoError(t, err) @@ -180,6 +182,7 @@ func TestChannels(t *testing.T) { require.Equal(t, uint64(100), state.Received()) require.Equal(t, uint64(100), state.Sent()) require.Equal(t, []cid.Cid{cids[0], cids[1]}, state.ReceivedCids()) + require.Equal(t, uint64(2), state.NReceivedCids()) isNew, err = channelList.DataSent(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1], 25) require.NoError(t, err) diff --git a/channels/internal/internalchannel.go b/channels/internal/internalchannel.go index 5cbd56a0..3c0f83b1 100644 --- a/channels/internal/internalchannel.go +++ b/channels/internal/internalchannel.go @@ -61,6 +61,9 @@ type ChannelState struct { Vouchers []EncodedVoucher VoucherResults []EncodedVoucherResult + // NReceivedCids is the number of cids for which data has already been received by this node. + NReceivedCids uint64 + // Stages traces the execution fo a data transfer. // // EXPERIMENTAL; subject to change. diff --git a/channels/internal/internalchannel_cbor_gen.go b/channels/internal/internalchannel_cbor_gen.go index 0546d414..aaf712b1 100644 --- a/channels/internal/internalchannel_cbor_gen.go +++ b/channels/internal/internalchannel_cbor_gen.go @@ -19,7 +19,7 @@ func (t *ChannelState) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{177}); err != nil { + if _, err := w.Write([]byte{178}); err != nil { return err } @@ -341,6 +341,22 @@ func (t *ChannelState) MarshalCBOR(w io.Writer) error { } } + // t.NReceivedCids (uint64) (uint64) + if len("NReceivedCids") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"NReceivedCids\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("NReceivedCids"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("NReceivedCids")); err != nil { + return err + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.NReceivedCids)); err != nil { + return err + } + // t.Stages (datatransfer.ChannelStages) (struct) if len("Stages") > cbg.MaxLength { return xerrors.Errorf("Value in field \"Stages\" was too long") @@ -632,6 +648,21 @@ func (t *ChannelState) UnmarshalCBOR(r io.Reader) error { t.VoucherResults[i] = v } + // t.NReceivedCids (uint64) (uint64) + case "NReceivedCids": + + { + + maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.NReceivedCids = uint64(extra) + + } // t.Stages (datatransfer.ChannelStages) (struct) case "Stages": diff --git a/types.go b/types.go index 60a6e36c..d8939967 100644 --- a/types.go +++ b/types.go @@ -132,6 +132,9 @@ type ChannelState interface { // ReceivedCids returns the cids received so far on the channel ReceivedCids() []cid.Cid + // NReceivedCids returns the number of unique cids received on the channel. + NReceivedCids() uint64 + // Queued returns the number of bytes read from the node and queued for sending Queued() uint64