Skip to content

Commit

Permalink
Overhaul uploading process.
Browse files Browse the repository at this point in the history
  • Loading branch information
birdhouses committed Aug 3, 2024
1 parent f14e6bc commit e1c00d5
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 156 deletions.
311 changes: 156 additions & 155 deletions src/instabot/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import shutil
from humanfriendly import format_timespan
from instabot.timekeeper import TimeKeeper
import re

async def upload_media(cl, account):
utils.logger.info("Uploading media...")
Expand All @@ -17,193 +16,195 @@ async def upload_media(cl, account):
captions_file = account['upload_posts'].get('captions_file') # Captions file path
posts_dir = account['upload_posts']['posts_dir']
delete_after_upload = account['upload_posts']['delete_after_upload']
medias = os.listdir(posts_dir)

### MAKE THIS CONFIGURABLE ###
random.shuffle(medias)
##############################
post_folders = get_post_folders(posts_dir)

random.shuffle(post_folders)
captions = load_captions(captions_file)

for media in medias:
for folder in post_folders:
sleep_time = utils.calculate_sleep_time(amount)
TimeKeeper(username, 'upload_media', sleep_time)
await asyncio.sleep(sleep_time)

caption = captions.get(media, random.choice(list(captions.values()))) # Get caption or random
if is_post(media):
upload_post(cl, media, posts_dir, caption, delete_after_upload)
elif is_album(posts_dir, media):
upload_album(cl, media, posts_dir, caption, delete_after_upload)
elif is_video(media):
upload_video(cl, media, posts_dir, caption, delete_after_upload)
else:
utils.logger.warning(f"{media} is not a valid post or album.")
continue

def is_video(media: str) -> bool:
return media.endswith('.mp4')

def upload_video(cl, path, posts_dir, caption, delete_after_upload):
path_to_post = posts_dir + '/' + path

try:
cl.video_upload(path_to_post, caption)
except Exception as e:
utils.logger.error(f"Error uploading {path}: {e}")
return

if delete_after_upload:
os.remove(path_to_post)

utils.logger.info(f"Uploaded {path}")

def is_album(posts_dir: str, path: str) -> bool:
if not os.path.isdir(posts_dir + '/' + path):
return False
folder_path = os.path.join(posts_dir, folder)
caption = get_caption_from_folder(folder_path) or random.choice(list(captions.values())) # Get caption or random

posts = get_posts(posts_dir + '/' + path)
if is_album(folder_path):
upload_album(cl, folder_path, caption, delete_after_upload)
elif is_video_in_folder(folder_path):
upload_video(cl, folder_path, caption, delete_after_upload)
elif is_image_post(folder_path):
upload_post(cl, folder_path, caption, delete_after_upload)
else:
utils.logger.warning(f"{folder} is not a valid post or album.")

return len(posts) > 0
def is_image_file(file_path: str) -> bool:
"""Checks if a file is an image."""
return file_path.lower().endswith(('.jpg', '.jpeg', '.png'))

def is_post(path: str) -> bool:
valid_images = [".jpg", ".webp", ".png"]
ext = os.path.splitext(path)[1]
return ext.lower() in valid_images
def is_video(media: str) -> bool:
"""Checks if a file is a video."""
return media.lower().endswith('.mp4')

def upload_album(cl, album: str, posts_dir, caption: str, delete_after_upload: bool):
path_to_album = posts_dir + '/' + album
posts = get_posts(path_to_album)
def is_image_post(folder_path: str) -> bool:
"""Checks if a folder contains a single image post."""
return any(is_image_file(file) for file in os.listdir(folder_path))

paths = []
for path in posts:
paths.append(path_to_album + '/' + path)
def is_video_in_folder(folder_path: str) -> bool:
"""Checks if a folder contains a video file."""
return any(is_video(file) for file in os.listdir(folder_path))

def upload_album(cl, folder_path, caption, delete_after_upload):
"""Uploads an album to Instagram."""
posts = get_posts(folder_path)
new_paths = []

for image_path in paths:
if is_post(image_path):
output_path = image_path.replace(".webp", ".jpg")
utils.convert_webp_to_jpg(image_path, output_path)
new_paths.append(output_path)
elif is_video(image_path):
new_paths.append(image_path)

cl.album_upload(new_paths, caption)

if delete_after_upload:
shutil.rmtree(path_to_album)

utils.logger.info(f"Uploaded {album}")
for image_path in posts:
full_path = os.path.join(folder_path, image_path)
ext = os.path.splitext(image_path)[1].lower()
output_path = full_path

if ext in [".jpg", ".jpeg"]:
new_paths.append(full_path)
elif ext == ".png":
# Convert PNG to JPG using utility function
output_path = full_path.replace(".png", ".jpg")
try:
utils.convert_png_to_jpg(full_path, output_path)
new_paths.append(output_path)
except Exception as e:
utils.logger.error(f"Error converting PNG to JPG for {full_path}: {e}")
continue
elif ext == ".webp":
# Convert WEBP to JPG using utility function
output_path = full_path.replace(".webp", ".jpg")
try:
utils.convert_webp_to_jpg(full_path, output_path)
new_paths.append(output_path)
except Exception as e:
utils.logger.error(f"Error converting WEBP to JPG for {full_path}: {e}")
continue
elif ext == ".mp4":
new_paths.append(full_path)
else:
utils.logger.warning(f"Unsupported file format {ext} in {full_path}, skipping.")

if new_paths:
try:
cl.album_upload(new_paths, caption)
if delete_after_upload:
shutil.rmtree(folder_path)
utils.logger.info(f"Uploaded album from {folder_path}")
except instabot.exceptions.AlbumUnknownFormat as e:
utils.logger.error(f"Error uploading album {folder_path}: Unknown format. {e}")
except Exception as e:
utils.logger.error(f"Error uploading album {folder_path}: {e}")
else:
utils.logger.warning(f"No valid media found in album {folder_path}")

def is_album(folder_path: str) -> bool:
"""Determines if a folder contains an album (more than one media file)."""
valid_media = ['.jpg', '.jpeg', '.png', '.mp4', '.webp']
media_files = [f for f in os.listdir(folder_path) if os.path.splitext(f)[1].lower() in valid_media]
return len(media_files) > 1

def upload_album(cl, folder_path, caption, delete_after_upload):
"""Uploads an album to Instagram."""
posts = get_posts(folder_path)
new_paths = []

def upload_post(cl, path, posts_dir, caption, delete_after_upload):
path_to_post = posts_dir + '/' + path
for image_path in posts:
full_path = os.path.join(folder_path, image_path)
ext = os.path.splitext(image_path)[1].lower()
output_path = full_path

if ext in [".jpg", ".jpeg"]:
new_paths.append(full_path)
elif ext == ".png":
# Convert PNG to JPG
output_path = full_path.replace(".png", ".jpg")
try:
with Image.open(full_path) as img:
rgb_img = img.convert('RGB')
rgb_img.save(output_path)
new_paths.append(output_path)
except Exception as e:
utils.logger.error(f"Error converting PNG to JPG for {full_path}: {e}")
continue
elif ext == ".webp":
# Convert WEBP to JPG
output_path = full_path.replace(".webp", ".jpg")
try:
utils.convert_webp_to_jpg(full_path, output_path)
new_paths.append(output_path)
except Exception as e:
utils.logger.error(f"Error converting WEBP to JPG for {full_path}: {e}")
continue
elif ext == ".mp4":
new_paths.append(full_path)
else:
utils.logger.warning(f"Unsupported file format {ext} in {full_path}, skipping.")

if new_paths:
try:
cl.album_upload(new_paths, caption)
if delete_after_upload:
shutil.rmtree(folder_path)
utils.logger.info(f"Uploaded album from {folder_path}")
except instabot.exceptions.AlbumUnknownFormat as e:
utils.logger.error(f"Error uploading album {folder_path}: Unknown format. {e}")
except Exception as e:
utils.logger.error(f"Error uploading album {folder_path}: {e}")
else:
utils.logger.warning(f"No valid media found in album {folder_path}")

def upload_post(cl, folder_path, caption, delete_after_upload):
"""Uploads a single image post to Instagram."""
image_files = [f for f in os.listdir(folder_path) if is_image_file(os.path.join(folder_path, f))]
if not image_files:
utils.logger.warning(f"No image found in {folder_path}")
return
image_path = os.path.join(folder_path, image_files[0])
try:
cl.photo_upload(path_to_post, caption)
cl.photo_upload(image_path, caption)
except Exception as e:
utils.logger.error(f"Error uploading {path}: {e}")
utils.logger.error(f"Error uploading post {image_path}: {e}")
return

if delete_after_upload:
os.remove(path_to_post)

utils.logger.info(f"Uploaded {path}")
shutil.rmtree(folder_path)
utils.logger.info(f"Uploaded image post from {folder_path}")

def get_posts(directory: str) -> List[str]:
"""Retrieves a list of valid media files in a directory."""
valid_images = [".jpg", ".jpeg", ".png", ".webp", ".mp4"]
posts = []
pattern = r'^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}_UTC_(\d+)\.(jpg|jpeg|png|mp4|avi|mov)$'

for filename in os.listdir(directory):
if re.match(pattern, filename, re.IGNORECASE):
ext = os.path.splitext(filename)[1].lower()
if ext in valid_images:
posts.append(filename)
else:
# Log a warning or handle files with unexpected names as needed
utils.logger.warning(f"Ignoring file with unexpected name format: {filename}")

# Ensure the lambda only attempts to convert a well-formatted index number to integer
posts.sort(key=lambda filename: int(re.search(r'_(\d+)\.', filename).group(1)))

utils.logger.warning(f"Ignoring file with unexpected extension: {filename}")
return posts

def get_albums(post_dir: str) -> List[str]:
albums = []
for file in os.listdir(post_dir):
if os.path.isdir(post_dir + '/' + file):
albums.append(file)
return albums

def filter_posts(posts):
valid_images = [".jpg", ".webp", ".png", ".mp4"]
valid_posts = []
for post in posts:
ext = os.path.splitext(post)[1]
if ext.lower() not in valid_images:
continue
valid_posts.append(post)

return valid_posts

async def upload_stories(cl, account):
utils.logger.info("Uploading story...")
username = account['account_details']['username']
amount = account['upload_stories']['amount_per_day']
captions_file = account['upload_stories'].get('captions_file') # Captions file path
posts_dir = account['upload_stories']['posts_dir']
delete_after_upload = account['upload_stories']['delete_after_upload']
medias = os.listdir(posts_dir)

### MAKE THIS CONFIGURABLE ###
random.shuffle(medias)
##############################

captions = load_captions(captions_file)

for media in medias:
sleep_time = utils.calculate_sleep_time(amount)
TimeKeeper(username, 'upload_story', sleep_time)
await asyncio.sleep(sleep_time)

caption = captions.get(media, random.choice(list(captions.values()))) # Get caption or random
if is_post(media):
upload_story_post(cl, media, posts_dir, caption, delete_after_upload)
elif is_video(media):
upload_story_video(cl, media, posts_dir, caption, delete_after_upload)
elif is_album(posts_dir, media):
## invalid operation, skipping.
utils.logger.warning(f"{media} An album can't be uploaded as a story.")
else:
utils.logger.warning(f"{media} is not in a valid story format.")
continue

def upload_story_post(cl, path, posts_dir, caption, delete_after_upload):
path_to_post = posts_dir + '/' + path
def get_post_folders(posts_dir: str) -> List[str]:
"""Retrieves a list of folders in the given directory."""
return [f for f in os.listdir(posts_dir) if os.path.isdir(os.path.join(posts_dir, f))]

def get_caption_from_folder(folder_path: str) -> str:
"""Retrieves the caption from a caption.txt file in the given folder."""
caption_file = os.path.join(folder_path, 'caption.txt')
if not os.path.exists(caption_file):
return ""
try:
cl.photo_upload_to_story(path_to_post, caption)
with open(caption_file, 'r', encoding='utf-8') as f:
caption = f.read().strip()
return caption
except Exception as e:
utils.logger.error(f"Error uploading story post {path}: {e}")
return

if delete_after_upload:
os.remove(path_to_post)

utils.logger.info(f"Uploaded {path}")

def upload_story_video(cl, path, posts_dir, caption, delete_after_upload):
path_to_post = posts_dir + '/' + path

try:
cl.video_upload_to_story(path_to_post, caption)
except Exception as e:
utils.logger.error(f"Error uploading story video {path}: {e}")
return

if delete_after_upload:
os.remove(path_to_post)

utils.logger.info(f"Uploaded {path}")
utils.logger.error(f"Error reading caption file in {folder_path}: {e}")
return ""

def load_captions(captions_file: str) -> dict:
"""Loads fallback captions from a file."""
captions = {}
if not captions_file:
return captions
Expand Down
17 changes: 16 additions & 1 deletion src/instabot/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,22 @@ def get_approximate_sleep_time(days=0, hours=0, minutes=0) -> int:
return random.uniform(min_time, max_time)

def convert_webp_to_jpg(image_path, output_path):
Image.open(image_path).save(output_path, "JPEG")
"""Converts a WEBP image to JPG."""
try:
Image.open(image_path).convert("RGB").save(output_path, "JPEG")
logger.info(f"Converted WEBP to JPG: {output_path}")
except Exception as e:
logger.error(f"Error converting WEBP to JPG for {image_path}: {e}")
raise

def convert_png_to_jpg(image_path, output_path):
"""Converts a PNG image to JPG."""
try:
Image.open(image_path).convert("RGB").save(output_path, "JPEG")
logger.info(f"Converted PNG to JPG: {output_path}")
except Exception as e:
logger.error(f"Error converting PNG to JPG for {image_path}: {e}")
raise

def load_config(file_path: str) -> dict:
with open(file_path, 'r') as file:
Expand Down

0 comments on commit e1c00d5

Please sign in to comment.