-
Notifications
You must be signed in to change notification settings - Fork 900
/
Copy pathems_event.rb
421 lines (346 loc) · 13.3 KB
/
ems_event.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
class EmsEvent < EventStream
include Automate
CLONE_TASK_COMPLETE = "CloneVM_Task_Complete"
SOURCE_DEST_TASKS = [
'CloneVM_Task',
'MarkAsTemplate',
'MigrateVM_Task',
'RelocateVM_Task',
'Rename_Task',
]
CLASS_GROUP_LEVELS = %i[critical warning].freeze
def self.description
_("Management Events")
end
def self.class_group_levels
CLASS_GROUP_LEVELS
end
def self.group_names_and_levels
event_groups.each_with_object(default_group_names_and_levels) do |(group_name, group_details), hash|
hash[:group_names][group_name] = group_details[:name]
group_details.each_key do |level|
next if level == :name
hash[:group_levels] ||= {}
hash[:group_levels][level] ||= level.to_s.capitalize
end
end
end
def self.event_groups
@event_groups ||= begin
core_event_groups = ::Settings.event_handling.event_groups.to_hash
Settings.ems.each_with_object(core_event_groups) do |(_provider_type, provider_settings), event_groups|
provider_event_groups = provider_settings.fetch_path(:event_handling, :event_groups)
next unless provider_event_groups
DeepMerge.deep_merge!(
provider_event_groups.to_hash, event_groups,
:preserve_unmergeables => false,
:overwrite_arrays => false
)
end
end
end
private_class_method def self.partition_group_and_level_by_event_type
return @literal_group_and_level_by_event_type, @regex_group_and_level_by_event_type if @literal_group_and_level_by_event_type
@literal_group_and_level_by_event_type = {}
@regex_group_and_level_by_event_type = {}
event_groups.each do |group_name, group_contents|
group_contents.each do |group_level, event_types|
next if group_level == :name
event_types.each do |event_type|
if event_type.starts_with?("/")
@regex_group_and_level_by_event_type[Regexp.new(event_type[1..-2])] ||= [group_name, group_level]
else
@literal_group_and_level_by_event_type[event_type] ||= [group_name, group_level]
end
end
end
end
return @literal_group_and_level_by_event_type, @regex_group_and_level_by_event_type
end
def self.clear_event_groups_cache
@event_groups = @literal_group_and_level_by_event_type = @regex_group_and_level_by_event_type = nil
end
def self.group_and_level(event_type)
by_literal, by_regex = partition_group_and_level_by_event_type
by_literal[event_type] ||
by_regex.detect { |regex, _| regex.match?(event_type) }&.last ||
[DEFAULT_GROUP_NAME, DEFAULT_GROUP_LEVEL]
end
def self.group_name(group)
return if group.nil?
event_groups.dig(group.to_sym, :name) || DEFAULT_GROUP_NAME_STR
end
def handle_event
EmsEventHelper.new(self).handle
rescue => err
_log.log_backtrace(err)
end
def self.task_final_events
::Settings.event_handling.task_final_events.to_hash
end
def self.bottleneck_event_groups
::Settings.event_handling.bottleneck_event_groups.to_hash
end
def self.add_queue(meth, ems_id, event)
if MiqEventHandler.worker_settings[:dequeue_method] != "drb" && MiqQueue.messaging_client('event_handler').present?
MiqQueue.messaging_client('event_handler').publish_topic(
:service => "manageiq.#{MiqEventHandler.default_queue_name}",
:sender => ems_id,
:event => event[:event_type],
:payload => event
)
else
MiqQueue.submit_job(
:service => "event",
:target_id => ems_id,
:class_name => "EmsEvent",
:method_name => meth,
:args => [event]
)
end
end
def self.add(ems_id, event_hash)
event_type = event_hash[:event_type]
raise MiqException::Error, _("event_type must be set in event") if event_type.nil?
event_hash[:ems_id] = ems_id
process_vm_in_event!(event_hash)
process_vm_in_event!(event_hash, :prefix => "dest_")
process_host_in_event!(event_hash)
process_host_in_event!(event_hash, :prefix => "dest_")
process_availability_zone_in_event!(event_hash)
process_cluster_in_event!(event_hash)
process_container_entities_in_event!(event_hash)
process_physical_storage_in_event!(event_hash)
# Write the event
new_event = create_event(event_hash)
return if new_event.nil? # If the event is a duplicate skip further processing
# Create a 'completed task' event if this is the last in a series of events
create_completed_event(event_hash) if task_final_events.key?(event_type.to_sym)
syndicate_event(ems_id, event_hash) if syndicate_events?
new_event
end
def self.process_object_in_event!(klass, event, options = {})
prefix = options[:prefix]
key_prefix = options[:key_prefix] || klass.name.underscore
id_key = options[:id_key] || "#{prefix}#{key_prefix}_id".to_sym
ems_ref_key = options[:ems_ref_key] || "#{prefix}#{key_prefix}_ems_ref".to_sym
name_key = options[:name_key] || "#{prefix}#{key_prefix}_name".to_sym
if event[id_key].nil?
ems_ref = event[ems_ref_key]
object = klass.base_class.find_by(:ems_ref => ems_ref, :ems_id => event[:ems_id]) unless ems_ref.nil?
unless object.nil?
event[id_key] = object.id
event[name_key] ||= object.name
end
end
end
def self.process_vm_in_event!(event, options = {})
prefix = options[:prefix]
options[:id_key] = "#{prefix}vm_or_template_id".to_sym
uid_ems = event.delete(:vm_uid_ems)
process_object_in_event!(Vm, event, options)
if options[:id_key] == :vm_or_template_id && event[:vm_or_template_id].nil?
vm = VmOrTemplate.find_by(:uid_ems => uid_ems) unless uid_ems.nil?
unless vm.nil?
event[:vm_or_template_id] = vm.id
event[:vm_name] ||= vm.name
end
end
end
def self.process_host_in_event!(event, options = {})
uid_ems = event.delete(:host_uid_ems)
process_object_in_event!(Host, event, options)
if event[:host_id].nil? && uid_ems.present?
# Attempt to find a host in the current EMS first, then fallback to archived hosts
host = Host.where(:uid_ems => uid_ems, :ems_id => [event[:ems_id], nil]).order("ems_id NULLS LAST").first
unless host.nil?
event[:host_id] = host.id
event[:host_name] ||= host.name
end
end
end
def self.process_physical_storage_in_event!(event, options = {})
process_object_in_event!(PhysicalStorage, event, options)
end
def self.process_container_entities_in_event!(event, _options = {})
[ContainerNode, ContainerGroup, ContainerReplicator].each do |entity|
process_object_in_event!(entity, event)
end
end
def self.process_availability_zone_in_event!(event, options = {})
process_object_in_event!(AvailabilityZone, event, options)
if event[:availability_zone_id].nil? && event[:vm_or_template_id]
vm = VmOrTemplate.find(event[:vm_or_template_id])
if vm.respond_to?(:availability_zone)
availability_zone = vm.availability_zone
unless availability_zone.nil?
event[:availability_zone_id] = availability_zone.id
end
end
end
# there's no "availability_zone_name" column in ems_event
# availability_zone_name may be added by process_vm_in_event
# prevent EmsEvent from trying to set the event attribute for availability_zone_name
event.delete(:availability_zone_name)
end
def self.process_cluster_in_event!(event, options = {})
process_object_in_event!(EmsCluster, event, options)
end
def self.first_chained_event(ems_id, chain_id)
return nil if chain_id.nil?
EmsEvent.where(:ems_id => ems_id, :chain_id => chain_id).order(:id).first
end
def parse_event_metadata
data = full_data || {}
[
event_type == "datawarehouse_alert" ? message : nil,
data[:severity],
data[:url],
data[:resolved],
]
end
def first_chained_event
@first_chained_event ||= EmsEvent.first_chained_event(ems_id, chain_id) || self
end
def get_target(target_type)
target_type = target_type.to_s
if target_type =~ /^first_chained_(.+)$/
target_type = $1
event = first_chained_event
else
event = self
end
target_type = "src_vm_or_template" if target_type == "src_vm"
target_type = "dest_vm_or_template" if target_type == "dest_vm"
target_type = "target" if event.event_type == "datawarehouse_alert"
event.send(target_type)
end
def tenant_identity
(vm_or_template || ext_management_system).tenant_identity
end
def manager_refresh_targets
if ext_management_system.allow_targeted_refresh?
require "inventory_refresh"
ext_management_system.class::EventTargetParser.new(self).parse
else
ext_management_system
end
end
def self.display_name(number = 1)
n_('Management Event', 'Management Events', number)
end
private
def self.event_allowed_ems_ref_keys
%w[vm_ems_ref dest_vm_ems_ref]
end
private_class_method :event_allowed_ems_ref_keys
def self.create_event(event)
event.delete_if { |k,| k.to_s.ends_with?("_ems_ref") && !event_allowed_ems_ref_keys.include?(k.to_s) }
new_event = EmsEvent.create(event) unless EmsEvent.exists?(
:event_type => event[:event_type],
:timestamp => event[:timestamp],
:chain_id => event[:chain_id],
:ems_id => event[:ems_id],
:ems_ref => event[:ems_ref]
)
new_event.handle_event if new_event
new_event
end
private_class_method :create_event
def self.create_completed_event(event, orig_task = nil)
if orig_task.nil?
orig_task = first_chained_event(event[:ems_id], event[:chain_id])
return if orig_task.nil?
end
if task_final_events[event[:event_type].to_sym].include?(orig_task.event_type)
event = MiqHashStruct.new(event)
# Determine which event has the details for the source and dest
if SOURCE_DEST_TASKS.include?(orig_task.event_type)
source_event = orig_task
dest_event = event
else
source_event = event
dest_event = nil
end
# Build the 'completed task' event
new_event_type = "#{orig_task.event_type}_Complete"
new_event = {
:event_type => new_event_type,
:chain_id => event.chain_id,
:is_task => true,
:source => 'EVM',
:ems_id => event.ems_id,
:message => "#{orig_task.event_type} Completed",
:timestamp => event.timestamp,
:host_name => source_event.host_name,
:host_id => source_event.host_id,
:vm_name => source_event.vm_name,
:vm_location => source_event.vm_location,
:vm_ems_ref => source_event.vm_ems_ref,
:vm_or_template_id => source_event.vm_or_template_id
}
new_event[:username] = event.username if event.username.present?
# Fill in the dest information if we have it
unless dest_event.nil?
# Determine from which field to get the dest information
dest_key = dest_event.dest_vm_name.nil? ? '' : 'dest_'
new_event.merge!(
:dest_host_name => dest_event.host_name,
:dest_host_id => dest_event.host_id,
:dest_vm_name => dest_event.send(:"#{dest_key}vm_name"),
:dest_vm_location => dest_event.send(:"#{dest_key}vm_location"),
:dest_vm_ems_ref => dest_event.send(:"#{dest_key}vm_ems_ref"),
:dest_vm_or_template_id => dest_event.send(:"#{dest_key}vm_or_template_id")
)
end
create_event(new_event)
end
end
private_class_method :create_completed_event
private_class_method def self.syndicate_event(ems_id, event)
ems = ExtManagementSystem.find(ems_id)
event = event.dup
event[:ems_uid] = ems&.uid_ems
event[:ems_type] = ems&.class&.ems_type
MiqQueue.messaging_client('event_handler')&.publish_topic(
:service => "manageiq.ems-events",
:sender => ems_id,
:event => event[:event_type],
:payload => event
)
rescue => err
_log.warn("Failed to publish event [#{ems_id}] [#{event[:event_type]}]: #{err}")
_log.log_backtrace(err)
end
private_class_method def self.syndicate_events?
Settings.event_streams.syndicate_events && MiqQueue.messaging_type != "miq_queue"
end
def get_refresh_target(target_type)
m = "#{target_type}_refresh_target"
try(m)
end
def vm_refresh_target
(vm_or_template && vm_or_template.ext_management_system ? vm_or_template : host_refresh_target)
end
alias_method :src_vm_refresh_target, :vm_refresh_target
def src_vm_or_dest_host_refresh_target
vm_or_template ? vm_refresh_target : dest_host_refresh_target
end
def host_refresh_target
(host && host.ext_management_system ? host : ems_refresh_target)
end
alias_method :src_host_refresh_target, :host_refresh_target
def dest_vm_refresh_target
(dest_vm_or_template && dest_vm_or_template.ext_management_system ? dest_vm_or_template : dest_host_refresh_target)
end
def dest_host_refresh_target
(dest_host && dest_host.ext_management_system ? dest_host : ems_refresh_target)
end
def ems_cluster_refresh_target
ext_management_system
end
alias_method :src_ems_cluster_refresh_target, :ems_cluster_refresh_target
def ems_refresh_target
ext_management_system
end
end