Skip to content

Commit

Permalink
refactor: move ClientInfo and ACLLogEntry from message.go
Browse files Browse the repository at this point in the history
  • Loading branch information
blueBlue0102 committed Feb 9, 2025
1 parent 6b4ba38 commit b1c175e
Show file tree
Hide file tree
Showing 2 changed files with 297 additions and 324 deletions.
316 changes: 0 additions & 316 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,24 +478,6 @@ func (r RedisResult) ToAny() (v any, err error) {
return
}

func (r RedisResult) AsClientInfo() (v *ClientInfo, err error) {
if r.err != nil {
err = r.err
} else {
v, err = r.val.AsClientInfo()
}
return
}

func (r RedisResult) AsACLLogEntry() (v []*ACLLogEntry, err error) {
if r.err != nil {
err = r.err
} else {
v, err = r.val.AsACLLogEntry()
}
return
}

// IsCacheHit delegates to RedisMessage.IsCacheHit
func (r RedisResult) IsCacheHit() bool {
return r.val.IsCacheHit()
Expand Down Expand Up @@ -1335,304 +1317,6 @@ func (m *RedisMessage) AsGeosearch() ([]GeoLocation, error) {
return geoLocations, nil
}

// ClientFlags is redis-server client flags, copy from redis/src/server.h (redis 7.0)
type ClientFlags uint64

const (
ClientSlave ClientFlags = 1 << 0 /* This client is a replica */
ClientMaster ClientFlags = 1 << 1 /* This client is a master */
ClientMonitor ClientFlags = 1 << 2 /* This client is a slave monitor, see MONITOR */
ClientMulti ClientFlags = 1 << 3 /* This client is in a MULTI context */
ClientBlocked ClientFlags = 1 << 4 /* The client is waiting in a blocking operation */
ClientDirtyCAS ClientFlags = 1 << 5 /* Watched keys modified. EXEC will fail. */
ClientCloseAfterReply ClientFlags = 1 << 6 /* Close after writing entire reply. */
ClientUnBlocked ClientFlags = 1 << 7 /* This client was unblocked and is stored in server.unblocked_clients */
ClientScript ClientFlags = 1 << 8 /* This is a non-connected client used by Lua */
ClientAsking ClientFlags = 1 << 9 /* Client issued the ASKING command */
ClientCloseASAP ClientFlags = 1 << 10 /* Close this client ASAP */
ClientUnixSocket ClientFlags = 1 << 11 /* Client connected via Unix domain socket */
ClientDirtyExec ClientFlags = 1 << 12 /* EXEC will fail for errors while queueing */
ClientMasterForceReply ClientFlags = 1 << 13 /* Queue replies even if is master */
ClientForceAOF ClientFlags = 1 << 14 /* Force AOF propagation of current cmd. */
ClientForceRepl ClientFlags = 1 << 15 /* Force replication of current cmd. */
ClientPrePSync ClientFlags = 1 << 16 /* Instance don't understand PSYNC. */
ClientReadOnly ClientFlags = 1 << 17 /* Cluster client is in read-only state. */
ClientPubSub ClientFlags = 1 << 18 /* Client is in Pub/Sub mode. */
ClientPreventAOFProp ClientFlags = 1 << 19 /* Don't propagate to AOF. */
ClientPreventReplProp ClientFlags = 1 << 20 /* Don't propagate to slaves. */
ClientPreventProp ClientFlags = ClientPreventAOFProp | ClientPreventReplProp
ClientPendingWrite ClientFlags = 1 << 21 /* Client has output to send but a-write handler is yet not installed. */
ClientReplyOff ClientFlags = 1 << 22 /* Don't send replies to client. */
ClientReplySkipNext ClientFlags = 1 << 23 /* Set ClientREPLY_SKIP for next cmd */
ClientReplySkip ClientFlags = 1 << 24 /* Don't send just this reply. */
ClientLuaDebug ClientFlags = 1 << 25 /* Run EVAL in debug mode. */
ClientLuaDebugSync ClientFlags = 1 << 26 /* EVAL debugging without fork() */
ClientModule ClientFlags = 1 << 27 /* Non connected client used by some module. */
ClientProtected ClientFlags = 1 << 28 /* Client should not be freed for now. */
ClientExecutingCommand ClientFlags = 1 << 29 /* Indicates that the client is currently in the process of handling
a command. usually this will be marked only during call()
however, blocked clients might have this flag kept until they
will try to reprocess the command. */
ClientPendingCommand ClientFlags = 1 << 30 /* Indicates the client has a fully * parsed command ready for execution. */
ClientTracking ClientFlags = 1 << 31 /* Client enabled keys tracking in order to perform client side caching. */
ClientTrackingBrokenRedir ClientFlags = 1 << 32 /* Target client is invalid. */
ClientTrackingBCAST ClientFlags = 1 << 33 /* Tracking in BCAST mode. */
ClientTrackingOptIn ClientFlags = 1 << 34 /* Tracking in opt-in mode. */
ClientTrackingOptOut ClientFlags = 1 << 35 /* Tracking in opt-out mode. */
ClientTrackingCaching ClientFlags = 1 << 36 /* CACHING yes/no was given, depending on optin/optout mode. */
ClientTrackingNoLoop ClientFlags = 1 << 37 /* Don't send invalidation messages about writes performed by myself.*/
ClientInTimeoutTable ClientFlags = 1 << 38 /* This client is in the timeout table. */
ClientProtocolError ClientFlags = 1 << 39 /* Protocol error chatting with it. */
ClientCloseAfterCommand ClientFlags = 1 << 40 /* Close after executing commands * and writing entire reply. */
ClientDenyBlocking ClientFlags = 1 << 41 /* Indicate that the client should not be blocked. currently, turned on inside MULTI, Lua, RM_Call, and AOF client */
ClientReplRDBOnly ClientFlags = 1 << 42 /* This client is a replica that only wants RDB without replication buffer. */
ClientNoEvict ClientFlags = 1 << 43 /* This client is protected against client memory eviction. */
ClientAllowOOM ClientFlags = 1 << 44 /* Client used by RM_Call is allowed to fully execute scripts even when in OOM */
ClientNoTouch ClientFlags = 1 << 45 /* This client will not touch LFU/LRU stats. */
ClientPushing ClientFlags = 1 << 46 /* This client is pushing notifications. */
)

// ClientInfo is redis-server ClientInfo
type ClientInfo struct {
Addr string // address/port of the client
LAddr string // address/port of local address client connected to (bind address)
Name string // the name set by the client with CLIENT SETNAME
Events string // file descriptor events (see below)
LastCmd string // cmd, last command played
User string // the authenticated username of the client
LibName string // redis version 7.2, client library name
LibVer string // redis version 7.2, client library version
ID int64 // redis version 2.8.12, a unique 64-bit client ID
FD int64 // file descriptor corresponding to the socket
Age time.Duration // total duration of the connection in seconds
Idle time.Duration // idle time of the connection in seconds
Flags ClientFlags // client flags (see below)
DB int // current database ID
Sub int // number of channel subscriptions
PSub int // number of pattern matching subscriptions
SSub int // redis version 7.0.3, number of shard channel subscriptions
Multi int // number of commands in a MULTI/EXEC context
Watch int // redis version 7.4 RC1, number of keys this client is currently watching.
QueryBuf int // qbuf, query buffer length (0 means no query pending)
QueryBufFree int // qbuf-free, free space of the query buffer (0 means the buffer is full)
ArgvMem int // incomplete arguments for the next command (already extracted from query buffer)
MultiMem int // redis version 7.0, memory is used up by buffered multi commands
BufferSize int // rbs, usable size of buffer
BufferPeak int // rbp, peak used size of buffer in last 5 sec interval
OutputBufferLength int // obl, output buffer length
OutputListLength int // oll, output list length (replies are queued in this list when the buffer is full)
OutputMemory int // omem, output buffer memory usage
TotalMemory int // tot-mem, total memory consumed by this client in its various buffers
Redir int64 // client id of current client tracking redirection
Resp int // redis version 7.0, client RESP protocol version
}

// AsClientInfo check if message is a redis RESP3 client info response, and return it
func (m *RedisMessage) AsClientInfo() (*ClientInfo, error) {
txt, err := m.ToString()
if err != nil {
return nil, err
}
info := &ClientInfo{}

for _, s := range strings.Split(strings.TrimPrefix(strings.TrimSpace(txt), "txt:"), " ") {
kv := strings.Split(s, "=")
if len(kv) != 2 {
return nil, fmt.Errorf("redis: unexpected client info data (%s)", s)
}
key, val := kv[0], kv[1]

switch key {
case "id":
info.ID, err = strconv.ParseInt(val, 10, 64)
case "addr":
info.Addr = val
case "laddr":
info.LAddr = val
case "fd":
info.FD, err = strconv.ParseInt(val, 10, 64)
case "name":
info.Name = val
case "age":
var age int
if age, err = strconv.Atoi(val); err == nil {
info.Age = time.Duration(age) * time.Second
}
case "idle":
var idle int
if idle, err = strconv.Atoi(val); err == nil {
info.Idle = time.Duration(idle) * time.Second
}
case "flags":
if val == "N" {
break
}

for i := 0; i < len(val); i++ {
switch val[i] {
case 'S':
info.Flags |= ClientSlave
case 'O':
info.Flags |= ClientSlave | ClientMonitor
case 'M':
info.Flags |= ClientMaster
case 'P':
info.Flags |= ClientPubSub
case 'x':
info.Flags |= ClientMulti
case 'b':
info.Flags |= ClientBlocked
case 't':
info.Flags |= ClientTracking
case 'R':
info.Flags |= ClientTrackingBrokenRedir
case 'B':
info.Flags |= ClientTrackingBCAST
case 'd':
info.Flags |= ClientDirtyCAS
case 'c':
info.Flags |= ClientCloseAfterCommand
case 'u':
info.Flags |= ClientUnBlocked
case 'A':
info.Flags |= ClientCloseASAP
case 'U':
info.Flags |= ClientUnixSocket
case 'r':
info.Flags |= ClientReadOnly
case 'e':
info.Flags |= ClientNoEvict
case 'T':
info.Flags |= ClientNoTouch
default:
return nil, fmt.Errorf("redis: unexpected client info flags(%s)", string(val[i]))
}
}
case "db":

info.DB, err = strconv.Atoi(val)
case "sub":
info.Sub, err = strconv.Atoi(val)
case "psub":
info.PSub, err = strconv.Atoi(val)
case "ssub":
info.SSub, err = strconv.Atoi(val)
case "multi":
info.Multi, err = strconv.Atoi(val)
case "watch":
info.Watch, err = strconv.Atoi(val)
case "qbuf":
info.QueryBuf, err = strconv.Atoi(val)
case "qbuf-free":
info.QueryBufFree, err = strconv.Atoi(val)
case "argv-mem":
info.ArgvMem, err = strconv.Atoi(val)
case "multi-mem":
info.MultiMem, err = strconv.Atoi(val)
case "rbs":
info.BufferSize, err = strconv.Atoi(val)
case "rbp":
info.BufferPeak, err = strconv.Atoi(val)
case "obl":
info.OutputBufferLength, err = strconv.Atoi(val)
case "oll":
info.OutputListLength, err = strconv.Atoi(val)
case "omem":
info.OutputMemory, err = strconv.Atoi(val)
case "tot-mem":
info.TotalMemory, err = strconv.Atoi(val)
case "events":
info.Events = val
case "cmd":
info.LastCmd = val
case "user":
info.User = val
case "redir":
info.Redir, err = strconv.ParseInt(val, 10, 64)
case "resp":
info.Resp, err = strconv.Atoi(val)
case "lib-name":
info.LibName = val
case "lib-ver":
info.LibVer = val
}

if err != nil {
return nil, err
}

}

return info, nil
}

type ACLLogEntry struct {
Count int64
Reason string
Context string
Object string
Username string
AgeSeconds float64
ClientInfo *ClientInfo
EntryID int64
TimestampCreated int64
TimestampLastUpdated int64
}

func (m *RedisMessage) AsACLLogEntry() ([]*ACLLogEntry, error) {
arr, err := m.ToArray()
if err != nil {
return nil, err

}
logEntries := make([]*ACLLogEntry, 0, len(arr))

for _, msg := range arr {
log, err := msg.AsMap()
if err != nil {
return nil, err
}
entry := ACLLogEntry{}

if attr, ok := log["count"]; ok {
entry.Count, err = attr.AsInt64()
}
if attr, ok := log["reason"]; ok {
entry.Reason, err = attr.ToString()
}
if attr, ok := log["context"]; ok {
entry.Context, err = attr.ToString()
}
if attr, ok := log["object"]; ok {
entry.Object, err = attr.ToString()
}
if attr, ok := log["username"]; ok {
entry.Username, err = attr.ToString()
}
if attr, ok := log["age-seconds"]; ok {
entry.AgeSeconds, err = attr.AsFloat64()
}
if attr, ok := log["client-info"]; ok {
entry.ClientInfo, err = attr.AsClientInfo()
}
if attr, ok := log["entry-id"]; ok {
entry.EntryID, err = attr.AsInt64()
}
if attr, ok := log["timestamp-created"]; ok {
entry.TimestampCreated, err = attr.AsInt64()
}
if attr, ok := log["timestamp-last-updated"]; ok {
entry.TimestampLastUpdated, err = attr.AsInt64()
}

if err != nil {
return nil, err
}
logEntries = append(logEntries, &entry)
}
return logEntries, nil
}

// ToMap check if message is a redis RESP3 map response, and return it
func (m *RedisMessage) ToMap() (map[string]RedisMessage, error) {
if m.IsMap() {
Expand Down
Loading

0 comments on commit b1c175e

Please sign in to comment.