Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add event dispatcher #2908

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .env.template
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,21 @@ ALLOWLISTED_PLUGINS=
################################################################################
# CHAT_MESSAGES_ENABLED - Enable chat messages (Default: False)
# CHAT_MESSAGES_ENABLED=False

################################################################################
### MICROSERVICE INTEGRATION
################################################################################

### EVENT_DISPATCHER - Fires (and forgets) asynchronously prompt log output as HTTP POST events to defined endpoint

## EVENT_DISPATCHER_ENABLED - Enabling/Disabling (Example: True, Default: False)
## EVENT_DISPATCHER_PROTOCOL - Using protocol (Example: http, Default: http)
## EVENT_DISPATCHER_HOST - Target host (Example: events.acme.com, Default: localhost)
## EVENT_DISPATCHER_ENDPOINT - Target endpoint (Example: /other-microservice, Default: /events)
## EVENT_DISPATCHER_PORT - Target port (Example: 45000, Default: 45000)

#EVENT_DISPATCHER_ENABLED=True
#EVENT_DISPATCHER_PROTOCOL=http
#EVENT_DISPATCHER_HOST=localhost
#EVENT_DISPATCHER_ENDPOINT=/events
#EVENT_DISPATCHER_PORT=45000
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ Please see the [documentation][docs] for full setup instructions and configurati
* [🧠 Memory](https://significant-gravitas.github.io/Auto-GPT/configuration/memory/)
* [🗣️ Voice (TTS)](https://significant-gravitas.github.io/Auto-GPT/configuration/voice/)
* [🖼️ Image Generation](https://significant-gravitas.github.io/Auto-GPT/configuration/imagegen/)
* [☄️ Event Dispatcher](/docs/configuration/eventdispatcher.md)

[docs/setup]: https://significant-gravitas.github.io/Auto-GPT/setup/
[docs/usage]: https://significant-gravitas.github.io/Auto-GPT/usage/
Expand Down
11 changes: 11 additions & 0 deletions autogpt/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,17 @@ def __init__(self) -> None:
self.plugins_allowlist = []
self.plugins_denylist = []

# event dispatcher configuration
self.event_dispatcher_enabled = os.getenv("EVENT_DISPATCHER_ENABLED", "False")
self.event_dispatcher_protocol = os.getenv("EVENT_DISPATCHER_PROTOCOL", "http")
self.event_dispatcher_host = os.getenv(
"EVENT_DISPATCHER_HOST",
)
self.event_dispatcher_endpoint = os.getenv(
"EVENT_DISPATCHER_ENDPOINT", "/events"
)
self.event_dispatcher_port = os.getenv("EVENT_DISPATCHER_PORT", 45000)

def get_azure_deployment_id_for_model(self, model: str) -> str:
"""
Returns the relevant deployment id for the model specified.
Expand Down
62 changes: 62 additions & 0 deletions autogpt/event_dispatcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import json
import threading
import time
import urllib.request
from typing import Dict

from autogpt.config import Config

# Retrieve configuration
CFG = Config()

# Build URL for target
url = f"{CFG.event_dispatcher_protocol}://{CFG.event_dispatcher_host}:{CFG.event_dispatcher_port}{CFG.event_dispatcher_endpoint}"


# Fires and forgets the data to the configured endpoint
def fire_and_forget(url: str, data: Dict[str, any], headers: Dict[str, any]):
"""
Send a JSON payload to a RESTful endpoint asynchronously.

Args:
- url (str): the endpoint URL where the data is sent to
- data (dict): a dictionary containing the payload data to be sent as JSON
- headers (dict): a dictionary containing headers

Returns:
- None
"""

# ensure later import (dependency injection)
from autogpt.logs import logger

json_payload = json.dumps(data).encode("utf-8")

# Callback definition for thread
def send_request():
try:
# send the request
req = urllib.request.Request(
tanns2 marked this conversation as resolved.
Show resolved Hide resolved
url, data=json_payload, headers=headers, method="POST"
)
if CFG.debug_mode:
logger.debug(f"Sending: {req}", "[Event Dispatcher]")
with urllib.request.urlopen(req) as response:
pass # do nothing with the response
except Exception as e: # ensure catching all exceptions
if CFG.debug_mode:
logger.error("[Event Dispatcher]", f"Error: {e}")

# Start a new thread to send the request
t = threading.Thread(target=send_request)
t.start()


# Helper method for passing data and config
def fire(data: Dict[str, any]):
headers = {
"Content-type": "application/json",
"Event-time": str(int(time.time_ns())),
"Event-origin": "AutoGPT",
}
fire_and_forget(url, data, headers)
14 changes: 14 additions & 0 deletions autogpt/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@

from colorama import Fore, Style

import autogpt.event_dispatcher
from autogpt.config import Config
from autogpt.singleton import Singleton
from autogpt.speech import say_text

CFG = Config()


class Logger(metaclass=Singleton):
"""
Expand Down Expand Up @@ -92,6 +96,16 @@ def typewriter_log(
level, content, extra={"title": title, "color": title_color}
)

# Additionally dispatch output to defined event endpoint
if CFG.event_dispatcher_enabled == "True":
autogpt.event_dispatcher.fire(
{
level: level,
title: title,
content: content,
}
)

def debug(
self,
message,
Expand Down
13 changes: 13 additions & 0 deletions docs/configuration/eventdispatcher.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
## Event Dispatcher

By default, Auto-GPT writes output to the prompt. The event dispatcher enables sending all console output to an endpoint where further actions, such as an enterprise service bus or 3rd party applications reacting to ongoing output, can take place. The event dispatcher asynchronously fires and forgets such events through the HTTP POST method.

To enable the event dispatcher, set these variables in your `.env`:

```bash
EVENT_DISPATCHER_ENABLED=True
EVENT_DISPATCHER_PROTOCOL=http
EVENT_DISPATCHER_HOST=localhost
EVENT_DISPATCHER_ENDPOINT=/events
EVENT_DISPATCHER_PORT=45000
```
67 changes: 67 additions & 0 deletions tests/unit/test_event_dispatcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import json
import urllib
from unittest import mock
from unittest.mock import MagicMock

import pytest

from autogpt.event_dispatcher import fire, fire_and_forget


@pytest.fixture
def mock_cfg():
cfg_mock = MagicMock()
cfg_mock.event_dispatcher_protocol = "http"
cfg_mock.event_dispatcher_host = "localhost"
cfg_mock.event_dispatcher_port = 8080
cfg_mock.event_dispatcher_endpoint = "/endpoint"
cfg_mock.debug_mode = True
return cfg_mock


def test_fire_and_forget(mock_cfg, monkeypatch, mocker):
mock_thread_class = mocker.MagicMock()
monkeypatch.setattr("autogpt.event_dispatcher.CFG", mock_cfg)
monkeypatch.setattr("urllib.request.urlopen", MagicMock())
monkeypatch.setattr("time.time_ns", lambda: 1234567890)
monkeypatch.setattr("threading.Thread", mock_thread_class)

data = {"key": "value"}
headers = {
"Content-type": "application/json",
"Event-time": "1234567890",
"Event-origin": "AutoGPT",
}
expected_url = "http://localhost:8080/endpoint"
fire_and_forget(expected_url, data, headers)
mock_thread_class.assert_called_once_with(target=mock.ANY)


def test_fire_and_forget_with_exception(mock_cfg, monkeypatch, mocker):
mock_thread_class = mocker.MagicMock()
monkeypatch.setattr("autogpt.event_dispatcher.CFG", mock_cfg)
monkeypatch.setattr("time.time_ns", lambda: 1234567890)
monkeypatch.setattr("threading.Thread", mock_thread_class)

data = {"key": "value"}
headers = {
"Content-type": "application/json",
"Event-time": "1234567890",
"Event-origin": "AutoGPT",
}
expected_url = "http://localhost:8080/endpoint"
with mock.patch("urllib.request.urlopen", side_effect=Exception()):
fire_and_forget(expected_url, data, headers)
mock_thread_class.assert_called_once_with(target=mock.ANY)


def test_fire(monkeypatch):
mock_fire_and_forget = MagicMock()
monkeypatch.setattr(
"autogpt.event_dispatcher.fire_and_forget", mock_fire_and_forget
)

data = {"key": "value"}
fire(data)

mock_fire_and_forget.assert_called_once()