Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kernel API for publish binary message broadcast #13681

Merged
merged 2 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 175 additions & 0 deletions kernel/api/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,29 @@ type Channel struct {
Count int `json:"count"`
}

type PublishMessage struct {
Type string `json:"type"` // "string" | "binary"
Size int `json:"size"` // message size
Filename string `json:"filename"` // empty string for string-message
}

type PublishResult struct {
Code int `json:"code"` // 0: success
Msg string `json:"msg"` // error message

Channel Channel `json:"channel"`
Message PublishMessage `json:"message"`
}

var (
BroadcastChannels = sync.Map{}
)

const (
StringMessageType = "string"
BinaryMessageType = "binary"
)

// broadcast create a broadcast channel WebSocket connection
//
// @param
Expand Down Expand Up @@ -136,7 +155,163 @@ func subscribe(c *gin.Context, broadcastChannel *melody.Melody, channel string)
}
}

// broadcastPublish push multiple binary messages to multiple broadcast channels
//
// @param
//
// MultipartForm: [name] -> [values]
// - name: string // channel name
// - values:
// - string[] // string-messages to the same channel
// - File[] // binary-messages to the same channel
// - filename: string // message key
//
// @returns
//
// {
// code: int,
// msg: string,
// data: {
// results: {
// code: int, // 0: success
// msg: string, // error message
// channel: {
// name: string, // channel name
// count: string, // subscriber count
// },
// message: {
// type: string, // "string" | "binary"
// size: int, // message size (Bytes)
// filename: string, // empty string for string-message
// },
// }[],
// },
// }
func broadcastPublish(c *gin.Context) {
ret := gulu.Ret.NewResult()
defer c.JSON(http.StatusOK, ret)

results := []*PublishResult{}

// Multipart form
form, err := c.MultipartForm()
if err != nil {
ret.Code = -2
ret.Msg = err.Error()
return
}

// Broadcast string messages
for name, values := range form.Value {
channel := Channel{
Name: name,
Count: 0,
}

// Get broadcast channel
_broadcastChannel, exist := BroadcastChannels.Load(name)
var broadcastChannel *melody.Melody
if exist {
broadcastChannel = _broadcastChannel.(*melody.Melody)
channel.Count = broadcastChannel.Len()
} else {
broadcastChannel = nil
channel.Count = 0
}

// Broadcast each string message to the same channel
for _, value := range values {
content := []byte(value)
result := &PublishResult{
Code: 0,
Msg: "",
Channel: channel,
Message: PublishMessage{
Type: StringMessageType,
Size: len(content),
Filename: "",
},
}
results = append(results, result)

if broadcastChannel != nil {
err := broadcastChannel.Broadcast(content)
if err != nil {
logging.LogErrorf("broadcast message failed: %s", err)
result.Code = -1
result.Msg = err.Error()
continue
}
}
}
}

// Broadcast binary message
for name, files := range form.File {
channel := Channel{
Name: name,
Count: 0,
}

// Get broadcast channel
_broadcastChannel, exist := BroadcastChannels.Load(name)
var broadcastChannel *melody.Melody
if exist {
broadcastChannel = _broadcastChannel.(*melody.Melody)
channel.Count = broadcastChannel.Len()
} else {
broadcastChannel = nil
channel.Count = 0
}

// Broadcast each binary message to the same channel
for _, file := range files {
result := &PublishResult{
Code: 0,
Msg: "",
Channel: channel,
Message: PublishMessage{
Type: BinaryMessageType,
Size: int(file.Size),
Filename: file.Filename,
},
}
results = append(results, result)

if broadcastChannel != nil {
value, err := file.Open()
if err != nil {
logging.LogErrorf("open multipart form file [%s] failed: %s", file.Filename, err)
result.Code = -2
result.Msg = err.Error()
continue
}

content := make([]byte, file.Size)
if _, err := value.Read(content); err != nil {
logging.LogErrorf("read multipart form file [%s] failed: %s", file.Filename, err)
result.Code = -3
result.Msg = err.Error()
continue
}

if err := broadcastChannel.BroadcastBinary(content); err != nil {
logging.LogErrorf("broadcast binary message failed: %s", err)
result.Code = -1
result.Msg = err.Error()
continue
}
}
}
}

ret.Data = map[string]interface{}{
"results": results,
}
}

// postMessage send string message to a broadcast channel
//
// @param
//
// {
Expand Down
1 change: 1 addition & 0 deletions kernel/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ func ServeAPI(ginServer *gin.Engine) {
ginServer.Handle("POST", "/api/network/forwardProxy", model.CheckAuth, model.CheckAdminRole, forwardProxy)

ginServer.Handle("GET", "/ws/broadcast", model.CheckAuth, model.CheckAdminRole, broadcast)
ginServer.Handle("POST", "/api/broadcast/publish", model.CheckAuth, model.CheckAdminRole, broadcastPublish)
ginServer.Handle("POST", "/api/broadcast/postMessage", model.CheckAuth, model.CheckAdminRole, postMessage)
ginServer.Handle("POST", "/api/broadcast/getChannels", model.CheckAuth, model.CheckAdminRole, getChannels)
ginServer.Handle("POST", "/api/broadcast/getChannelInfo", model.CheckAuth, model.CheckAdminRole, getChannelInfo)
Expand Down
Loading