-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathopcode_data_preprocessing.py
149 lines (121 loc) · 5.79 KB
/
opcode_data_preprocessing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import os
import yaml
import numpy as np
from collections import Counter
from tqdm import tqdm
from sklearn.feature_extraction.text import TfidfTransformer
def load_config():
"""Load configuration from a YAML file."""
with open("config.yaml") as f:
config = yaml.safe_load(f)
return config["data"]["train_opcode_codesection"], config["data"]["test_opcode_codesection"]
def read_sample_opcodes(folder_path):
"""
Reads samples from the given folder and extracts labels and content.
Args:
folder_path (str): Path to the folder containing class subfolders.
Returns:
list of str: List of file contents.
list of int: Corresponding labels.
list of str: Class names mapped to labels.
"""
samples = []
labels = []
class_names = sorted(os.listdir(folder_path))
class_to_label = {name: idx for idx, name in enumerate(class_names)}
for class_name in class_names:
class_folder = os.path.join(folder_path, class_name)
for file_name in os.listdir(class_folder):
file_path = os.path.join(class_folder, file_name)
with open(file_path, 'r') as file:
content = file.read()
samples.append(content)
labels.append(class_to_label[class_name])
return samples, labels, class_names
# Build global opcode dictionary
def build_global_opcode_dict(train_folder, test_folder):
"""Build a global dictionary of all unique opcodes from train and test sets."""
opcode_set = set()
def extract_opcodes_from_folder(folder):
for class_folder in os.listdir(folder):
class_path = os.path.join(folder, class_folder)
if os.path.isdir(class_path):
for file_name in os.listdir(class_path):
file_path = os.path.join(class_path, file_name)
with open(file_path, 'r') as f:
opcodes = f.read().strip().split(',')
opcode_set.update(opcodes)
# Extract opcodes from train and test folders
extract_opcodes_from_folder(train_folder)
extract_opcodes_from_folder(test_folder)
# Sort opcodes alphabetically and assign indices
sorted_opcodes = sorted(opcode_set)
return {opcode: idx for idx, opcode in enumerate(sorted_opcodes)}
# Map class labels to integers
def map_labels(train_folder):
"""Map class (folder) names to integer labels in alphabetical order."""
class_names = sorted(os.listdir(train_folder))
return {class_name: idx for idx, class_name in enumerate(class_names)}
# Generate feature matrix and labels
def generate_features_and_labels(folder, opcode_dict, label_map):
"""Generate feature matrix and label array for a given folder."""
features = []
labels = []
for class_name, label in label_map.items():
class_path = os.path.join(folder, class_name)
if os.path.isdir(class_path):
for file_name in tqdm(os.listdir(class_path), desc=f"Processing {class_name}"):
file_path = os.path.join(class_path, file_name)
with open(file_path, 'r') as f:
opcodes = f.read().strip().split(',')
opcode_counts = Counter(opcodes)
# Create feature vector
feature_vector = np.zeros(len(opcode_dict), dtype=np.float32)
for opcode, count in opcode_counts.items():
if opcode in opcode_dict:
feature_vector[opcode_dict[opcode]] = count
features.append(feature_vector)
labels.append(label)
return np.array(features), np.array(labels)
def main():
train_folder, test_folder = load_config()
# Build global opcode dictionary.
print("Building global opcode dictionary...")
opcode_dict = build_global_opcode_dict(train_folder, test_folder)
print(f"Total unique opcodes: {len(opcode_dict)}")
# Map labels to integers.
print("Mapping class labels to integers...")
label_map = map_labels(train_folder)
print(f"Class label mapping: {label_map}")
# Generate features and labels for train set.
print("Generating features and labels for train set...")
X_train, y_train = generate_features_and_labels(train_folder, opcode_dict, label_map)
print(f"Train set: {X_train.shape} features, {y_train.shape} labels")
unique_classes = np.unique(y_train)
X_train_submatrices = {
label: X_train[y_train == label] for label in unique_classes
}
X_train_tfidf = np.zeros_like(X_train)
for class_label, sub_matrix in X_train_submatrices.items():
print(f"Class {class_label}: {sub_matrix.shape} samples")
transformer = TfidfTransformer(norm=None, use_idf=True)
X_train_tfidf[y_train == class_label] = transformer.fit_transform(sub_matrix).toarray()
# Generate features and labels for test set.
print("Generating features and labels for test set...")
X_test, y_test = generate_features_and_labels(test_folder, opcode_dict, label_map)
print(f"Test set: {X_test.shape} features, {y_test.shape} labels")
X_test_submatrices = {
label: X_test[y_test == label] for label in unique_classes
}
X_test_tfidf = np.zeros_like(X_test)
for class_label, sub_matrix in X_test_submatrices.items():
print(f"Class {class_label}: {sub_matrix.shape} samples")
transformer = TfidfTransformer(norm=None, use_idf=True)
X_test_tfidf[y_test == class_label] = transformer.fit_transform(sub_matrix).toarray()
# Save all the sets in a zip file.
print("Saving the processed data...")
np.savez("data/opcode_data_tfidf.npz", X_train=X_train_tfidf, y_train=y_train
, X_test=X_test_tfidf, y_test=y_test, label_map=label_map, opcode_dict=opcode_dict)
print("Data preparation complete. Files saved.")
if __name__ == "__main__":
main()