Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Heartbeat #87

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
44 changes: 36 additions & 8 deletions src/AzManagers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ function delete_pending_down_vms()
end

function prune()
@debug "pruning workers that are registered with Julia, but are not registered in the Azure scale-set"
manager = azmanager()
wrkrs = Dict{Int,Dict}()
for wrkr in Distributed.PGRP.workers
Expand All @@ -253,8 +254,9 @@ function prune()

sleep(10)
for scaleset in scalesets(manager)
vms = scaleset_listvms(manager, scaleset.subscriptionid, scaleset.resourcegroup, scaleset.scalesetname, manager.nretry, manager.verbose)
vms = scaleset_listvms(manager, scaleset.subscriptionid, scaleset.resourcegroup, scaleset.scalesetname, manager.nretry, manager.verbose; allowed_states=("Creating", "Updating", "Succeeded"))
vm_names = get.(vms, "name", "")
@debug "for scaleset $(scaleset.scalesetname), vms are $vm_names"
for (id,wrkr) in wrkrs
is_sub = get(wrkr, "subscriptionid", "") == scaleset.subscriptionid
is_rg = get(wrkr, "resourcegroup", "") == scaleset.resourcegroup
Expand All @@ -266,6 +268,7 @@ function prune()
end

for pid in keys(wrkrs)
@debug "pruning worker with pid=$pid"
@async Distributed.deregister_worker(pid)
end
end
Expand Down Expand Up @@ -617,7 +620,7 @@ end

function Distributed.manage(manager::AzManager, id::Integer, config::WorkerConfig, op::Symbol)
if op == :register
remote_do(AzManagers.logging, id)
# remote_do(AzManagers.logging, id)
end
if op == :interrupt
# TODO
Expand Down Expand Up @@ -698,6 +701,8 @@ function azure_worker_start(out::IO, cookie::AbstractString=readline(stdin); clo

Distributed.init_worker(cookie)
interface = IPv4(Distributed.LPROC.bind_addr)

local sock
if Distributed.LPROC.bind_port == 0
port_hint = 9000 + (getpid() % 1000)
(port, sock) = listenany(interface, UInt16(port_hint))
Expand All @@ -706,11 +711,31 @@ function azure_worker_start(out::IO, cookie::AbstractString=readline(stdin); clo
sock = listen(interface, Distributed.LPROC.bind_port)
end

tsk_messages = nothing
@async while isopen(sock)
client = accept(sock)
tsk_messages = Distributed.process_messages(client, client, true)
client = nothing

tsk_messages = @async begin
while true
sleep(30)
end
end
# _tsk_messages = @async while isopen(sock)
# client = accept(sock)
# tsk_messages = Distributed.process_messages(client, client, true)
# end

tsk_heartbeat = @async while true
try
@info "now=$(now())"
# @info "sock=$sock"
# @info "client=$client"
# @info "out=$out"
catch e
@warn "caught heartbeat error"
showerror(stdout, e)
end
sleep(30)
end

print(out, "julia_worker:") # print header
print(out, "$(string(Distributed.LPROC.bind_port))#") # print port
print(out, Distributed.LPROC.bind_addr)
Expand All @@ -731,6 +756,7 @@ function azure_worker_start(out::IO, cookie::AbstractString=readline(stdin); clo
if tsk_messages != nothing
try
wait(tsk_messages)
@info "finished waiting for tsk_messages"

#=
We throw an error regardless of whether the tsk_messages task completes
Expand Down Expand Up @@ -1447,7 +1473,7 @@ function scaleset_capacity(manager::AzManager, subscriptionid, resourcegroup, sc
r["sku"]["capacity"]
end

function scaleset_listvms(manager::AzManager, subscriptionid, resourcegroup, scalesetname, nretry, verbose)
function scaleset_listvms(manager::AzManager, subscriptionid, resourcegroup, scalesetname, nretry, verbose; allowed_states=("Succeeded", "Updating"))
scalesetnames = list_scalesets(manager, subscriptionid, resourcegroup, nretry, verbose)
scalesetname ∉ scalesetnames && return String[]

Expand All @@ -1474,11 +1500,13 @@ function scaleset_listvms(manager::AzManager, subscriptionid, resourcegroup, sca
vms = Dict{String,String}[]

for vm in _vms
if vm["properties"]["provisioningState"] ∈ ("Succeeded", "Updating")
if vm["properties"]["provisioningState"] ∈ allowed_states
i = findfirst(id->id == vm["id"], networkinterfaces_vmids)
if i != nothing
push!(vms, Dict("name"=>vm["name"], "host"=>vm["properties"]["osProfile"]["computerName"], "bindaddr"=>networkinterfaces[i]["properties"]["ipConfigurations"][1]["properties"]["privateIPAddress"], "instanceid"=>vm["instanceId"]))
end
else
@debug "vm $(vm["name"]) in scaleset $scalesetname is in state $(vm["properties"]["provisioningState"])"
end
end
@debug "done collating vms and nics"
Expand Down
3 changes: 3 additions & 0 deletions src/templates.jl
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ to the `~/.azmanagers` folder.
* `skutier = "Standard"` Azure SKU tier.
* `datadisks=[]` list of data disks to create and attach [1]
* `tempdisk = "sudo mkdir -m 777 /mnt/scratch\nln -s /mnt/scratch /scratch"` cloud-init commands used to mount or link to temporary disk
* `accelerated_networking = false`

# Notes
[1] Each datadisk is a Dictionary. For example,
Expand All @@ -62,6 +63,7 @@ function build_sstemplate(name;
imagename,
vnet,
subnet,
accelerated_networking = false,
skutier="Standard",
datadisks=[],
tempdisk="sudo mkdir -m 777 /mnt/scratch\nln -s /mnt/scratch /scratch",
Expand Down Expand Up @@ -129,6 +131,7 @@ function build_sstemplate(name;
"name" => name,
"properties" => Dict(
"primary" => true,
"enableAcceleratedNetworking" => accelerated_networking,
"ipConfigurations" => [
Dict(
"name" => name,
Expand Down