Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: reduce telemetry events count #4501

Merged
merged 21 commits into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions haystack/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,3 @@

pd.options.display.max_colwidth = 80
set_pytorch_secure_model_loading()

import os


from haystack.telemetry import send_event

send_event(event_name="Haystack imported")
28 changes: 28 additions & 0 deletions haystack/agents/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

import logging
import re
from hashlib import md5
from typing import List, Optional, Union, Dict, Any

from events import Events

from haystack import Pipeline, BaseComponent, Answer, Document
from haystack.telemetry import send_event
from haystack.agents.agent_step import AgentStep
from haystack.agents.types import Color
from haystack.agents.utils import print_text
Expand Down Expand Up @@ -177,6 +179,18 @@ def __init__(
self.tool_pattern = tool_pattern
self.final_answer_pattern = final_answer_pattern
self.add_default_logging_callbacks()
self.update_hash()

def update_hash(self):
"""
Used for telemetry. Hashes the tool classnames to send an event only when they change.
See haystack/telemetry.py::send_event
"""
try:
self.hash = md5(" ".join([tool.pipeline_or_node.__class__.__name__ for tool in self.tools.values()]))
except Exception as exc:
logger.debug("Telemetry exception: %s", str(exc))
self.hash = "[an exception occurred during hashing]"

def add_default_logging_callbacks(self, agent_color: Color = Color.GREEN) -> None:
def on_tool_finish(
Expand Down Expand Up @@ -243,6 +257,13 @@ def run(
`{"Retriever": {"top_k": 10}, "Reader": {"top_k": 3}}`.
You can only pass parameters to tools that are pipelines, but not nodes.
"""
try:
if not self.hash == self.last_hash:
self.last_hash = self.hash
send_event(event_name="Agent", event_properties={"llm.agent_hash": self.hash})
except Exception as exc:
logger.debug("Telemetry exception: %s", exc)

if not self.tools:
raise AgentError(
"An Agent needs tools to run. Add at least one tool using `add_tool()` or set the parameter `tools` "
Expand Down Expand Up @@ -314,6 +335,13 @@ def run_batch(
`{"Retriever": {"top_k": 10}, "Reader": {"top_k": 3}}`.
You can only pass parameters to tools that are pipelines but not nodes.
"""
try:
if not self.hash == self.last_hash:
self.last_hash = self.hash
send_event(event_name="Agent", event_properties={"llm.agent_hash": self.hash})
except Exception as exc:
logger.debug("Telemetry exception: %s", exc)

results: Dict = {"queries": [], "answers": [], "transcripts": []}
for query in queries:
result = self.run(query=query, max_steps=max_steps, params=params)
Expand Down
9 changes: 7 additions & 2 deletions haystack/nodes/prompt/prompt_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,13 @@ def __init__(
These parameters should be supplied in the `model_kwargs` dictionary.

"""
send_event("PromptNode initialized")
send_event(
event_name="PromptNode",
event_properties={
"llm.model_name_or_path": model_name_or_path,
"llm.default_prompt_template": default_prompt_template,
},
)
super().__init__()
self.prompt_templates: Dict[str, PromptTemplate] = {pt.name: pt for pt in get_predefined_prompt_templates()} # type: ignore
self.default_prompt_template: Union[str, PromptTemplate, None] = default_prompt_template
Expand Down Expand Up @@ -755,7 +761,6 @@ def prompt(self, prompt_template: Optional[Union[str, PromptTemplate]], *args, *
:param prompt_template: The name or object of the optional PromptTemplate to use.
:return: A list of strings as model responses.
"""
send_event("PromptNode.prompt()", event_properties={"template": str(prompt_template)})
results = []
# we pop the prompt_collector kwarg to avoid passing it to the model
prompt_collector: List[Union[str, List[Dict[str, str]]]] = kwargs.pop("prompt_collector", [])
Expand Down
51 changes: 31 additions & 20 deletions haystack/pipelines/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import itertools
from functools import partial
from hashlib import md5
from time import time
from typing import Dict, List, Optional, Any, Set, Tuple, Union

Expand All @@ -20,7 +21,6 @@
from pathlib import Path

import yaml
import mmh3
import numpy as np
import pandas as pd
import networkx as nx
Expand Down Expand Up @@ -51,7 +51,7 @@
from haystack.nodes.retriever.base import BaseRetriever
from haystack.document_stores.base import BaseDocumentStore
from haystack.utils.experiment_tracking import MLflowTrackingHead, Tracker as tracker
from haystack.telemetry import send_pipeline_run_event, send_pipeline_event
from haystack.telemetry import send_event, send_pipeline_event


logger = logging.getLogger(__name__)
Expand All @@ -71,7 +71,8 @@ class Pipeline:

def __init__(self):
self.graph = DiGraph()
self.yaml_hash = False
self.config_hash = None
self.last_config_hash = None

@property
def root_node(self) -> Optional[str]:
Expand Down Expand Up @@ -425,14 +426,22 @@ def add_node(self, component: BaseComponent, name: str, inputs: List[str]):
node={"name": name, "inputs": inputs},
instance=component,
)
# TELEMETRY: Hash the config of the pipeline without node names
# to be able to cluster later by "pipeline type"
# (is any specific pipeline configuration very popular?)
fingerprint_config = copy.copy(self.get_config())
for comp in fingerprint_config["components"]:
del comp["name"]
fingerprint = json.dumps(fingerprint_config, default=str)
self.fingerprint = "{:02x}".format(mmh3.hash128(fingerprint, signed=False))
self.update_config_hash()

def update_config_hash(self):
"""
Used for telemetry. Hashes the config, except for the node names, to send an event only when the pipeline changes.
See haystack/telemetry.py::send_pipeline_event
"""
try:
config_to_hash = copy.copy(self.get_config())
for comp in config_to_hash["components"]:
del comp["name"]
config_hash = json.dumps(config_to_hash, default=str)
self.config_hash = md5(config_hash)
except Exception as exc:
logger.debug("Telemetry exception: %s", str(exc))
self.config_hash = "[an exception occurred during hashing]"

def get_node(self, name: str) -> Optional[BaseComponent]:
"""
Expand Down Expand Up @@ -483,10 +492,8 @@ def run( # type: ignore
about their execution. By default, this information includes the input parameters
the Nodes received and the output they generated. You can then find all debug information in the dictionary returned by this method under the key `_debug`.
"""
send_pipeline_run_event(
send_pipeline_event(
pipeline=self,
classname=self.__class__.__name__,
function_name="run",
query=query,
file_paths=file_paths,
labels=labels,
Expand Down Expand Up @@ -633,10 +640,8 @@ def run_batch( # type: ignore
about their execution. By default, this information includes the input parameters
the Nodes received and the output they generated. You can then find all debug information in the dictionary returned by this method under the key `_debug`.
"""
send_pipeline_run_event(
send_pipeline_event(
pipeline=self,
classname=self.__class__.__name__,
function_name="run_batch",
queries=queries,
file_paths=file_paths,
labels=labels,
Expand Down Expand Up @@ -1230,7 +1235,10 @@ def eval(
Additional information can be found here
https://huggingface.co/transformers/main_classes/model.html#transformers.PreTrainedModel.from_pretrained
"""
send_pipeline_event(pipeline=self, event_name="Evaluation", event_properties={"function_name": "eval"})
send_event(
event_name="Evaluation",
event_properties={"pipeline.classname": self.__class__.__name__, "pipeline.config_hash": self.config_hash},
)

eval_result = EvaluationResult()
if add_isolated_node_eval:
Expand Down Expand Up @@ -1348,7 +1356,10 @@ def eval_batch(
Additional information can be found here
https://huggingface.co/transformers/main_classes/model.html#transformers.PreTrainedModel.from_pretrained
"""
send_pipeline_event(pipeline=self, event_name=f"{self.__class__.__name__}.eval_batch()")
send_event(
event_name="Evaluation",
event_properties={"pipeline.classname": self.__class__.__name__, "pipeline.config_hash": self.config_hash},
)

eval_result = EvaluationResult()
if add_isolated_node_eval:
Expand Down Expand Up @@ -1985,7 +1996,6 @@ def load_from_yaml(
overwrite_with_env_variables=overwrite_with_env_variables,
strict_version_check=strict_version_check,
)
pipeline.yaml_hash = "{:02x}".format(mmh3.hash128(str(path), signed=False))
return pipeline

@classmethod
Expand Down Expand Up @@ -2056,6 +2066,7 @@ def load_from_config(
)
pipeline.add_node(component=component, name=node_config["name"], inputs=node_config["inputs"])

pipeline.update_config_hash()
return pipeline

@classmethod
Expand Down
8 changes: 4 additions & 4 deletions haystack/pipelines/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from time import time
from typing import Any, Dict, List, Optional, Tuple, Union
from pathlib import Path

import networkx as nx

try:
Expand All @@ -23,7 +24,7 @@
from haystack.nodes.base import BaseComponent, RootNode
from haystack.pipelines.base import Pipeline
from haystack.schema import Document, MultiLabel
from haystack.telemetry import send_pipeline_run_event
from haystack.telemetry import send_pipeline_event


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -130,6 +131,7 @@ def load_from_config(
inputs=node_config.get("inputs", []),
)

pipeline.update_config_hash()
return pipeline

@classmethod
Expand Down Expand Up @@ -312,10 +314,8 @@ async def run_async( # type: ignore
about their execution. By default, this information includes the input parameters
the Nodes received and the output they generated. You can then find all debug information in the dictionary returned by this method under the key `_debug`.
"""
send_pipeline_run_event(
send_pipeline_event(
pipeline=self,
classname=self.__class__.__name__,
function_name="run_async",
query=query,
file_paths=file_paths,
labels=labels,
Expand Down
68 changes: 18 additions & 50 deletions haystack/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import logging
from pathlib import Path
import json
import datetime

import yaml
import posthog
Expand Down Expand Up @@ -102,9 +101,7 @@ def send_event(self, event_name: str, event_properties: Optional[Dict[str, Any]]
logger.debug("Telemetry couldn't make a POST request to PostHog.", exc_info=e)


def send_pipeline_run_event( # type: ignore
classname: str,
function_name: str,
def send_pipeline_event( # type: ignore
pipeline: "Pipeline", # type: ignore
query: Optional[str] = None,
queries: Optional[List[str]] = None,
Expand All @@ -118,8 +115,6 @@ def send_pipeline_run_event( # type: ignore
"""
Sends a telemetry event about the execution of a pipeline, if telemetry is enabled.

:param classname: The name of the Pipeline class (Pipeline, RayPipeline, ...)
:param function_name: The name of the function that was invoked (run, run_batch, async_run, ...).
:param pipeline: the pipeline that is running
:param query: the value of the `query` input of the pipeline, if any
:param queries: the value of the `queries` input of the pipeline, if any
Expand All @@ -132,25 +127,26 @@ def send_pipeline_run_event( # type: ignore
"""
try:
if telemetry:
event_properties: Dict[str, Optional[Union[str, bool, int, Dict[str, Any]]]] = {
"class": classname,
"function_name": function_name,
}

# Check if it's the public demo
exec_context = os.environ.get(HAYSTACK_EXECUTION_CONTEXT, "")
if exec_context == "public_demo":
event_properties["pipeline.is_public_demo"] = True
event_properties["pipeline.run_parameters.query"] = query
event_properties["pipeline.run_parameters.params"] = params
telemetry.send_event(event_name=function_name, event_properties=event_properties)
event_properties = {
"pipeline.is_public_demo": True,
"pipeline.run_parameters.query": query,
"pipeline.run_parameters.params": params,
}
telemetry.send_event(event_name="Public Demo", event_properties=event_properties)
return

# Collect pipeline profile
event_properties["pipeline.classname"] = pipeline.__class__.__name__
event_properties["pipeline.fingerprint"] = pipeline.fingerprint
if pipeline.yaml_hash:
event_properties["pipeline.yaml_hash"] = pipeline.yaml_hash
# Send this event only if the pipeline config has changed
if pipeline.last_config_hash == pipeline.config_hash:
return
pipeline.last_config_hash = pipeline.config_hash

event_properties: Dict[str, Optional[Union[str, bool, int, Dict[str, Any]]]] = {
"pipeline.classname": pipeline.__class__.__name__,
"pipeline.config_hash": pipeline.config_hash,
}

# Add document store
docstore = pipeline.get_document_store()
Expand Down Expand Up @@ -190,38 +186,10 @@ def send_pipeline_run_event( # type: ignore
event_properties["pipeline.run_parameters.params"] = bool(params)
event_properties["pipeline.run_parameters.debug"] = bool(debug)

telemetry.send_event(event_name="Pipeline run", event_properties=event_properties)
except Exception as e:
# Never let telemetry break things
logger.debug("There was an issue sending a '%s' telemetry event", function_name, exc_info=e)


def send_pipeline_event(pipeline: "Pipeline", event_name: str, event_properties: Optional[Dict[str, Any]] = None): # type: ignore
"""
Send a telemetry event related to a pipeline which is not a call to run(), if telemetry is enabled.
"""
try:
if telemetry:
if not event_properties:
event_properties = {}
event_properties.update(
{
"pipeline.classname": pipeline.__class__.__name__,
"pipeline.fingerprint": pipeline.fingerprint,
"pipeline.yaml_hash": pipeline.yaml_hash,
}
)
now = datetime.datetime.now()
if pipeline.last_run:
event_properties["pipeline.since_last_run"] = (now - pipeline.last_run).total_seconds()
else:
event_properties["pipeline.since_last_run"] = 0
pipeline.last_run = now

telemetry.send_event(event_name=event_name, event_properties=event_properties)
telemetry.send_event(event_name="Pipeline", event_properties=event_properties)
except Exception as e:
# Never let telemetry break things
logger.debug("There was an issue sending a '%s' telemetry event", event_name, exc_info=e)
logger.debug("There was an issue sending a 'Pipeline' telemetry event", exc_info=e)


def send_event(event_name: str, event_properties: Optional[Dict[str, Any]] = None):
Expand Down