Skip to content
This repository was archived by the owner on Nov 15, 2019. It is now read-only.

Extending event catcher and adding targeted refresh for availabilities #27

Merged
merged 3 commits into from
Jun 27, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
module ManageIQ::Providers
class Hawkular::Inventory::AvailabilityUpdates
delegate :select, :to => :@targets

def initialize(targets)
@targets = targets
end

def name
"Collection of availabilities to update on inventory entities"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the name displayed somewhere? if yes, it needs translation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only usage is for writing it in logs.

end

def id
"Collection: #{@targets.map(&:manager_ref)}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the id be such a long string?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't need to be long. But, as name, it's only used when writing logs here. I found no other usage of it. I'm trying to be verbose for logging.

end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module ManageIQ::Providers
class Hawkular::Inventory::Collector::AvailabilityUpdates < ManagerRefresh::Inventory::Collector
def deployment_updates
@target.select { |item| item.association == :middleware_deployments }
end

def server_updates
@target.select { |item| item.association == :middleware_servers }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you test your code for the WF running in domain mode?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I tested domain mode. There is no need to make any distinction.

end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
module ManageIQ::Providers
class Hawkular::Inventory::Parser::AvailabilityUpdates < ManagerRefresh::Inventory::Parser
def parse
fetch_server_availabilities
fetch_deployment_availabilities
end

def fetch_server_availabilities
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like there's an opportunity here for code reuse.
because this method and the one below look exactly the same

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, I'll try to think a way to make it reusable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't think a way to refactor it :(

collector.server_updates.each do |item|
server = persister.middleware_servers.find_or_build(item.manager_ref[:ems_ref])
server.properties = item.options
end
end

def fetch_deployment_availabilities
collector.deployment_updates.each do |item|
deployment = persister.middleware_deployments.find_or_build(item.manager_ref[:ems_ref])
deployment.status = item.options[:status]
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ class Inventory::Parser::MiddlewareManager < ManagerRefresh::Inventory::Parser
include ::Hawkular::ClientUtils

def initialize
super
@data_index = {}
end

Expand Down Expand Up @@ -164,19 +165,19 @@ def fetch_availability
end

def fetch_deployment_availabilities(feeds)
metrics_type_id = 'Deployment%20Status~Deployment%20Status'
fetch_availabilities_for(feeds, persister.middleware_deployments, metrics_type_id) do |deployment, availability|
collection = persister.middleware_deployments
fetch_availabilities_for(feeds, collection, collection.model_class::AVAIL_TYPE_ID) do |deployment, availability|
deployment.status = process_deployment_availability(availability.try(:[], 'data').try(:first))
end
end

def fetch_server_availabilities(feeds)
metrics_type_id = 'Server%20Availability~Server%20Availability'
fetch_availabilities_for(feeds, persister.middleware_servers, metrics_type_id) do |server, availability|
collection = persister.middleware_servers
fetch_availabilities_for(feeds, collection, collection.model_class::AVAIL_TYPE_ID) do |server, availability|
props = server.properties

props['Availability'] = availability.try(:[], 'data').try { first['value'] } || 'unknown'
props['Calculated Server State'] = props['Availability'] == 'up' ? props['Server State'] : props['Availability']
props['Availability'], props['Calculated Server State'] =
process_server_availability(props['Server State'], availability.try(:[], 'data').try(:first))
end
end

Expand All @@ -201,7 +202,10 @@ def fetch_availabilities_for(feeds, collection, metric_type_id)
collection.each do |item|
yield item, nil

path = URI.decode(item.model_class.try(:resource_path_for_metrics, item) || item.manager_uuid)
path = URI.decode(item.try(:resource_path_for_metrics) ||
item.try(:model_class).try(:resource_path_for_metrics, item) ||
item.try(:ems_ref) ||
item.manager_uuid)
next unless metric_id_by_resource_path.key? path
metric_id = metric_id_by_resource_path[path]
resources_by_metric_id[metric_id] = [] unless resources_by_metric_id.key? metric_id
Expand Down Expand Up @@ -245,6 +249,11 @@ def process_server_entity(server, entity)
inventory_object.middleware_server_group = server.middleware_server_group if inventory_object.respond_to?(:middleware_server_group=)
end

def process_server_availability(server_state, availability = nil)
avail = availability.try(:[], 'value') || 'unknown'
[avail, avail == 'up' ? server_state : avail]
end

def process_deployment_availability(availability = nil)
if availability.blank? || availability['value'].casecmp('unknown').zero?
'Unknown'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
module ManageIQ::Providers
class Hawkular::Inventory::Persister::AvailabilityUpdates < Hawkular::Inventory::Persister::MiddlewareManager
def self.save_deployments(ems, collection)
::ActiveRecord::Base.transaction do
collection.to_a.each do |item|
deployment = ems.middleware_deployments.find_by(:ems_ref => item.manager_uuid)
next unless deployment # if deployment is not found in the database, it is ignored.

$mw_log.debug("EMS_#{ems.id}(Persister::AvailabilityUpdates): " \
"Updating status #{deployment.status} -> #{item.status} for deployment #{deployment.ems_ref}")

deployment.status = item.status
deployment.save!
end
end
end

def self.save_servers(ems, collection)
::ActiveRecord::Base.transaction do
collection.to_a.each do |item|
server = ems.middleware_servers.find_by(:ems_ref => item.manager_uuid)
next unless server # if no matching server is in the database, there is nothing to update

$mw_log.debug("EMS_#{ems.id}(Persister::AvailabilityUpdates): " \
"Updating status to #{item.properties} for server #{server.ems_ref}")

server.properties = {} unless server.properties
server.properties.merge!(item.properties)
server.save!
end
end
end

# has_middleware_manager_servers
has_middleware_manager_deployments(:custom_save_block => method(:save_deployments))
has_middleware_manager_servers(:custom_save_block => method(:save_servers))
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,30 @@ def monitor_events
event_monitor_handle.start
event_monitor_handle.each_batch do |events|
event_monitor_running

# Separate alerts from avail updates
avail_updates, events = events.partition { |e| !e.kind_of?(::Hawkular::Alerts::Alert) }

# Filter and queue events for processing
new_events = events.select { |e| whitelist?(e) }
$mw_log.debug("#{log_prefix} Discarding events #{events - new_events}") if new_events.length < events.length
if new_events.any?
$mw_log.debug "#{log_prefix} Queueing events #{new_events}"
@queue.enq new_events
end

# Queue avail updates for processing
if avail_updates.any?
targets = avail_updates.map do |item|
ManagerRefresh::Target.new(:manager => @ems,
:association => item[:association],
:manager_ref => { :ems_ref => item[:ems_ref] },
:options => item[:data])
end
$mw_log.debug "#{log_prefix} Queueing refresh of #{targets.count} availabilities."
EmsRefresh.queue_refresh(targets)
end

# invoke the configured sleep before the next event fetch
sleep_poll_normal
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ class ManageIQ::Providers::Hawkular::MiddlewareManager::EventCatcher::Stream
def initialize(ems)
@ems = ems
@alerts_client = ems.alerts_client
@metrics_client = ems.metrics_client
@inventory_client = ems.inventory_client
@collecting_events = false
end

Expand All @@ -21,20 +23,113 @@ def each_batch

private

def fetch
events = []
events = fetch_events
events.concat(fetch_availabilities)
rescue => err
$mw_log.warn "#{log_prefix} Error capturing events #{err}"
events
end

# Each fetch is performed from the time of the most recently caught event or 1 minute back for the first poll.
# This gives us some slack if hawkular events are timestamped behind the miq server time.
# Note: This assumes all Hawkular events at max-time T are fetched in one call. It is unlikely that there
# would be more than one for the same millisecond, and that the query would be performed in the midst of
# writes for the same ms. It may be a feasible scenario but I think it's unnecessary to handle it at this time.
def fetch
def fetch_events
@start_time ||= (Time.current - 1.minute).to_i * 1000
$mw_log.debug "Catching Events since [#{@start_time}]"
$mw_log.debug "#{log_prefix} Catching Events since [#{@start_time}]"

new_events = @alerts_client.list_events("startTime" => @start_time, "tags" => "miq.event_type|*", "thin" => true)
@start_time = new_events.max_by(&:ctime).ctime + 1 unless new_events.empty? # add 1 ms to avoid dups with GTE filter
new_events
rescue => err
$mw_log.info "Error capturing events #{err}"
[]
end

def fetch_availabilities
parser = ManageIQ::Providers::Hawkular::Inventory::Parser::MiddlewareManager.new
parser.collector = ManageIQ::Providers::Hawkular::Inventory::Collector::MiddlewareManager.new(@ems, nil)

server_avails = fetch_server_availabilities(parser)
deploy_avails = fetch_deployment_availabilities(parser)

server_avails.concat(deploy_avails)
end

def fetch_server_availabilities(parser)
# For servers, it's also needed to refresh server state from inventory.
$mw_log.debug("#{log_prefix} Retrieving server state from Hawkular inventory")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: perhaps, s/state/states/g would be better

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is now changed :)


server_states = {}
@ems.middleware_servers.reload.each do |server|
inventoried_server = @inventory_client.get_resource(server.ems_ref, true)
server_states[server.id] = inventoried_server.try(:properties).try(:[], 'Server State') || ''
end

# Fetch availabilities and process them together with server state updates.
fetch_entities_availabilities(parser, @ems.middleware_servers) do |item, avail|
server_state = server_states[item.id]
avail_data, calculated_status = parser.process_server_availability(server_state, avail)

props = item.try(:properties)
stored_avail = props.try(:[], 'Availability')
stored_state = props.try(:[], 'Server State')
stored_calculated = props.try(:[], 'Calculated Server State')

next nil if stored_avail == avail_data && stored_calculated == calculated_status && stored_state == server_state

{
:ems_ref => item.ems_ref,
:association => :middleware_servers,
:data => {
'Availability' => avail_data,
'Server State' => server_state,
'Calculated Server State' => calculated_status
}
}
end
end

def fetch_deployment_availabilities(parser)
fetch_entities_availabilities(parser, @ems.middleware_deployments.reload) do |item, avail|
status = parser.process_deployment_availability(avail)
next nil if item.status == status

{
:ems_ref => item.ems_ref,
:association => :middleware_deployments,
:data => {
:status => status
}
}
end
end

def fetch_entities_availabilities(parser, entities)
return {} if entities.blank?
log_name = entities.first.class.name.demodulize

# Get feeds where availabilities should be looked in.
feeds = entities.map(&:feed).uniq

$mw_log.debug("#{log_prefix} Retrieving availabilities for #{entities.count} " \
"#{log_name.pluralize(entities.count)} in #{feeds.count} feeds.")

# Get availabilities
avails = {}
parser.fetch_availabilities_for(feeds, entities, entities.first.class::AVAIL_TYPE_ID) do |item, avail|
avail_data = avail.try(:[], 'data').try(:first)
avails[item.id] = yield(item, avail_data)

# Filter out if availability is unchanged. This way, no refresh is triggered if unnecessary.
avails.delete(item.id) unless avails[item.id]
end

$mw_log.debug("#{log_prefix} Availability has changed for #{avails.length} #{log_name.pluralize(avails.length)}.")
avails.values
end

def log_prefix
@_log_prefix ||= "EMS_#{@ems.id}(Hawkular::EventCatcher::Stream)"
end
end
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
module ManageIQ::Providers
class Hawkular::MiddlewareManager::MiddlewareDeployment < MiddlewareDeployment
AVAIL_TYPE_ID = 'Deployment%20Status~Deployment%20Status'.freeze

def resource_path_for_metrics
self.class.resource_path_for_metrics(self)
end

def self.resource_path_for_metrics(item)
path = ::Hawkular::Inventory::CanonicalPath.parse(item.ems_ref)
# for subdeployments use it's parent deployment availability.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
module ManageIQ::Providers
class Hawkular::MiddlewareManager::MiddlewareServer < MiddlewareServer
AVAIL_TYPE_ID = 'Server%20Availability~Server%20Availability'.freeze

def feed
CGI.unescape(super)
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,32 @@
module ManageIQ::Providers::Hawkular
class MiddlewareManager::Refresher < ManageIQ::Providers::BaseManager::Refresher
include ::EmsRefresh::Refreshers::EmsRefresherMixin

def preprocess_targets
@targets_by_ems_id.each do |ems_id, targets|
if targets.any? { |t| t.kind_of?(ExtManagementSystem) }
# If the EMS is in the list of targets, full graph refresh is done.
ems = @ems_by_ems_id[ems_id]
_log.info "Defaulting to full refresh for EMS: [#{ems.name}], id: [#{ems.id}]." if targets.length > 1
targets.clear << ems
elsif targets.any?
# Assuming availabilities are being refreshed (since there is no other
# kind of refresh for Hawkular)

# Filter out duplicated entities
# The reverse is to keep the most up-to-date data
uniq_targets = targets.reverse.uniq do |item|
{
:association => item.association,
:ems_ref => item.manager_ref[:ems_ref]
}
end

# Compact all availability updates into one target
targets.clear
targets << ::ManageIQ::Providers::Hawkular::Inventory::AvailabilityUpdates.new(uniq_targets)
end
end
end
end
end