-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocess_cook_files_parallel.py.run
executable file
·186 lines (154 loc) · 6.97 KB
/
process_cook_files_parallel.py.run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
#!/usr/bin/env python3
import os
import json
import random
import re
import requests
from dotenv import load_dotenv
from concurrent.futures import ThreadPoolExecutor, as_completed
# Load environment variables from .env file
load_dotenv()
# Get API keys from .env file
API_KEYS = [
os.getenv("API_KEY_1"),
os.getenv("API_KEY_2"),
os.getenv("API_KEY_3"),
]
# Ensure all API keys are loaded
if not all(API_KEYS):
raise ValueError("One or more API keys are missing in the .env file.")
# Configuration
API_URL = "https://api.perplexity.ai/chat/completions"
MODEL = "llama-3.1-sonar-huge-128k-online"
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
PROMPT_FILE = os.path.join(SCRIPT_DIR, "prompt.txt")
REFERENCE_FILE = os.path.join(SCRIPT_DIR, "reference_file.txt")
LOG_FILE = os.path.join(SCRIPT_DIR, "debug.log")
MAX_WORKERS = 10
# Ensure required files exist
if not os.path.exists(PROMPT_FILE):
raise FileNotFoundError(f"Prompt file not found: {PROMPT_FILE}")
if not os.path.exists(REFERENCE_FILE):
raise FileNotFoundError(f"Reference file not found: {REFERENCE_FILE}")
# Read shared prompt and reference content
with open(PROMPT_FILE, "r") as f:
shared_prompt = f.read().strip()
with open(REFERENCE_FILE, "r") as f:
reference_content = f.read().strip()
# Logging function
def log_message(message):
with open(LOG_FILE, "a") as log_file:
log_file.write(message + "\n")
print(message)
# Function to sanitize JSON response and extract only agent's message content
def extract_agent_message(response_text):
"""
Extract only the agent's final message content from the JSON response.
"""
try:
# Parse JSON response
data = json.loads(response_text)
# Extract content from choices[0].message.content
choices = data.get("choices", [])
if not choices:
raise ValueError("No choices found in the response.")
message_content = choices[0].get("message", {}).get("content", "").strip()
if not message_content:
raise ValueError("No valid content found in the agent's message.")
# Normalize newline characters for better readability
return message_content.replace("\\n", "\n").strip()
except (json.JSONDecodeError, ValueError) as e:
log_message(f"Error extracting agent message: {str(e)}")
return None
# Function to extract content inside triple backticks or fallback to full response
def extract_content(content):
"""
Extract content inside triple backticks from a string.
If no backticks are found, return the full content.
"""
try:
# Search for content inside triple backticks
match = re.search(r"```(?:.*?\n)?(.*?)\n?```", content, re.DOTALL)
if match:
return match.group(1).strip()
else:
raise ValueError("No triple backtick region found.")
except Exception as e:
log_message(f"Warning: Failed to extract content inside backticks: {str(e)}")
return None
# Function to process a single file
def process_file(file_path):
# Determine the output file path for this input file
final_output_file_path = f"{os.path.splitext(file_path)[0]}.cook"
# Skip processing if the .cook file already exists
if os.path.exists(final_output_file_path):
log_message(f"Skipping {file_path}: Converted version already exists.")
return
# Read input file content
with open(file_path, "r") as f:
file_content = f.read().strip()
# Prepare payload for API request
payload = {
"model": MODEL,
"temperature": 0,
"messages": [
{ "role": "user", "content": reference_content },
{ "role": "assistant", "content": "I understand the file reference. Now provide the file to process." },
{ "role": "user", "content": file_content },
{ "role": "assistant", "content": "I understand the file reference and the input file. What transformation(s) would you like performed?" },
{ "role": "user", "content": shared_prompt }
],
}
# Make API request and write cleaned agent message directly to final output file
try:
api_key = random.choice(API_KEYS) # Randomly select an API key
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
response = requests.post(API_URL, headers=headers, json=payload)
response.raise_for_status() # Raise HTTPError for bad responses (4xx and 5xx)
# Extract only the agent's message content from the API response
agent_message = extract_agent_message(response.text)
if not agent_message:
log_message(f"Error: No valid agent message extracted for {file_path}.")
return
# Write cleaned agent message directly to final output file
with open(final_output_file_path, "w") as final_output_file:
final_output_file.write(agent_message + "\n")
log_message(f"Agent message written to {final_output_file_path}")
# Attempt to extract content inside triple backticks and overwrite the same output file if successful
extracted_content = extract_content(agent_message)
if extracted_content is not None:
with open(final_output_file_path, "w") as final_output_file:
final_output_file.write(extracted_content + "\n")
log_message(f"Final stripped content written to {final_output_file_path}")
else:
log_message(f"No triple backtick region found for {file_path}. Raw agent message retained.")
except requests.exceptions.RequestException as e:
log_message(f"Error processing {file_path}: {str(e)}")
except Exception as e:
log_message(f"Unexpected error for {file_path}: {str(e)}")
# Main function to process all files in a directory recursively using parallel execution
def main(directory):
if not os.path.exists(directory):
raise FileNotFoundError(f"Directory not found: {directory}")
with open(LOG_FILE, "w") as log_file:
log_file.write(f"Debug log started at {os.path.abspath(LOG_FILE)}\n")
cook_files = []
for root, _, files in os.walk(directory):
for file in files:
if file.endswith(".json") and ".cook" not in file:
cook_files.append(os.path.join(root, file))
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_file = {executor.submit(process_file, file): file for file in cook_files}
for future in as_completed(future_to_file):
try:
future.result()
except Exception as e:
log_message(f"Error processing a file: {str(e)}")
if __name__ == "__main__":
import sys
# Get target directory from command-line arguments or use script directory by default
target_directory = sys.argv[1] if len(sys.argv) > 1 else SCRIPT_DIR
try:
main(target_directory)
except Exception as e:
log_message(f"Fatal error: {str(e)}")