From e1c00d53e353f40489d3aaff146797711cc1a9be Mon Sep 17 00:00:00 2001 From: birdhouses Date: Sat, 3 Aug 2024 21:17:40 +0200 Subject: [PATCH] Overhaul uploading process. --- src/instabot/upload.py | 311 +++++++++++++++++++++-------------------- src/instabot/utils.py | 17 ++- 2 files changed, 172 insertions(+), 156 deletions(-) diff --git a/src/instabot/upload.py b/src/instabot/upload.py index dd2a47c..6ed1db2 100644 --- a/src/instabot/upload.py +++ b/src/instabot/upload.py @@ -7,7 +7,6 @@ import shutil from humanfriendly import format_timespan from instabot.timekeeper import TimeKeeper -import re async def upload_media(cl, account): utils.logger.info("Uploading media...") @@ -17,193 +16,195 @@ async def upload_media(cl, account): captions_file = account['upload_posts'].get('captions_file') # Captions file path posts_dir = account['upload_posts']['posts_dir'] delete_after_upload = account['upload_posts']['delete_after_upload'] - medias = os.listdir(posts_dir) - - ### MAKE THIS CONFIGURABLE ### - random.shuffle(medias) - ############################## + post_folders = get_post_folders(posts_dir) + random.shuffle(post_folders) captions = load_captions(captions_file) - for media in medias: + for folder in post_folders: sleep_time = utils.calculate_sleep_time(amount) TimeKeeper(username, 'upload_media', sleep_time) await asyncio.sleep(sleep_time) - caption = captions.get(media, random.choice(list(captions.values()))) # Get caption or random - if is_post(media): - upload_post(cl, media, posts_dir, caption, delete_after_upload) - elif is_album(posts_dir, media): - upload_album(cl, media, posts_dir, caption, delete_after_upload) - elif is_video(media): - upload_video(cl, media, posts_dir, caption, delete_after_upload) - else: - utils.logger.warning(f"{media} is not a valid post or album.") - continue - -def is_video(media: str) -> bool: - return media.endswith('.mp4') - -def upload_video(cl, path, posts_dir, caption, delete_after_upload): - path_to_post = posts_dir + '/' + path - - try: - cl.video_upload(path_to_post, caption) - except Exception as e: - utils.logger.error(f"Error uploading {path}: {e}") - return - - if delete_after_upload: - os.remove(path_to_post) - - utils.logger.info(f"Uploaded {path}") - -def is_album(posts_dir: str, path: str) -> bool: - if not os.path.isdir(posts_dir + '/' + path): - return False + folder_path = os.path.join(posts_dir, folder) + caption = get_caption_from_folder(folder_path) or random.choice(list(captions.values())) # Get caption or random - posts = get_posts(posts_dir + '/' + path) + if is_album(folder_path): + upload_album(cl, folder_path, caption, delete_after_upload) + elif is_video_in_folder(folder_path): + upload_video(cl, folder_path, caption, delete_after_upload) + elif is_image_post(folder_path): + upload_post(cl, folder_path, caption, delete_after_upload) + else: + utils.logger.warning(f"{folder} is not a valid post or album.") - return len(posts) > 0 +def is_image_file(file_path: str) -> bool: + """Checks if a file is an image.""" + return file_path.lower().endswith(('.jpg', '.jpeg', '.png')) -def is_post(path: str) -> bool: - valid_images = [".jpg", ".webp", ".png"] - ext = os.path.splitext(path)[1] - return ext.lower() in valid_images +def is_video(media: str) -> bool: + """Checks if a file is a video.""" + return media.lower().endswith('.mp4') -def upload_album(cl, album: str, posts_dir, caption: str, delete_after_upload: bool): - path_to_album = posts_dir + '/' + album - posts = get_posts(path_to_album) +def is_image_post(folder_path: str) -> bool: + """Checks if a folder contains a single image post.""" + return any(is_image_file(file) for file in os.listdir(folder_path)) - paths = [] - for path in posts: - paths.append(path_to_album + '/' + path) +def is_video_in_folder(folder_path: str) -> bool: + """Checks if a folder contains a video file.""" + return any(is_video(file) for file in os.listdir(folder_path)) +def upload_album(cl, folder_path, caption, delete_after_upload): + """Uploads an album to Instagram.""" + posts = get_posts(folder_path) new_paths = [] - for image_path in paths: - if is_post(image_path): - output_path = image_path.replace(".webp", ".jpg") - utils.convert_webp_to_jpg(image_path, output_path) - new_paths.append(output_path) - elif is_video(image_path): - new_paths.append(image_path) - - cl.album_upload(new_paths, caption) - - if delete_after_upload: - shutil.rmtree(path_to_album) - - utils.logger.info(f"Uploaded {album}") + for image_path in posts: + full_path = os.path.join(folder_path, image_path) + ext = os.path.splitext(image_path)[1].lower() + output_path = full_path + + if ext in [".jpg", ".jpeg"]: + new_paths.append(full_path) + elif ext == ".png": + # Convert PNG to JPG using utility function + output_path = full_path.replace(".png", ".jpg") + try: + utils.convert_png_to_jpg(full_path, output_path) + new_paths.append(output_path) + except Exception as e: + utils.logger.error(f"Error converting PNG to JPG for {full_path}: {e}") + continue + elif ext == ".webp": + # Convert WEBP to JPG using utility function + output_path = full_path.replace(".webp", ".jpg") + try: + utils.convert_webp_to_jpg(full_path, output_path) + new_paths.append(output_path) + except Exception as e: + utils.logger.error(f"Error converting WEBP to JPG for {full_path}: {e}") + continue + elif ext == ".mp4": + new_paths.append(full_path) + else: + utils.logger.warning(f"Unsupported file format {ext} in {full_path}, skipping.") + + if new_paths: + try: + cl.album_upload(new_paths, caption) + if delete_after_upload: + shutil.rmtree(folder_path) + utils.logger.info(f"Uploaded album from {folder_path}") + except instabot.exceptions.AlbumUnknownFormat as e: + utils.logger.error(f"Error uploading album {folder_path}: Unknown format. {e}") + except Exception as e: + utils.logger.error(f"Error uploading album {folder_path}: {e}") + else: + utils.logger.warning(f"No valid media found in album {folder_path}") + +def is_album(folder_path: str) -> bool: + """Determines if a folder contains an album (more than one media file).""" + valid_media = ['.jpg', '.jpeg', '.png', '.mp4', '.webp'] + media_files = [f for f in os.listdir(folder_path) if os.path.splitext(f)[1].lower() in valid_media] + return len(media_files) > 1 + +def upload_album(cl, folder_path, caption, delete_after_upload): + """Uploads an album to Instagram.""" + posts = get_posts(folder_path) + new_paths = [] -def upload_post(cl, path, posts_dir, caption, delete_after_upload): - path_to_post = posts_dir + '/' + path + for image_path in posts: + full_path = os.path.join(folder_path, image_path) + ext = os.path.splitext(image_path)[1].lower() + output_path = full_path + + if ext in [".jpg", ".jpeg"]: + new_paths.append(full_path) + elif ext == ".png": + # Convert PNG to JPG + output_path = full_path.replace(".png", ".jpg") + try: + with Image.open(full_path) as img: + rgb_img = img.convert('RGB') + rgb_img.save(output_path) + new_paths.append(output_path) + except Exception as e: + utils.logger.error(f"Error converting PNG to JPG for {full_path}: {e}") + continue + elif ext == ".webp": + # Convert WEBP to JPG + output_path = full_path.replace(".webp", ".jpg") + try: + utils.convert_webp_to_jpg(full_path, output_path) + new_paths.append(output_path) + except Exception as e: + utils.logger.error(f"Error converting WEBP to JPG for {full_path}: {e}") + continue + elif ext == ".mp4": + new_paths.append(full_path) + else: + utils.logger.warning(f"Unsupported file format {ext} in {full_path}, skipping.") + + if new_paths: + try: + cl.album_upload(new_paths, caption) + if delete_after_upload: + shutil.rmtree(folder_path) + utils.logger.info(f"Uploaded album from {folder_path}") + except instabot.exceptions.AlbumUnknownFormat as e: + utils.logger.error(f"Error uploading album {folder_path}: Unknown format. {e}") + except Exception as e: + utils.logger.error(f"Error uploading album {folder_path}: {e}") + else: + utils.logger.warning(f"No valid media found in album {folder_path}") + +def upload_post(cl, folder_path, caption, delete_after_upload): + """Uploads a single image post to Instagram.""" + image_files = [f for f in os.listdir(folder_path) if is_image_file(os.path.join(folder_path, f))] + if not image_files: + utils.logger.warning(f"No image found in {folder_path}") + return + image_path = os.path.join(folder_path, image_files[0]) try: - cl.photo_upload(path_to_post, caption) + cl.photo_upload(image_path, caption) except Exception as e: - utils.logger.error(f"Error uploading {path}: {e}") + utils.logger.error(f"Error uploading post {image_path}: {e}") return - if delete_after_upload: - os.remove(path_to_post) - - utils.logger.info(f"Uploaded {path}") + shutil.rmtree(folder_path) + utils.logger.info(f"Uploaded image post from {folder_path}") def get_posts(directory: str) -> List[str]: + """Retrieves a list of valid media files in a directory.""" + valid_images = [".jpg", ".jpeg", ".png", ".webp", ".mp4"] posts = [] - pattern = r'^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}_UTC_(\d+)\.(jpg|jpeg|png|mp4|avi|mov)$' - for filename in os.listdir(directory): - if re.match(pattern, filename, re.IGNORECASE): + ext = os.path.splitext(filename)[1].lower() + if ext in valid_images: posts.append(filename) else: - # Log a warning or handle files with unexpected names as needed - utils.logger.warning(f"Ignoring file with unexpected name format: {filename}") - - # Ensure the lambda only attempts to convert a well-formatted index number to integer - posts.sort(key=lambda filename: int(re.search(r'_(\d+)\.', filename).group(1))) - + utils.logger.warning(f"Ignoring file with unexpected extension: {filename}") return posts -def get_albums(post_dir: str) -> List[str]: - albums = [] - for file in os.listdir(post_dir): - if os.path.isdir(post_dir + '/' + file): - albums.append(file) - return albums - -def filter_posts(posts): - valid_images = [".jpg", ".webp", ".png", ".mp4"] - valid_posts = [] - for post in posts: - ext = os.path.splitext(post)[1] - if ext.lower() not in valid_images: - continue - valid_posts.append(post) - - return valid_posts - -async def upload_stories(cl, account): - utils.logger.info("Uploading story...") - username = account['account_details']['username'] - amount = account['upload_stories']['amount_per_day'] - captions_file = account['upload_stories'].get('captions_file') # Captions file path - posts_dir = account['upload_stories']['posts_dir'] - delete_after_upload = account['upload_stories']['delete_after_upload'] - medias = os.listdir(posts_dir) - - ### MAKE THIS CONFIGURABLE ### - random.shuffle(medias) - ############################## - - captions = load_captions(captions_file) - - for media in medias: - sleep_time = utils.calculate_sleep_time(amount) - TimeKeeper(username, 'upload_story', sleep_time) - await asyncio.sleep(sleep_time) - - caption = captions.get(media, random.choice(list(captions.values()))) # Get caption or random - if is_post(media): - upload_story_post(cl, media, posts_dir, caption, delete_after_upload) - elif is_video(media): - upload_story_video(cl, media, posts_dir, caption, delete_after_upload) - elif is_album(posts_dir, media): - ## invalid operation, skipping. - utils.logger.warning(f"{media} An album can't be uploaded as a story.") - else: - utils.logger.warning(f"{media} is not in a valid story format.") - continue - -def upload_story_post(cl, path, posts_dir, caption, delete_after_upload): - path_to_post = posts_dir + '/' + path +def get_post_folders(posts_dir: str) -> List[str]: + """Retrieves a list of folders in the given directory.""" + return [f for f in os.listdir(posts_dir) if os.path.isdir(os.path.join(posts_dir, f))] +def get_caption_from_folder(folder_path: str) -> str: + """Retrieves the caption from a caption.txt file in the given folder.""" + caption_file = os.path.join(folder_path, 'caption.txt') + if not os.path.exists(caption_file): + return "" try: - cl.photo_upload_to_story(path_to_post, caption) + with open(caption_file, 'r', encoding='utf-8') as f: + caption = f.read().strip() + return caption except Exception as e: - utils.logger.error(f"Error uploading story post {path}: {e}") - return - - if delete_after_upload: - os.remove(path_to_post) - - utils.logger.info(f"Uploaded {path}") - -def upload_story_video(cl, path, posts_dir, caption, delete_after_upload): - path_to_post = posts_dir + '/' + path - - try: - cl.video_upload_to_story(path_to_post, caption) - except Exception as e: - utils.logger.error(f"Error uploading story video {path}: {e}") - return - - if delete_after_upload: - os.remove(path_to_post) - - utils.logger.info(f"Uploaded {path}") + utils.logger.error(f"Error reading caption file in {folder_path}: {e}") + return "" def load_captions(captions_file: str) -> dict: + """Loads fallback captions from a file.""" captions = {} if not captions_file: return captions diff --git a/src/instabot/utils.py b/src/instabot/utils.py index 3b033f0..eda5c2c 100644 --- a/src/instabot/utils.py +++ b/src/instabot/utils.py @@ -46,7 +46,22 @@ def get_approximate_sleep_time(days=0, hours=0, minutes=0) -> int: return random.uniform(min_time, max_time) def convert_webp_to_jpg(image_path, output_path): - Image.open(image_path).save(output_path, "JPEG") + """Converts a WEBP image to JPG.""" + try: + Image.open(image_path).convert("RGB").save(output_path, "JPEG") + logger.info(f"Converted WEBP to JPG: {output_path}") + except Exception as e: + logger.error(f"Error converting WEBP to JPG for {image_path}: {e}") + raise + +def convert_png_to_jpg(image_path, output_path): + """Converts a PNG image to JPG.""" + try: + Image.open(image_path).convert("RGB").save(output_path, "JPEG") + logger.info(f"Converted PNG to JPG: {output_path}") + except Exception as e: + logger.error(f"Error converting PNG to JPG for {image_path}: {e}") + raise def load_config(file_path: str) -> dict: with open(file_path, 'r') as file: