Skip to content

Commit

Permalink
Add Elasticsearch Document Store (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
tanaysoni authored Jan 24, 2020
1 parent a0293cc commit f83a164
Show file tree
Hide file tree
Showing 15 changed files with 164 additions and 63 deletions.
23 changes: 14 additions & 9 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,26 @@ Usage
.. image:: https://raw.githubusercontent.com/deepset-ai/haystack/master/docs/img/code_snippet_usage.png


Configuration
-------------
The configuration can be supplied in a :code:`qa_config.py` placed in the PYTHONPATH. Alternatively, the :code:`DATABASE_URL` can also be set as an environment variable.


Deployment
==========

SQL Backend
-----------
The database ORM layer is implemented using SQLAlchemy library. By default, it uses the file-based SQLite database. For large scale deployments, the configuration can be changed to use other compatible databases like PostgreSQL or MySQL.
Haystack has an extensible document store layer.
There are currently implementations of Elasticsearch and SQL (see :code:`haystack.database.elasticsearch.ElasticsearchDocumentStore` and :code:`haystack.database.sql.SQLDocumentStore`).

Elasticsearch Backend
----------------------
(Coming soon)
---------------------
Elasticsearch is recommended for deploying on a large scale. The documents can optionally be chunked into smaller units (e.g., paragraphs) before indexing to make the results returned by the Retriever more granular and accurate.
Retrievers can access an Elasticsearch index to find the relevant paragraphs(or documents) for a query. The default `ElasticsearchRetriever` uses Elasticsearch's native scoring (BM25), but can be extended easily with custom implementations.

You can get started by running a single Elasticsearch node using docker::

docker run -d -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.5.1

SQL Backend
-----------
The SQL backend layer is mainly meant to simplify the first development steps. By default, a local file-based SQLite database is initialized.
However, if you prefer a PostgreSQL or MySQL backend for production, you can easily configure this since our implementation is based on SQLAlchemy.

REST API
--------
Expand Down
Binary file modified docs/img/code_snippet_usage.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 2 additions & 5 deletions haystack/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from haystack.retriever.tfidf import TfidfRetriever
from haystack.reader.farm import FARMReader
import logging

import pandas as pd

pd.options.display.max_colwidth = 80

logger = logging.getLogger(__name__)
Expand All @@ -21,8 +20,6 @@ class Finder:

def __init__(self, reader, retriever):
self.retriever = retriever
self.retriever.fit()

self.reader = reader

def get_answers(self, question, top_k_reader=1, top_k_retriever=10, filters=None):
Expand All @@ -39,7 +36,7 @@ def get_answers(self, question, top_k_reader=1, top_k_retriever=10, filters=None

# 1) Optional: reduce the search space via document tags
if filters:
candidate_doc_ids = self.retriever.datastore.get_document_ids_by_tags(filters)
candidate_doc_ids = self.retriever.document_store.get_document_ids_by_tags(filters)
else:
candidate_doc_ids = None

Expand Down
2 changes: 1 addition & 1 deletion haystack/database/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

class BaseDocumentStore:
"""
Base class for implementing DataStores.
Base class for implementing Document Stores.
"""

@abstractmethod
Expand Down
85 changes: 85 additions & 0 deletions haystack/database/elasticsearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search, Document as ESDoc, Text, connections
from haystack.database.base import BaseDocumentStore


class Document(ESDoc):
name = Text()
text = Text()
tags = Text()

class Index:
name = "document"


class ElasticsearchDocumentStore(BaseDocumentStore):
def __init__(self, host="localhost", username="", password="", index="document"):
self.client = Elasticsearch(hosts=[{"host": host}], http_auth=(username, password))
self.connections = connections.create_connection(hosts=[{"host": host}], http_auth=(username, password))
Document.init() # create mapping if not exists.
self.index = index

def get_document_by_id(self, id):
query = {"filter": {"term": {"_id": id}}}
result = self.client.search(index=self.index, body=query)["hits"]["hits"]
if result:
document = {"id": result["_id"], "name": result["name"], "text": result["text"]}
else:
document = None
return document

def get_document_ids_by_tags(self, tags):
query = {
"query": {
"bool": {
"should": [
{
"terms": {
"tags": tags
}
}
]
}
}
}
result = self.client.search(index=self.index, body=query)["hits"]["hits"]
documents = []
for hit in result:
documents.append({"id": hit["_id"], "name": hit["name"], "text": hit["text"]})
return documents

def write_documents(self, documents):
for doc in documents:
d = Document(
name=doc["name"],
text=doc["text"],
document_id=doc.get("document_id", None),
tags=doc.get("tags", None),
)
d.save()

def get_document_count(self):
s = Search(using=self.client, index=self.index)
return s.count()

def get_all_documents(self):
search = Search(using=self.client, index=self.index)
documents = []
for hit in search:
documents.append(
{
"id": hit.meta["id"],
"name": hit["name"],
"text": hit["text"],
}
)
return documents

def query(self, query, top_k=10):
search = Search(using=self.client, index=self.index).query("match", text=query)[:top_k].execute()
paragraphs = []
meta_data = []
for hit in search:
paragraphs.append(hit["text"])
meta_data.append({"paragraph_id": hit.meta["id"], "document_id": hit["document_id"]})
return paragraphs, meta_data
5 changes: 2 additions & 3 deletions haystack/database/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,8 @@ def get_document_ids_by_tags(self, tags):
GROUP BY dt.document_id
"""
tag_filters = []
for tag, value in tags.items():
if value:
tag_filters.append(f"SUM(CASE WHEN t.value='{value}' THEN 1 ELSE 0 END) > 0")
for tag in tags:
tag_filters.append(f"SUM(CASE WHEN t.value='{tag}' THEN 1 ELSE 0 END) > 0")

final_query = f"{query} HAVING {' AND '.join(tag_filters)});"
query_results = self.session.execute(final_query)
Expand Down
36 changes: 25 additions & 11 deletions haystack/indexing/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
logger = logging.getLogger(__name__)


def write_documents_to_db(datastore, document_dir, clean_func=None, only_empty_db=False):
def write_documents_to_db(document_store, document_dir, clean_func=None, only_empty_db=False, split_paragrahs=False):
"""
Write all text files(.txt) in the sub-directories of the given path to the connected database.
Expand All @@ -22,28 +22,42 @@ def write_documents_to_db(datastore, document_dir, clean_func=None, only_empty_d

# check if db has already docs
if only_empty_db:
n_docs = datastore.get_document_count()
n_docs = document_store.get_document_count()
if n_docs > 0:
logger.info(f"Skip writing documents since DB already contains {n_docs} docs ... "
"(Disable `only_empty_db`, if you want to add docs anyway.)")
return None

# read and add docs
documents_to_write = []
docs_to_index = []
doc_id = 1
for path in file_paths:
with open(path) as doc:
text = doc.read()
if clean_func:
text = clean_func(text)

documents_to_write.append(
{
"name": path.name,
"text": text,
}
)
datastore.write_documents(documents_to_write)
logger.info(f"Wrote {len(documents_to_write)} docs to DB")
if split_paragrahs:
for para in text.split("\n\n"):
if not para.strip(): # skip empty paragraphs
continue
docs_to_index.append(
{
"name": path.name,
"text": para,
"document_id": doc_id
}
)
doc_id += 1
else:
docs_to_index.append(
{
"name": path.name,
"text": text,
}
)
document_store.write_documents(docs_to_index)
logger.info(f"Wrote {len(docs_to_index)} docs to DB")


def fetch_archive_from_http(url, output_dir, proxies=None):
Expand Down
7 changes: 7 additions & 0 deletions haystack/retriever/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from abc import ABC, abstractmethod


class BaseRetriever(ABC):
@abstractmethod
def retrieve(self, query, candidate_doc_ids=None, top_k=1):
pass
9 changes: 9 additions & 0 deletions haystack/retriever/elasticsearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from haystack.retriever.base import BaseRetriever


class ElasticsearchRetriever(BaseRetriever):
def __init__(self, document_store):
self.document_store = document_store

def retrieve(self, query, candidate_doc_ids=None, top_k=10):
return self.document_store.query(query, top_k)
26 changes: 6 additions & 20 deletions haystack/retriever/tfidf.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from abc import ABC, abstractmethod
from collections import OrderedDict, namedtuple
import logging
from collections import OrderedDict, namedtuple

import pandas as pd
from haystack.retriever.base import BaseRetriever
from sklearn.feature_extraction.text import TfidfVectorizer


Expand All @@ -11,20 +12,6 @@
Paragraph = namedtuple("Paragraph", ["paragraph_id", "document_id", "text"])


class BaseRetriever(ABC):
@abstractmethod
def _get_all_paragraphs(self):
pass

@abstractmethod
def retrieve(self, query, candidate_doc_ids=None, top_k=1):
pass

@abstractmethod
def fit(self):
pass


class TfidfRetriever(BaseRetriever):
"""
Read all documents from a SQL backend.
Expand All @@ -35,15 +22,15 @@ class TfidfRetriever(BaseRetriever):
It uses sklearn's TfidfVectorizer to compute a tf-idf matrix.
"""

def __init__(self, datastore):
def __init__(self, document_store):
self.vectorizer = TfidfVectorizer(
lowercase=True,
stop_words=None,
token_pattern=r"(?u)\b\w\w+\b",
ngram_range=(1, 1),
)

self.datastore = datastore
self.document_store = document_store
self.paragraphs = self._get_all_paragraphs()
self.df = None
self.fit()
Expand All @@ -52,12 +39,11 @@ def _get_all_paragraphs(self):
"""
Split the list of documents in paragraphs
"""
documents = self.datastore.get_all_documents()
documents = self.document_store.get_all_documents()

paragraphs = []
p_id = 0
for doc in documents:
_pgs = [d for d in doc["text"].splitlines() if d.strip()]
for p in doc["text"].split("\n\n"):
if not p.strip(): # skip empty paragraphs
continue
Expand Down
1 change: 0 additions & 1 deletion qa_config.py

This file was deleted.

8 changes: 4 additions & 4 deletions test/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@


def test_db_write_read():
sql_datastore = SQLDocumentStore()
write_documents_to_db(datastore=sql_datastore, document_dir="samples/docs")
documents = sql_datastore.get_all_documents()
sql_document_store = SQLDocumentStore()
write_documents_to_db(document_store=sql_document_store, document_dir="samples/docs")
documents = sql_document_store.get_all_documents()
assert len(documents) == 2
doc = sql_datastore.get_document_by_id("1")
doc = sql_document_store.get_document_by_id("1")
assert doc.keys() == {"id", "name", "text", "tags"}
6 changes: 3 additions & 3 deletions tutorials/Tutorial1_Basic_QA_Pipeline.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@
"# The documents can be stored in different types of \"DocumentStores\".\n",
"# For dev we suggest a light-weight SQL DB\n",
"# For production we suggest elasticsearch\n",
"datastore = SQLDocumentStore(url=\"sqlite:///qa.db\")\n",
"document_store = SQLDocumentStore(url=\"sqlite:///qa.db\")\n",
"\n",
"# Now, let's write the docs to our DB.\n",
"# You can optionally supply a cleaning function that is applied to each doc (e.g. to remove footers)\n",
"# It must take a str as input, and return a str.\n",
"write_documents_to_db(datastore=datastore, document_dir=doc_dir, clean_func=clean_wiki_text, only_empty_db=True)"
"write_documents_to_db(document_store=document_store, document_dir=doc_dir, clean_func=clean_wiki_text, only_empty_db=True)"
]
},
{
Expand Down Expand Up @@ -122,7 +122,7 @@
"source": [
"# A retriever identifies the k most promising chunks of text that might contain the answer for our question\n",
"# Retrievers use some simple but fast algorithm, here: TF-IDF\n",
"retriever = TfidfRetriever(datastore=datastore)"
"retriever = TfidfRetriever(document_store=document_store)"
]
},
{
Expand Down
6 changes: 3 additions & 3 deletions tutorials/Tutorial1_Basic_QA_Pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@
# The documents can be stored in different types of "DocumentStores".
# For dev we suggest a light-weight SQL DB
# For production we suggest elasticsearch
datastore = SQLDocumentStore(url="sqlite:///qa.db")
document_store = SQLDocumentStore(url="sqlite:///qa.db")

# Now, let's write the docs to our DB.
# You can optionally supply a cleaning function that is applied to each doc (e.g. to remove footers)
# It must take a str as input, and return a str.
write_documents_to_db(datastore=datastore, document_dir=doc_dir, clean_func=clean_wiki_text, only_empty_db=True)
write_documents_to_db(document_store=document_store, document_dir=doc_dir, clean_func=clean_wiki_text, only_empty_db=True)

## Initalize Reader, Retriever & Finder

# A retriever identifies the k most promising chunks of text that might contain the answer for our question
# Retrievers use some simple but fast algorithm, here: TF-IDF
retriever = TfidfRetriever(datastore=datastore)
retriever = TfidfRetriever(document_store=document_store)

# A reader scans the text chunks in detail and extracts the k best answers
# Reader use more powerful but slower deep learning models
Expand Down
6 changes: 3 additions & 3 deletions tutorials/Tutorial2_Finetune_a_model_on_your_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@


# Init Document store & write docs to it
datastore = SQLDocumentStore(url="sqlite:///qa.db")
write_documents_to_db(datastore=datastore, document_dir=doc_dir, clean_func=clean_wiki_text, only_empty_db=True)
document_store = SQLDocumentStore(url="sqlite:///qa.db")
write_documents_to_db(document_store=document_store, document_dir=doc_dir, clean_func=clean_wiki_text, only_empty_db=True)

## Initalize Reader, Retriever & Finder

# A retriever identifies the k most promising chunks of text that might contain the answer for our question
# Retrievers use some simple but fast algorithm, here: TF-IDF
retriever = TfidfRetriever(datastore=datastore)
retriever = TfidfRetriever(document_store=document_store)

# The Finder sticks together retriever and retriever in a pipeline to answer our actual questions
finder = Finder(reader, retriever)
Expand Down

0 comments on commit f83a164

Please sign in to comment.