Skip to content

Commit

Permalink
Merge pull request #17914 from tumido/fine-fog-google-upgrade
Browse files Browse the repository at this point in the history
[FINE] Fog google upgrade (to 1.3.3)
  • Loading branch information
simaishi authored Sep 10, 2018
2 parents ebe8ceb + 9133e8a commit 37317d7
Show file tree
Hide file tree
Showing 21 changed files with 66,232 additions and 8,358 deletions.
3 changes: 1 addition & 2 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,9 @@ gem "dalli", "=2.7.6", :require => false
gem "default_value_for", "~>3.0.2"
gem "elif", "=0.1.0", :require => false
gem "fast_gettext", "~>1.2.0"
gem "fog-google", "=0.5.2", :require => false
gem "fog-google", "~>1.3.3", :require => false
gem "gettext_i18n_rails", "~>1.7.2"
gem "gettext_i18n_rails_js", "~>1.1.0"
gem "google-api-client", "~>0.8.6", :require => false
gem "hamlit", "~>2.7.0"
gem "hashie", "~>3.4.6", :require => false
gem "high_voltage", "~>3.0.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def stop
# Poll for events (blocks forever until #stop is called)
def each_batch
while @collecting_events
yield events.map { |e| JSON.parse(e.message['data']) }
yield events.map { |e| JSON.parse(e[:message][:data]) }
end
end

Expand All @@ -38,23 +38,24 @@ def each_batch
def events
# For now, return immediately with up to 10 messages
@ems.with_provider_connection(:service => 'pubsub') do |google|
subscription = get_or_create_subscription(google)
subscription.pull(:return_immediately => true, :max_messages => 10).tap do |msgs|
subscription.acknowledge(msgs)
end
get_or_create_subscription(google)
# FIXME: Change once https://github.com/fog/fog-google/issues/349 is resolved
# In normal case we would use the return value of previous command and call :pull method on it.
# Due to Google API inconsitency we tend to implement our own pull method and pull it directly from the service.
# Current subscription.pull() in Fog follows Google API specification and does a Base64 decoding on the payload.
# This is consistent with Google API documentation for PubsubMessage
# (see https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage).
# However in real world, the data comes in plain text, so the Base64 decoding makes it gibberish.
# Pulling directly from PubSub service workarounds it.
pull_subscription(google).tap { |msgs| acknowledge_messages(google, msgs) }
end
rescue Fog::Errors::Error
raise ProviderUnreachable, "Error when contacting Google Pubsub for events; this may be a temporary failure."
end

def get_or_create_subscription(google)
# If event catcher is not yet setup, then we'll get a fog error
google.subscriptions.get(subscription_name) ||
google.subscriptions.create(:name => subscription_name,
# add empty config - workaround for https://github.com/fog/fog-google/issues/214
# TODO: remove once the above is resolved
:push_config => {},
:topic => topic_name)
google.subscriptions.get(subscription_name) || google.subscriptions.create(:name => subscription_name, :topic => topic_name)
rescue Fog::Errors::NotFound
# Rather than expose the notfound error, we expose our own exception
# indicating that the worker thread should back off
Expand All @@ -64,6 +65,19 @@ def get_or_create_subscription(google)
raise TopicNotFound, msg
end

def pull_subscription(google)
options = {:return_immediately => true, :max_messages => 10}
data = google.pull_subscription(subscription_name, options).to_h

data[:received_messages].to_a
end

def acknowledge_messages(google, messages)
return if messages.empty?
ack_ids = messages.collect { |m| m[:ack_id] }
google.acknowledge_subscription(subscription_name, ack_ids)
end

def subscription_name
"projects/#{@ems.project}/subscriptions/manageiq-eventcatcher-#{@ems.guid}"
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@ def self.event_to_hash(event, ems_id)
log_header = "ems_id: [#{ems_id}] " unless ems_id.nil?

event_type = parse_event_type(event)
timestamp = event.fetch_path('metadata', 'timestamp')

_log.debug { "#{log_header}event: [#{event_type}]" }

event_hash = {
:event_type => event_type,
:source => "GOOGLE",
:message => event_type,
:timestamp => timestamp,
:timestamp => event['timestamp'],
:full_data => event,
:ems_id => ems_id
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,46 +37,45 @@ class ManageIQ::Providers::Google::CloudManager::MetricsCapture < ManageIQ::Prov
VIM_COUNTER_SCHEMAS = [
{
# Name of the VIM_STYLE_COUNTER this schema describes
:vim_counter_name => "cpu_usage_rate_average",
:vim_counter_name => "cpu_usage_rate_average",

# List of metric names in GCP that should be retrieved to calculate the
# vim-style metric
:google_metric_names => ["compute.googleapis.com/instance/cpu/utilization"],

# A function that maps a target to a list of google labels to be applied
# to the request. Only results matching the label are returned.
:target_to_google_labels => ->(target) { ["compute.googleapis.com/resource_id==#{target.ems_ref}"] },
# https://cloud.google.com/monitoring/api/metrics_gcp#gcp-compute
:google_metric_names => %w(compute.googleapis.com/instance/cpu/utilization),

# Function that maps a point returned by Google's monitoring api (which
# is a hash data structure; see
# https://cloud.google.com/monitoring/v2beta2/timeseries) to our counter
# value. Any unit transformations are applied as well.
:point_to_val => ->(point) { point["doubleValue"].to_f * 100 },
# https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries)
# to our counter value. Any unit transformations are applied as well.
:point_to_val => ->(point) { point[:double_value].to_f * 100 },

# Function that takes two points and reduces them to one. This is used
# when multiple points are found for the same data point in the same
# query (e.g. if we are querying disk usage and the host has multiple
# disks, this method is used to combine the points into a single metric)
:reducer => lambda do |x, _|
:reducer => lambda do |x, _|
_log.warn("Received multiple values for cpu_usage; ignoring duplicates")
x
end
},
{
:vim_counter_name => "disk_usage_rate_average",
:google_metric_names => ["compute.googleapis.com/instance/disk/read_bytes_count",
"compute.googleapis.com/instance/disk/write_bytes_count"],
:target_to_google_labels => ->(target) { ["compute.googleapis.com/resource_id==#{target.ems_ref}"] },
:point_to_val => ->(point) { point["int64Value"].to_i / (60.0 * 1024.0) }, # convert from b/m to Kb/s
:reducer => ->(x, y) { x + y },
:vim_counter_name => "disk_usage_rate_average",
:google_metric_names => %w(
compute.googleapis.com/instance/disk/read_bytes_count
compute.googleapis.com/instance/disk/write_bytes_count
),
:point_to_val => ->(point) { point[:int64_value].to_i / (60.0 * 1024.0) }, # convert from b/m to Kb/s
:reducer => ->(x, y) { x + y },
},
{
:vim_counter_name => "net_usage_rate_average",
:google_metric_names => ["compute.googleapis.com/instance/network/received_bytes_count",
"compute.googleapis.com/instance/network/sent_bytes_count"],
:target_to_google_labels => ->(target) { ["compute.googleapis.com/resource_id==#{target.ems_ref}"] },
:point_to_val => ->(point) { point["int64Value"].to_i / (60.0 * 1024.0) }, # convert from b/m to Kb/s
:reducer => ->(x, y) { x + y },
:vim_counter_name => "net_usage_rate_average",
:google_metric_names => %w(
compute.googleapis.com/instance/network/received_bytes_count
compute.googleapis.com/instance/network/sent_bytes_count
),
:point_to_val => ->(point) { point[:int64_value].to_i / (60.0 * 1024.0) }, # convert from b/m to Kb/s
:reducer => ->(x, y) { x + y },
}
].freeze

Expand Down Expand Up @@ -127,15 +126,19 @@ def perf_collect_metrics(_interval_name, start_time = nil, end_time = nil)
# aggregate metrics onto (will be modified by method)
# @return nil
def collect_metrics(google, start_time, end_time, schema, counter_values_by_ts)
interval = {
:start_time => start_time.to_datetime.rfc3339,
:end_time => end_time.to_datetime.rfc3339
}

schema[:google_metric_names].each do |google_metric_name|
options = {
:labels => schema[:target_to_google_labels].call(target),
:oldest => start_time.to_datetime.rfc3339,
}
# For filter creation and entity selection see https://cloud.google.com/monitoring/api/v3/filters
filter = "metric.type = \"#{google_metric_name}\" AND resource.labels.instance_id = \"#{target.ems_ref}\""

# Make our service call for metrics; Note that we might get multiple
# time series back (for example, if the host has multiple disks/network
# cards)
tss = google.timeseries_collection.all(google_metric_name, end_time.to_datetime.rfc3339, options)
tss = google.timeseries_collection.all(:filter => filter, :interval => interval)

tss.each do |ts|
collect_time_series_metrics(ts, schema, counter_values_by_ts)
Expand All @@ -147,7 +150,8 @@ def collect_metrics(google, start_time, end_time, schema, counter_values_by_ts)
# provided 'counter_values_by_ts' hash, using the provided schema.
#
# @param time_series [Hash] resource returned by GCP describing a metric
# lookup result (see https://cloud.google.com/monitoring/v2beta2/timeseries)
# lookup result (see
# https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries)
# @param schema [Hash] schema describing the metric to query (see
# VIM_STYLE_COUNTERS definition for a description)
# @param counter_values_by_ts [Hash{Time => Hash{String => Number}}] hash to
Expand All @@ -159,8 +163,8 @@ def collect_time_series_metrics(time_series, schema, counter_values_by_ts)
# minute; this allows us to sum up points across time series that may
# have landed on different seconds. Note this only holds true for
# 1-minute metrics.
timestamp = Time.zone.parse(point["start"]).beginning_of_minute
val = schema[:point_to_val].call(point)
timestamp = Time.zone.parse(point[:interval][:start_time]).beginning_of_minute
val = schema[:point_to_val].call(point[:value])

# If we already have a value, reduce using our reduction function
prev_val = counter_values_by_ts.fetch_path(timestamp, schema[:vim_counter_name])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ def prepare_for_clone_task
clone_options[:machine_type] = instance_type.ems_ref
clone_options[:zone_name] = dest_availability_zone.ems_ref
clone_options[:preemptible] = get_option(:is_preemptible)
# fog-google specifies a default value that's incompatible with
# :preemptible; until this is fixed we need to be explicit about the host
# behavior on maintenance
# issue: https://github.com/fog/fog-google/issues/136
clone_options[:on_host_maintenance] = "TERMINATE" if clone_options[:preemptible]

clone_options
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ def get_zones
end

def get_flavors
# connection.flavors returns a duplicate flavor for every zone
# so build a unique list of flavors using the flavor id
flavors = @connection.flavors.to_a.uniq(&:id)
# Google API returns a duplicate flavor for every zone
# so build a unique list of flavors using the flavor
flavors = @connection.list_aggregated_machine_types.items.values.each_with_object([]) do |zone, arr|
arr.concat(zone.machine_types) if zone.machine_types
end
flavors.uniq!(&:id)
process_collection(flavors, :flavors) { |flavor| parse_flavor(flavor) }
end

Expand Down Expand Up @@ -128,7 +131,7 @@ def parse_volume(volume)
zone_id = parse_uid_from_url(volume.zone)

new_result = {
:ems_ref => volume.id,
:ems_ref => volume.id.to_s,
:name => volume.name,
:status => volume.status,
:creation_time => volume.creation_timestamp,
Expand All @@ -149,7 +152,7 @@ def parse_volume(volume)

def parse_snapshot(snapshot)
new_result = {
:ems_ref => snapshot.id,
:ems_ref => snapshot.id.to_s,
:type => "ManageIQ::Providers::Google::CloudManager::CloudVolumeSnapshot",
:name => snapshot.name,
:status => snapshot.status,
Expand All @@ -163,7 +166,7 @@ def parse_snapshot(snapshot)
end

def parse_storage_as_template(storage)
uid = storage.id
uid = storage.id.to_s
name = storage.name
name ||= uid
type = ManageIQ::Providers::Google::CloudManager::Template.name
Expand Down Expand Up @@ -208,20 +211,20 @@ def parse_ssh_key(ssh_key)
end

def parse_instance(instance)
uid = instance.id
uid = instance.id.to_s
name = instance.name
name ||= uid

flavor_uid = parse_uid_from_url(instance.machine_type)
flavor = @data_index.fetch_path(:flavors, flavor_uid)

# If the flavor isn't found in our index, check if it is a custom flavor
# that we have to get directly
flavor = query_and_add_flavor(flavor_uid) if flavor.nil?

zone_uid = parse_uid_from_url(instance.zone)
zone = @data_index.fetch_path(:availability_zones, zone_uid)

# If the flavor isn't found in our index, check if it is a custom flavor
# that we have to get directly
flavor = query_and_add_flavor(flavor_uid, zone_uid) if flavor.nil?

parent_image_uid = parse_instance_parent_image(instance)
parent_image = @data_index.fetch_path(:vms, parent_image_uid)

Expand All @@ -235,7 +238,7 @@ def parse_instance(instance)
:name => name,
:description => instance.description,
:vendor => "google",
:raw_power_state => instance.state,
:raw_power_state => instance.status,
:flavor => flavor,
:availability_zone => zone,
:parent_vm => parent_image,
Expand All @@ -252,7 +255,7 @@ def parse_instance(instance)
:display_name => N_("Is VM Preemptible"),
:description => N_("Whether or not the VM is 'preemptible'. See"\
" https://cloud.google.com/compute/docs/instances/preemptible for more details."),
:value => instance.scheduling["preemptible"].to_s,
:value => instance.scheduling[:preemptible].to_s,
:read_only => true
}
]
Expand All @@ -267,13 +270,13 @@ def parse_instance(instance)
def populate_hardware_hash_with_disks(hardware_disks_array, instance)
instance.disks.each do |attached_disk|
# lookup the full disk information from the data_index by source link
d = @data_index.fetch_path(:cloud_volumes, attached_disk["source"])
d = @data_index.fetch_path(:cloud_volumes, attached_disk[:source])

next if d.nil?

disk_size = d[:size]
disk_name = attached_disk["deviceName"]
disk_location = attached_disk["index"]
disk_name = attached_disk[:device_name]
disk_location = attached_disk[:index]

disk = add_instance_disk(hardware_disks_array, disk_size, disk_name, disk_location)
# Link the disk and the instance together
Expand All @@ -290,7 +293,7 @@ def parse_instance_parent_image(instance)
parent_image_uid = nil

instance.disks.each do |disk|
parent_image_uid = @disk_to_source_image_id[disk["source"]]
parent_image_uid = @disk_to_source_image_id[disk[:source]]
next if parent_image_uid.nil?
break
end
Expand All @@ -310,8 +313,8 @@ def populate_key_pairs_with_ssh_keys(result_key_pairs, instance)
end

def parse_compute_metadata(metadata, key)
metadata_item = metadata["items"].to_a.detect { |x| x["key"] == key }
metadata_item.to_h["value"]
metadata_item = metadata[:items].to_a.detect { |x| x[:key] == key }
metadata_item.to_h[:value]
end

def parse_compute_metadata_ssh_keys(metadata)
Expand Down Expand Up @@ -350,8 +353,8 @@ def link_volumes_to_base_snapshots
end
end

def query_and_add_flavor(flavor_uid)
flavor = @connection.flavors.get(flavor_uid)
def query_and_add_flavor(flavor_uid, zone_uid)
flavor = @connection.get_machine_type(flavor_uid, zone_uid)
process_collection(flavor.to_miq_a, :flavors) { |f| parse_flavor(f) }
@data_index.fetch_path(:flavors, flavor_uid)
end
Expand Down
6 changes: 3 additions & 3 deletions app/models/manageiq/providers/google/event_catcher_mixin.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module ManageIQ::Providers::Google::EventCatcherMixin
def parse_event_type(event)
event_type = event.fetch_path('structPayload', 'event_type')
event_subtype = event.fetch_path('structPayload', 'event_subtype')
event_type = event.fetch_path('jsonPayload', 'event_type')
event_subtype = event.fetch_path('jsonPayload', 'event_subtype')

event_type = "unknown" if event_type.blank?
event_subtype = "unknown" if event_subtype.blank?
Expand All @@ -12,7 +12,7 @@ def parse_event_type(event)
end

def parse_resource_id(event)
resource_id = event.fetch_path('structPayload', 'resource', 'id')
resource_id = event.fetch_path('jsonPayload', 'resource', 'id')
resource_id = "unknown" if resource_id.blank?

resource_id
Expand Down
Loading

0 comments on commit 37317d7

Please sign in to comment.