1
+ import collections
2
+ import json
3
+ import os
4
+
5
+ from loguru import logger
6
+
7
+ from modules import util
8
+
9
+ from . import campaigns_config
10
+ from .. import site_config
11
+
12
+
13
+ def generate_campaigns ():
14
+ """
15
+ Responsible for verifying campaign directory and starting off campaign markdown generation.
16
+ """
17
+
18
+ # Create content pages directory if does not already exist
19
+ util .buildhelpers .create_content_pages_dir ()
20
+
21
+ # Move templates to templates directory
22
+ util .buildhelpers .move_templates (campaigns_config .module_name , campaigns_config .campaigns_templates_path )
23
+
24
+ # Verify if directory exists
25
+ if not os .path .isdir (campaigns_config .campaign_markdown_path ):
26
+ os .mkdir (campaigns_config .campaign_markdown_path )
27
+
28
+ # Generate redirections
29
+ util .buildhelpers .generate_redirections (
30
+ redirections_filename = campaigns_config .campaigns_redirection_location , redirect_md = site_config .redirect_md
31
+ )
32
+
33
+ # Generates the markdown files to be used for page generation
34
+ campaigns_generated = generate_markdown_files ()
35
+
36
+ if not campaigns_generated :
37
+ util .buildhelpers .remove_module_from_menu (campaigns_config .module_name )
38
+
39
+
40
+ def generate_markdown_files ():
41
+ """
42
+ Responsible for generating campaign index page and getting shared data for all campaigns.
43
+ """
44
+
45
+ has_campaign = False
46
+
47
+ campaign_list = util .relationshipgetters .get_campaign_list ()
48
+
49
+ campaign_list_no_deprecated_revoked = util .buildhelpers .filter_deprecated_revoked (campaign_list )
50
+
51
+ if campaign_list_no_deprecated_revoked :
52
+ has_campaign = True
53
+
54
+ if has_campaign :
55
+ data = {}
56
+
57
+ # Amount of characters per category
58
+ group_by = 2
59
+
60
+ notes = util .relationshipgetters .get_objects_using_notes ()
61
+ side_menu_data = util .buildhelpers .get_side_menu_data ("Campaigns" , "/campaigns/" , campaign_list_no_deprecated_revoked )
62
+ data ["side_menu_data" ] = side_menu_data
63
+
64
+ side_menu_mobile_view_data = util .buildhelpers .get_side_menu_mobile_view_data (
65
+ "campaigns" , "/campaigns/" , campaign_list_no_deprecated_revoked , group_by
66
+ )
67
+ data ["side_menu_mobile_view_data" ] = side_menu_mobile_view_data
68
+
69
+ data ["campaigns_table" ] = get_campaigns_table_data (campaign_list_no_deprecated_revoked )
70
+ data ["campaigns_list_len" ] = str (len (campaign_list_no_deprecated_revoked ))
71
+
72
+ subs = campaigns_config .campaign_index_md + json .dumps (data )
73
+
74
+ with open (
75
+ os .path .join (campaigns_config .campaign_markdown_path , "overview.md" ), "w" , encoding = "utf8"
76
+ ) as md_file :
77
+ md_file .write (subs )
78
+
79
+ # Create the markdown for the enterprise campaigns in the STIX
80
+ for campaign in campaign_list :
81
+ generate_campaign_md (campaign , side_menu_data , side_menu_mobile_view_data , notes )
82
+
83
+ return has_campaign
84
+
85
+
86
+ def generate_campaign_md (campaign , side_menu_data , side_menu_mobile_view_data , notes ):
87
+ """Responsible for generating markdown of all campaigns."""
88
+
89
+ attack_id = util .buildhelpers .get_attack_id (campaign )
90
+
91
+ if attack_id :
92
+ data = {}
93
+
94
+ data ["attack_id" ] = attack_id
95
+
96
+ data ["side_menu_data" ] = side_menu_data
97
+ data ["side_menu_mobile_view_data" ] = side_menu_mobile_view_data
98
+ data ["notes" ] = notes .get (campaign ["id" ])
99
+
100
+ # External references
101
+ ext_ref = campaign ["external_references" ]
102
+
103
+ dates = util .buildhelpers .get_created_and_modified_dates (campaign )
104
+ if dates .get ("created" ):
105
+ data ["created" ] = dates ["created" ]
106
+ if dates .get ("modified" ):
107
+ data ["modified" ] = dates ["modified" ]
108
+ if campaign .get ("name" ):
109
+ data ["name" ] = campaign ["name" ]
110
+ if campaign .get ("x_mitre_version" ):
111
+ data ["version" ] = campaign ["x_mitre_version" ]
112
+
113
+ campaign_dates = util .buildhelpers .get_first_last_seen_dates (campaign )
114
+ if campaign_dates .get ("first_seen" ):
115
+ data ["first_seen" ] = campaign_dates ["first_seen" ]
116
+ if campaign_dates .get ("last_seen" ):
117
+ data ["last_seen" ] = campaign_dates ["last_seen" ]
118
+
119
+ campaign_date_citations = util .buildhelpers .get_first_last_seen_citations (campaign )
120
+ if campaign_date_citations .get ("first_seen_citation" ):
121
+ data ["first_seen_citation" ] = campaign_date_citations ["first_seen_citation" ]
122
+ if campaign_date_citations .get ("last_seen_citation" ):
123
+ data ["last_seen_citation" ] = campaign_date_citations ["last_seen_citation" ]
124
+
125
+ if isinstance (campaign .get ("x_mitre_contributors" ), collections .abc .Iterable ):
126
+ data ["contributors_list" ] = campaign ["x_mitre_contributors" ]
127
+
128
+ # Get initial reference list
129
+ reference_list = {"current_number" : 0 }
130
+
131
+ # Get initial reference list from campaign object
132
+ reference_list = util .buildhelpers .update_reference_list (reference_list , campaign )
133
+
134
+ if campaign .get ("description" ):
135
+ data ["descr" ] = campaign ["description" ]
136
+
137
+ if campaign .get ("x_mitre_deprecated" ):
138
+ data ["deprecated" ] = True
139
+
140
+ # Get technique data for techniques used table
141
+ data ["technique_table_data" ] = get_techniques_used_by_campaign_data (campaign , reference_list )
142
+
143
+ # Get navigator layers for this campaign
144
+ layers = util .buildhelpers .get_navigator_layers (
145
+ data ["name" ],
146
+ data ["attack_id" ],
147
+ "campaign" ,
148
+ data ["version" ] if "version" in data else None ,
149
+ data ["technique_table_data" ],
150
+ )
151
+
152
+ data ["layers" ] = []
153
+ for layer in layers :
154
+ with open (
155
+ os .path .join (
156
+ campaigns_config .campaign_markdown_path ,
157
+ "-" .join ([data ["attack_id" ], "techniques" , layer ["domain" ]]) + ".md" ,
158
+ ),
159
+ "w" ,
160
+ encoding = "utf8" ,
161
+ ) as layer_json :
162
+ subs = site_config .layer_md .substitute (
163
+ {"attack_id" : data ["attack_id" ], "path" : "campaigns/" + data ["attack_id" ], "domain" : layer ["domain" ]}
164
+ )
165
+ subs = subs + layer ["layer" ]
166
+ layer_json .write (subs )
167
+ data ["layers" ].append (
168
+ {
169
+ "domain" : layer ["domain" ],
170
+ "filename" : "-" .join ([data ["attack_id" ], layer ["domain" ], "layer" ]) + ".json" ,
171
+ "navigator_link" : site_config .navigator_link ,
172
+ }
173
+ )
174
+
175
+ # Get group data for Group table
176
+ data ["group_data" ] = get_group_table_data (campaign , reference_list )
177
+
178
+ # Grab software data for Software table
179
+ data ["software_data" ] = get_software_table_data (campaign , reference_list )
180
+
181
+ if campaign .get ("aliases" ):
182
+ data ["alias_descriptions" ] = util .buildhelpers .get_alias_data (campaign ["aliases" ][1 :], ext_ref )
183
+
184
+ data ["citations" ] = reference_list
185
+
186
+ if isinstance (campaign .get ("aliases" ), collections .abc .Iterable ):
187
+ data ["aliases_list" ] = campaign ["aliases" ][1 :]
188
+
189
+ data ["versioning_feature" ] = site_config .check_versions_module ()
190
+
191
+ subs = campaigns_config .campaign_md .substitute (data )
192
+ subs = subs + json .dumps (data )
193
+
194
+ # Write out the markdown file
195
+ with open (
196
+ os .path .join (campaigns_config .campaign_markdown_path , data ["attack_id" ] + ".md" ), "w" , encoding = "utf8"
197
+ ) as md_file :
198
+ md_file .write (subs )
199
+
200
+
201
+ def get_campaigns_table_data (campaign_list ):
202
+ """Responsible for generating campaign table data for the campaign index page"""
203
+ campaigns_table_data = []
204
+
205
+ # Now the table on the right, which is made up of campaign data
206
+ for campaign in campaign_list :
207
+ attack_id = util .buildhelpers .get_attack_id (campaign )
208
+ if attack_id :
209
+ campaign_dates = util .buildhelpers .get_first_last_seen_dates (campaign )
210
+ row = {
211
+ "id" : attack_id ,
212
+ "name" : campaign ["name" ] if campaign .get ("name" ) else attack_id ,
213
+ "first_seen" : campaign_dates ["first_seen" ] if campaign_dates .get ("first_seen" ) else '' ,
214
+ "last_seen" : campaign_dates ["last_seen" ] if campaign_dates .get ("last_seen" ) else '' ,
215
+ }
216
+
217
+ if campaign .get ("description" ):
218
+ row ["descr" ] = campaign ["description" ]
219
+
220
+ if campaign .get ("x_mitre_deprecated" ):
221
+ row ["deprecated" ] = True
222
+
223
+ if isinstance (campaign .get ("aliases" ), collections .abc .Iterable ):
224
+ row ["aliases_list" ] = campaign ["aliases" ][1 :]
225
+
226
+ campaigns_table_data .append (row )
227
+
228
+ return campaigns_table_data
229
+
230
+
231
+ def get_group_table_data (campaign , reference_list ):
232
+ """Given a campaign, get the group table data."""
233
+ group_list = {} # group stix_id => {attack_id, name, description}
234
+ groups_attributed_to_campaign = util .relationshipgetters .get_groups_attributed_to_campaigns ()
235
+
236
+ if groups_attributed_to_campaign .get (campaign .get ("id" )):
237
+ for group in groups_attributed_to_campaign [campaign ["id" ]]:
238
+ group_id = group ["object" ]["id" ]
239
+ if group_id not in group_list :
240
+ attack_id = util .buildhelpers .get_attack_id (group ["object" ])
241
+ group_list [group_id ] = {
242
+ "id" : attack_id ,
243
+ "name" : group ["object" ]["name" ]
244
+ }
245
+
246
+ if group ["relationship" ].get ("description" ):
247
+ group_list [group_id ]["desc" ] = group ["relationship" ]["description" ]
248
+
249
+ # update reference list
250
+ reference_list = util .buildhelpers .update_reference_list (reference_list , group ["relationship" ])
251
+
252
+
253
+ group_data = [group_list [item ] for item in group_list ]
254
+ group_data = sorted (group_data , key = lambda k : k ["name" ].lower ())
255
+ return group_data
256
+
257
+ def get_techniques_used_by_campaign_data (campaign , reference_list ):
258
+ """Given a campaign and its reference list, get the techniques used by the campaign.
259
+
260
+ Check the reference list for citations, if not found in list, add it.
261
+ """
262
+ technique_list = {}
263
+ techniques_used_by_campaigns = util .relationshipgetters .get_techniques_used_by_campaigns ()
264
+
265
+ if techniques_used_by_campaigns .get (campaign .get ("id" )):
266
+ for technique in techniques_used_by_campaigns [campaign ["id" ]]:
267
+ # Do not add if technique is deprecated
268
+ if not technique ["object" ].get ("x_mitre_deprecated" ):
269
+ technique_list = util .buildhelpers .technique_used_helper (technique_list , technique , reference_list )
270
+
271
+ technique_data = []
272
+ for item in technique_list :
273
+ technique_data .append (technique_list [item ])
274
+ # Sort by technique name
275
+ technique_data = sorted (technique_data , key = lambda k : k ["name" ].lower ())
276
+
277
+ # Sort by domain name
278
+ technique_data = sorted (
279
+ technique_data , key = lambda k : [site_config .custom_alphabet .index (c ) for c in k ["domain" ].lower ()]
280
+ )
281
+ return technique_data
282
+
283
+
284
+ def get_software_table_data (campaign , reference_list ):
285
+ """Given a campaign, get software table data."""
286
+ software_list = {} # software stix_id => {attack_id, name, description}
287
+
288
+ # Creating map for tools/malware used by campaigns
289
+ software_used_by_campaign = [
290
+ util .relationshipgetters .get_tools_used_by_campaigns (),
291
+ util .relationshipgetters .get_malware_used_by_campaigns ()
292
+ ]
293
+
294
+ for res in software_used_by_campaign :
295
+ if res .get (campaign .get ("id" )):
296
+ for software in res [campaign ["id" ]]:
297
+ software_id = software ["object" ]["id" ]
298
+ if software_id not in software_list :
299
+ attack_id = util .buildhelpers .get_attack_id (software ["object" ])
300
+ software_list [software_id ] = {
301
+ "id" : attack_id ,
302
+ "name" : software ["object" ]["name" ]
303
+ }
304
+ if software ["relationship" ].get ("description" ):
305
+ software_list [software_id ]["desc" ] = software ["relationship" ]["description" ]
306
+
307
+ # update reference list
308
+ reference_list = util .buildhelpers .update_reference_list (reference_list , software ["relationship" ])
309
+
310
+ software_data = [software_list [item ] for item in software_list ]
311
+ software_data = sorted (software_data , key = lambda k : k ["name" ].lower ())
312
+ return software_data
0 commit comments