Skip to content
This repository was archived by the owner on Nov 15, 2019. It is now read-only.

Commit

Permalink
Merge pull request #27 from israel-hdez/worker-update-avails
Browse files Browse the repository at this point in the history
Extending event catcher and adding targeted refresh for availabilities
  • Loading branch information
Jirka Kremser authored Jun 27, 2017
2 parents 2552a73 + 7e68ff0 commit 8bb5611
Show file tree
Hide file tree
Showing 17 changed files with 832 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
module ManageIQ::Providers
class Hawkular::Inventory::AvailabilityUpdates
attr_reader :targets

delegate :select, :to => :targets
delegate :<<, :to => :targets

def initialize(targets)
@targets = targets
end

def name
"Collection of availabilities to update on inventory entities"
end

def id
"Collection: #{@targets.map(&:manager_ref)}"
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module ManageIQ::Providers
class Hawkular::Inventory::Collector::AvailabilityUpdates < ManagerRefresh::Inventory::Collector
def deployment_updates
@target.select { |item| item.association == :middleware_deployments }
end

def server_updates
@target.select { |item| item.association == :middleware_servers }
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
module ManageIQ::Providers
class Hawkular::Inventory::Parser::AvailabilityUpdates < ManagerRefresh::Inventory::Parser
def parse
fetch_server_availabilities
fetch_deployment_availabilities
end

private

def fetch_server_availabilities
collector.server_updates.each do |item|
server = persister.middleware_servers.find_or_build(item.manager_ref[:ems_ref])
server.properties = item.options
end
end

def fetch_deployment_availabilities
collector.deployment_updates.each do |item|
deployment = persister.middleware_deployments.find_or_build(item.manager_ref[:ems_ref])
deployment.status = item.options[:status]
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ class Inventory::Parser::MiddlewareManager < ManagerRefresh::Inventory::Parser
include ::Hawkular::ClientUtils

def initialize
super
@data_index = {}
end

Expand Down Expand Up @@ -164,19 +165,19 @@ def fetch_availability
end

def fetch_deployment_availabilities(feeds)
metrics_type_id = 'Deployment%20Status~Deployment%20Status'
fetch_availabilities_for(feeds, persister.middleware_deployments, metrics_type_id) do |deployment, availability|
collection = persister.middleware_deployments
fetch_availabilities_for(feeds, collection, collection.model_class::AVAIL_TYPE_ID) do |deployment, availability|
deployment.status = process_deployment_availability(availability.try(:[], 'data').try(:first))
end
end

def fetch_server_availabilities(feeds)
metrics_type_id = 'Server%20Availability~Server%20Availability'
fetch_availabilities_for(feeds, persister.middleware_servers, metrics_type_id) do |server, availability|
collection = persister.middleware_servers
fetch_availabilities_for(feeds, collection, collection.model_class::AVAIL_TYPE_ID) do |server, availability|
props = server.properties

props['Availability'] = availability.try(:[], 'data').try { first['value'] } || 'unknown'
props['Calculated Server State'] = props['Availability'] == 'up' ? props['Server State'] : props['Availability']
props['Availability'], props['Calculated Server State'] =
process_server_availability(props['Server State'], availability.try(:[], 'data').try(:first))
end
end

Expand All @@ -201,7 +202,10 @@ def fetch_availabilities_for(feeds, collection, metric_type_id)
collection.each do |item|
yield item, nil

path = URI.decode(item.model_class.try(:resource_path_for_metrics, item) || item.manager_uuid)
path = URI.decode(item.try(:resource_path_for_metrics) ||
item.try(:model_class).try(:resource_path_for_metrics, item) ||
item.try(:ems_ref) ||
item.manager_uuid)
next unless metric_id_by_resource_path.key? path
metric_id = metric_id_by_resource_path[path]
resources_by_metric_id[metric_id] = [] unless resources_by_metric_id.key? metric_id
Expand Down Expand Up @@ -245,6 +249,11 @@ def process_server_entity(server, entity)
inventory_object.middleware_server_group = server.middleware_server_group if inventory_object.respond_to?(:middleware_server_group=)
end

def process_server_availability(server_state, availability = nil)
avail = availability.try(:[], 'value') || 'unknown'
[avail, avail == 'up' ? server_state : avail]
end

def process_deployment_availability(availability = nil)
if availability.blank? || availability['value'].casecmp('unknown').zero?
'Unknown'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
module ManageIQ::Providers
class Hawkular::Inventory::Persister::AvailabilityUpdates < Hawkular::Inventory::Persister::MiddlewareManager
def self.save_deployments(ems, collection)
::ActiveRecord::Base.transaction do
collection.to_a.each do |item|
deployment = ems.middleware_deployments.find_by(:ems_ref => item.manager_uuid)
next unless deployment # if deployment is not found in the database, it is ignored.

$mw_log.debug("EMS_#{ems.id}(Persister::AvailabilityUpdates): " \
"Updating status #{deployment.status} -> #{item.status} for deployment #{deployment.ems_ref}")

deployment.status = item.status
deployment.save!
end
end
end

def self.save_servers(ems, collection)
::ActiveRecord::Base.transaction do
collection.to_a.each do |item|
data_to_update = item.properties.try(:slice, 'Server State', 'Availability', 'Calculated Server State')
next if data_to_update.blank?

server = ems.middleware_servers.find_by(:ems_ref => item.manager_uuid)
next unless server # if no matching server is in the database, there is nothing to update

$mw_log.debug("EMS_#{ems.id}(Persister::AvailabilityUpdates): " \
"Updating status to #{item.properties} for server #{server.ems_ref}")

server.properties = {} if server.properties.blank?
server.properties.merge!(data_to_update)
server.save!
end
end
end

has_middleware_manager_deployments(:custom_save_block => method(:save_deployments))
has_middleware_manager_servers(:custom_save_block => method(:save_servers))
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,30 @@ def monitor_events
event_monitor_handle.start
event_monitor_handle.each_batch do |events|
event_monitor_running

# Separate alerts from avail updates
avail_updates, events = events.partition { |e| !e.kind_of?(::Hawkular::Alerts::Alert) }

# Filter and queue events for processing
new_events = events.select { |e| whitelist?(e) }
$mw_log.debug("#{log_prefix} Discarding events #{events - new_events}") if new_events.length < events.length
if new_events.any?
$mw_log.debug "#{log_prefix} Queueing events #{new_events}"
@queue.enq new_events
end

# Queue avail updates for processing
if avail_updates.any?
targets = avail_updates.map do |item|
ManagerRefresh::Target.new(:manager => @ems,
:association => item[:association],
:manager_ref => { :ems_ref => item[:ems_ref] },
:options => item[:data])
end
$mw_log.debug "#{log_prefix} Queueing refresh of #{targets.count} availabilities."
EmsRefresh.queue_refresh(targets)
end

# invoke the configured sleep before the next event fetch
sleep_poll_normal
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ class ManageIQ::Providers::Hawkular::MiddlewareManager::EventCatcher::Stream
def initialize(ems)
@ems = ems
@alerts_client = ems.alerts_client
@metrics_client = ems.metrics_client
@inventory_client = ems.inventory_client
@collecting_events = false
end

Expand All @@ -21,20 +23,113 @@ def each_batch

private

def fetch
events = []
events = fetch_events
events.concat(fetch_availabilities)
rescue => err
$mw_log.warn "#{log_prefix} Error capturing events #{err}"
events
end

# Each fetch is performed from the time of the most recently caught event or 1 minute back for the first poll.
# This gives us some slack if hawkular events are timestamped behind the miq server time.
# Note: This assumes all Hawkular events at max-time T are fetched in one call. It is unlikely that there
# would be more than one for the same millisecond, and that the query would be performed in the midst of
# writes for the same ms. It may be a feasible scenario but I think it's unnecessary to handle it at this time.
def fetch
def fetch_events
@start_time ||= (Time.current - 1.minute).to_i * 1000
$mw_log.debug "Catching Events since [#{@start_time}]"
$mw_log.debug "#{log_prefix} Catching Events since [#{@start_time}]"

new_events = @alerts_client.list_events("startTime" => @start_time, "tags" => "miq.event_type|*", "thin" => true)
@start_time = new_events.max_by(&:ctime).ctime + 1 unless new_events.empty? # add 1 ms to avoid dups with GTE filter
new_events
rescue => err
$mw_log.info "Error capturing events #{err}"
[]
end

def fetch_availabilities
parser = ManageIQ::Providers::Hawkular::Inventory::Parser::MiddlewareManager.new
parser.collector = ManageIQ::Providers::Hawkular::Inventory::Collector::MiddlewareManager.new(@ems, nil)

server_avails = fetch_server_availabilities(parser)
deploy_avails = fetch_deployment_availabilities(parser)

server_avails.concat(deploy_avails)
end

def fetch_server_availabilities(parser)
# For servers, it's also needed to refresh server states from inventory.
$mw_log.debug("#{log_prefix} Retrieving server states from Hawkular inventory")

server_states = {}
@ems.middleware_servers.reload.each do |server|
inventoried_server = @inventory_client.get_resource(server.ems_ref, true)
server_states[server.id] = inventoried_server.try(:properties).try(:[], 'Server State') || ''
end

# Fetch availabilities and process them together with server state updates.
fetch_entities_availabilities(parser, @ems.middleware_servers) do |item, avail|
server_state = server_states[item.id]
avail_data, calculated_status = parser.process_server_availability(server_state, avail)

props = item.try(:properties)
stored_avail = props.try(:[], 'Availability')
stored_state = props.try(:[], 'Server State')
stored_calculated = props.try(:[], 'Calculated Server State')

next nil if stored_avail == avail_data && stored_calculated == calculated_status && stored_state == server_state

{
:ems_ref => item.ems_ref,
:association => :middleware_servers,
:data => {
'Availability' => avail_data,
'Server State' => server_state,
'Calculated Server State' => calculated_status
}
}
end
end

def fetch_deployment_availabilities(parser)
fetch_entities_availabilities(parser, @ems.middleware_deployments.reload) do |item, avail|
status = parser.process_deployment_availability(avail)
next nil if item.status == status

{
:ems_ref => item.ems_ref,
:association => :middleware_deployments,
:data => {
:status => status
}
}
end
end

def fetch_entities_availabilities(parser, entities)
return [] if entities.blank?
log_name = entities.first.class.name.demodulize

# Get feeds where availabilities should be looked in.
feeds = entities.map(&:feed).uniq

$mw_log.debug("#{log_prefix} Retrieving availabilities for #{entities.count} " \
"#{log_name.pluralize(entities.count)} in #{feeds.count} feeds.")

# Get availabilities
avails = {}
parser.fetch_availabilities_for(feeds, entities, entities.first.class::AVAIL_TYPE_ID) do |item, avail|
avail_data = avail.try(:[], 'data').try(:first)
avails[item.id] = yield(item, avail_data)

# Filter out if availability is unchanged. This way, no refresh is triggered if unnecessary.
avails.delete(item.id) unless avails[item.id]
end

$mw_log.debug("#{log_prefix} Availability has changed for #{avails.length} #{log_name.pluralize(avails.length)}.")
avails.values
end

def log_prefix
@_log_prefix ||= "EMS_#{@ems.id}(Hawkular::EventCatcher::Stream)"
end
end
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
module ManageIQ::Providers
class Hawkular::MiddlewareManager::MiddlewareDeployment < MiddlewareDeployment
AVAIL_TYPE_ID = 'Deployment%20Status~Deployment%20Status'.freeze

def resource_path_for_metrics
self.class.resource_path_for_metrics(self)
end

def self.resource_path_for_metrics(item)
path = ::Hawkular::Inventory::CanonicalPath.parse(item.ems_ref)
# for subdeployments use it's parent deployment availability.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
module ManageIQ::Providers
class Hawkular::MiddlewareManager::MiddlewareServer < MiddlewareServer
AVAIL_TYPE_ID = 'Server%20Availability~Server%20Availability'.freeze

def feed
CGI.unescape(super)
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,32 @@
module ManageIQ::Providers::Hawkular
class MiddlewareManager::Refresher < ManageIQ::Providers::BaseManager::Refresher
include ::EmsRefresh::Refreshers::EmsRefresherMixin

def preprocess_targets
@targets_by_ems_id.each do |ems_id, targets|
if targets.any? { |t| t.kind_of?(ExtManagementSystem) }
# If the EMS is in the list of targets, full graph refresh is done.
ems = @ems_by_ems_id[ems_id]
_log.info "Defaulting to full refresh for EMS: [#{ems.name}], id: [#{ems.id}]." if targets.length > 1
targets.clear << ems
elsif targets.any?
# Assuming availabilities are being refreshed (since there is no other
# kind of refresh for Hawkular)

# Filter out duplicated entities
# The reverse is to keep the most up-to-date data
uniq_targets = targets.reverse.uniq do |item|
{
:association => item.association,
:ems_ref => item.manager_ref[:ems_ref]
}
end

# Compact all availability updates into one target
targets.clear
targets << ::ManageIQ::Providers::Hawkular::Inventory::AvailabilityUpdates.new(uniq_targets)
end
end
end
end
end
12 changes: 12 additions & 0 deletions spec/contexts/targeted_avail_updates.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
RSpec.shared_context 'targeted_avail_updates' do
let(:ems_hawkular) { FactoryGirl.create(:ems_hawkular) }
let(:target) { ::ManageIQ::Providers::Hawkular::Inventory::AvailabilityUpdates.new([]) }
let(:persister) { ::ManageIQ::Providers::Hawkular::Inventory::Persister::AvailabilityUpdates.new(ems_hawkular, target) }
let(:collector) { ::ManageIQ::Providers::Hawkular::Inventory::Collector::AvailabilityUpdates.new(ems_hawkular, target) }
let(:parser) do
parser = described_class.new
parser.collector = collector
parser.persister = persister
parser
end
end
Loading

0 comments on commit 8bb5611

Please sign in to comment.