-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstartrails.py
155 lines (127 loc) · 5.7 KB
/
startrails.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#!/usr/bin/env python3
import os
import numpy
import argparse
import math
import logging
from pathlib import Path
from datetime import datetime
from PIL import Image
TOTAL_IMAGE_COUNT = 0
OUTPUT_LOCATION = "."
class ImageFile():
IMAGE_COUNTER = 0
def __init__(self, file_path: str):
self.path = file_path
image = Image.open(self.path)
self.array = numpy.array(image, dtype=numpy.float)
self.size = image.size
self.num = ImageFile.IMAGE_COUNTER
ImageFile.IMAGE_COUNTER += 1
logging.info("loaded image {}/{}: '{}'".format(
self.num + 1, TOTAL_IMAGE_COUNT, self.path))
def __del__(self):
logging.debug("deleted image {}/{}".format(self.num, TOTAL_IMAGE_COUNT))
class Stack():
def __init__(self, img_range: list):
self.img_range = img_range
self.array = None
self.width, self.height = (None, None)
self._image_limit = len(img_range)
self._n_images = 0
def add_image(self, image: Image.Image):
if self._n_images == self._image_limit:
Exception("Stack has already been processed & output!!!!")
if self.array is None:
self.width, self.height = image.size
self.array = numpy.zeros((self.height, self.width, 3), numpy.float)
elif image.size != (self.width, self.height):
raise Exception("{} != {}".format(image.size, (self.width, self.height))) # TODO include filename w/ exception
self.array = numpy.maximum(self.array, image.array)
self._n_images = self._n_images + 1
logging.info("image {}, added to stack {}".format(image.num, self.img_range))
if self._n_images == self._image_limit:
self.output()
def output(self):
if self.array is None:
return # maybe should log
new_stack = numpy.array(numpy.round(self.array), dtype=numpy.uint8)
output = Image.fromarray(new_stack, mode="RGB")
path = "{}/stack_{}-{}.jpeg".format(OUTPUT_LOCATION, self.img_range.start, self.img_range.stop)
output.save(path, "JPEG")
logging.info("Saved stack to {}".format(path))
# cleanup memory footprint of this object
del self.array
self.array = None
def get_subset_of_files(files: list,
start_filename: str = None,
end_filename: str = None) -> list:
if start_filename is None and end_filename is None:
return files
start_index = 0
end_index = len(files)
if start_filename:
start_index = files.index(start_filename)
if end_filename:
end_index = files.index(end_filename)
# both can throw ValueError
return files[start_index:end_index]
def create_stacks_of_size(stack_size: int) -> list:
n_stacks = math.ceil(TOTAL_IMAGE_COUNT/stack_size)
stacks = []
for i in range(0, n_stacks):
stk = Stack(range(i * stack_size, i * stack_size + stack_size))
stacks.append(stk)
return stacks
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser(description="")
parser.add_argument('--source_dir', type=str, required=True, help="")
parser.add_argument('--start_filename', type=str, required=False, default=None, help="")
parser.add_argument('--end_filename', type=str, required=False, default=None, help="")
parser.add_argument('--output_dir', type=str, required=False, help="")
parser.add_argument('--skip_num', type=int, required=False, default=None, help="")
parser.add_argument('--filetype', type=str, required=False, default="jpg", help="")
parser.add_argument('--exclude_filename', type=str, action='append')
args = parser.parse_args()
if args.output_dir:
OUTPUT_LOCATION = args.output_dir
if not os.path.isdir(OUTPUT_LOCATION):
raise Exception("--output_dir path directory doesn't exist: {}".format(OUTPUT_LOCATION))
date = datetime.now().strftime("%Y-%m-%d")
export_directory = OUTPUT_LOCATION + "/" + date
Path(export_directory).mkdir(parents=True, exist_ok=True)
OUTPUT_LOCATION = export_directory
file_type = args.filetype.lower()
accepted_filetypes = ["jpg", "raf"] # jpeg
if file_type not in accepted_filetypes:
raise Exception("--filetype '{}' provided is not supported".format(args.filetype))
file_paths = os.listdir(args.source_dir)
file_paths.sort()
file_paths = get_subset_of_files(file_paths, args.start_filename, args.end_filename)
if args.exclude_filename:
for filename in args.exclude_filename:
file_paths.remove(filename)
logging.info("Skipping {} as provided by --exclude_filename".format(filename))
# create full file paths w/ source_dir included
file_paths = [args.source_dir + x for x in file_paths]
file_type_endings = ("." + file_type, "." + file_type.upper())
image_paths = list(filter(lambda x: x.endswith(file_type_endings), file_paths))
if not image_paths:
raise Exception("No images of type {} found in the --source_dir {}".format(file_type, args.source_dir))
if args.skip_num:
image_paths = image_paths[::args.skip_num]
TOTAL_IMAGE_COUNT = len(image_paths)
stacks_to_process = []
#stacks_to_process.extend(create_stacks_of_size(40))
half = int(TOTAL_IMAGE_COUNT / 2)
stacks_to_process.append(Stack(range(0, half)))
stacks_to_process.append(Stack(range(half, TOTAL_IMAGE_COUNT)))
stacks_to_process.append(Stack(range(0, TOTAL_IMAGE_COUNT)))
for path in image_paths:
image = ImageFile(path)
for stack in stacks_to_process:
if image.num in stack.img_range:
stack.add_image(image)
for stack in stacks_to_process:
stack.output()