diff --git a/app/transcript.py b/app/transcript.py index ef5a5b5..7c9c204 100644 --- a/app/transcript.py +++ b/app/transcript.py @@ -36,10 +36,12 @@ def __init__(self, source, test_mode=False, metadata_file=None): def process_source(self, tmp_dir=None): tmp_dir = tmp_dir if tmp_dir is not None else tempfile.mkdtemp() self.audio_file = self.source.process(tmp_dir) - self.title = self.source.title if self.source.title else os.path.basename( - self.audio_file)[:-4] return self.audio_file, tmp_dir + @property + def title(self): + return self.source.title + def __str__(self): excluded_fields = ['test_mode', 'logger'] fields = {key: value for key, value in self.__dict__.items() @@ -55,8 +57,9 @@ def to_json(self): "speakers": self.source.speakers, "loc": self.source.loc, "body": self.result, - "media": self.source.media } + if not self.source.local: + json_data["media"] = self.source.media if self.source.date: json_data['date'] = self.source.date @@ -76,7 +79,8 @@ def save_source(self, source_file, loc, local, title, tags, category, speakers, self.link = link # the url that will be used as `media` for the transcript. It contains more metadata than just the audio download link self.loc = loc.strip("/") self.local = local - self.title = title + self.title = title if title is not None else os.path.splitext( + os.path.basename(source_file))[0] self.tags = tags self.category = category self.speakers = speakers @@ -126,14 +130,9 @@ def __init__(self, source, description=None, chapters=[]): self.type = "audio" self.description = description self.chapters = chapters - self.__config_source() except Exception as e: raise Exception(f"Error during Audio creation: {e}") - def __config_source(self): - if self.title is None: - raise Exception("Please supply a title for the audio file") - def process(self, working_dir): """Process audio""" @@ -142,8 +141,6 @@ def download_audio(): # sanity checks if self.local: raise Exception(f"{self.source_file} is a local file") - if self.title is None: - raise Exception("Please supply a title for the audio file") self.logger.info(f"Downloading audio file: {self.source_file}") try: audio = requests.get(self.source_file, stream=True) @@ -205,7 +202,7 @@ def to_json(self): class Video(Source): - def __init__(self, source, youtube_metadata=None, chapters=None): + def __init__(self, source, youtube_metadata=None, chapters=[]): try: # initialize source using a base Source super().__init__(source_file=source.source_file, link=source.link, loc=source.loc, local=source.local, title=source.title, @@ -277,8 +274,7 @@ def convert_video_to_mp3(video_file): try: self.logger.info(f"Converting {video_file} to mp3...") clip = VideoFileClip(video_file) - output_file = os.path.join( - working_dir, os.path.basename(video_file)[:-4] + ".mp3") + output_file = os.path.join(working_dir, f"{self.title}.mp3") clip.audio.write_audiofile(output_file) clip.close() self.logger.info("Video converted to mp3") @@ -286,31 +282,10 @@ def convert_video_to_mp3(video_file): except Exception as e: raise Exception(f"Error converting video to mp3: {e}") - def extract_chapters_from_downloaded_video_metadata(): - try: - list_of_chapters = [] - with open(f"{working_dir}/videoFile.info.json", "r") as f: - info = json.load(f) - if "chapters" not in info: - self.logger.info("No chapters found for downloaded video") - return list_of_chapters - for index, x in enumerate(info["chapters"]): - name = x["title"] - start = x["start_time"] - list_of_chapters.append((str(index), start, str(name))) - - return list_of_chapters - except Exception as e: - self.logger.error( - f"Error reading downloaded video's metadata: {e}") - return [] - try: self.logger.info(f"Video processing: '{self.source_file}'") if not self.local: abs_path = download_video() - if self.chapters is None: - self.chapters = extract_chapters_from_downloaded_video_metadata() else: abs_path = os.path.abspath(self.source_file) diff --git a/app/transcription.py b/app/transcription.py index c622b07..9a209a9 100644 --- a/app/transcription.py +++ b/app/transcription.py @@ -117,16 +117,16 @@ def check_if_youtube(source: Source): # Invalid URL or video not found raise Exception(f"Invalid source: {e}") try: - if source.source_file.endswith(".mp3") or source.source_file.endswith(".wav") or source.source_file.endswith(".m4a"): + if source.source_file.endswith((".mp3", ".wav", ".m4a")): return Audio(source=source, chapters=chapters) - if source.source_file.endswith("rss") or source.source_file.endswith(".xml"): + if source.source_file.endswith(("rss", ".xml")): return RSS(source=source) if youtube_metadata is not None: # we have youtube metadata, this can only be true for videos source.preprocess = False return Video(source=source, youtube_metadata=youtube_metadata, chapters=chapters) - if source.source_file.endswith(".mp4"): + if source.source_file.endswith((".mp4", ".webm")): # regular remote video, not youtube source.preprocess = False return Video(source=source) @@ -283,14 +283,14 @@ def write_to_markdown_file(self, transcript: Transcript, output_dir): # Add metadata prefix meta_data = ( "---\n" - f"title: {transcript.title}\n" + f'title: "{transcript.title}"\n' f"transcript_by: {self.transcript_by} via TBTBTC v{__version__}\n" ) if not transcript.source.local: - meta_data += f"media: {transcript.source.source_file}\n" - meta_data += f"tags: {transcript.source.tags}\n" - meta_data += f"speakers: {transcript.source.speakers}\n" - meta_data += f"categories: {transcript.source.category}\n" + meta_data += f"media: {transcript.source.media}\n" + meta_data += f"tags: {str(transcript.source.tags)}\n" + meta_data += f"speakers: {str(transcript.source.speakers)}\n" + meta_data += f"categories: {str(transcript.source.category)}\n" if transcript.summary: meta_data += f"summary: {transcript.summary}\n" if transcript.source.event_date: @@ -298,10 +298,9 @@ def write_to_markdown_file(self, transcript: Transcript, output_dir): meta_data += "---\n" # Write to file markdown_file = f"{utils.configure_output_file_path(output_dir, transcript.title, add_timestamp=False)}.md" - with open(markdown_file, "a") as opf: + with open(markdown_file, "w") as opf: opf.write(meta_data + "\n") opf.write(transcript.result + "\n") - opf.close() self.logger.info(f"Markdown file stored at: {markdown_file}") return os.path.abspath(markdown_file) except Exception as e: diff --git a/test/test_audio.py b/test/test_audio.py index 4ba407b..b941d1e 100644 --- a/test/test_audio.py +++ b/test/test_audio.py @@ -37,23 +37,6 @@ def test_audio_with_title(): transcription.clean_up() -@pytest.mark.feature -def test_audio_without_title(): - with open(rel_path("testAssets/transcript.txt"), "r") as file: - result = file.read() - file.close() - - source = rel_path("test/testAssets/audio.mp3") - title = None - transcription = Transcription( - test_mode=True - ) - with pytest.raises(Exception) as error: - transcription.add_transcription_source(source_file=source, title=title) - assert "Please supply a title for the audio file" in str(error) - transcription.clean_up() - - @pytest.mark.feature def test_audio_with_all_data(): with open(rel_path("testAssets/transcript.txt"), "r") as file: diff --git a/test/test_cli.py b/test/test_cli.py index a37d63a..41f85c1 100644 --- a/test/test_cli.py +++ b/test/test_cli.py @@ -49,6 +49,6 @@ def test_download_video_file(): source_file="https://www.youtube.com/watch?v=B0HW_sJ503Y") audio_file, tmp_dir = transcription.transcripts[0].process_source( transcription.tmp_dir) - assert os.path.isfile(f"{audio_file[:-4]}.mp4") # video download + assert os.path.isfile(f"{tmp_dir}/videoFile.mp4") # video download assert os.path.isfile(audio_file) # mp3 convert application.clean_up(tmp_dir) diff --git a/test/test_helpers.py b/test/test_helpers.py index 1d6bfe0..bbebcb6 100644 --- a/test/test_helpers.py +++ b/test/test_helpers.py @@ -39,7 +39,7 @@ def check_md_file( if not local: assert fields["media"] == media - assert fields["title"] == title + assert fields["title"] == f'"{title}"' if date: assert fields["date"] == date diff --git a/transcriber.py b/transcriber.py index 022b6fb..114c47b 100644 --- a/transcriber.py +++ b/transcriber.py @@ -252,7 +252,7 @@ def transcribe( transcription.add_transcription_source_JSON(source) else: transcription.add_transcription_source( - source_file=source, loc=loc, title=title, date=date, tags=tags, category=category, speakers=speakers, + source_file=source, loc=loc, title=title, date=date, tags=list(tags), category=list(category), speakers=list(speakers), ) transcription.start() if nocleanup: @@ -390,7 +390,6 @@ def postprocess( ) # Finalize transcription service output transcript_to_postprocess = transcription.transcripts[0] - transcript_to_postprocess.title = metadata["title"] transcript_to_postprocess.transcription_service_output_file = metadata[ f"{service}_output"] transcript_to_postprocess.result = transcription.service.finalize_transcript(