Skip to content

Commit

Permalink
updating rabbitmq looping to add more retries
Browse files Browse the repository at this point in the history
  • Loading branch information
its-a-feature committed Mar 5, 2025
1 parent 86c6488 commit bbfcb19
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 71 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.MD
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [3.3.1-rc48] - 2025-03-05

### Changed

- Updated the create tasking webhook to return sooner before sending to containers
- Updated some of the rabbitmq connections with loops to be a bit more robust to errors
- Updated some of the tasking status messages to try to be a bit more informative

## [3.3.1-rc47] - 2025-02-26

### Changed
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.3.1-rc47
3.3.1-rc48
2 changes: 1 addition & 1 deletion mythic-docker/src/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.3.1-rc47
3.3.1-rc48
10 changes: 5 additions & 5 deletions mythic-docker/src/rabbitmq/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,12 @@ type PT_TASK_FUNCTION_STATUS = string

const (
PT_TASK_FUNCTION_STATUS_OPSEC_PRE PT_TASK_FUNCTION_STATUS = "OPSEC Pre Check Running..."
PT_TASK_FUNCTION_STATUS_OPSEC_PRE_ERROR = "Error: processing arguments - click cog to check stderr"
PT_TASK_FUNCTION_STATUS_OPSEC_PRE_ERROR = "Error: processing arguments"
PT_TASK_FUNCTION_STATUS_OPSEC_PRE_BLOCKED = "OPSEC Pre Blocked"
PT_TASK_FUNCTION_STATUS_PREPROCESSING = "creating task..."
PT_TASK_FUNCTION_STATUS_PREPROCESSING_ERROR = "Error: creating task - click cog to check stderr"
PT_TASK_FUNCTION_STATUS_PREPROCESSING = "preparing task for agent..."
PT_TASK_FUNCTION_STATUS_PREPROCESSING_ERROR = "Error: creating task "
PT_TASK_FUNCTION_STATUS_OPSEC_POST = "OPSEC Post Check Running..."
PT_TASK_FUNCTION_STATUS_OPSEC_POST_ERROR = "Error: opsec check - click cog to check stderr"
PT_TASK_FUNCTION_STATUS_OPSEC_POST_ERROR = "Error: opsec check failed to run"
PT_TASK_FUNCTION_STATUS_OPSEC_POST_BLOCKED = "OPSEC Post Blocked"
PT_TASK_FUNCTION_STATUS_SUBMITTED = "submitted"
PT_TASK_FUNCTION_STATUS_PROCESSING = "agent processing"
Expand All @@ -169,7 +169,7 @@ const (
PT_TASK_FUNCTION_STATUS_GROUP_COMPLETED_FUNCTION = "Group Completion Function Running..."
PT_TASK_FUNCTION_STATUS_GROUP_COMPLETED_FUNCTION_ERROR = "Error: group completion function - click cog to check stderr"
PT_TASK_FUNCTION_STATUS_COMPLETED = "success"
PT_TASK_FUNCTION_STATUS_PROCESSED = "processed, waiting for more messages..."
PT_TASK_FUNCTION_STATUS_PROCESSED = "processed, agent sending responses..."
PT_TASK_FUNCTION_STATUS_INTERCEPTED = "intercepted for custom checks"
PT_TASK_FUNCTION_STATUS_INTERCEPTED_ERROR = "Error: Task Interception Failed"
)
Expand Down
24 changes: 10 additions & 14 deletions mythic-docker/src/rabbitmq/util_create_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,12 @@ func CreateTask(createTaskInput CreateTaskInput) CreateTaskResponse {
TaskDisplayID: task.DisplayID,
}
}
return submitTaskToContainer(task.ID)
go submitTaskToContainer(task.ID)
return CreateTaskResponse{
Status: "success",
TaskID: task.ID,
TaskDisplayID: task.DisplayID,
}
}

func associateUploadedFilesWithTask(task *databaseStructs.Task, files []string) {
Expand Down Expand Up @@ -804,24 +809,15 @@ func handleHelpCommand(createTaskInput CreateTaskInput, callback databaseStructs

}

func submitTaskToContainer(taskID int) CreateTaskResponse {
output := CreateTaskResponse{
Status: "error",
Error: "not implemented",
}
func submitTaskToContainer(taskID int) {
taskMessage := GetTaskConfigurationForContainer(taskID)
if err := RabbitMQConnection.SendPtTaskOPSECPre(taskMessage); err != nil {
err := RabbitMQConnection.SendPtTaskOPSECPre(taskMessage)
if err != nil {
logging.LogError(err, "Failed to send task to payload type")
output.Error = err.Error()
if _, err := database.DB.Exec(`UPDATE task SET status=$1 WHERE id=$2`,
TASK_STATUS_CONTAINER_DOWN, taskID); err != nil {
logging.LogError(err, "Failed to update task status")
}
} else {
output.Status = "success"
output.Error = ""
output.TaskID = taskID
output.TaskDisplayID = taskMessage.Task.DisplayID
}
return output
return
}
112 changes: 62 additions & 50 deletions mythic-docker/src/rabbitmq/utils_rabbitmq_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,29 +150,28 @@ func (r *rabbitMQConnection) GetConnection() (*amqp.Connection, error) {
defer r.mutex.Unlock()
if r.conn != nil && !r.conn.IsClosed() {
return r.conn, nil
} else {
for {
logging.LogInfo("Attempting to connect to rabbitmq")
conn, err := amqp.DialConfig(fmt.Sprintf("amqp://%s:%s@%s:%d/%s",
utils.MythicConfig.RabbitmqUser,
utils.MythicConfig.RabbitmqPassword,
utils.MythicConfig.RabbitmqHost,
utils.MythicConfig.RabbitmqPort,
utils.MythicConfig.RabbitmqVHost),
amqp.Config{
Dial: func(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, 10*time.Second)
},
}
for {
logging.LogInfo("Attempting to connect to rabbitmq")
conn, err := amqp.DialConfig(fmt.Sprintf("amqp://%s:%s@%s:%d/%s",
utils.MythicConfig.RabbitmqUser,
utils.MythicConfig.RabbitmqPassword,
utils.MythicConfig.RabbitmqHost,
utils.MythicConfig.RabbitmqPort,
utils.MythicConfig.RabbitmqVHost),
amqp.Config{
Dial: func(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, 10*time.Second)
},
)
if err != nil {
logging.LogError(err, "Failed to connect to rabbitmq")
time.Sleep(RETRY_CONNECT_DELAY)
continue
}
r.conn = conn
return conn, nil
},
)
if err != nil {
logging.LogError(err, "Failed to connect to rabbitmq")
time.Sleep(RETRY_CONNECT_DELAY)
continue
}
r.conn = conn
return conn, nil
}
}
func (r *rabbitMQConnection) SendStructMessage(exchange string, queue string, correlationId string, body interface{}, ignoreErrorMessage bool) error {
Expand Down Expand Up @@ -255,7 +254,6 @@ func (r *rabbitMQConnection) SendMessage(exchange string, queue string, correlat
}
logging.LogError(err, "failed 3 times")
return err

}
func (r *rabbitMQConnection) SendRPCMessage(exchange string, queue string, body []byte, exclusiveQueue bool) ([]byte, error) {
conn, err := r.GetConnection()
Expand Down Expand Up @@ -359,78 +357,90 @@ func (r *rabbitMQConnection) ReceiveFromMythicDirectExchange(exchange string, qu
// routingKey is the specific direct topic we're interested in for the exchange
// handler processes the messages we get on our queue
for {
if conn, err := r.GetConnection(); err != nil {
conn, err := r.GetConnection()
if err != nil {
logging.LogError(err, "Failed to connect to rabbitmq", "retry_wait_time", RETRY_CONNECT_DELAY)
time.Sleep(RETRY_CONNECT_DELAY)
continue
} else if ch, err := conn.Channel(); err != nil {
}
ch, err := conn.Channel()
if err != nil {
logging.LogError(err, "Failed to open rabbitmq channel", "retry_wait_time", RETRY_CONNECT_DELAY)
time.Sleep(RETRY_CONNECT_DELAY)
continue
} else if err = ch.ExchangeDeclare(
}
err = ch.ExchangeDeclare(
exchange, // exchange name
"direct", // type of exchange, ex: topic, fanout, direct, etc
true, // durable
true, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
); err != nil {
)
if err != nil {
logging.LogError(err, "Failed to declare exchange", "exchange", exchange, "exchange_type", "direct", "retry_wait_time", RETRY_CONNECT_DELAY)
time.Sleep(RETRY_CONNECT_DELAY)
continue
} else if q, err := ch.QueueDeclare(
}
q, err := ch.QueueDeclare(
queue, // name, queue
false, // durable
true, // delete when unused
exclusiveQueue, // exclusive
false, // no-wait
nil, // arguments
); err != nil {
)
if err != nil {
logging.LogError(err, "Failed to declare queue", "retry_wait_time", RETRY_CONNECT_DELAY)
ch.Close()
time.Sleep(RETRY_CONNECT_DELAY)
continue
} else if err = ch.QueueBind(
}
err = ch.QueueBind(
q.Name, // queue name
routingKey, // routing key
exchange, // exchange name
false, // nowait
nil, // arguments
); err != nil {
)
if err != nil {
logging.LogError(err, "Failed to bind to queue to receive messages", "retry_wait_time", RETRY_CONNECT_DELAY)
ch.Close()
time.Sleep(RETRY_CONNECT_DELAY)
continue
} else if msgs, err := ch.Consume(
}
msgs, err := ch.Consume(
q.Name, // queue name
"", // consumer
false, // auto-ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
); err != nil {
)
if err != nil {
logging.LogError(err, "Failed to start consuming on queue", "queue", q.Name)
ch.Close()
} else {
forever := make(chan bool)
go func() {
for d := range msgs {
//logging.LogDebug("got direct message", "queue", q.Name, "msg", d.Body)
go handler(d)
if err = ch.Ack(d.DeliveryTag, false); err != nil {
logging.LogError(err, "Failed to Ack message")
}
}
forever <- true
}()
logging.LogInfo("Started listening for messages", "exchange", exchange, "queue", queue, "routingKey", routingKey)
<-forever
ch.Close()
logging.LogError(nil, "Stopped listening for messages", "exchange", exchange, "queue", queue, "routingKey", routingKey)
time.Sleep(RETRY_CONNECT_DELAY)
continue
}

forever := make(chan bool)
go func() {
for d := range msgs {
//logging.LogDebug("got direct message", "queue", q.Name, "msg", d.Body)
err = ch.Ack(d.DeliveryTag, false)
if err != nil {
logging.LogError(err, "Failed to Ack message")
}
go handler(d)
}
forever <- true
}()
logging.LogInfo("Started listening for messages", "exchange", exchange, "queue", queue, "routingKey", routingKey)
<-forever
ch.Close()
logging.LogError(nil, "Stopped listening for messages", "exchange", exchange, "queue", queue, "routingKey", routingKey)
}
}
func (r *rabbitMQConnection) ReceiveFromRPCQueue(exchange string, queue string, routingKey string, handler RPCQueueHandler, exclusiveQueue bool) {
Expand Down Expand Up @@ -458,6 +468,7 @@ func (r *rabbitMQConnection) ReceiveFromRPCQueue(exchange string, queue string,
)
if err != nil {
logging.LogError(err, "Failed to declare exchange", "exchange", exchange, "exchange_type", "direct", "retry_wait_time", RETRY_CONNECT_DELAY)
ch.Close()
time.Sleep(RETRY_CONNECT_DELAY)
continue
}
Expand All @@ -484,8 +495,8 @@ func (r *rabbitMQConnection) ReceiveFromRPCQueue(exchange string, queue string,
)
if err != nil {
logging.LogError(err, "Failed to bind to queue to receive messages", "retry_wait_time", RETRY_CONNECT_DELAY)
time.Sleep(RETRY_CONNECT_DELAY)
ch.Close()
time.Sleep(RETRY_CONNECT_DELAY)
continue
}
msgs, err := ch.Consume(
Expand All @@ -500,6 +511,7 @@ func (r *rabbitMQConnection) ReceiveFromRPCQueue(exchange string, queue string,
if err != nil {
logging.LogError(err, "Failed to start consuming messages on queue", "queue", q.Name)
ch.Close()
time.Sleep(RETRY_CONNECT_DELAY)
continue
}
forever := make(chan bool)
Expand Down

0 comments on commit bbfcb19

Please sign in to comment.