Skip to content

Commit

Permalink
feature(gbserver): 支持事件订阅与通知 (#23)
Browse files Browse the repository at this point in the history
* 支持事件与订阅

* 更新readme文件

---------

Co-authored-by: jianhao.chen <[email protected]>
  • Loading branch information
chenjianhao66 and jianhao.chen authored May 12, 2023
1 parent e49c3df commit 6f282b2
Show file tree
Hide file tree
Showing 9 changed files with 293 additions and 40 deletions.
18 changes: 7 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ go-GB28181是一个基于GB28181-2016标准实现的网络视频平台,用 Go

- [x] 注册和注销
- [x] 实时视音频点播
- [ ] 控制
- [ ] 设备控制
- [x] 控制
- [x] 设备控制
- [x] 云台控制
- [ ] 录像控制
- [x] 设备配置
- [ ] 信息查询
- [x] 设备目录查询
Expand All @@ -19,15 +18,12 @@ go-GB28181是一个基于GB28181-2016标准实现的网络视频平台,用 Go
- [ ] 报警查询
- [x] 设备配置查询
- [x] 设备信息查询
- [ ] 通知
- [x] 通知
- [x] 状态信息报送(心跳)
- [ ] 报警订阅
- [ ] 报警通知
- [ ] 目录订阅
- [ ] 目录通知
- [ ] 媒体通知
- [ ] 语音广播通知
- [ ] 语音广播和语音对讲
- [x] 报警订阅
- [x] 报警通知
- [x] 目录订阅
- [x] 目录通知


# 项目目录结构
Expand Down
42 changes: 42 additions & 0 deletions internal/gbserver/controller/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,45 @@ func (d *DeviceController) StatusQuery(ctx *gin.Context) {

newResponse(ctx).successWithAny(data)
}

func (d *DeviceController) AlarmSubscribe(ctx *gin.Context) {
deviceId := ctx.Param("deviceId")
device, ok := d.srv.Devices().GetByDeviceId(deviceId)
if !ok {
newResponse(ctx).fail(errDeviceNotFound.Error())
return
}
if err := gbsip.AlarmSubscribe(device); err != nil {
newResponse(ctx).fail("订阅失败")
return
}
newResponse(ctx).success()
}

func (d *DeviceController) CatalogSubscribe(ctx *gin.Context) {
deviceId := ctx.Param("deviceId")
device, ok := d.srv.Devices().GetByDeviceId(deviceId)
if !ok {
newResponse(ctx).fail(errDeviceNotFound.Error())
return
}
if err := gbsip.CatalogSubscribe(device); err != nil {
newResponse(ctx).fail("订阅失败")
return
}
newResponse(ctx).success()
}

func (d *DeviceController) MobilePositionSubscribe(ctx *gin.Context) {
deviceId := ctx.Param("deviceId")
device, ok := d.srv.Devices().GetByDeviceId(deviceId)
if !ok {
newResponse(ctx).fail(errDeviceNotFound.Error())
return
}
if err := gbsip.MobilePositionSubscribe(device); err != nil {
newResponse(ctx).fail("订阅失败")
return
}
newResponse(ctx).success()
}
14 changes: 11 additions & 3 deletions internal/gbserver/gb/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
var (
messageHandler = map[string]gosip.RequestHandler{
// 通知
"Notify:Keepalive": keepaliveHandler,
"Notify:Keepalive": keepaliveNotifyHandler,
"Notify:Alarm": alarmNotifyHandler,
"Notify:MobilePosition": mobilePositionNotifyHandler,

// 响应
// 查询设备信息响应
Expand All @@ -36,12 +38,18 @@ var (

// 查询设备配置信息响应
"Response:ConfigDownload": deviceConfigQueryHandler,

// 发起报警订阅信息响应
"Response:Alarm": subscribeAlarmResponseHandler,

// 发起设备移动位置订阅响应
"Response:MobilePosition": subscribeMobilePositionResponseHandler,
}
)

func MessageHandler(req sip.Request, tx sip.ServerTransaction) {
log.Debug("处理MESSAGE消息...")
log.Debugf("MESSAGE消息体:\n%s", req)
//log.Debug("处理MESSAGE消息...")
//log.Debugf("MESSAGE消息体:\n%s", req)
if l, ok := req.ContentLength(); !ok || l.Equals(0) {
log.Debug("该MESSAGE消息的消息体长度为0,返回OK")
_ = tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, http.StatusText(http.StatusOK), ""))
Expand Down
45 changes: 44 additions & 1 deletion internal/gbserver/gb/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type keepalive struct {
Info string `xml:"Info"`
}

func keepaliveHandler(req sip.Request, tx sip.ServerTransaction) {
func keepaliveNotifyHandler(req sip.Request, tx sip.ServerTransaction) {
keepalive := &keepalive{}
if err := xml.Unmarshal([]byte(req.Body()), keepalive); err != nil {
log.Debugf("keepalive 消息解析xml失败:%s", err)
Expand All @@ -45,3 +45,46 @@ func keepaliveHandler(req sip.Request, tx sip.ServerTransaction) {

_ = tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, http.StatusText(http.StatusOK), ""))
}

func alarmNotifyHandler(req sip.Request, tx sip.ServerTransaction) {
// 使用 gbsip.AlarmNotify 结构体解析
// 自行扩展

_ = responseAck(tx, req)
}

func mobilePositionNotifyHandler(req sip.Request, tx sip.ServerTransaction) {
// 自行扩展

_ = responseAck(tx, req)
}

func subscribeAlarmResponseHandler(req sip.Request, tx sip.ServerTransaction) {
r := parser.GetResultFromXML(req.Body())
if r == "" {
log.Error("获取不到响应信息中的Result字段")
return
}

if r == "ERROR" {
log.Error("订阅报警信息失败,请检查")
} else {
log.Debug("订阅报警信息成功")
}
_ = responseAck(tx, req)
}

func subscribeMobilePositionResponseHandler(req sip.Request, tx sip.ServerTransaction) {
r := parser.GetResultFromXML(req.Body())
if r == "" {
log.Error("获取不到响应信息中的Result字段")
return
}

if r == "ERROR" {
log.Error("订阅设备移动位置信息失败,请检查")
} else {
log.Debug("订阅设备移动位置信息信息成功")
}
_ = responseAck(tx, req)
}
10 changes: 9 additions & 1 deletion internal/gbserver/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,16 @@ func initDeviceRoute(group *gin.RouterGroup, factory storage.Factory) {
// 设备的基本配置
group.POST("/config/basic", d.BasicParamsConfig)
group.GET("/config/basic/:deviceId", d.BasicParamsQuery)

// 查询设备状态
group.GET("/status/:deviceId", d.StatusQuery)
// 查询设备文件目录
//group.GET("/catalog",)

// 订阅
group.POST("/subscribe/alarm:deviceId", d.AlarmSubscribe)
group.POST("/subscribe/catalog:deviceId", d.CatalogSubscribe)
group.POST("/subscribe/mobilePosition:deviceId", d.MobilePositionSubscribe)

}

func initChannelRoute(group *gin.RouterGroup, factory storage.Factory) {
Expand Down
68 changes: 68 additions & 0 deletions internal/pkg/gbsip/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,74 @@ func DeviceStatusQuery(d model.Device) error {
return nil
}

func AlarmSubscribe(device model.Device) error {
xml, err := parser.CreateQueryXML(parser.AlarmCmdType, device.DeviceId, parser.WithAlarmQuery())
if err != nil {
return errors.Wrap(err, "创建报警订阅请求body失败")
}
request, err := sipRequestFactory.createSubscribeRequest(device, xml, "presence")
if err != nil {
return errors.Wrap(err, "创建报警订阅请求失败")
}
log.Debug("报警订阅请求:\n", request)
tx, err := c.server.sendRequest(request)
if err != nil {
log.Error(err)
return errors.Wrap(err, "发送请求失败")
}
response := getResponse(tx)
if response == nil || !response.IsSuccess() {
log.Error("发送请求失败")
return errors.New("发送请求失败2")
}
log.Debug("报警订阅请求响应:\n", response)
return nil
}

func CatalogSubscribe(device model.Device) error {
xml, err := parser.CreateQueryXML(parser.CatalogCmdType, device.DeviceId)
if err != nil {
return errors.Wrap(err, "创建目录请订阅求body失败")
}
request, err := sipRequestFactory.createSubscribeRequest(device, xml, "Catalog")
if err != nil {
return errors.Wrap(err, "创建目录订阅请求失败")
}
tx, err := c.server.sendRequest(request)
if err != nil {
log.Error(err)
return errors.Wrap(err, "发送目录订阅请求失败")
}
response := getResponse(tx)
if response == nil || !response.IsSuccess() {
log.Error("接收目录订阅消息确认超时")
return errors.New("接收目录订阅消息确认超时")
}
return nil
}

func MobilePositionSubscribe(device model.Device) error {
xml, err := parser.CreateQueryXML(parser.MobilePositionCmdType, device.DeviceId, parser.WithCustomKV("Interval", "5"))
if err != nil {
return errors.Wrap(err, "创建设备移动位置订阅请求body失败")
}
request, err := sipRequestFactory.createSubscribeRequest(device, xml, "presence")
if err != nil {
return errors.Wrap(err, "创建设备移动位置订阅请求失败")
}
tx, err := c.server.sendRequest(request)
if err != nil {
log.Error(err)
return errors.Wrap(err, "发送设备移动位置订阅请求失败")
}
response := getResponse(tx)
if response == nil || !response.IsSuccess() {
log.Error("接收设备移动位置订阅确认超时")
return errors.New("接收设备移动位置订阅确认超时")
}
return nil
}

func Play(device model.Device, detail model.MediaDetail, streamId, ssrc string, channelId string, rtpPort int) (model.StreamInfo, error) {
log.Debugf("点播开始,流id: %c, 设备ip: %c, SSRC: %c, rtp端口: %d\n", streamId, device.Ip, ssrc, rtpPort)
request := sipRequestFactory.createInviteRequest(device, detail, channelId, ssrc, rtpPort)
Expand Down
64 changes: 51 additions & 13 deletions internal/pkg/gbsip/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,7 @@ package gbsip

import "encoding/xml"

// notify消息
type (
// Keepalive xml解析心跳包结构
Keepalive struct {
CmdType string `xml:"CmdType"`
SN int `xml:"SN"`
DeviceID string `xml:"DeviceID"`
Status string `xml:"Status"`
Info string `xml:"Info"`
}
)

// message消息
// common struct
type (
// CmdType 命令类型
CmdType struct {
Expand All @@ -39,7 +27,57 @@ type (
DeviceID
SN
}
)

// notify消息
type (
// Keepalive xml解析心跳包结构
Keepalive struct {
CmdType string `xml:"CmdType"`
SN int `xml:"SN"`
DeviceID string `xml:"DeviceID"`
Status string `xml:"Status"`
Info string `xml:"Info"`
}

// AlarmNotify 报警通知
AlarmNotify struct {
Mata
// 报警级别,1为一级警情、2为二级警情、3为三级警情、4为四级警情
AlarmPriority string `xml:"AlarmPriority"`
// !-报警方式(必选),取值1为电话报警,2为设备报警,3为短信报警,4为GPS报警
//5为视频报警,6为设备故障报警,7其他报警-〉
AlarmMethod string `xml:"AlarmMethod"`
// 报警时间
AlarmTime string `xml:"AlarmTime"`
// 报警内容描述
AlarmDescription string `xml:"AlarmDescription"`
// 经纬度信息
Longitude string `xml:"Longitude"`
Latitude string `xml:"Latitude"`
// 扩展信息
Info AlarmNotifyInfo `xml:"Info"`
}

AlarmNotifyInfo struct {
// 根据报警方式的不同,该字段的取值也有不同的含义
// 报警方式为2时,不携带AlarmType为默认的报警设备报警,但如果携带AlarmType取值,那么报警类型如下:1-视频丢失报警;2-设备防拆报警;3-存储设备磁盘满报警;4-设备高温报警;5-设备低温报警。
//
// 报警方式为5时,取值如下:1-人工视频报警;2-运动目标检测报警;3-遗留物检测报警;4-物体移除检测报警;5-绊线检测报警6-入侵检测报警;7-逆行检测报警;8-徘徊检测报警;9-流量统计报警;10-密度检测报警;11-视频异常检测报警;12-快速移动报警。
//
// 报警方式为6时,取值如下:1-存储设备磁盘故障报警;2-存储设备风扇故障报警。
AlarmType string `xml:"AlarmType"`
AlarmTypeParam AlarmTypeParam `xml:"AlarmTypeParam"`
}

AlarmTypeParam struct {
//(!一报警类型扩展参数。在人侵检测报警时可携带EventType〉事件类型(/Even- tType〉,事件类型取值:1-进入区域;2-离开区域。-〉
EventType string `xml:"EventType"`
}
)

// message消息
type (
// DeviceInfo 设备信息
DeviceInfo struct {
CmdType string `xml:"CmdType"`
Expand Down
Loading

0 comments on commit 6f282b2

Please sign in to comment.