Skip to content

Commit

Permalink
Optimized stream proxy codebase
Browse files Browse the repository at this point in the history
- Moved stream proxy config from database to file based conf
- Optimized implementation for detecting proxy rule running
- Fixed #320 (hopefully)
  • Loading branch information
tobychui committed Oct 25, 2024
1 parent 6923f0d commit 528be69
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 22 deletions.
88 changes: 71 additions & 17 deletions src/mod/streamproxy/streamproxy.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package streamproxy

import (
"encoding/json"
"errors"
"log"
"net"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/google/uuid"
"imuslab.com/zoraxy/mod/database"
"imuslab.com/zoraxy/mod/info/logger"
"imuslab.com/zoraxy/mod/utils"
)

/*
Expand Down Expand Up @@ -48,9 +51,10 @@ type ProxyRelayConfig struct {
}

type Options struct {
Database *database.Database
DefaultTimeout int
AccessControlHandler func(net.Conn) bool
ConfigStore string //Folder to store the config files, will be created if not exists
Logger *logger.Logger //Logger for the stream proxy
}

type Manager struct {
Expand All @@ -63,13 +67,37 @@ type Manager struct {

}

func NewStreamProxy(options *Options) *Manager {
options.Database.NewTable("tcprox")
func NewStreamProxy(options *Options) (*Manager, error) {
if !utils.FileExists(options.ConfigStore) {
err := os.MkdirAll(options.ConfigStore, 0775)
if err != nil {
return nil, err
}
}

//Load relay configs from db
previousRules := []*ProxyRelayConfig{}
if options.Database.KeyExists("tcprox", "rules") {
options.Database.Read("tcprox", "rules", &previousRules)
streamProxyConfigFiles, err := filepath.Glob(options.ConfigStore + "/*.config")
if err != nil {
return nil, err
}

for _, configFile := range streamProxyConfigFiles {
//Read file into bytes
configBytes, err := os.ReadFile(configFile)
if err != nil {
options.Logger.PrintAndLog("stream-prox", "Read stream proxy config failed", err)
continue
}
thisRelayConfig := &ProxyRelayConfig{}
err = json.Unmarshal(configBytes, thisRelayConfig)
if err != nil {
options.Logger.PrintAndLog("stream-prox", "Unmarshal stream proxy config failed", err)
continue
}

//Append the config to the list
previousRules = append(previousRules, thisRelayConfig)
}

//Check if the AccessControlHandler is empty. If yes, set it to always allow access
Expand All @@ -91,14 +119,27 @@ func NewStreamProxy(options *Options) *Manager {
rule.parent = &thisManager
if rule.Running {
//This was previously running. Start it again
log.Println("[Stream Proxy] Resuming stream proxy rule " + rule.Name)
thisManager.logf("Resuming stream proxy rule "+rule.Name, nil)
rule.Start()
}
}

thisManager.Configs = previousRules

return &thisManager
return &thisManager, nil
}

// Wrapper function to log error
func (m *Manager) logf(message string, originalError error) {
if m.Options.Logger == nil {
//Print to fmt
if originalError != nil {
message += ": " + originalError.Error()
}
println(message)
return
}
m.Options.Logger.PrintAndLog("stream-prox", message, originalError)
}

func (m *Manager) NewConfig(config *ProxyRelayOptions) string {
Expand Down Expand Up @@ -190,8 +231,19 @@ func (m *Manager) RemoveConfig(configUUID string) error {
return errors.New("config not found")
}

// Save all configs to ConfigStore folder
func (m *Manager) SaveConfigToDatabase() {
m.Options.Database.Write("tcprox", "rules", m.Configs)
for _, config := range m.Configs {
configBytes, err := json.Marshal(config)
if err != nil {
m.logf("Failed to marshal stream proxy config", err)
continue
}
err = os.WriteFile(m.Options.ConfigStore+"/"+config.UUID+".config", configBytes, 0775)
if err != nil {
m.logf("Failed to save stream proxy config", err)
}
}
}

/*
Expand All @@ -217,9 +269,10 @@ func (c *ProxyRelayConfig) Start() error {
if err != nil {
if !c.UseTCP {
c.Running = false
c.udpStopChan = nil
c.parent.SaveConfigToDatabase()
}
log.Println("[TCP] Error starting stream proxy " + c.Name + "(" + c.UUID + "): " + err.Error())
c.parent.logf("[proto:udp] Error starting stream proxy "+c.Name+"("+c.UUID+")", err)
}
}()
}
Expand All @@ -231,8 +284,9 @@ func (c *ProxyRelayConfig) Start() error {
err := c.Port2host(c.ListeningAddress, c.ProxyTargetAddr, tcpStopChan)
if err != nil {
c.Running = false
c.tcpStopChan = nil
c.parent.SaveConfigToDatabase()
log.Println("[TCP] Error starting stream proxy " + c.Name + "(" + c.UUID + "): " + err.Error())
c.parent.logf("[proto:tcp] Error starting stream proxy "+c.Name+"("+c.UUID+")", err)
}
}()
}
Expand All @@ -253,27 +307,27 @@ func (c *ProxyRelayConfig) Restart() {
if c.IsRunning() {
c.Stop()
}
time.Sleep(300 * time.Millisecond)
time.Sleep(3000 * time.Millisecond)
c.Start()
}

// Stop a running proxy if running
func (c *ProxyRelayConfig) Stop() {
log.Println("[STREAM PROXY] Stopping Stream Proxy " + c.Name)
c.parent.logf("Stopping Stream Proxy "+c.Name, nil)

if c.udpStopChan != nil {
log.Println("[STREAM PROXY] Stopping UDP for " + c.Name)
c.parent.logf("Stopping UDP for "+c.Name, nil)
c.udpStopChan <- true
c.udpStopChan = nil
}

if c.tcpStopChan != nil {
log.Println("[STREAM PROXY] Stopping TCP for " + c.Name)
c.parent.logf("Stopping TCP for "+c.Name, nil)
c.tcpStopChan <- true
c.tcpStopChan = nil
}

log.Println("[STREAM PROXY] Stopped Stream Proxy " + c.Name)
c.parent.logf("Stopped Stream Proxy "+c.Name, nil)
c.Running = false

//Update the running status
Expand Down
8 changes: 6 additions & 2 deletions src/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,14 @@ func startupSequence() {
webSshManager = sshprox.NewSSHProxyManager()

//Create TCP Proxy Manager
streamProxyManager = streamproxy.NewStreamProxy(&streamproxy.Options{
Database: sysdb,
streamProxyManager, err = streamproxy.NewStreamProxy(&streamproxy.Options{
AccessControlHandler: accessController.DefaultAccessRule.AllowConnectionAccess,
ConfigStore: "./conf/streamproxy",
Logger: SystemWideLogger,
})
if err != nil {
panic(err)
}

//Create WoL MAC storage table
sysdb.NewTable("wolmac")
Expand Down
15 changes: 12 additions & 3 deletions src/web/components/streamprox.html
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ <h3>Add or Edit Stream Proxy</h3>
</div>
</div>
<button id="addStreamProxyButton" class="ui basic button" type="submit"><i class="ui green add icon"></i> Create</button>
<button id="editStreamProxyButton" class="ui basic button" onclick="confirmEditTCPProxyConfig(event);" style="display:none;"><i class="ui green check icon"></i> Update</button>
<button id="editStreamProxyButton" class="ui basic button" onclick="confirmEditTCPProxyConfig(event, this);" style="display:none;"><i class="ui green check icon"></i> Update</button>
<button class="ui basic red button" onclick="event.preventDefault(); cancelStreamProxyEdit(event);"><i class="ui red remove icon"></i> Cancel</button>
</form>
</div>
Expand All @@ -88,7 +88,7 @@ <h3>Add or Edit Stream Proxy</h3>

//Check if update mode
if ($("#editStreamProxyButton").is(":visible")){
confirmEditTCPProxyConfig(event);
confirmEditTCPProxyConfig(event,$("#editStreamProxyButton")[0]);
return;
}

Expand Down Expand Up @@ -274,13 +274,18 @@ <h3>Add or Edit Stream Proxy</h3>
}
}

function confirmEditTCPProxyConfig(event){
function confirmEditTCPProxyConfig(event, btn){
event.preventDefault();
event.stopImmediatePropagation();
var form = $("#streamProxyForm");
let originalButtonHTML = $(btn).html();
$(btn).html(`<i class="ui loading spinner icon"></i> Updating`);
$(btn).addClass("disabled");

var formValid = validateTCPProxyConfig(form);
if (!formValid){
$(btn).html(originalButtonHTML);
$(btn).removeClass("disabled");
return;
}

Expand All @@ -299,6 +304,8 @@ <h3>Add or Edit Stream Proxy</h3>
timeout: parseInt($("#streamProxyForm input[name=timeout]").val().trim()),
},
success: function(response) {
$(btn).html(originalButtonHTML);
$(btn).removeClass("disabled");
if (response.error) {
msgbox(response.error, false, 6000);
}else{
Expand All @@ -310,6 +317,8 @@ <h3>Add or Edit Stream Proxy</h3>

},
error: function() {
$(btn).html(originalButtonHTML);
$(btn).removeClass("disabled");
msgbox('An error occurred while processing the request', false);
}
});
Expand Down

0 comments on commit 528be69

Please sign in to comment.