@@ -57,31 +57,37 @@ def fetch_comments_page(
57
57
response .raise_for_status ()
58
58
return response .json ()
59
59
60
- @staticmethod
61
- def format_comment (comment_data : dict ) -> dict :
60
+ @tracer .capture_method
61
+ def format_comment (
62
+ self ,
63
+ comment_data : dict ,
64
+ additional_data : dict = {},
65
+ ) -> dict :
62
66
"""Format a single comment for storage."""
63
67
64
- author_channel_id = (
65
- comment_data ["snippet" ].get ("authorChannelId" , {}).get ("value" , "none" )
66
- )
67
-
68
68
snippet_snake_case = {}
69
69
for key , value in comment_data ["snippet" ].items ():
70
70
new_key = inflection .underscore (key )
71
71
snippet_snake_case [new_key ] = value
72
72
73
+ author_channel_id = (
74
+ comment_data ["snippet" ].get ("authorChannelId" , {}).get ("value" , "none" )
75
+ )
76
+
73
77
data = {
74
78
"id" : comment_data ["id" ],
75
79
** snippet_snake_case ,
76
- "author_channel_id" : author_channel_id ,
80
+ ** additional_data ,
77
81
"parent_id" : comment_data .get ("parentId" , "none" ),
78
- "fetched_at " : datetime . now (). isoformat () ,
82
+ "author_channel_id " : author_channel_id ,
79
83
}
80
84
81
85
return data
82
86
83
87
@tracer .capture_method
84
- def retrieve_comments_from_youtube (self , video_id : str , api_key : str ) -> list [dict ]:
88
+ def retrieve_comments_from_youtube (
89
+ self , video_id : str , api_key : str , additional_data : dict = None
90
+ ) -> list [dict ]:
85
91
"""Retrieve all comments for a given video."""
86
92
87
93
comments = []
@@ -95,16 +101,16 @@ def retrieve_comments_from_youtube(self, video_id: str, api_key: str) -> list[di
95
101
for item in response_data ["items" ]:
96
102
# Process top-level comment
97
103
top_level_comment = self .format_comment (
98
- item ["snippet" ]["topLevelComment" ]
104
+ item ["snippet" ]["topLevelComment" ],
105
+ additional_data ,
99
106
)
100
107
101
108
comments .append (top_level_comment )
102
109
103
110
# Process replies
104
111
if "replies" in item :
105
112
for reply in item ["replies" ]["comments" ]:
106
- reply_comment = self .format_comment (reply )
107
-
113
+ reply_comment = self .format_comment (reply , additional_data )
108
114
comments .append (reply_comment )
109
115
110
116
next_page_token = response_data .get ("nextPageToken" )
@@ -191,6 +197,7 @@ class Action(str, Enum):
191
197
class Event (BaseModel ):
192
198
video_id : str
193
199
action : Action
200
+ execution_id : str
194
201
195
202
196
203
handler = YouTubeCommentsHandler ()
@@ -205,6 +212,7 @@ def lambda_handler(event: Event, context: LambdaContext) -> dict:
205
212
206
213
video_id = event .video_id
207
214
action = event .action
215
+ execution_id = event .execution_id
208
216
209
217
match action :
210
218
case Action .INSERT :
@@ -213,7 +221,13 @@ def lambda_handler(event: Event, context: LambdaContext) -> dict:
213
221
api_key = parameters .get_secret (YOUTUBE_API_KEY_SECRET_NAME )
214
222
215
223
# Retrieve and process comments
216
- comments = handler .retrieve_comments_from_youtube (video_id , api_key )
224
+ additional_data = {
225
+ "execution_id" : execution_id ,
226
+ "fetched_at" : datetime .now ().isoformat (),
227
+ }
228
+ comments = handler .retrieve_comments_from_youtube (
229
+ video_id , api_key , additional_data = additional_data
230
+ )
217
231
218
232
# Batch detect sentiment
219
233
comments_with_sentiment = handler .batch_detect_sentiment (comments )
0 commit comments