Skip to content

Commit

Permalink
add 3rd return value
Browse files Browse the repository at this point in the history
  • Loading branch information
linyows committed Aug 24, 2023
1 parent 20d9e41 commit 6953ea6
Showing 1 changed file with 28 additions and 26 deletions.
54 changes: 28 additions & 26 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Pipe struct {
afterConnHook func()
}

type Mediator func([]byte, int) ([]byte, int)
type Mediator func([]byte, int) ([]byte, int, bool)
type Flow int
type Data []byte
type Direction string
Expand Down Expand Up @@ -87,7 +87,7 @@ func (p *Pipe) Do() {

// Sender --- packet --> Proxy
go func() {
_, err := p.copy(upstream, func(b []byte, i int) ([]byte, int) {
_, err := p.copy(upstream, func(b []byte, i int) ([]byte, int, bool) {
if !p.tls || p.rMailAddr == nil {
p.pairing(b[0:i])
}
Expand All @@ -100,7 +100,11 @@ func (p *Pipe) Do() {
p.readytls = false
go p.afterCommHook(b[0:i], srcToPxy)
}
return b, i
if p.locked {
p.waitForTLSConn(buf, nr)
}
go p.afterCommHook(p.removeMailBody(buf[0:nr]), srcToDst)
return b, i, false
})
if err != nil {
go p.afterCommHook([]byte(fmt.Sprintf("io copy error: %s", err.Error())), pxyToDst)
Expand All @@ -110,7 +114,7 @@ func (p *Pipe) Do() {

// Proxy <--- packet -- Receiver
go func() {
_, err := p.copy(downstream, func(b []byte, i int) ([]byte, int) {
_, err := p.copy(downstream, func(b []byte, i int) ([]byte, int, bool) {
if p.isResponseOfEHLOWithStartTLS(b) {
go p.afterCommHook(b[0:i], dstToPxy)
b, i = p.removeStartTLSCommand(b, i)
Expand All @@ -121,12 +125,27 @@ func (p *Pipe) Do() {
go p.afterCommHook([]byte(fmt.Sprintf("TLS connection error: %s", er.Error())), dstToPxy)
}
}

// time before email input
list := bytes.Split(buf, []byte(crlf))
for _, v := range list {
if len(v) >= 3 && string(v[:3]) == fmt.Sprint(codeStartingMailInput) {
p.timeAtDataStarting = time.Now()
}
}

// remove buffering ready response
if bytes.Contains(buf, []byte("Ready to start TLS")) || bytes.Contains(buf, []byte("SMTP server ready")) || bytes.Contains(buf, []byte("Start TLS")) {
continue
}

if p.isResponseOfEHLOWithoutStartTLS(b) {
go p.afterCommHook(b[0:i], pxyToSrc)
} else {
go p.afterCommHook(b[0:i], dstToSrc)
}
return b, i

return b, i, true
})
if err != nil {
go p.afterCommHook([]byte(fmt.Sprintf("io copy error: %s", err.Error())), dstToPxy)
Expand Down Expand Up @@ -182,35 +201,18 @@ func (p *Pipe) copy(dr Flow, fn Mediator) (written int64, err error) {
buf := make([]byte, bufferSize)

for {
var isContinue bool
if p.locked {
continue
}

nr, er := p.src(dr).Read(buf)
if nr > 0 {
buf, nr = fn(buf, nr)
if dr == upstream && p.locked {
p.waitForTLSConn(buf, nr)
}
if nr == 0 {
// Run the Mediator!
buf, nr, isContinue = fn(buf, nr)
if nr == 0 || isContinue {
continue
}
if dr == upstream {
go p.afterCommHook(p.removeMailBody(buf[0:nr]), srcToDst)
} else {
// time before email input
list := bytes.Split(buf, []byte(crlf))
for _, v := range list {
if len(v) >= 3 && string(v[:3]) == fmt.Sprint(codeStartingMailInput) {
p.timeAtDataStarting = time.Now()
}
}
// remove buffering ready response
if bytes.Contains(buf, []byte("Ready to start TLS")) || bytes.Contains(buf, []byte("SMTP server ready")) || bytes.Contains(buf, []byte("Start TLS")) {
continue
}
go p.afterCommHook(buf[0:nr], dstToSrc)
}
nw, ew := p.dst(dr).Write(buf[0:nr])
if nw > 0 {
written += int64(nw)
Expand Down

0 comments on commit 6953ea6

Please sign in to comment.