Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async 1 #143

Merged
merged 5 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 96 additions & 62 deletions lua/netman/providers/ssh.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@ local local_files = require("netman.tools.utils").files_dir
local logger = require("netman.tools.utils").get_provider_logger()

local HOST_MATCH_GLOB = "^[%s]*Host[%s=](.*)"

local find_pattern_globs = {
'^(MODE)=([%d%a]+),',
'^(BLOCKS)=([%d]+),',
'^(BLKSIZE)=([%d]+),',
'^(MTIME_SEC)=([%d]+),',
'^(USER)=([%w%-._]+),',
'^(GROUP)=([%w%-._]+),',
'^(INODE)=([%d]+),',
'^(PERMISSIONS)=([%d]+),',
'^(SIZE)=([%d]+),',
'^(TYPE)=([%l%s]+),',
'^(NAME)=(.*)$'
'^(MODE)=(>?)([%d%a]+),',
'^(BLOCKS)=(>?)([%d]+),',
'^(BLKSIZE)=(>?)([%d]+),',
'^(MTIME_SEC)=(>?)([%d]+),',
'^(USER)=(>?)([%w%-._]+),',
'^(GROUP)=(>?)([%w%-._]+),',
'^(INODE)=(>?)([%d]+),',
'^(PERMISSIONS)=(>?)([%d]+),',
'^(SIZE)=(>?)([%d]+),',
'^(TYPE)=(>?)([%l%s]+),',
'^(NAME)=(>?)(.*)$'
}

local M = {}
Expand Down Expand Up @@ -580,7 +581,6 @@ function SSH:extract(archive, target_dir, scheme, provider_cache, opts)
[command_flags.STDOUT_PIPE_LIMIT] = 0,
[command_flags.EXIT_CALLBACK] = finish_callback
}
-- if opts.async then command_options[command_flags.ASYNC] = true end
if not opts.remote_dump then
-- There is no way for us to do this command "asynchronously" as we have to perform the commands in sync
opts.async = false
Expand Down Expand Up @@ -641,7 +641,7 @@ end
--- Default: {}
--- If provided, a table of options that can be used to modify how copy works
--- Valid Options
--- - ignore_errors:
--- - ignore_errors:
--- If provided, we will not report any errors received while attempting copy
--- @return table
--- Returns a table that contains the following key/value pairs
Expand Down Expand Up @@ -907,6 +907,13 @@ end
--- - If provided, used to specify the minimum depth to traverse our search
--- - filesystems: boolean
--- - If provided, tells us to descend (or not) into other filesystems
--- - multi_item_exec: boolean
--- - Default: False
--- - If provided, tells us that you want find's exec to run individual shell instances
--- for each match. For more details on this, look at `man find`. Specifically `-exec`
--- - By not providing this, we will default to using finds `;` option which means
--- that exec will execute a new "command" for every match. If you can, you should consider
--- setting this to true as you will see great performance increases
--- - exec: string or function | Optional
--- - If provided, will be used as the `exec` flag with find.
--- Note: the `string` form of this needs to be a find compliant shell string. @see man find for details
Expand Down Expand Up @@ -950,6 +957,8 @@ function SSH:find(location, opts)
end
table.insert(find_command, string.format('"%s"', opts.search_param))
end
local op = ";"
if opts.multi_item_exec then op = "+" end
local command_options = {
[command_flags.STDERR_JOIN] = '',
[command_flags.ASYNC] = opts.async,
Expand All @@ -961,12 +970,13 @@ function SSH:find(location, opts)
assert(_ == 'string' or _== 'function', "Invalid Exec provided for SSH:find. Exec should be a shell command (string) or function!")
if type(opts.exec) == 'string' then
table.insert(find_command, "-exec")
table.insert(find_command, opts.exec .. " {} \\;")
table.insert(find_command, opts.exec .. " {} \\" .. op)
else
command_options[command_flags.STDOUT_CALLBACK] = opts.exec
command_options[command_flags.STDOUT_PIPE_LIMIT] = 0
end
end
-- Note, this will return a handle to the running command
-- if opts.async is provided
local output = self:run_command(table.concat(find_command, ' '), command_options)
if opts.async then return output end
if output.exit_code ~= 0 then
Expand Down Expand Up @@ -1250,7 +1260,7 @@ function SSH:stat(locations, target_flags)
local stat_flags = {
'-L',
'-c',
'MODE=%f,BLOCKS=%b,BLKSIZE=%B,MTIME_SEC=%X,USER=%U,GROUP=%G,INODE=%i,PERMISSIONS=%a,SIZE=%s,TYPE=%F,NAME=%n\\\\0'
'MODE=%f,BLOCKS=%b,BLKSIZE=%B,MTIME_SEC=%X,USER=%U,GROUP=%G,INODE=%i,PERMISSIONS=%a,SIZE=%s,TYPE=%F,NAME=%n'
}
local stat_command = { 'stat' }
for _, flag in ipairs(stat_flags) do
Expand Down Expand Up @@ -1297,15 +1307,17 @@ function SSH:_stat_parse(stat_output, target_flags)
local item = {}
local _type = nil
for _, pattern in ipairs(find_pattern_globs) do
local key, value = line:match(pattern)
local key, is_number, value = line:match(pattern)
key = key:gsub('(^\n)', '')
line = line:gsub(pattern, '')
if is_number:len() > 0 then
value = tonumber(value)
end
if target_flags[key:upper()] then
item[key:upper()] = value
end
if not _type and key:upper() == SSH.CONSTANTS.STAT_FLAGS.TYPE then
_type = value
end
end
_type = item.TYPE:upper()
if target_flags[SSH.CONSTANTS.STAT_FLAGS.URI] then
item[SSH.CONSTANTS.STAT_FLAGS.URI] = self:_create_uri(item.NAME)
if _type:upper() == 'DIRECTORY' and item.NAME:sub(-1, -1) ~= '/' then
Expand All @@ -1331,6 +1343,10 @@ function SSH:_stat_parse(stat_output, target_flags)
name = _
})
end
if not name or name:len() == 0 then
-- Little catch to deal with if the file is literally named '/'
name = item.NAME
end
path[#path] = {uri = item[SSH.CONSTANTS.STAT_FLAGS.URI], name = name}
local absolute_path = item.NAME
item[SSH.CONSTANTS.STAT_FLAGS.ABSOLUTE_PATH] = path
Expand Down Expand Up @@ -1643,7 +1659,6 @@ function M.ui.get_host_details(config, host, provider_cache)
table.insert(paths, {uri = URI:new(uri_as_string).uri, name = _})
end
end
logger.debug("Paths", {host = host, paths = paths})
return paths
end
return {
Expand Down Expand Up @@ -1691,27 +1706,60 @@ function M.internal.validate(uri, cache)
return { uri = uri, host = host }
end

function M.internal.read_directory(uri, host)
local children = M.internal.find(uri, host, {max_depth = 1})
if not children.success then
-- Something happened during find.
if children.error:match('[pP]ermission%s+[dD]enied') then
return {
success = false,
error = {
message = string.format("Permission Denied when accessing %s", uri:to_string())
function M.internal.read_directory(uri, host, callback)
local find_cmd = 'stat -L -c \\|MODE=%f,BLOCKS=\\>%b,BLKSIZE=\\>%B,MTIME_SEC=\\>%X,USER=%U,GROUP=%G,INODE=\\>%i,PERMISSIONS=\\>%a,SIZE=\\>%s,TYPE=%F,NAME=%n\\|'

local partial_output = {}
local children = {}
local incomplete = false
local stdout_callback = function(data, force)
data = table.concat(partial_output, '') .. data
partial_output = {}
incomplete = false
for line in data:gmatch('([^\n\r]+)') do
if not force and (incomplete or line:match('^|$') or not line:match('|\n?$')) then
-- The line is incomplete. Store it and wait for more?
table.insert(partial_output, line)
incomplete = true
goto continue
end
-- Conditionally stripping bars off start and end
if line:sub(1, 1) == '|' then line = line:sub(2, -1) end
if line:sub(-1, -1) == '|' then line = line:sub(1, -2) end
local raw_obj = host:_stat_parse(line)
for _, metadata in pairs(raw_obj) do
local obj = {
URI = metadata.URI,
FIELD_TYPE = metadata.FIELD_TYPE,
NAME = metadata.NAME,
ABSOLUTE_PATH = metadata.ABSOLUTE_PATH,
METADATA = metadata
}
}
if callback then
callback(obj)
else
table.insert(children, obj)
end
end
::continue::
end
-- Handle other errors as we find them
return {
success = false,
error = children.error
}

end
local exit_callback = function()
if #partial_output > 0 then
-- Force cleanup of any data left
stdout_callback(table.concat(partial_output, ''), true)
end
if callback then callback(nil, true) end
end
local async = callback and true or false
local handle = M.internal.find(uri, host, {max_depth = 1, exec = find_cmd, stdout_callback = stdout_callback, exit_callback = exit_callback, async = async})
-- Assuming callback means we are doing this asynchronously
if callback then return handle end
-- We can't get here until we are done if we aren't running asynchronously
return {
success = true,
data = children.data,
data = children,
type = api_flags.READ_TYPE.EXPLORE
}
end
Expand Down Expand Up @@ -1773,32 +1821,18 @@ end
function M.internal.find(uri, host, opts)
opts = opts or {}
if not opts.exec then
opts.exec = 'stat -L -c MODE=%f,BLOCKS=%b,BLKSIZE=%B,MTIME_SEC=%X,USER=%U,GROUP=%G,INODE=%i,PERMISSIONS=%a,SIZE=%s,TYPE=%F,NAME=%n'
end
local raw_children = host:find(uri, opts)
if raw_children.error then
logger.info("Received potential error during find", {error = raw_children.error})
raw_children = raw_children.output
end
-- if raw_children.error and not opts.ignore_errors then
-- return {success = false, error = raw_children.error}
-- end

local children = host:_stat_parse(raw_children)
local __ = {}
for _, metadata in pairs(children) do
__[metadata.URI] = {
URI = metadata.URI,
FIELD_TYPE = metadata.FIELD_TYPE,
NAME = metadata.NAME,
-- Child will always be the absolute path, and its ever so slightly cheaper to do a straight memory reference as opposed
-- to a hash lookup and memory reference
ABSOLUTE_PATH = metadata.ABSOLUTE_PATH,
METADATA = metadata
}
opts.exec = 'stat -L -c \\|MODE=%f,BLOCKS=\\>%b,BLKSIZE=\\>%B,MTIME_SEC=\\>%X,USER=%U,GROUP=%G,INODE=\\>%i,PERMISSIONS=\\>%a,SIZE=\\>%s,TYPE=%F,NAME=%n\\|'
end
local data_cache = nil
if opts.callback then
opts.stdout_callback = function(data)
if data_cache then data = data_cache .. data end
data_cache = nil
if opts.callback then opts.callback(data) end
end
end
return { success = true, data = __ }
end
return host:find(uri, opts)
end

function M.search(uri, cache, param, opts)
opts = opts or {}
Expand Down
Loading