forked from OpenShopChannel/RepositoryManager
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhelpers.py
181 lines (147 loc) · 6.9 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import importlib
import os
import random
import stat
import sys
import pygit2
from flask import url_for
from models import SettingsModel, db, ModeratedBinariesModel
def get_settings():
settings = {}
for setting in SettingsModel.query.all():
settings[setting.key] = setting.value
return settings
def update_setting(key: str, value: str):
# create if it doesn't exist
if SettingsModel.query.filter_by(key=key).first() is None:
settings = SettingsModel(key=key, value=value)
db.session.add(settings)
else:
settings = SettingsModel.query.filter_by(key=key).first()
settings.value = value
db.session.commit()
# delete a directory and all its contents, even if read-only files are present
def rmtree(top):
for root, dirs, files in os.walk(top, topdown=False):
for name in files:
filename = os.path.join(root, name)
os.chmod(filename, stat.S_IWUSR)
os.remove(filename)
for name in dirs:
os.rmdir(os.path.join(root, name))
os.rmdir(top)
def pull(repo, remote_name='origin', branch='master'):
"""
Pull changes for the specified remote (defaults to origin).
Code originally from Michael Boselowitz at: https://github.com/MichaelBoselowitz/pygit2-examples/blob/master/examples.py#L54-L91
Licensed under the MIT license.
"""
for remote in repo.remotes:
if remote.name == remote_name:
remote.fetch()
remote_master_id = repo.lookup_reference('refs/remotes/origin/%s' % (branch)).target
merge_result, _ = repo.merge_analysis(remote_master_id)
# Up to date, do nothing
if merge_result & pygit2.GIT_MERGE_ANALYSIS_UP_TO_DATE:
return
# We can just fastforward
elif merge_result & pygit2.GIT_MERGE_ANALYSIS_FASTFORWARD:
repo.checkout_tree(repo.get(remote_master_id))
try:
master_ref = repo.lookup_reference('refs/heads/%s' % (branch))
master_ref.set_target(remote_master_id)
except KeyError:
repo.create_branch(branch, repo.get(remote_master_id))
repo.head.set_target(remote_master_id)
elif merge_result & pygit2.GIT_MERGE_ANALYSIS_NORMAL:
repo.merge(remote_master_id)
if repo.index.conflicts is not None:
for conflict in repo.index.conflicts:
print('Conflicts found in:', conflict[0].path)
raise AssertionError('Conflicts, ahhhhh!!')
user = repo.default_signature
tree = repo.index.write_tree()
commit = repo.create_commit('HEAD',
user,
user,
'Merge!',
tree,
[repo.head.target, remote_master_id])
# We need to do this or git CLI will think we are still merging.
repo.state_cleanup()
else:
raise AssertionError('Unknown merge analysis result')
def app_index_directory_location(slug):
return os.path.join('data', 'contents', slug)
def notifications():
return {
"pending_moderation": ModeratedBinariesModel.query.filter_by(status='pending').count()
}
def load_source_downloader(source_type):
module_path = os.path.join(sys.path[0], "sources", f"{source_type}.py")
if not os.path.isfile(module_path):
raise Exception(f"Unsupported source type: {source_type}")
spec = importlib.util.spec_from_file_location(source_type, module_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
SourceDownloader = getattr(module, "SourceDownloader", None)
if SourceDownloader is None:
raise Exception(f"Unsupported source type: {source_type}")
return SourceDownloader
def get_available_source_downloader_details():
source_directory = "sources"
source_files = [f for f in os.listdir(source_directory) if f.endswith(".py") and f != "base_source_downloader.py"]
source_details = []
for source_file in source_files:
source_type = source_file[:-3] # Remove the ".py" extension
SourceDownloader = load_source_downloader(source_type)
name = getattr(SourceDownloader, "name")
description = getattr(SourceDownloader, "description")
source_details.append({
"type": source_type,
"name": name,
"description": description
})
return source_details
def generate_title_id():
random_value = random.randint(0x000000, 0xFFFFFF)
# Convert to hexadecimal and format it as a string
random_hex = format(random_value, 'X')
# Ensure the hexadecimal value is represented with 6 characters by adding leading zeros if necessary
random_hex = random_hex.zfill(6)
id_prefix = "000100014E"
final_id = f"{id_prefix}{random_hex}"
return final_id
def describe_app(package):
return {
"author": package["metaxml"]["app"].get("coder", package["information"]["author"]),
"category": package["information"]["category"],
"description": {
"long": package["metaxml"]["app"].get("long_description", "No description provided."),
"short": package["metaxml"]["app"].get("short_description", "No description provided.")
},
"file_size": {
"binary": package["index_computed_info"]["binary_size"],
"icon": package["index_computed_info"]["icon_size"],
"zip_compressed": package["index_computed_info"]["compressed_size"],
"zip_uncompressed": package["index_computed_info"]["uncompressed_size"]
},
"flags": package["information"].get("flags", []),
"name": package["metaxml"]["app"].get("name", package["information"]["name"]),
"package_type": package["index_computed_info"]["package_type"],
"peripherals": package["information"]["peripherals"],
"release_date": package["index_computed_info"].get("release_date", 0),
"shop": {
"title_id": package["index_computed_info"].get("title_id"),
"title_version": package["index_computed_info"].get("title_version")
},
"slug": package["information"]["slug"],
"subdirectories": package["index_computed_info"]["subdirectories"],
"supported_platforms": package["information"].get("supported_platforms", []),
"url": {
"icon": url_for('api.get_content_icon', slug=package["information"]["slug"], _external=True),
"zip": url_for('api.get_content_zip', slug=package["information"]["slug"],
_slug=package["information"]["slug"], _external=True),
},
"version": package["metaxml"]["app"].get("version", "Unknown")
}