Skip to content

Completed python side implementation of sentiment engine #280

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions querent/common/types/querent_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class EventType:
Vector = "Vector"
Terminate="Terminate"
QueryResult="QueryResult"
Sentiment="Sentiment"


class EventState:
Expand Down
73 changes: 73 additions & 0 deletions querent/config/core/sentiment_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from typing import Optional
from pydantic import BaseModel, Field
import os

class Sentiment_Config(BaseModel):
id: str = ""
name: str = "Sentiment Configuration"
openai_model_name: str = "gpt-3.5-turbo"
requests_per_minute: int = 3
openai_api_key: str = ""
user_context: str = None
huggingface_token: Optional[str] = None
huggingface_api_url: Optional[str] = None
spacy_model_path: str = 'en_core_web_lg'
nltk_path: str = '/model/nltk_data'
sentiment_model_name: str = "mr8488/distilroberta-finetuned-financial-news-sentiment-analysis-v2"
topic_type_model_name: str = "nickmuchi/finbert-tone-finetuned-finance-topic-classification"
target_companies: Optional[dict]= {
'Nvidia Corporation': 'NVDA',
'Advanced Micro Devices': 'AMD',
'Meta Platforms, Inc.': 'META',
'Alphabet Inc.': 'GOOGL',
'Amazon.com, Inc.': 'AMZN',
'Intel Corporation': 'INTC',
'Microsoft Corporation': 'MSFT',
'Apple Inc.': 'AAPL',
'International Business Machines Corporation': 'IBM',
'Oracle Corporation': 'ORCL',
'Salesforce.com, inc.': 'CRM',
'Tesla, Inc.': 'TSLA',
'Uber Technologies, Inc.': 'UBER',
'Baidu, Inc.': 'BIDU',
'Qualcomm Incorporated': 'QCOM',
'Square, Inc.': 'SQ',
'Palantir Technologies Inc.': 'PLTR',
'Adobe Inc.': 'ADBE',
'Zoom Video Communications, Inc.': 'ZM',
'Splunk Inc.': 'SPLK',
'Shopify Inc.': 'SHOP',
'ServiceNow, Inc.': 'NOW',
'Snowflake Inc.': 'SNOW',
'Twilio Inc.': 'TWLO',
'DocuSign, Inc.': 'DOCU',
'CrowdStrike Holdings, Inc.': 'CRWD',
'Okta, Inc.': 'OKTA',
'Pinterest, Inc.': 'PINS',
'Broadcom Inc.': 'AVGO',
'Texas Instruments Incorporated': 'TXN'
}



def __init__(self, config_source=None, **kwargs):
config_data = {}
config_data.update(kwargs)
if config_source:
config_data = self.load_config(config_source)
if "config" in config_data:
config_data.update(config_data["config"])
super().__init__(**config_data)


@classmethod
def load_config(cls, config_source) -> dict:
if isinstance(config_source, dict):
# If config source is a dictionary, return a dictionary
cls.config_data = config_source
else:
raise ValueError("Invalid config. Must be a valid dictionary")

env_vars = dict(os.environ)
cls.config_data.update(env_vars)
return cls.config_data
103 changes: 103 additions & 0 deletions querent/core/transformers/sentiment_graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@

import ast
from querent.common.types.ingested_images import IngestedImages
from querent.common.types.querent_event import EventState, EventType
from querent.kg.ner_helperfunctions.ner_llm_transformer import NER_LLM
from querent.core.base_engine import BaseEngine
from typing import Any, Dict, List
from querent.kg.sentiment_helperfunctions.sentiment_analyzer import SentimentAnalyzer
from querent.kg.sentiment_helperfunctions.topic_classifier import TopicClassifier
from querent.logging.logger import setup_logger
from openai import OpenAI
from transformers import pipeline
from tenacity import (
retry,
stop_after_attempt,
wait_random_exponential,
wait_fixed
)
from dotenv import load_dotenv, find_dotenv
import json

_ = load_dotenv(find_dotenv())

class Sentiment_Graph(BaseEngine):
def __init__(self, input_queue, config):
super().__init__(input_queue)
self.logger = setup_logger(__name__, "Sentiment_Graph")
self.user_context = config.user_context
self.nlp_model = NER_LLM.set_nlp_model(config.spacy_model_path)
self.nlp_model = NER_LLM.get_class_variable()
self.target_companies = config.target_companies
self.sentiment_analyzer = SentimentAnalyzer(config)
self.topic_classifier = TopicClassifier(config)

def validate(self):
return True

def process_messages(self, data):
return super().process_messages(data)

def process_images(self, data):
return super().process_messages(data)

async def process_code(self, data):
return super().process_messages(data)

@staticmethod
def validate_ingested_tokens(data):
return not data.is_error()

def preprocess_data(self, data)-> Dict[str, Any]:
if not data or len(data) != 1 or not isinstance(data[0], str):
raise ValueError("Data must be a list containing a single string element.")
try:
dict_data = ast.literal_eval(data[0])
except (ValueError, SyntaxError) as e:
raise ValueError("Could not convert string to dictionary.") from e

return dict_data

async def process_tokens(self, data):
try:
if not Sentiment_Graph.validate_ingested_tokens(data):
self.set_termination_event()
return
else:
if data.data[0].startswith('{\'source\''):
parsed_data = self.preprocess_data(data.data)
company_name = data.file.split('_')[0]
file = company_name
sentiment_result = self.sentiment_analyzer.analyze_sentiment(parsed_data["title"], self.user_context)
topic_result = self.topic_classifier.classify_topic(parsed_data["title"])
topic_result['topic_type'] = topic_result.pop('sentiment')
topic_result.pop('sentiment_score')
sentiment_result.update(topic_result)
sentiment_result['title'] = parsed_data["title"]
sentiment_result['author'] = parsed_data["author"]
sentiment_result['url'] = parsed_data["url"]
sentiment_result['publishedAt'] = parsed_data["publishedAt"]
sentiment_result['entity'] = company_name.lower()
sentiment_result['entity_type'] = "company"
sentiment_result['predicate'] = "has_sentiment"
sentiment_result['symbol'] = self.target_companies[company_name]
else:
single_string = data.data
file = data.get_file_path()
sentiment_result = self.sentiment_analyzer.analyze_sentiment(single_string, self.user_context)
topic_result = self.topic_classifier.classify_topic(single_string)
topic_result['topic_type'] = topic_result.pop('sentiment')
topic_result.pop('sentiment_score')
sentiment_result.update(topic_result)
sentiment_result['title'] = single_string
sentiment_result['entity'] = file
sentiment_result['entity_type'] = "text"
sentiment_result['predicate'] = "has_sentiment"

current_state = EventState(EventType.Sentiment, 1.0, json.dumps(sentiment_result), file)
await self.set_state(new_state=current_state)
except Exception as e:
self.logger.debug(f"Invalid {self.__class__.__name__} configuration. Unable to extract sentiment. {e}")

async def process_messages(self, data):
raise NotImplementedError
119 changes: 119 additions & 0 deletions querent/kg/sentiment_helperfunctions/sentiment_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from typing import Any, Dict, List, Tuple
from querent.logging.logger import setup_logger
from openai import OpenAI
from transformers import pipeline
from tenacity import (
retry,
stop_after_attempt,
wait_random_exponential,
wait_fixed
)
from dotenv import load_dotenv, find_dotenv
import json

_ = load_dotenv(find_dotenv())
import requests

class SentimentAnalyzer:
def __init__(self, config):
self.openai_model_name = config.openai_model_name
self.sentiment_model_name = config.sentiment_model_name
self.user_context = config.user_context
self.openai_api_key = config.openai_api_key
self.huggingface_token = config.huggingface_token
self.api_url = config.huggingface_api_url
self.gpt_llm = OpenAI(api_key=self.openai_api_key) if self.openai_api_key else None
self.sentiment_classifier = self.initialize_classifier(self.sentiment_model_name)

def initialize_classifier(self, model_name):
if self.api_url:
return lambda text: self.classify_text_with_hf_api(text, model_name)
else:
return self.load_model(model_name)

def classify_text_with_hf_api(self, text, model_name):
headers = {"Authorization": f"Bearer {self.huggingface_token}"}
payload = {"inputs": text}
url = f"{self.api_url}/{model_name}"
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 200:
return response.json()
else:
return None


@staticmethod
def load_model(model_name, huggingface_token=None):
token = huggingface_token if huggingface_token else None
return pipeline("text-classification", model=model_name, token=token)

def extract_sentiment_info(self, output):
if isinstance(output, list):
if all(isinstance(sublist, list) for sublist in output):
flat_list = [item for sublist in output for item in sublist]
if all(isinstance(item, dict) for item in flat_list):
max_score_dict = max(flat_list, key=lambda x: x['score'])
return {"sentiment": max_score_dict['label'].lower(), "sentiment_score": round(max_score_dict['score'], 1)}
elif all(isinstance(item, dict) for item in output):
max_score_dict = max(output, key=lambda x: x['score'])
return {"sentiment": max_score_dict['label'].lower(), "sentiment_score": round(max_score_dict['score'], 1)}
else:
return {"sentiment": output[0]['label'].lower(), "sentiment_score": round(output[0]['score'], 1)}
elif hasattr(output, 'choices'):
try:
sentiment_info = json.loads(output.choices[0].message.content)
return {"sentiment": sentiment_info["sentiment"].lower(), "sentiment_score": sentiment_info["sentiment_score"]}
except Exception as e:
return None
return None

def analyze_sentiment(self, text, user_context=None):
if self.openai_api_key:
result = self.process_triples(text, user_context)
else:
result = self.sentiment_classifier(text)

return self.extract_sentiment_info(result)

def process_triples(self, data, company_name):
try:
if not self.user_context:
identify_entity_message = f"""Forget all your previous instructions. I want you to act as an experienced financial engineer. I will offer you
financial news headlines in one day. Your task is to:
1. Identify whether the target company will be impacted by the news headline.
2. Determine the sentiments of the affected company: positive, negative, or neutral.
3. Determine the sentiment score based on the potential impact of news headline on the target company stock price. With 0 being no impact and 1 being the highest.
4. Only provide responses in JSON format.
5. Example output: {{sentiment: “positive”, sentiment_score:0.9}}
News Headline:
"""
else:
identify_entity_message = self.user_context
if isinstance (data,dict):
messages_classify_entity = [
{"role": "user", "content": identify_entity_message},
{"role": "user", "content": data["title"]},
{"role": "user", "content": "Target Company: " + company_name},
]
else:
messages_classify_entity = [
{"role": "user", "content": identify_entity_message},
{"role": "user", "content": data}
]
response = self.generate_response(messages_classify_entity)
return response
except Exception as e:
return

# @retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
def completion_with_backoff(self, **kwargs):
return self.gpt_llm.chat.completions.create(**kwargs)

def generate_response(self, messages):
response = self.completion_with_backoff(
model=self.openai_model_name,
messages=messages,
temperature=0
)
return response

67 changes: 67 additions & 0 deletions querent/kg/sentiment_helperfunctions/topic_classifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from typing import Any, Dict, List, Tuple
from transformers import pipeline
from tenacity import (
retry,
stop_after_attempt,
wait_random_exponential,
wait_fixed
)
from dotenv import load_dotenv, find_dotenv
import json

_ = load_dotenv(find_dotenv())
import requests

class TopicClassifier:
def __init__(self, config):
self.topic_type_model_name = config.topic_type_model_name
self.huggingface_token = config.huggingface_token
self.api_url = config.huggingface_api_url
self.topic_classifier = self.initialize_classifier(self.topic_type_model_name)


def initialize_classifier(self, model_name):
if self.api_url:
return lambda text: self.classify_text_with_hf_api(text, model_name)
else:
return self.load_model(model_name)

def classify_text_with_hf_api(self, text, model_name):
headers = {"Authorization": f"Bearer {self.huggingface_token}"}
payload = {"inputs": text}
url = f"{self.api_url}/{model_name}"
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 200:
return response.json()
else:
return None


@staticmethod
def load_model(model_name, huggingface_token=None):
token = huggingface_token if huggingface_token else None
return pipeline("text-classification", model=model_name, token=token)

def extract_topic_info(self, output):
if isinstance(output, list):
if all(isinstance(sublist, list) for sublist in output):
flat_list = [item for sublist in output for item in sublist]
if all(isinstance(item, dict) for item in flat_list):
max_score_dict = max(flat_list, key=lambda x: x['score'])
return {"sentiment": max_score_dict['label'].lower(), "sentiment_score": round(max_score_dict['score'], 1)}
elif all(isinstance(item, dict) for item in output):
max_score_dict = max(output, key=lambda x: x['score'])
return {"sentiment": max_score_dict['label'].lower(), "sentiment_score": round(max_score_dict['score'], 1)}
else:
return {"sentiment": output[0]['label'].lower(), "sentiment_score": round(output[0]['score'], 1)}
elif hasattr(output, 'choices'):
try:
sentiment_info = json.loads(output.choices[0].message.content)
return {"sentiment": sentiment_info["sentiment"].lower(), "sentiment_score": sentiment_info["sentiment_score"]}
except Exception as e:
return None
return None

def classify_topic(self, text):
result = self.topic_classifier(text)
return self.extract_topic_info(result)
Loading
Loading