Skip to content

Commit

Permalink
use a buffersize for cp to/from blob storage
Browse files Browse the repository at this point in the history
  • Loading branch information
samtkaplan committed Apr 8, 2024
1 parent f44e923 commit e47d497
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 39 deletions.
102 changes: 64 additions & 38 deletions src/AzStorage.jl
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ const _MAXBYTES_PER_BLOCK = 4_000 * 2^20
const _MAXBLOCKS_PER_BLOB = 50_000

nblocks_error() = error("data is too large for a block-blob")
function nblocks(nthreads::Integer, nbytes::Integer)
nblocks = ceil(Int, nbytes/_MAXBYTES_PER_BLOCK + eps(Float64))
function nblocks(nthreads::Integer, nbytes::Integer, max_bytes_per_block=_MAXBYTES_PER_BLOCK)
nblocks = ceil(Int, nbytes/min(_MAXBYTES_PER_BLOCK, max_bytes_per_block) + eps(Float64))
if nblocks < nthreads
nblocks = clamp(ceil(Int, nbytes/_MINBYTES_PER_BLOCK + eps(Float64)), 1, nthreads)
end
Expand Down Expand Up @@ -421,38 +421,29 @@ function putblocklist(c, o, blockids)
nothing
end

function writebytes_block(c, o, data, _nblocks)
# heuristic to increase probability that token is valid during the retry logic in AzSessions.c
function blockids(_nblocks)
l = ceil(Int, log10(_nblocks))
blockids = [base64encode(lpad(blockid-1, l, '0')) for blockid in 1:_nblocks]
_blockids = [HTTP.escapeuri(blockid) for blockid in blockids]
t = token(c.session)
[base64encode(lpad(blockid-1, l, '0')) for blockid in 1:_nblocks]
end

function writebytes_block(c, o, data, _blockids)
__blockids = [HTTP.escapeuri(_blockid) for _blockid in _blockids]
token(c.session)
_token,refresh_token,expiry,scope,resource,tenant,clientid,client_secret = authinfo(c.session)
r = @ccall libAzStorage.curl_writebytes_block_retry_threaded(_token::Ptr{UInt8}, refresh_token::Ptr{UInt8}, expiry::Ptr{Culong}, scope::Cstring, resource::Cstring, tenant::Cstring,
clientid::Cstring, client_secret::Cstring,c.storageaccount::Cstring, c.containername::Cstring, addprefix(c,o)::Cstring, _blockids::Ptr{Cstring}, data::Ptr{UInt8},
length(data)::Csize_t, c.nthreads::Cint, _nblocks::Cint, c.nretry::Cint, c.verbose::Cint, c.connect_timeout::Clong, c.read_timeout::Clong)::ResponseCodes
clientid::Cstring, client_secret::Cstring, c.storageaccount::Cstring, c.containername::Cstring, addprefix(c,o)::Cstring, __blockids::Ptr{Cstring}, data::Ptr{UInt8},
length(data)::Csize_t, c.nthreads::Cint, length(__blockids)::Cint, c.nretry::Cint, c.verbose::Cint, c.connect_timeout::Clong, c.read_timeout::Clong)::ResponseCodes
(r.http >= 300 || r.curl > 0) && error("writebytes_block error: http code $(r.http), curl code $(r.curl)")
authinfo!(c.session, _token, refresh_token, expiry)
putblocklist(c, o, blockids)
end

# function writebytes_single_block(c, o, data, blockid)
# t = token(c.session)
# _token,refresh_token,expiry,scope,resource,tenant,clientid,client_secret = authinfo(c.session)
# token_lock = @ccall
# r = @ccall libAzStorage.curl_writebytes_block_retry(_token::Ptr{UInt8}, refresh_token::Ptr{UInt8}, expiry::Ptr{Culong}, scope::Cstring, resource::Cstring, tenant::Cstring,
# clientid::Cstring, client_secret::Cstring,c.storageaccount::Cstring, c.containername::Cstring, addprefix(c,o)::Cstring, _blockids::Ptr{Cstring}, data::Ptr{UInt8},
# length(data)::Csize_t, c.nthreads::Cint, _nblocks::Cint, c.nretry::Cint, c.verbose::Cint, c.connect_timeout::Clong, c.read_timeout::Clong)::ResponseCodes
# (r.http >= 300 || r.curl > 0) && error("writebytes_block error: http code $(r.http), curl code $(r.curl)")
# authinfo!(c.session, _token, refresh_token, expiry)

# end
function writebytes(c::AzContainer, o::AbstractString, data::DenseArray{UInt8}; contenttype="application/octet-stream")
_nblocks = nblocks(c.nthreads, length(data))
if Sys.iswindows()
writebytes_blob(c, o, data, contenttype)
else
writebytes_block(c, o, data, _nblocks)
_blockids = blockids(nblocks(c.nthreads, length(data)))
writebytes_block(c, o, data, _blockids)
putblocklist(c, o, _blockids)
end
nothing
end
Expand Down Expand Up @@ -740,30 +731,64 @@ cp("localfile.txt", AzContainer("mycontainer";storageaccount="mystorageaccount")
## blob to local file
```
cp(AzContainer("mycontainer";storageaccount="mystorageaccount"), "remoteblob.txt", "localfile.txt", chunksize=2_000_000)
cp(AzContainer("mycontainer";storageaccount="mystorageaccount"), "remoteblob.txt", "localfile.txt", buffersize=2_000_000_000)
```
`chunksize` can be modified to set the size of chunks being in memory at one time. Default is 2GB
`buffersize` is the memory buffer size (in bytes) used in the copy algorithm, and defaults to `2_000_000_000` bytes (2GB).
## blob to blob
```
cp(AzContainer("mycontainer";storageaccount="mystorageaccount"), "remoteblob_in.txt", AzContainer("mycontainer";storageaccount="mystorageaccount"), "remoteblob_out.txt")
```
"""
function Base.cp(in::AbstractString, outc::AzContainer, outb::AbstractString, chunksize=2_000_000)
bytes = read!(in, Vector{UInt8}(undef,filesize(in)))
write(outc, outb, bytes)
function Base.cp(in::AbstractString, outc::AzContainer, outb::AbstractString; buffersize=2_000_000_000)
if Sys.iswindows()
bytes = read!(in, Vector{UInt8}(undef, filesize(in)))
write(outc, outb, bytes)
else
n = filesize(in)
_nblocks = nblocks(outc.nthreads, n, div(buffersize, outc.nthreads))
_blockids = blockids(_nblocks)
nominal_bytes_per_block,remaining_bytes_per_block = divrem(n, _nblocks)
nblocks_per_buffer,remaining_blocks_per_buffer = divrem(buffersize, nominal_bytes_per_block)
nblocks_per_buffer += remaining_blocks_per_buffer > 0 ? 1 : 0

buffer = Vector{UInt8}(undef, nblocks_per_buffer*(nominal_bytes_per_block + 1))

i2byte = nbytes_buffer = 0
i1block = 1
io = open(in, "r")
for iblock = 1:_nblocks
i1byte = i2byte + 1

if iblock <= remaining_bytes_per_block
i2byte = min(n, i1byte + nominal_bytes_per_block)
else
i2byte = min(n, i1byte + nominal_bytes_per_block - 1)
end

nbytes_buffer += i2byte - i1byte + 1

if iblock == _nblocks || nbytes_buffer >= buffersize
_buffer = @view buffer[1:nbytes_buffer]
read!(io, _buffer)
writebytes_block(outc, outb, _buffer, _blockids[i1block:iblock])
i1block = iblock + 1
nbytes_buffer = 0
end
end
putblocklist(outc, outb, _blockids)
end
end

```
```
function Base.cp(inc::AzContainer, inb::AbstractString, out::AbstractString; chunksize=2_000_000)
function Base.cp(inc::AzContainer, inb::AbstractString, out::AbstractString; buffersize=2_000_000_000)
n = filesize(inc, inb)
io = open(out, "w")
for i1=0:chunksize:n-1
i2 = min(i1+chunksize-1,n-1)
_chunksize = i2 - i1 + 1
bytes = read!(inc, inb, Vector{UInt8}(undef, _chunksize), offset=i1)
write(io, bytes)
buffer = Vector{UInt8}(undef, min(buffersize, n))
for i1 = 0:buffersize:n-1
_buffersize = min(buffersize, n - i1)
_buffer = buffersize == _buffersize ? buffer : view(buffer, 1:_buffersize)
read!(inc, inb, _buffer, offset=i1)
write(io, _buffer)
end
close(io)
end
Expand All @@ -787,15 +812,16 @@ cp("localfile.txt", open(AzContainer("mycontainer";storageaccount="mystorageacco
## blob to local file
```
cp(open(AzContainer("mycontainer";storageaccount="mystorageaccount"), "remoteblob.txt"), "localfile.txt")
cp(open(AzContainer("mycontainer";storageaccount="mystorageaccount"), "remoteblob.txt"), "localfile.txt"; buffersize=2_000_000_000)
```
`buffersize` is the memory buffer size (in bytes) used in the copy algorithm, and defaults to `2_000_000_000` bytes (2GB).
## blob to blob
```
cp(open(AzContainer("mycontainer";storageaccount="mystorageaccount"), "remoteblob_in.txt"), open(AzContainer("mycontainer";storageaccount="mystorageaccount"), "remoteblob_out.txt"))
```
"""
Base.cp(in::AbstractString, out::AzObject) = cp(in, out.container, out.name)
Base.cp(in::AbstractString, out::AzObject; kwargs...) = cp(in, out.container, out.name; kwargs...)
Base.cp(in::AzObject, out::AbstractString; kwargs...) = cp(in.container, in.name, out; kwargs...)
Base.cp(in::AzObject, out::AzObject) = cp(in.container, in.name, out.container, out.name)

Expand Down
15 changes: 14 additions & 1 deletion test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ end
n = 100
x = rand(UInt8, n)
write(c, "foo.txt", x)
cp(c, "foo.txt", "foolocal.txt", chunksize=11)
cp(c, "foo.txt", "foolocal.txt", buffersize=11)
@test read!("foolocal.txt", Vector{UInt8}(undef, n)) == x
rm(c)
rm("foolocal.txt")
Expand All @@ -496,6 +496,19 @@ end
rm("foolocal.txt")
end

@testset "Container, copy large local file to blob" begin
n = 100
x = rand(UInt8, n)
write("foolocal.bin", x)
r = uuid4()
c = AzContainer("foo-$r-o", storageaccount=storageaccount, session=session, nthreads=2, nretry=10)
c = robust_mkpath(c)
cp("foolocal.bin", c, "foo.bin", buffersize=11)
@test read!(c, "foo.bin", Vector{UInt8}(undef, n)) == x
rm(c)
rm("foolocal.bin")
end

@testset "Container, copy blob to blob" begin
r = uuid4()
c = AzContainer("foo-$r-o", storageaccount=storageaccount, session=session, nthreads=2, nretry=10)
Expand Down

0 comments on commit e47d497

Please sign in to comment.