Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace http gem with faraday #488

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Kubeclient release versioning follows [SemVer](https://semver.org/).
## Unreleased
### Changed
- `Kubeclient::Client.new` now always requires an api version, use for example: `Kubeclient::Client.new(uri, 'v1')`
- `socket_options` has been removed from `Kubeclient::Client.new`

## 4.9.1 — 2020-08-31
### Fixed
Expand Down
18 changes: 0 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,24 +160,6 @@ namespace = File.read('/var/run/secrets/kubernetes.io/serviceaccount/namespace')
```
You can find information about tokens in [this guide](https://kubernetes.io/docs/tasks/access-application-cluster/access-cluster/#accessing-the-api-from-a-pod) and in [this reference](http://kubernetes.io/docs/admin/authentication/).

### Non-blocking IO

You can also use kubeclient with non-blocking sockets such as Celluloid::IO, see [here](https://github.com/httprb/http/wiki/Parallel-requests-with-Celluloid%3A%3AIO)
for details. For example:

```ruby
require 'celluloid/io'
socket_options = {
socket_class: Celluloid::IO::TCPSocket,
ssl_socket_class: Celluloid::IO::SSLSocket
}
client = Kubeclient::Client.new(
'https://localhost:8443/api', 'v1', socket_options: socket_options
)
```

This affects only `.watch_*` sockets, not one-off actions like `.get_*`, `.delete_*` etc.

### Proxies

You can also use kubeclient with an http proxy server such as tinyproxy. It can be entered as a string or a URI object.
Expand Down
1 change: 0 additions & 1 deletion kubeclient.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,4 @@ Gem::Specification.new do |spec|
spec.add_dependency 'faraday_middleware', '~> 1.0'
spec.add_dependency 'jsonpath', '~> 1.0'
spec.add_dependency 'recursive-open-struct', '~> 1.1', '>= 1.1.1'
spec.add_dependency 'http', '>= 3.0', '< 5.0'
end
35 changes: 15 additions & 20 deletions lib/kubeclient/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ module ClientMixin
bearer_token_file: nil
}.freeze

DEFAULT_SOCKET_OPTIONS = {
socket_class: nil,
ssl_socket_class: nil
}.freeze

DEFAULT_TIMEOUTS = {
# These do NOT affect watch, watching never times out.
open: Net::HTTP.new('127.0.0.1').open_timeout, # depends on ruby version
Expand Down Expand Up @@ -68,7 +63,6 @@ def initialize_client(
version,
ssl_options: DEFAULT_SSL_OPTIONS,
auth_options: DEFAULT_AUTH_OPTIONS,
socket_options: DEFAULT_SOCKET_OPTIONS,
timeouts: DEFAULT_TIMEOUTS,
http_proxy_uri: DEFAULT_HTTP_PROXY_URI,
http_max_redirects: DEFAULT_HTTP_MAX_REDIRECTS,
Expand All @@ -83,7 +77,6 @@ def initialize_client(
@headers = {}
@ssl_options = ssl_options
@auth_options = auth_options
@socket_options = socket_options
# Allow passing partial timeouts hash, without unspecified
# @timeouts[:foo] == nil resulting in infinite timeout.
@timeouts = DEFAULT_TIMEOUTS.merge(timeouts)
Expand Down Expand Up @@ -348,7 +341,7 @@ def watch_entities(resource_name, options = {}, &block)

watcher = Kubeclient::Common::WatchStream.new(
uri,
http_options(uri),
watch_options(uri),
formatter: ->(value) { format_response(options[:as] || @as, value) }
)

Expand Down Expand Up @@ -501,7 +494,7 @@ def watch_pod_log(pod_name, namespace, container: nil, &block)
uri.query = URI.encode_www_form(params)

watcher = Kubeclient::Common::WatchStream.new(
uri, http_options(uri), formatter: ->(value) { value }
uri, watch_options(uri), formatter: ->(value) { value }
)
return_or_yield_to_watcher(watcher, &block)
end
Expand Down Expand Up @@ -653,28 +646,30 @@ def return_or_yield_to_watcher(watcher, &block)
end
end

def http_options(uri)
options = {
basic_auth_user: @auth_options[:username],
basic_auth_password: @auth_options[:password],
def watch_options(uri)
watch_options = {
auth_options: {
username: @auth_options[:username],
password: @auth_options[:password]
},
faraday_options: {
proxy: @http_proxy_uri
},
headers: @headers,
http_proxy_uri: @http_proxy_uri,
http_max_redirects: http_max_redirects
}

if uri.scheme == 'https'
options[:ssl] = {
watch_options[:faraday_options][:ssl] = {
ca_file: @ssl_options[:ca_file],
cert: @ssl_options[:client_cert],
cert_store: @ssl_options[:cert_store],
client_cert: @ssl_options[:client_cert],
key: @ssl_options[:client_key],
# ruby HTTP uses verify_mode instead of verify_ssl
# http://ruby-doc.org/stdlib-1.9.3/libdoc/openssl/rdoc/OpenSSL/SSL/SSLContext.html
verify_mode: @ssl_options[:verify_ssl]
verify: @ssl_options[:verify_ssl]
}
end

options.merge(@socket_options)
watch_options
end

def json_headers
Expand Down
105 changes: 39 additions & 66 deletions lib/kubeclient/watch_stream.rb
Original file line number Diff line number Diff line change
@@ -1,98 +1,71 @@
# frozen_string_literal: true

require 'json'
require 'http'

module Kubeclient
module Common
# HTTP Stream used to watch changes on entities
class WatchStream
def initialize(uri, http_options, formatter:)
def initialize(uri, options, formatter:)
@uri = uri
@http_client = nil
@http_options = http_options
@http_options[:http_max_redirects] ||= Kubeclient::Client::DEFAULT_HTTP_MAX_REDIRECTS
@options = options
@headers = options[:headers]
@options[:http_max_redirects] ||= Kubeclient::Client::DEFAULT_HTTP_MAX_REDIRECTS
@formatter = formatter

@faraday_client = build_client
end

def each
@finished = false

@http_client = build_client
response = @http_client.request(:get, @uri, build_client_options)
unless response.code < 300
raise Kubeclient::HttpError.new(response.code, response.reason, response)
end

buffer = +''
response.body.each do |chunk|
buffer << chunk
while (line = buffer.slice!(/.+\n/))
yield(@formatter.call(line.chomp))

begin
@faraday_client.get('', nil, @headers) do |request|
request.options.on_data = proc do |chunk|
buffer << chunk
while (line = buffer.slice!(/.+\n/))
yield(@formatter.call(line.chomp))
end
next if @finished
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main purpose of finish method was to allow interrupting a watch from another thread.

The previous implementation by @http_client&.close was kinda violent but worked — closing the client would cause the code in each to crash in various ways, depending on where exactly it was when it gets closed, and rescue StandardError would swallow the crash (when caused by finish).

A synchronous next if @finished here is much simpler & safer, but I think it'll only stop the loop after it had something to read, which can take unbounded time (until server sends next update)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm yes that's a very good observation. I'll try to confirm this with a test and then fix it, hopefully having come up with a solution. ;)

end
end
rescue Faraday::Error => e
err_message = build_http_error_message(e)
response_code = e.response ? (e.response[:status] || e.response&.env&.status) : nil
error_klass = (response_code == 404 ? ResourceNotFoundError : HttpError)
raise error_klass.new(response_code, err_message, e.response)
end
rescue StandardError
raise unless @finished
end

def finish
@finished = true
@http_client&.close
@faraday_client.close
end

private

def max_hops
@http_options[:http_max_redirects] + 1
end

def follow_option
if max_hops > 1
{ max_hops: max_hops }
else
# i.e. Do not follow redirects as we have set http_max_redirects to 0
# Setting `{ max_hops: 1 }` does not work FWIW
false
end
end

def build_client
client = HTTP::Client.new(follow: follow_option)
auth = @options[:auth_options] || {}
max_redirects = @options[:http_max_redirects]

if @http_options[:basic_auth_user] && @http_options[:basic_auth_password]
client = client.basic_auth(
user: @http_options[:basic_auth_user],
pass: @http_options[:basic_auth_password]
)
Faraday.new(@uri, @options[:faraday_options] || {}) do |connection|
if auth[:username] && auth[:password]
connection.basic_auth(auth[:username], auth[:password])
end
connection.use(FaradayMiddleware::FollowRedirects, limit: max_redirects)
connection.response(:raise_error)
end

client
end

def using_proxy
proxy = @http_options[:http_proxy_uri]
return nil unless proxy
p_uri = URI.parse(proxy)
{
proxy_address: p_uri.hostname,
proxy_port: p_uri.port,
proxy_username: p_uri.user,
proxy_password: p_uri.password
}
end

def build_client_options
client_options = {
headers: @http_options[:headers],
proxy: using_proxy
}
if @http_options[:ssl]
client_options[:ssl] = @http_options[:ssl]
socket_option = :ssl_socket_class
else
socket_option = :socket_class
end
client_options[socket_option] = @http_options[socket_option] if @http_options[socket_option]
client_options
def build_http_error_message(e)
json_error_msg =
begin
JSON.parse(e.response[:body] || '') || {}
rescue StandardError
{}
end
json_error_msg['message'] || e.message || ''
end
end
end
Expand Down
6 changes: 4 additions & 2 deletions test/test_kubeclient.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ def test_pass_proxy
assert_equal(proxy_uri.to_s, faraday_client.proxy.uri.to_s)

watch_client = client.watch_pods
assert_equal(watch_client.send(:build_client_options)[:proxy][:proxy_address], proxy_uri.host)
assert_equal(watch_client.send(:build_client_options)[:proxy][:proxy_port], proxy_uri.port)
assert_equal(
watch_client.instance_variable_get('@faraday_client').proxy.uri.to_s,
proxy_uri.to_s
)
end

def test_pass_max_redirects
Expand Down
28 changes: 24 additions & 4 deletions test/test_watch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def test_watch_with_field_selector
assert_requested(:get, "#{api_host}/v1/watch/events?fieldSelector=#{selector}", times: 1)
end

def test_watch_with_finish_and_ebadf
def test_watch_with_finish
api_host = 'http://localhost:8080/api'

stub_core_api_list
Expand All @@ -172,10 +172,30 @@ def test_watch_with_finish_and_ebadf
client = Kubeclient::Client.new(api_host, 'v1')
watcher = client.watch_events

# explodes when StandardError is not caught
watcher.each do # rubocop:disable Lint/UnreachableLoop
watcher.each do
watcher.finish
raise StandardError
end

assert_requested(:get, "#{api_host}/v1/watch/events", times: 1)
end

def test_watch_with_finish_without_api_server_response
api_host = 'http://localhost:8080/api'

stub_core_api_list
stub_request(:get, %r{.*/watch/events})
.to_return(body: lambda { |request| sleep 3; open_test_file('watch_stream.json') }, status: 200)

client = Kubeclient::Client.new(api_host, 'v1')
watcher = client.watch_events

watch_killer = Thread.new(watcher) do |watcher|
sleep 1
watcher.finish
end

watcher.each do |event|
raise "Watching should finish before receiving an event"
end

assert_requested(:get, "#{api_host}/v1/watch/events", times: 1)
Expand Down