Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

informer library for shared watching with retries and updated list #494

Merged
merged 1 commit into from
Mar 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,5 @@ Style/AccessorGrouping:
EnforcedStyle: separated
Style/NegatedIfElseCondition:
Enabled: false
Style/Semicolon:
Exclude: ["test/**/*.rb"]
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,20 @@ to be substituted. Note that for a required parameter that does not provide a ge
client.process_template template
```

### Informer

A list that is always updated because it is it kept in sync by a watch in the background.
Can also share a list+watch with multiple threads.

```ruby
client = Kubeclient::Client.new('http://localhost:8080/api/', 'v1')
informer = Kubeclient::Informer.new(client, "pods", reconcile_timeout: 15 * 60, logger: Logger.new(STDOUT))
informer.start_worker

informer.list # all current pods
informer.watch { |notice| } # watch for changes (hides restarts and errors)
```

## Contributing

1. Fork it ( https://github.com/[my-github-username]/kubeclient/fork )
Expand Down
1 change: 1 addition & 0 deletions lib/kubeclient.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
require_relative 'kubeclient/exec_credentials'
require_relative 'kubeclient/gcp_auth_provider'
require_relative 'kubeclient/http_error'
require_relative 'kubeclient/informer'
require_relative 'kubeclient/missing_kind_compatibility'
require_relative 'kubeclient/oidc_auth_provider'
require_relative 'kubeclient/resource'
Expand Down
93 changes: 93 additions & 0 deletions lib/kubeclient/informer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
module Kubeclient
# caches results for multiple consumers to share and keeps them updated with a watch
class Informer
def initialize(client, resource_name, reconcile_timeout: 15 * 60, logger: nil)
@client = client
@resource_name = resource_name
@reconcile_timeout = reconcile_timeout
@logger = logger
@cache = nil
@started = nil
@watching = []
end

def list
@cache.values
end

def watch(&block)
with_watching(&block)
end

# not implicit so users know they have to `stop`
def start_worker
@worker = Thread.new do
loop do
fill_cache
watch_to_update_cache
rescue StandardError => e
# need to keep retrying since we work in the background
@logger&.error("ignoring error during background work #{e}")
ensure
sleep(1) # do not overwhelm the api-server if we are somehow broken
end
end
sleep(0.01) until @cache
cben marked this conversation as resolved.
Show resolved Hide resolved
end

def stop_worker
@worker&.kill # TODO: be nicer ?
end

private

def with_watching
queue = Queue.new
@watching << queue
loop do
x = queue.pop
yield(x)
end
ensure
@watching.delete(queue)
end

def cache_key(resource)
resource.dig(:metadata, :uid)
end

def fill_cache
reply = @client.get_entities(nil, @resource_name, raw: true, resource_version: '0')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will version "0" result in .metadata.resourceVersion we can watch from? It means "any version is OK" which allows server returning cached response, but I vaguely remember hearing that might allow a version so old it'll return 410 when watched (?)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, when List+Watch restarts, this might lead to replaying a state + events that are older than we already observed?

https://kubernetes.io/docs/reference/using-api/api-concepts/#the-resourceversion-parameter doesn't exactly answer this, but does say about supplying resourceVersion="0" to watch directly:

Get State and Start at Any: Warning: Watches initialize this way may return arbitrarily stale data! Please review this semantic before using it, and favor the other semantics where possible. Start a watch at any resource version, the most recent resource version available is preferred, but not required; any starting resource version is allowed. It is possible for the watch to start at a much older resource version that the client has previously observed, particularly in high availability configurations, due to partitions or stale caches. Clients that cannot tolerate this should not start a watch with this semantic. To establish initial state, the watch begins with synthetic "Added" events for all resources instances that exist at the starting resource version. All following watch events are for all changes that occurred after the resource version the watch started at.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • if the version was too old it would results in a restart of the list/watch loop, so we should be good (never saw this happening)
  • watch does not receive the 0, but what the list returned ... but it could end up in a replay since the list might have gotten old data ... so ideally we'd keep track of the last resourceVersion we saw and then do something with that (with a "latest" approach we'd lose some events though)

@cache = reply[:items].each_with_object({}) do |item, h|
h[cache_key(item)] = item
end
@started = reply.dig(:metadata, :resourceVersion)
end

def watch_to_update_cache
watcher = @client.watch_entities(@resource_name, watch: true, resource_version: @started)
stop_reason = 'disconnect'

# stop watcher without using timeout
Thread.new do
sleep(@reconcile_timeout)
stop_reason = 'reconcile'
watcher.finish
end

watcher.each do |notice|
case notice[:type]
when 'ADDED', 'MODIFIED' then @cache[cache_key(notice[:object])] = notice[:object]
when 'DELETED' then @cache.delete(cache_key(notice[:object]))
when 'ERROR'
stop_reason = 'error'
break
else
@logger&.error("Unsupported event type #{notice[:type]}")
end
@watching.each { |q| q << notice }
end
@logger&.info("watch restarted: #{stop_reason}")
end
end
end
163 changes: 163 additions & 0 deletions test/test_informer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# frozen_string_literal: true

require_relative 'test_helper'

# tests with_retries in common.rb
class RetryTest < MiniTest::Test
def setup
super
@slept = []
stub_core_api_list
end

# prevent leftover threads from causing trouble
def teardown
(Thread.list - [Thread.current]).each(&:kill)
super
end

def test_lists_at_start
list = stub_list
watch = stub_request(:get, %r{/v1/watch/pods}).to_return(body: '', status: 200)
with_worker do
assert_equal(['a'], informer.list.map { |p| p.metadata.name })
end
assert_requested(list, times: 1)
assert_requested(watch, times: 1)
end

def test_watches_for_updates
lock = Mutex.new
lock.lock
list = stub_list
watch = stub_request(:get, %r{/v1/watch/pods}).with { lock.lock }.to_return(
body: {
type: 'MODIFIED', object: { metadata: { name: 'b', uid: 'id1' } }
}.to_json << "\n",
status: 200
)

with_worker do
assert_equal(['a'], informer.list.map { |p| p.metadata.name })
lock.unlock # trigger watch
sleep(0.02) # wait for watch to finish
assert_equal(['b'], informer.list.map { |p| p.metadata.name })
end

assert_requested(list, times: 1)
assert_requested(watch, times: 1)
end

def test_watches_for_add
stub_list
stub_request(:get, %r{/v1/watch/pods}).to_return(
body: {
type: 'ADDED', object: { metadata: { name: 'b', uid: 'id2' } }
}.to_json << "\n",
status: 200
)

with_worker do
assert_equal(['a', 'b'], informer.list.map { |p| p.metadata.name })
end
end

def test_watches_for_delete
stub_list
stub_request(:get, %r{/v1/watch/pods}).to_return(
body: {
type: 'DELETED', object: { metadata: { name: 'b', uid: 'id1' } }
}.to_json << "\n",
status: 200
)

with_worker do
assert_equal([], informer.list.map { |p| p.metadata.name })
end
end

def test_restarts_on_error
list = stub_list
watch = stub_request(:get, %r{/v1/watch/pods}).to_return(
body: { type: 'ERROR' }.to_json << "\n",
status: 200
)
slept = []
informer.stubs(:sleep).with { |x| slept << x; sleep(0.01) }

with_worker do
assert_equal(['a'], informer.list.map { |p| p.metadata.name })
sleep(0.05)
end

assert slept.size >= 2, slept
assert_requested(list, at_least_times: 2)
assert_requested(watch, at_least_times: 2)
end

def test_can_watch_watches
skip if RUBY_ENGINE == 'truffleruby' # TODO: some race condition in truffle-ruby

list = stub_list
watch = stub_request(:get, %r{/v1/watch/pods}).to_return(
body: {
type: 'ADDED', object: { metadata: { name: 'b', uid: 'id2' } }
}.to_json << "\n",
status: 200
)

seen1 = []
seen2 = []
seeer1 = Thread.new { informer.watch { |n| seen1 << n; break } }
seeer2 = Thread.new { informer.watch { |n| seen2 << n; break } }
sleep(0.01) until informer.instance_variable_get(:@watching).size == 2

with_worker do
assert_equal([['ADDED'], ['ADDED']], [seen1.map(&:type), seen2.map(&:type)])
end

assert_requested(list, times: 1)
assert_requested(watch, times: 1)
ensure
seeer1&.kill
seeer2&.kill
end

def test_timeout
timeout = 0.1
informer.instance_variable_set(:@reconcile_timeout, timeout)
stub_list
Kubeclient::Common::WatchStream.any_instance.expects(:finish)
stub_request(:get, %r{/v1/watch/pods})
with_worker { sleep(timeout * 1.9) }
end

private

def with_worker
informer.start_worker
sleep(0.03) # wait for worker to watch
yield
ensure
informer.stop_worker
end

def stub_list
stub_request(:get, %r{/v1/pods}).to_return(body: pods_reply.to_json, status: 200)
end

def client
@client ||= Kubeclient::Client.new('http://localhost:8080/api/', 'v1')
end

def informer
@informer ||= Kubeclient::Informer.new(client, 'pods')
end

def pods_reply
@pods_reply ||= {
metadata: { resourceVersion: 1 },
items: [{ metadata: { name: 'a', uid: 'id1' } }]
}
end
end