-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathxznicer.py
executable file
·94 lines (77 loc) · 3.01 KB
/
xznicer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#!/usr/bin/python3
import os
import subprocess
import threading
import argparse
results = []
print_lock = threading.Lock()
def xznicer_test_nices(nice_values, thread_id):
command_template = f"xz --format=xz -9 --extreme --lzma2=preset=9,lc=0,lp=0,pb=0,nice={{}} --keep --stdout {{}} > /tmp/out{thread_id}.xz"
total_steps = len(nice_values)
if total_steps == 0:
return
tick = 100.0 / total_steps
progress = 0.0
for idx, nice in enumerate(nice_values):
command = command_template.format(nice, input_file)
result = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode == 0:
try:
size = os.path.getsize(f"/tmp/out{thread_id}.xz")
results.append((size, str(nice)))
except OSError as e:
with print_lock:
print(f"Error accessing file: {e}")
finally:
os.remove(f"/tmp/out{thread_id}.xz")
else:
with print_lock:
print(f"Warning: 'xz' command failed for nice={nice}, skipping.")
progress += tick
with print_lock:
print(f"Thread {thread_id}: Finding optimal LZMA2-nice-parameter {int(progress)}%", end="\r")
def xznicer():
num_cores = os.cpu_count() or 1
nice_values = list(range(2, 274))
chunk_size = (len(nice_values) + num_cores - 1) // num_cores
threads = []
for i in range(num_cores):
start = i * chunk_size
end = min(start + chunk_size, len(nice_values))
chunk = nice_values[start:end]
if not chunk:
continue
thread = threading.Thread(target=xznicer_test_nices, args=(chunk, i+1))
threads.append(thread)
thread.start()
for t in threads:
t.join()
results.sort(key=lambda x: x[0])
if verbose:
for row in results:
print(row)
print("Best choice:")
best = results[0]
print(f"nice={best[1]}, Uses {best[0]} bytes")
final_command = f"xz --format=xz -9 --extreme --lzma2=preset=9,lc=0,lp=0,pb=0,nice={best[1]} --keep --stdout {input_file} > {output_file}"
print(final_command)
subprocess.run(final_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
def main():
parser = argparse.ArgumentParser(description="Optimize LZMA2 compression using the 'xz' tool by finding the best 'nice' parameter.")
parser.add_argument("input_file", help="Input file to compress")
parser.add_argument("-o", "--output", help="Output file name", required=True)
parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose output")
args = parser.parse_args()
global input_file, output_file, verbose
input_file = args.input_file
output_file = args.output
verbose = args.verbose
if not input_file:
print("No input file given")
return
elif not output_file:
print("No output file given")
return
xznicer()
if __name__ == "__main__":
main()