Skip to content

Commit

Permalink
ssh: add deadlines support for channels
Browse files Browse the repository at this point in the history
This is actually Nicola Murino's fix from google source: https://go-review.googlesource.com/c/crypto/+/562756

deadlines unblock reads waiting for data and writes waiting for window
capacity

Fixes golang/go#65930
Fixes golang/go#67152

Change-Id: Ica42573cdf11ddf58e48b51fa82466a14cc5e606
  • Loading branch information
Jeff Williams committed Jul 28, 2024
1 parent bb80217 commit 57a10b1
Show file tree
Hide file tree
Showing 6 changed files with 335 additions and 7 deletions.
48 changes: 47 additions & 1 deletion ssh/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ package ssh

import (
"io"
"os"
"sync"
"time"
)

// buffer provides a linked list buffer for data exchange
Expand All @@ -19,7 +21,9 @@ type buffer struct {
head *element // the buffer that will be read first
tail *element // the buffer that will be read last

closed bool
closed bool
deadlineReached bool
timer *time.Timer
}

// An element represents a single link in a linked list.
Expand Down Expand Up @@ -55,6 +59,39 @@ func (b *buffer) write(buf []byte) {
func (b *buffer) eof() {
b.Cond.L.Lock()
b.closed = true
if b.timer != nil {
b.timer.Stop()
b.timer = nil
}
b.Cond.Signal()
b.Cond.L.Unlock()
}

func (b *buffer) setDeadline(deadline time.Time) {
if !deadline.IsZero() && deadline.Before(time.Now()) {
// Unblock read, if any.
b.deadline()
return
}
b.Cond.L.Lock()
defer b.Cond.L.Unlock()

b.deadlineReached = false
if b.timer != nil {
b.timer.Stop()
b.timer = nil
}
if deadline.IsZero() {
return
}
b.timer = time.AfterFunc(time.Until(deadline), func() {
b.deadline()
})
}

func (b *buffer) deadline() {
b.Cond.L.Lock()
b.deadlineReached = true
b.Cond.Signal()
b.Cond.L.Unlock()
}
Expand All @@ -65,6 +102,10 @@ func (b *buffer) Read(buf []byte) (n int, err error) {
b.Cond.L.Lock()
defer b.Cond.L.Unlock()

if b.deadlineReached {
return 0, os.ErrDeadlineExceeded
}

for len(buf) > 0 {
// if there is data in b.head, copy it
if len(b.head.buf) > 0 {
Expand All @@ -90,6 +131,11 @@ func (b *buffer) Read(buf []byte) (n int, err error) {
err = io.EOF
break
}
// check if the deadline was reached.
if b.deadlineReached {
err = os.ErrDeadlineExceeded
break
}
// out of buffers, wait for producer
b.Cond.Wait()
}
Expand Down
51 changes: 51 additions & 0 deletions ssh/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"io"
"log"
"sync"
"time"
)

const (
Expand Down Expand Up @@ -78,6 +79,26 @@ type Channel interface {
Stderr() io.ReadWriter
}

// ChannelWithDeadlines is a channel with deadlines support.
type ChannelWithDeadlines interface {
Channel

// SetDeadline sets the read and write deadlines associated with the
// channel. It is equivalent to calling both SetReadDeadline and
// SetWriteDeadline. Deadlines errors are not fatal, the Channel can be used
// again after resetting the deadlines.
SetDeadline(deadline time.Time) error

// SetReadDeadline sets the deadline for future Read calls and unblock Read
// calls waiting for data. A zero value for t means Read will not time out.
SetReadDeadline(deadline time.Time) error

// SetWriteDeadline sets the deadline for future Write calls and unblock
// Write calls waiting for window capacity. A zero value for t means Write
// will not time out.
SetWriteDeadline(deadline time.Time) error
}

// Request is a request sent outside of the normal stream of
// data. Requests can either be specific to an SSH channel, or they
// can be global.
Expand Down Expand Up @@ -205,6 +226,24 @@ type channel struct {
packetPool map[uint32][]byte
}

func (ch *channel) SetDeadline(deadline time.Time) error {
if err := ch.SetReadDeadline(deadline); err != nil {
return err
}
return ch.SetWriteDeadline(deadline)
}

func (ch *channel) SetReadDeadline(deadline time.Time) error {
ch.extPending.setDeadline(deadline)
ch.pending.setDeadline(deadline)
return nil
}

func (ch *channel) SetWriteDeadline(deadline time.Time) error {
ch.remoteWin.setDeadline(deadline)
return nil
}

// writePacket sends a packet. If the packet is a channel close, it updates
// sentClose. This method takes the lock c.writeMu.
func (ch *channel) writePacket(packet []byte) error {
Expand Down Expand Up @@ -492,6 +531,18 @@ type extChannel struct {
ch *channel
}

func (e *extChannel) SetDeadline(deadline time.Time) error {
return e.ch.SetDeadline(deadline)
}

func (e *extChannel) SetReadDeadline(deadline time.Time) error {
return e.ch.SetReadDeadline(deadline)
}

func (e *extChannel) SetWriteDeadline(deadline time.Time) error {
return e.ch.SetWriteDeadline(deadline)
}

func (e *extChannel) Write(data []byte) (n int, err error) {
return e.ch.WriteExtended(data, e.code)
}
Expand Down
57 changes: 51 additions & 6 deletions ssh/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"fmt"
"io"
"math"
"os"
"sync"
"time"

_ "crypto/sha1"
_ "crypto/sha256"
Expand Down Expand Up @@ -407,9 +409,11 @@ func newCond() *sync.Cond { return sync.NewCond(new(sync.Mutex)) }
// wishing to write to a channel.
type window struct {
*sync.Cond
win uint32 // RFC 4254 5.2 says the window size can grow to 2^32-1
writeWaiters int
closed bool
win uint32 // RFC 4254 5.2 says the window size can grow to 2^32-1
writeWaiters int
closed bool
deadlineReached bool
timer *time.Timer
}

// add adds win to the amount of window available
Expand Down Expand Up @@ -438,30 +442,71 @@ func (w *window) add(win uint32) bool {
func (w *window) close() {
w.L.Lock()
w.closed = true
if w.timer != nil {
w.timer.Stop()
w.timer = nil
}
w.Broadcast()
w.L.Unlock()
}

func (w *window) deadline() {
w.L.Lock()
w.deadlineReached = true
w.Broadcast()
w.L.Unlock()
}

func (w *window) setDeadline(deadline time.Time) {
if !deadline.IsZero() && deadline.Before(time.Now()) {
// Unblock reserve, if any.
w.deadline()
return
}
w.L.Lock()
defer w.L.Unlock()

w.deadlineReached = false
if w.timer != nil {
w.timer.Stop()
w.timer = nil
}
if deadline.IsZero() {
return
}
w.timer = time.AfterFunc(time.Until(deadline), func() {
w.deadline()
})
}

// reserve reserves win from the available window capacity.
// If no capacity remains, reserve will block. reserve may
// return less than requested.
func (w *window) reserve(win uint32) (uint32, error) {
var err error
w.L.Lock()
defer w.L.Unlock()

var err error
if w.deadlineReached {
return 0, os.ErrDeadlineExceeded
}
w.writeWaiters++
w.Broadcast()
for w.win == 0 && !w.closed {
for w.win == 0 && !w.closed && !w.deadlineReached {
w.Wait()
}
w.writeWaiters--
if w.win < win {
win = w.win
}
w.win -= win
if w.deadlineReached {
// If the window is also closed the error will be io.EOF.
err = os.ErrDeadlineExceeded
}
if w.closed {
err = io.EOF
}
w.L.Unlock()
return win, err
}

Expand Down
Loading

0 comments on commit 57a10b1

Please sign in to comment.