From 47cd0fa58ee2fed91625175611e0e3d3c3a64635 Mon Sep 17 00:00:00 2001 From: Sam Kaplan Date: Tue, 13 Jul 2021 18:35:12 +0000 Subject: [PATCH 01/11] heartbeat for debugging --- src/AzManagers.jl | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/AzManagers.jl b/src/AzManagers.jl index 3f193010..8e490e9d 100644 --- a/src/AzManagers.jl +++ b/src/AzManagers.jl @@ -706,11 +706,21 @@ function azure_worker_start(out::IO, cookie::AbstractString=readline(stdin); clo sock = listen(interface, Distributed.LPROC.bind_port) end + local client + tsk_messages = nothing - @async while isopen(sock) + _tsk_messages = @async while isopen(sock) client = accept(sock) tsk_messages = Distributed.process_messages(client, client, true) end + + tsk_heartbeat = @async while true + @info "now=$(now())" + @info "sock=$sock" + @info "client=$client" + sleep(30) + end + print(out, "julia_worker:") # print header print(out, "$(string(Distributed.LPROC.bind_port))#") # print port print(out, Distributed.LPROC.bind_addr) @@ -731,6 +741,7 @@ function azure_worker_start(out::IO, cookie::AbstractString=readline(stdin); clo if tsk_messages != nothing try wait(tsk_messages) + @info "finished waiting for tsk_messages" #= We throw an error regardless of whether the tsk_messages task completes From eba18ef72a5e8cc08532e76bc6e6ba7f129c7e9e Mon Sep 17 00:00:00 2001 From: Sam Kaplan Date: Tue, 13 Jul 2021 19:10:41 +0000 Subject: [PATCH 02/11] wip --- src/AzManagers.jl | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/AzManagers.jl b/src/AzManagers.jl index 8e490e9d..ae3d1e2c 100644 --- a/src/AzManagers.jl +++ b/src/AzManagers.jl @@ -706,7 +706,7 @@ function azure_worker_start(out::IO, cookie::AbstractString=readline(stdin); clo sock = listen(interface, Distributed.LPROC.bind_port) end - local client + client = nothing tsk_messages = nothing _tsk_messages = @async while isopen(sock) @@ -715,9 +715,13 @@ function azure_worker_start(out::IO, cookie::AbstractString=readline(stdin); clo end tsk_heartbeat = @async while true - @info "now=$(now())" - @info "sock=$sock" - @info "client=$client" + try + @info "now=$(now())" + @info "sock=$sock" + @info "client=$client" + catch e + @warn "caught heartbeat error" + showerror(stdout, e) sleep(30) end From cde231194286591d11fdd0954c8d1d75209822fa Mon Sep 17 00:00:00 2001 From: Sam Kaplan Date: Tue, 13 Jul 2021 19:12:01 +0000 Subject: [PATCH 03/11] Fixup --- src/AzManagers.jl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/AzManagers.jl b/src/AzManagers.jl index ae3d1e2c..92125568 100644 --- a/src/AzManagers.jl +++ b/src/AzManagers.jl @@ -722,6 +722,7 @@ function azure_worker_start(out::IO, cookie::AbstractString=readline(stdin); clo catch e @warn "caught heartbeat error" showerror(stdout, e) + end sleep(30) end From bca4a51dba45957cca2e14ea6dc08da8b3ef2910 Mon Sep 17 00:00:00 2001 From: Sam Kaplan Date: Tue, 13 Jul 2021 19:23:36 +0000 Subject: [PATCH 04/11] wip --- src/AzManagers.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/AzManagers.jl b/src/AzManagers.jl index 92125568..9907abde 100644 --- a/src/AzManagers.jl +++ b/src/AzManagers.jl @@ -617,7 +617,7 @@ end function Distributed.manage(manager::AzManager, id::Integer, config::WorkerConfig, op::Symbol) if op == :register - remote_do(AzManagers.logging, id) + # remote_do(AzManagers.logging, id) end if op == :interrupt # TODO From fbee143d0cf6fd01105bed91fd9f6fa9b50d876c Mon Sep 17 00:00:00 2001 From: Sam Kaplan Date: Tue, 13 Jul 2021 20:49:14 +0000 Subject: [PATCH 05/11] wip --- src/AzManagers.jl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/AzManagers.jl b/src/AzManagers.jl index 9907abde..08d4b879 100644 --- a/src/AzManagers.jl +++ b/src/AzManagers.jl @@ -698,6 +698,8 @@ function azure_worker_start(out::IO, cookie::AbstractString=readline(stdin); clo Distributed.init_worker(cookie) interface = IPv4(Distributed.LPROC.bind_addr) + + local sock if Distributed.LPROC.bind_port == 0 port_hint = 9000 + (getpid() % 1000) (port, sock) = listenany(interface, UInt16(port_hint)) @@ -719,6 +721,7 @@ function azure_worker_start(out::IO, cookie::AbstractString=readline(stdin); clo @info "now=$(now())" @info "sock=$sock" @info "client=$client" + @info "out=$out" catch e @warn "caught heartbeat error" showerror(stdout, e) From 804ac68eaf945e1e24007bd7a3b9ce60cb46b7b0 Mon Sep 17 00:00:00 2001 From: Sam Kaplan Date: Tue, 13 Jul 2021 21:12:52 +0000 Subject: [PATCH 06/11] add accelerated networking option --- src/templates.jl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/templates.jl b/src/templates.jl index 956a2c06..42922c89 100644 --- a/src/templates.jl +++ b/src/templates.jl @@ -37,6 +37,7 @@ to the `~/.azmanagers` folder. * `skutier = "Standard"` Azure SKU tier. * `datadisks=[]` list of data disks to create and attach [1] * `tempdisk = "sudo mkdir -m 777 /mnt/scratch\nln -s /mnt/scratch /scratch"` cloud-init commands used to mount or link to temporary disk +* `accelerated_networking = false` # Notes [1] Each datadisk is a Dictionary. For example, @@ -62,6 +63,7 @@ function build_sstemplate(name; imagename, vnet, subnet, + accelerated_networking = false, skutier="Standard", datadisks=[], tempdisk="sudo mkdir -m 777 /mnt/scratch\nln -s /mnt/scratch /scratch", @@ -129,6 +131,7 @@ function build_sstemplate(name; "name" => name, "properties" => Dict( "primary" => true, + "enableAcceleratedNetworking" => accelerated_networking, "ipConfigurations" => [ Dict( "name" => name, From d7650030cb60d131e11f3d6a97de1e5ba6719892 Mon Sep 17 00:00:00 2001 From: Sam Kaplan Date: Tue, 13 Jul 2021 22:03:16 +0000 Subject: [PATCH 07/11] alex test --- src/AzManagers.jl | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/AzManagers.jl b/src/AzManagers.jl index 08d4b879..9a64d3dd 100644 --- a/src/AzManagers.jl +++ b/src/AzManagers.jl @@ -710,18 +710,22 @@ function azure_worker_start(out::IO, cookie::AbstractString=readline(stdin); clo client = nothing - tsk_messages = nothing - _tsk_messages = @async while isopen(sock) - client = accept(sock) - tsk_messages = Distributed.process_messages(client, client, true) + tsk_messages = @async begin + while true + sleep(30) + end end + # _tsk_messages = @async while isopen(sock) + # client = accept(sock) + # tsk_messages = Distributed.process_messages(client, client, true) + # end tsk_heartbeat = @async while true try @info "now=$(now())" - @info "sock=$sock" - @info "client=$client" - @info "out=$out" + # @info "sock=$sock" + # @info "client=$client" + # @info "out=$out" catch e @warn "caught heartbeat error" showerror(stdout, e) From abbd0c60f489d4f55ecf0fe48dc6cd7ced79b99a Mon Sep 17 00:00:00 2001 From: Sam Kaplan Date: Wed, 14 Jul 2021 13:34:54 +0000 Subject: [PATCH 08/11] debug statement for pruning --- src/AzManagers.jl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/AzManagers.jl b/src/AzManagers.jl index 9a64d3dd..373b2a24 100644 --- a/src/AzManagers.jl +++ b/src/AzManagers.jl @@ -266,6 +266,7 @@ function prune() end for pid in keys(wrkrs) + @debug "pruning worker with pid=$pid" @async Distributed.deregister_worker(pid) end end From c4cbba369d724f1576153414953e522f38ca54af Mon Sep 17 00:00:00 2001 From: Sam Kaplan Date: Wed, 14 Jul 2021 13:47:34 +0000 Subject: [PATCH 09/11] inspect the machines returned by scaleset_listvms --- src/AzManagers.jl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/AzManagers.jl b/src/AzManagers.jl index 373b2a24..b0997da9 100644 --- a/src/AzManagers.jl +++ b/src/AzManagers.jl @@ -241,6 +241,7 @@ function delete_pending_down_vms() end function prune() + @debug "pruning workers that are registered with Julia, but are not registered in the Azure scale-set" manager = azmanager() wrkrs = Dict{Int,Dict}() for wrkr in Distributed.PGRP.workers @@ -255,6 +256,7 @@ function prune() for scaleset in scalesets(manager) vms = scaleset_listvms(manager, scaleset.subscriptionid, scaleset.resourcegroup, scaleset.scalesetname, manager.nretry, manager.verbose) vm_names = get.(vms, "name", "") + @info "vm_names=$vm_names" for (id,wrkr) in wrkrs is_sub = get(wrkr, "subscriptionid", "") == scaleset.subscriptionid is_rg = get(wrkr, "resourcegroup", "") == scaleset.resourcegroup From d29bc23fd7f0f4c253173b53c16b44c06f1206ad Mon Sep 17 00:00:00 2001 From: Sam Kaplan Date: Wed, 14 Jul 2021 14:09:45 +0000 Subject: [PATCH 10/11] print more diagnostic info --- src/AzManagers.jl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/AzManagers.jl b/src/AzManagers.jl index b0997da9..c49cc927 100644 --- a/src/AzManagers.jl +++ b/src/AzManagers.jl @@ -256,7 +256,7 @@ function prune() for scaleset in scalesets(manager) vms = scaleset_listvms(manager, scaleset.subscriptionid, scaleset.resourcegroup, scaleset.scalesetname, manager.nretry, manager.verbose) vm_names = get.(vms, "name", "") - @info "vm_names=$vm_names" + @debug "for scaleset $(scaleset.scalesetname), vms are $vm_names" for (id,wrkr) in wrkrs is_sub = get(wrkr, "subscriptionid", "") == scaleset.subscriptionid is_rg = get(wrkr, "resourcegroup", "") == scaleset.resourcegroup @@ -1505,6 +1505,8 @@ function scaleset_listvms(manager::AzManager, subscriptionid, resourcegroup, sca if i != nothing push!(vms, Dict("name"=>vm["name"], "host"=>vm["properties"]["osProfile"]["computerName"], "bindaddr"=>networkinterfaces[i]["properties"]["ipConfigurations"][1]["properties"]["privateIPAddress"], "instanceid"=>vm["instanceId"])) end + else + @debug "vm $(vm["name"]) in scaleset $scalesetname is in state $(vm["properties"]["provisioningState"])" end end @debug "done collating vms and nics" From 4542de3b40e9c2cc5e89e3999f9743e93d68d429 Mon Sep 17 00:00:00 2001 From: Sam Kaplan Date: Wed, 14 Jul 2021 14:30:53 +0000 Subject: [PATCH 11/11] specify states for filtering list of vms, and use that in the prune method to hopefully be more stable --- src/AzManagers.jl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/AzManagers.jl b/src/AzManagers.jl index c49cc927..44528263 100644 --- a/src/AzManagers.jl +++ b/src/AzManagers.jl @@ -254,7 +254,7 @@ function prune() sleep(10) for scaleset in scalesets(manager) - vms = scaleset_listvms(manager, scaleset.subscriptionid, scaleset.resourcegroup, scaleset.scalesetname, manager.nretry, manager.verbose) + vms = scaleset_listvms(manager, scaleset.subscriptionid, scaleset.resourcegroup, scaleset.scalesetname, manager.nretry, manager.verbose; allowed_states=("Creating", "Updating", "Succeeded")) vm_names = get.(vms, "name", "") @debug "for scaleset $(scaleset.scalesetname), vms are $vm_names" for (id,wrkr) in wrkrs @@ -1473,7 +1473,7 @@ function scaleset_capacity(manager::AzManager, subscriptionid, resourcegroup, sc r["sku"]["capacity"] end -function scaleset_listvms(manager::AzManager, subscriptionid, resourcegroup, scalesetname, nretry, verbose) +function scaleset_listvms(manager::AzManager, subscriptionid, resourcegroup, scalesetname, nretry, verbose; allowed_states=("Succeeded", "Updating")) scalesetnames = list_scalesets(manager, subscriptionid, resourcegroup, nretry, verbose) scalesetname ∉ scalesetnames && return String[] @@ -1500,7 +1500,7 @@ function scaleset_listvms(manager::AzManager, subscriptionid, resourcegroup, sca vms = Dict{String,String}[] for vm in _vms - if vm["properties"]["provisioningState"] ∈ ("Succeeded", "Updating") + if vm["properties"]["provisioningState"] ∈ allowed_states i = findfirst(id->id == vm["id"], networkinterfaces_vmids) if i != nothing push!(vms, Dict("name"=>vm["name"], "host"=>vm["properties"]["osProfile"]["computerName"], "bindaddr"=>networkinterfaces[i]["properties"]["ipConfigurations"][1]["properties"]["privateIPAddress"], "instanceid"=>vm["instanceId"]))