Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for custom memory storage #2280

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions docs/concepts/custom_memory_storage.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# Custom Memory Storage

CrewAI supports custom memory storage implementations for different memory types. You can provide your own storage implementation by extending the `Storage` interface and passing it to the memory instances or through the `memory_config` parameter.

## Implementing a Custom Storage

To create a custom storage implementation, you need to extend the `Storage` interface and implement the required methods:

```python
from typing import Any, Dict, List
from crewai.memory.storage.interface import Storage

class CustomStorage(Storage):
"""Custom storage implementation."""

def __init__(self):
# Initialize your storage backend
self.data = []

def save(self, value: Any, metadata: Dict[str, Any]) -> None:
"""Save a value with metadata to the storage."""
# Implement your save logic
self.data.append({"value": value, "metadata": metadata})

def search(
self, query: str, limit: int = 3, score_threshold: float = 0.35
) -> List[Any]:
"""Search for values in the storage."""
# Implement your search logic
return [{"context": item["value"], "metadata": item["metadata"]} for item in self.data]

def reset(self) -> None:
"""Reset the storage."""
# Implement your reset logic
self.data = []
```

## Using Custom Storage

There are two ways to provide custom storage implementations to CrewAI:

### 1. Pass Custom Storage to Memory Instances

You can create memory instances with custom storage and pass them to the Crew:

```python
from crewai import Crew, Agent
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.user.user_memory import UserMemory

# Create custom storage instances
short_term_storage = CustomStorage()
long_term_storage = CustomStorage()
entity_storage = CustomStorage()
user_storage = CustomStorage()

# Create memory instances with custom storage
short_term_memory = ShortTermMemory(storage=short_term_storage)
long_term_memory = LongTermMemory(storage=long_term_storage)
entity_memory = EntityMemory(storage=entity_storage)
user_memory = UserMemory(storage=user_storage)

# Create a crew with custom memory instances
crew = Crew(
agents=[Agent(role="researcher", goal="research", backstory="I am a researcher")],
memory=True,
short_term_memory=short_term_memory,
long_term_memory=long_term_memory,
entity_memory=entity_memory,
memory_config={"user_memory": user_memory},
)
```

### 2. Pass Custom Storage through Memory Config

You can also provide custom storage implementations through the `memory_config` parameter:

```python
from crewai import Crew, Agent

# Create a crew with custom storage in memory_config
crew = Crew(
agents=[Agent(role="researcher", goal="research", backstory="I am a researcher")],
memory=True,
memory_config={
"storage": {
"short_term": CustomStorage(),
"long_term": CustomStorage(),
"entity": CustomStorage(),
"user": CustomStorage(),
}
},
)
```

## Example: Redis Storage

Here's an example of a custom storage implementation using Redis:

```python
import json
import redis
from typing import Any, Dict, List
from crewai.memory.storage.interface import Storage

class RedisStorage(Storage):
"""Redis-based storage implementation."""

def __init__(self, redis_url="redis://localhost:6379/0", prefix="crewai"):
self.redis = redis.from_url(redis_url)
self.prefix = prefix

def save(self, value: Any, metadata: Dict[str, Any]) -> None:
"""Save a value with metadata to Redis."""
key = f"{self.prefix}:{len(self.redis.keys(f'{self.prefix}:*'))}"
data = {"value": value, "metadata": metadata}
self.redis.set(key, json.dumps(data))

def search(
self, query: str, limit: int = 3, score_threshold: float = 0.35
) -> List[Any]:
"""Search for values in Redis."""
# This is a simple implementation that returns all values
# In a real implementation, you would use Redis search capabilities
results = []
for key in self.redis.keys(f"{self.prefix}:*"):
data = json.loads(self.redis.get(key))
results.append({"context": data["value"], "metadata": data["metadata"]})
if len(results) >= limit:
break
return results

def reset(self) -> None:
"""Reset the Redis storage."""
for key in self.redis.keys(f"{self.prefix}:*"):
self.redis.delete(key)
```

## Benefits of Custom Storage

Using custom storage implementations allows you to:

1. Store memory data in external databases or services
2. Implement custom search algorithms
3. Share memory between different crews or applications
4. Persist memory across application restarts
5. Implement custom memory retention policies

By extending the `Storage` interface, you can integrate CrewAI with any storage backend that suits your needs.
20 changes: 18 additions & 2 deletions src/crewai/crew.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,21 +262,37 @@ def set_private_attrs(self) -> "Crew":
def create_crew_memory(self) -> "Crew":
"""Set private attributes."""
if self.memory:
from crewai.memory.storage.rag_storage import RAGStorage

# Create default storage instances for each memory type if needed
long_term_storage = RAGStorage(type="long_term", crew=self, embedder_config=self.embedder)
short_term_storage = RAGStorage(type="short_term", crew=self, embedder_config=self.embedder)
entity_storage = RAGStorage(type="entity", crew=self, embedder_config=self.embedder)

self._long_term_memory = (
self.long_term_memory if self.long_term_memory else LongTermMemory()
self.long_term_memory if self.long_term_memory else LongTermMemory(
crew=self,
embedder_config=self.embedder,
storage=long_term_storage
)
)
self._short_term_memory = (
self.short_term_memory
if self.short_term_memory
else ShortTermMemory(
crew=self,
embedder_config=self.embedder,
storage=short_term_storage
)
)
self._entity_memory = (
self.entity_memory
if self.entity_memory
else EntityMemory(crew=self, embedder_config=self.embedder)
else EntityMemory(
crew=self,
embedder_config=self.embedder,
storage=entity_storage
)
)
if (
self.memory_config and "user_memory" in self.memory_config
Expand Down
10 changes: 5 additions & 5 deletions src/crewai/memory/contextual/contextual_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def _fetch_stm_context(self, query) -> str:
stm_results = self.stm.search(query)
formatted_results = "\n".join(
[
f"- {result['memory'] if self.memory_provider == 'mem0' else result['context']}"
f"- {result.get('memory', result.get('context', ''))}"
for result in stm_results
]
)
Expand All @@ -58,7 +58,7 @@ def _fetch_ltm_context(self, task) -> Optional[str]:
Fetches historical data or insights from LTM that are relevant to the task's description and expected_output,
formatted as bullet points.
"""
ltm_results = self.ltm.search(task, latest_n=2)
ltm_results = self.ltm.search(query=task, limit=2)
if not ltm_results:
return None

Expand All @@ -80,9 +80,9 @@ def _fetch_entity_context(self, query) -> str:
em_results = self.em.search(query)
formatted_results = "\n".join(
[
f"- {result['memory'] if self.memory_provider == 'mem0' else result['context']}"
f"- {result.get('memory', result.get('context', ''))}"
for result in em_results
] # type: ignore # Invalid index type "str" for "str"; expected type "SupportsIndex | slice"
]
)
return f"Entities:\n{formatted_results}" if em_results else ""

Expand All @@ -99,6 +99,6 @@ def _fetch_user_context(self, query: str) -> str:
return ""

formatted_memories = "\n".join(
f"- {result['memory']}" for result in user_memories
f"- {result.get('memory', result.get('context', ''))}" for result in user_memories
)
return f"User memories/preferences:\n{formatted_memories}"
88 changes: 56 additions & 32 deletions src/crewai/memory/entity/entity_memory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional
from typing import Any, Dict, Optional

from pydantic import PrivateAttr

Expand All @@ -17,47 +17,71 @@ class EntityMemory(Memory):
_memory_provider: Optional[str] = PrivateAttr()

def __init__(self, crew=None, embedder_config=None, storage=None, path=None):
memory_provider = None
memory_config = None

if crew and hasattr(crew, "memory_config") and crew.memory_config is not None:
memory_provider = crew.memory_config.get("provider")
else:
memory_provider = None

if memory_provider == "mem0":
memory_config = crew.memory_config
memory_provider = memory_config.get("provider")

# If no storage is provided, try to create one
if storage is None:
try:
from crewai.memory.storage.mem0_storage import Mem0Storage
except ImportError:
raise ImportError(
"Mem0 is not installed. Please install it with `pip install mem0ai`."
# Try to select storage using helper method
storage = self._select_storage(
storage=storage,
memory_config=memory_config,
storage_type="entity",
crew=crew,
path=path,
default_storage_factory=lambda path, crew: RAGStorage(
type="entities",
allow_reset=True,
crew=crew,
embedder_config=embedder_config,
path=path,
)
)
storage = Mem0Storage(type="entities", crew=crew)
else:
storage = (
storage
if storage
else RAGStorage(
except ValueError:
# Fallback to default storage
storage = RAGStorage(
type="entities",
allow_reset=True,
embedder_config=embedder_config,
crew=crew,
embedder_config=embedder_config,
path=path,
)
)

super().__init__(storage=storage)
self._memory_provider = memory_provider

# Initialize with parameters
super().__init__(
storage=storage,
embedder_config=embedder_config,
memory_provider=memory_provider
)


def save(self, item: EntityMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory"
"""Saves an entity item into the SQLite storage."""
if self._memory_provider == "mem0":
data = f"""
Remember details about the following entity:
Name: {item.name}
Type: {item.type}
Entity Description: {item.description}
"""
def save(
self,
value: Any,
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
"""Saves an entity item or value into the storage."""
if isinstance(value, EntityMemoryItem):
item = value
if self.memory_provider == "mem0":
data = f"""
Remember details about the following entity:
Name: {item.name}
Type: {item.type}
Entity Description: {item.description}
"""
else:
data = f"{item.name}({item.type}): {item.description}"
super().save(data, item.metadata)
else:
data = f"{item.name}({item.type}): {item.description}"
super().save(data, item.metadata)
# Handle regular value and metadata
super().save(value, metadata, agent)

def reset(self) -> None:
try:
Expand Down
Loading
Loading