Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

[WIP] remove waitpub, export publish #47

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,6 @@ func FlushPath(rt *Root, pth string) error {
return err
}

rt.repub.WaitPub()
rt.repub.PublishNow()
return nil
}
53 changes: 19 additions & 34 deletions repub.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ type Republisher struct {
TimeoutShort time.Duration
valueHasBeenUpdated chan struct{}
pubfunc PubFunc
immediatePublish chan chan struct{}

valueLock sync.Mutex
valueToPublish cid.Cid
lastValuePublished cid.Cid
valueToPublish *cid.Cid
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a pointer ? Can the CID be modified (elsewhere) while we are waiting to publish ?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I think I see how this handled in PublishNow() with the extractedValue == nil check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the CID be modified (elsewhere) while we are waiting to publish ?

It shouldn't be, the Update API hasn't changed, we still make a local copy.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should just be a cid.Cid and should be set to cid.Undef when "nil". (probably should have been cid.Nil but I didn't win that argument).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I thought of that at first but we do cid.Undef to issue publish orders, at least in some tests,

go-mfs/repub_test.go

Lines 43 to 45 in 4fb6dc4

go func() {
for {
rp.Update(cid.Undef)

so I think it violates the semantics I would expect of a nil value.

I agree that we should fix that and provide a cid.Nil but in the meanwhile I don't see the harm in implementing the nil with a pointer for an internal variable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pointer is fine but shouldn't be necessary. Really, we should fix that test, passing the "Undef" CID is should be equivalent to a "nil" CID (and passing a nil CID to rp.Update doesn't make sense).


ctx context.Context
cancel func()
Expand All @@ -37,29 +35,13 @@ func NewRepublisher(ctx context.Context, pf PubFunc, tshort, tlong time.Duration
TimeoutLong: tlong,
valueHasBeenUpdated: make(chan struct{}, 1),
pubfunc: pf,
immediatePublish: make(chan chan struct{}),
ctx: ctx,
cancel: cancel,
}
}

// WaitPub waits for the current value to be published (or returns early
// if it already has).
func (rp *Republisher) WaitPub() {
rp.valueLock.Lock()
valueHasBeenPublished := rp.lastValuePublished == rp.valueToPublish
rp.valueLock.Unlock()
if valueHasBeenPublished {
return
}

wait := make(chan struct{})
rp.immediatePublish <- wait
<-wait
}

func (rp *Republisher) Close() error {
err := rp.publish(rp.ctx)
err := rp.PublishNow()
rp.cancel()
return err
}
Expand All @@ -69,7 +51,7 @@ func (rp *Republisher) Close() error {
// the next publish occurs in order to more efficiently batch updates.
func (rp *Republisher) Update(c cid.Cid) {
rp.valueLock.Lock()
rp.valueToPublish = c
rp.valueToPublish = &c
rp.valueLock.Unlock()

select {
Expand Down Expand Up @@ -106,8 +88,6 @@ func (rp *Republisher) Run() {
longer := time.After(rp.TimeoutLong)

wait:
var valueHasBeenPublished chan struct{}

select {
case <-rp.ctx.Done():
return
Expand All @@ -121,15 +101,9 @@ func (rp *Republisher) Run() {

case <-quick:
case <-longer:
case valueHasBeenPublished = <-rp.immediatePublish:
}

err := rp.publish(rp.ctx)
if valueHasBeenPublished != nil {
// The user is waiting in `WaitPub` with this channel, signal
// that the `publish` has happened.
valueHasBeenPublished <- struct{}{}
}
err := rp.PublishNow()
if err != nil {
log.Errorf("republishRoot error: %s", err)
}
Expand All @@ -139,17 +113,28 @@ func (rp *Republisher) Run() {

// Wrapper function around the user-defined `pubfunc`. It publishes
// the (last) `valueToPublish` set and registers it in `lastValuePublished`.
func (rp *Republisher) publish(ctx context.Context) error {
// TODO: Allow passing a value to `PublishNow` which supersedes the
// internal `valueToPublish`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we want to allow this. Users shouldn't swap out the MFS root using the republisher.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand the comment, what do you mean by swap out?

The TODO (that I don't think I worded correctly) was aiming at adding an optional argument that would replace the Update(newCid); PublishNow(); call pair with just a PublishNow(newCid) call.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Yeah, that makes sense.

(context: I keep thinking we're exposing the republisher to the user)

func (rp *Republisher) PublishNow() error {

rp.valueLock.Lock()
topub := rp.valueToPublish
extractedValue := rp.valueToPublish
rp.valueLock.Unlock()

err := rp.pubfunc(ctx, topub)
if extractedValue == nil {
return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A concurrent call won't actually wait. We may need a RwMutex here.

// If this value is `nil` it means we've done a publish
// since the last `Update`.
}

err := rp.pubfunc(rp.ctx, *extractedValue);
if err != nil {
return err
}

rp.valueLock.Lock()
rp.lastValuePublished = topub
rp.valueToPublish = nil
rp.valueLock.Unlock()

return nil
}
9 changes: 7 additions & 2 deletions repub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func TestRepublisher(t *testing.T) {
go func() {
for {
rp.Update(cid.Undef)
// TODO: Updating always the same value will be ignored now,
// change to an incrementing integer.
time.Sleep(time.Millisecond * 10)
select {
case <-cctx.Done():
Expand Down Expand Up @@ -72,6 +74,9 @@ func TestRepublisher(t *testing.T) {
}
}()

// final pub from closing
<-pub
//// final pub from closing
//<-pub
// TODO: Document this in the commit: `Close` doesn't guarantee
// that a value will be actually published (in this case we're
// always updating the same repeated `cid.Undef` value).
}
3 changes: 2 additions & 1 deletion root.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ func NewRoot(parent context.Context, ds ipld.DAGService, node *dag.ProtoNode, pf
if pf != nil {
repub = NewRepublisher(parent, pf, time.Millisecond*300, time.Second*3)

repub.valueToPublish = node.Cid()
initialCid := node.Cid()
repub.valueToPublish = &initialCid
// No need to take the lock here since we just created
// the `Republisher` and no one has access to it yet.

Expand Down