diff --git a/haystack/__init__.py b/haystack/__init__.py index 1ce8f176e3..9eeaa7db7b 100644 --- a/haystack/__init__.py +++ b/haystack/__init__.py @@ -25,10 +25,3 @@ pd.options.display.max_colwidth = 80 set_pytorch_secure_model_loading() - -import os - - -from haystack.telemetry import send_event - -send_event(event_name="Haystack imported") diff --git a/haystack/agents/base.py b/haystack/agents/base.py index fd86d8f9bb..92243b0398 100644 --- a/haystack/agents/base.py +++ b/haystack/agents/base.py @@ -2,11 +2,13 @@ import logging import re +from hashlib import md5 from typing import List, Optional, Union, Dict, Any from events import Events from haystack import Pipeline, BaseComponent, Answer, Document +from haystack.telemetry import send_event from haystack.agents.agent_step import AgentStep from haystack.agents.types import Color from haystack.agents.utils import print_text @@ -177,6 +179,20 @@ def __init__( self.tool_pattern = tool_pattern self.final_answer_pattern = final_answer_pattern self.add_default_logging_callbacks() + self.hash = None + self.last_hash = None + self.update_hash() + + def update_hash(self): + """ + Used for telemetry. Hashes the tool classnames to send an event only when they change. + See haystack/telemetry.py::send_event + """ + try: + self.hash = md5(" ".join([tool.pipeline_or_node.__class__.__name__ for tool in self.tools.values()])) + except Exception as exc: + logger.debug("Telemetry exception: %s", str(exc)) + self.hash = "[an exception occurred during hashing]" def add_default_logging_callbacks(self, agent_color: Color = Color.GREEN) -> None: def on_tool_finish( @@ -243,6 +259,13 @@ def run( `{"Retriever": {"top_k": 10}, "Reader": {"top_k": 3}}`. You can only pass parameters to tools that are pipelines, but not nodes. """ + try: + if not self.hash == self.last_hash: + self.last_hash = self.hash + send_event(event_name="Agent", event_properties={"llm.agent_hash": self.hash}) + except Exception as exc: + logger.debug("Telemetry exception: %s", exc) + if not self.tools: raise AgentError( "An Agent needs tools to run. Add at least one tool using `add_tool()` or set the parameter `tools` " @@ -314,6 +337,13 @@ def run_batch( `{"Retriever": {"top_k": 10}, "Reader": {"top_k": 3}}`. You can only pass parameters to tools that are pipelines but not nodes. """ + try: + if not self.hash == self.last_hash: + self.last_hash = self.hash + send_event(event_name="Agent", event_properties={"llm.agent_hash": self.hash}) + except Exception as exc: + logger.debug("Telemetry exception: %s", exc) + results: Dict = {"queries": [], "answers": [], "transcripts": []} for query in queries: result = self.run(query=query, max_steps=max_steps, params=params) diff --git a/haystack/nodes/prompt/prompt_node.py b/haystack/nodes/prompt/prompt_node.py index a14d672d17..026837f80c 100644 --- a/haystack/nodes/prompt/prompt_node.py +++ b/haystack/nodes/prompt/prompt_node.py @@ -694,7 +694,13 @@ def __init__( These parameters should be supplied in the `model_kwargs` dictionary. """ - send_event("PromptNode initialized") + send_event( + event_name="PromptNode", + event_properties={ + "llm.model_name_or_path": model_name_or_path, + "llm.default_prompt_template": default_prompt_template, + }, + ) super().__init__() self.prompt_templates: Dict[str, PromptTemplate] = {pt.name: pt for pt in get_predefined_prompt_templates()} # type: ignore self.default_prompt_template: Union[str, PromptTemplate, None] = default_prompt_template @@ -755,7 +761,6 @@ def prompt(self, prompt_template: Optional[Union[str, PromptTemplate]], *args, * :param prompt_template: The name or object of the optional PromptTemplate to use. :return: A list of strings as model responses. """ - send_event("PromptNode.prompt()", event_properties={"template": str(prompt_template)}) results = [] # we pop the prompt_collector kwarg to avoid passing it to the model prompt_collector: List[Union[str, List[Dict[str, str]]]] = kwargs.pop("prompt_collector", []) diff --git a/haystack/pipelines/base.py b/haystack/pipelines/base.py index 5881004243..29923580ae 100644 --- a/haystack/pipelines/base.py +++ b/haystack/pipelines/base.py @@ -4,6 +4,7 @@ import itertools from functools import partial +from hashlib import md5 from time import time from typing import Dict, List, Optional, Any, Set, Tuple, Union @@ -20,7 +21,6 @@ from pathlib import Path import yaml -import mmh3 import numpy as np import pandas as pd import networkx as nx @@ -51,7 +51,7 @@ from haystack.nodes.retriever.base import BaseRetriever from haystack.document_stores.base import BaseDocumentStore from haystack.utils.experiment_tracking import MLflowTrackingHead, Tracker as tracker -from haystack.telemetry import send_pipeline_run_event, send_pipeline_event +from haystack.telemetry import send_event, send_pipeline_event logger = logging.getLogger(__name__) @@ -71,7 +71,8 @@ class Pipeline: def __init__(self): self.graph = DiGraph() - self.yaml_hash = False + self.config_hash = None + self.last_config_hash = None @property def root_node(self) -> Optional[str]: @@ -425,14 +426,22 @@ def add_node(self, component: BaseComponent, name: str, inputs: List[str]): node={"name": name, "inputs": inputs}, instance=component, ) - # TELEMETRY: Hash the config of the pipeline without node names - # to be able to cluster later by "pipeline type" - # (is any specific pipeline configuration very popular?) - fingerprint_config = copy.copy(self.get_config()) - for comp in fingerprint_config["components"]: - del comp["name"] - fingerprint = json.dumps(fingerprint_config, default=str) - self.fingerprint = "{:02x}".format(mmh3.hash128(fingerprint, signed=False)) + self.update_config_hash() + + def update_config_hash(self): + """ + Used for telemetry. Hashes the config, except for the node names, to send an event only when the pipeline changes. + See haystack/telemetry.py::send_pipeline_event + """ + try: + config_to_hash = copy.copy(self.get_config()) + for comp in config_to_hash["components"]: + del comp["name"] + config_hash = json.dumps(config_to_hash, default=str) + self.config_hash = md5(config_hash) + except Exception as exc: + logger.debug("Telemetry exception: %s", str(exc)) + self.config_hash = "[an exception occurred during hashing]" def get_node(self, name: str) -> Optional[BaseComponent]: """ @@ -483,10 +492,8 @@ def run( # type: ignore about their execution. By default, this information includes the input parameters the Nodes received and the output they generated. You can then find all debug information in the dictionary returned by this method under the key `_debug`. """ - send_pipeline_run_event( + send_pipeline_event( pipeline=self, - classname=self.__class__.__name__, - function_name="run", query=query, file_paths=file_paths, labels=labels, @@ -633,10 +640,8 @@ def run_batch( # type: ignore about their execution. By default, this information includes the input parameters the Nodes received and the output they generated. You can then find all debug information in the dictionary returned by this method under the key `_debug`. """ - send_pipeline_run_event( + send_pipeline_event( pipeline=self, - classname=self.__class__.__name__, - function_name="run_batch", queries=queries, file_paths=file_paths, labels=labels, @@ -1230,7 +1235,10 @@ def eval( Additional information can be found here https://huggingface.co/transformers/main_classes/model.html#transformers.PreTrainedModel.from_pretrained """ - send_pipeline_event(pipeline=self, event_name="Evaluation", event_properties={"function_name": "eval"}) + send_event( + event_name="Evaluation", + event_properties={"pipeline.classname": self.__class__.__name__, "pipeline.config_hash": self.config_hash}, + ) eval_result = EvaluationResult() if add_isolated_node_eval: @@ -1348,7 +1356,10 @@ def eval_batch( Additional information can be found here https://huggingface.co/transformers/main_classes/model.html#transformers.PreTrainedModel.from_pretrained """ - send_pipeline_event(pipeline=self, event_name=f"{self.__class__.__name__}.eval_batch()") + send_event( + event_name="Evaluation", + event_properties={"pipeline.classname": self.__class__.__name__, "pipeline.config_hash": self.config_hash}, + ) eval_result = EvaluationResult() if add_isolated_node_eval: @@ -1985,7 +1996,6 @@ def load_from_yaml( overwrite_with_env_variables=overwrite_with_env_variables, strict_version_check=strict_version_check, ) - pipeline.yaml_hash = "{:02x}".format(mmh3.hash128(str(path), signed=False)) return pipeline @classmethod @@ -2056,6 +2066,7 @@ def load_from_config( ) pipeline.add_node(component=component, name=node_config["name"], inputs=node_config["inputs"]) + pipeline.update_config_hash() return pipeline @classmethod diff --git a/haystack/pipelines/ray.py b/haystack/pipelines/ray.py index 72025eba12..891e276094 100644 --- a/haystack/pipelines/ray.py +++ b/haystack/pipelines/ray.py @@ -4,6 +4,7 @@ from time import time from typing import Any, Dict, List, Optional, Tuple, Union from pathlib import Path + import networkx as nx try: @@ -23,7 +24,7 @@ from haystack.nodes.base import BaseComponent, RootNode from haystack.pipelines.base import Pipeline from haystack.schema import Document, MultiLabel -from haystack.telemetry import send_pipeline_run_event +from haystack.telemetry import send_pipeline_event logger = logging.getLogger(__name__) @@ -130,6 +131,7 @@ def load_from_config( inputs=node_config.get("inputs", []), ) + pipeline.update_config_hash() return pipeline @classmethod @@ -312,10 +314,8 @@ async def run_async( # type: ignore about their execution. By default, this information includes the input parameters the Nodes received and the output they generated. You can then find all debug information in the dictionary returned by this method under the key `_debug`. """ - send_pipeline_run_event( + send_pipeline_event( pipeline=self, - classname=self.__class__.__name__, - function_name="run_async", query=query, file_paths=file_paths, labels=labels, diff --git a/haystack/telemetry.py b/haystack/telemetry.py index f61c75217d..5118ddb44c 100644 --- a/haystack/telemetry.py +++ b/haystack/telemetry.py @@ -4,7 +4,6 @@ import logging from pathlib import Path import json -import datetime import yaml import posthog @@ -102,9 +101,7 @@ def send_event(self, event_name: str, event_properties: Optional[Dict[str, Any]] logger.debug("Telemetry couldn't make a POST request to PostHog.", exc_info=e) -def send_pipeline_run_event( # type: ignore - classname: str, - function_name: str, +def send_pipeline_event( # type: ignore pipeline: "Pipeline", # type: ignore query: Optional[str] = None, queries: Optional[List[str]] = None, @@ -118,8 +115,6 @@ def send_pipeline_run_event( # type: ignore """ Sends a telemetry event about the execution of a pipeline, if telemetry is enabled. - :param classname: The name of the Pipeline class (Pipeline, RayPipeline, ...) - :param function_name: The name of the function that was invoked (run, run_batch, async_run, ...). :param pipeline: the pipeline that is running :param query: the value of the `query` input of the pipeline, if any :param queries: the value of the `queries` input of the pipeline, if any @@ -132,25 +127,26 @@ def send_pipeline_run_event( # type: ignore """ try: if telemetry: - event_properties: Dict[str, Optional[Union[str, bool, int, Dict[str, Any]]]] = { - "class": classname, - "function_name": function_name, - } - # Check if it's the public demo exec_context = os.environ.get(HAYSTACK_EXECUTION_CONTEXT, "") if exec_context == "public_demo": - event_properties["pipeline.is_public_demo"] = True - event_properties["pipeline.run_parameters.query"] = query - event_properties["pipeline.run_parameters.params"] = params - telemetry.send_event(event_name=function_name, event_properties=event_properties) + event_properties: Dict[str, Optional[Union[str, bool, int, Dict[str, Any]]]] = { + "pipeline.is_public_demo": True, + "pipeline.run_parameters.query": query, + "pipeline.run_parameters.params": params, + } + telemetry.send_event(event_name="Public Demo", event_properties=event_properties) + return + + # Send this event only if the pipeline config has changed + if pipeline.last_config_hash == pipeline.config_hash: return + pipeline.last_config_hash = pipeline.config_hash - # Collect pipeline profile - event_properties["pipeline.classname"] = pipeline.__class__.__name__ - event_properties["pipeline.fingerprint"] = pipeline.fingerprint - if pipeline.yaml_hash: - event_properties["pipeline.yaml_hash"] = pipeline.yaml_hash + event_properties = { + "pipeline.classname": pipeline.__class__.__name__, + "pipeline.config_hash": pipeline.config_hash, + } # Add document store docstore = pipeline.get_document_store() @@ -190,38 +186,10 @@ def send_pipeline_run_event( # type: ignore event_properties["pipeline.run_parameters.params"] = bool(params) event_properties["pipeline.run_parameters.debug"] = bool(debug) - telemetry.send_event(event_name="Pipeline run", event_properties=event_properties) - except Exception as e: - # Never let telemetry break things - logger.debug("There was an issue sending a '%s' telemetry event", function_name, exc_info=e) - - -def send_pipeline_event(pipeline: "Pipeline", event_name: str, event_properties: Optional[Dict[str, Any]] = None): # type: ignore - """ - Send a telemetry event related to a pipeline which is not a call to run(), if telemetry is enabled. - """ - try: - if telemetry: - if not event_properties: - event_properties = {} - event_properties.update( - { - "pipeline.classname": pipeline.__class__.__name__, - "pipeline.fingerprint": pipeline.fingerprint, - "pipeline.yaml_hash": pipeline.yaml_hash, - } - ) - now = datetime.datetime.now() - if pipeline.last_run: - event_properties["pipeline.since_last_run"] = (now - pipeline.last_run).total_seconds() - else: - event_properties["pipeline.since_last_run"] = 0 - pipeline.last_run = now - - telemetry.send_event(event_name=event_name, event_properties=event_properties) + telemetry.send_event(event_name="Pipeline", event_properties=event_properties) except Exception as e: # Never let telemetry break things - logger.debug("There was an issue sending a '%s' telemetry event", event_name, exc_info=e) + logger.debug("There was an issue sending a 'Pipeline' telemetry event", exc_info=e) def send_event(event_name: str, event_properties: Optional[Dict[str, Any]] = None):