Skip to content

Commit

Permalink
[file] Ensure we do not heap-allocate on each async write
Browse files Browse the repository at this point in the history
Same reasoning as the previous commit, but for writes.
  • Loading branch information
sergiu128 committed Nov 14, 2024
1 parent a18cfe8 commit 08d3705
Showing 1 changed file with 41 additions and 21 deletions.
62 changes: 41 additions & 21 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ import (
var _ File = &file{}

type file struct {
ioc *IO
slot internal.Slot
closed uint32
readReactor fileReadReactor
ioc *IO
slot internal.Slot
closed uint32
readReactor fileReadReactor
writeReactor fileWriteReactor
}

type fileReadReactor struct {
Expand Down Expand Up @@ -45,6 +46,32 @@ func (r *fileReadReactor) onRead(err error) {
}
}

type fileWriteReactor struct {
file *file

b []byte
writeAll bool
cb AsyncCallback
wroteSoFar int
}

func (r *fileWriteReactor) init(b []byte, writeAll bool, cb AsyncCallback) {
r.b = b
r.writeAll = writeAll
r.cb = cb

r.wroteSoFar = 0
}

func (r *fileWriteReactor) onWrite(err error) {
r.file.ioc.Deregister(&r.file.slot)
if err != nil {
r.cb(err, r.wroteSoFar)
} else {
r.file.asyncWriteNow(r.b, r.wroteSoFar, r.writeAll, r.cb)
}
}

func newFile(ioc *IO, fd int) *file {
f := &file{
ioc: ioc,
Expand All @@ -55,6 +82,9 @@ func newFile(ioc *IO, fd int) *file {
f.readReactor = fileReadReactor{file: f}
f.readReactor.init(nil, false, nil)

f.writeReactor = fileWriteReactor{file: f}
f.writeReactor.init(nil, false, nil)

return f
}

Expand Down Expand Up @@ -183,14 +213,16 @@ func (f *file) AsyncWriteAll(b []byte, cb AsyncCallback) {
}

func (f *file) asyncWrite(b []byte, writeAll bool, cb AsyncCallback) {
f.writeReactor.init(b, writeAll, cb)

if f.ioc.Dispatched < MaxCallbackDispatch {
f.asyncWriteNow(b, 0, writeAll, func(err error, n int) {
f.ioc.Dispatched++
cb(err, n)
f.ioc.Dispatched--
})
} else {
f.scheduleWrite(b, 0, writeAll, cb)
f.scheduleWrite(0 /* this is the starting point, we did not write anything yet */, cb)
}
}

Expand All @@ -207,20 +239,20 @@ func (f *file) asyncWriteNow(b []byte, wroteSoFar int, writeAll bool, cb AsyncCa

// Handles (writeAll == false) and (writeAll == true && wroteSoFar != len(b)).
if err == sonicerrors.ErrWouldBlock {
f.scheduleWrite(b, wroteSoFar, writeAll, cb)
f.scheduleWrite(wroteSoFar, cb)
} else {
cb(err, wroteSoFar)
}
}

func (f *file) scheduleWrite(b []byte, wroteSoFar int, writeAll bool, cb AsyncCallback) {
func (f *file) scheduleWrite(wroteSoFar int, cb AsyncCallback) {
if f.Closed() {
cb(io.EOF, 0)
return
}

handler := f.getWriteHandler(b, wroteSoFar, writeAll, cb)
f.slot.Set(internal.WriteEvent, handler)
f.writeReactor.wroteSoFar = wroteSoFar
f.slot.Set(internal.WriteEvent, f.writeReactor.onWrite)

if err := f.ioc.SetWrite(&f.slot); err != nil {
cb(err, wroteSoFar)
Expand All @@ -229,18 +261,6 @@ func (f *file) scheduleWrite(b []byte, wroteSoFar int, writeAll bool, cb AsyncCa
}
}

func (f *file) getWriteHandler(b []byte, wroteSoFar int, writeAll bool, cb AsyncCallback) internal.Handler {
return func(err error) {
f.ioc.Deregister(&f.slot)

if err != nil {
cb(err, wroteSoFar)
} else {
f.asyncWriteNow(b, wroteSoFar, writeAll, cb)
}
}
}

func (f *file) Close() error {
if !atomic.CompareAndSwapUint32(&f.closed, 0, 1) {
return io.EOF
Expand Down

0 comments on commit 08d3705

Please sign in to comment.