Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query Basic Types in Binary Format #223

Merged
merged 26 commits into from
Aug 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "LibPQ"
uuid = "194296ae-ab2e-5f79-8cd4-7183a0a5a0d1"
license = "MIT"
version = "1.6.2"
version = "1.7.0"

[deps]
BinaryProvider = "b99e7846-7c00-51b0-8f62-c81ae34c0232"
Expand Down
24 changes: 17 additions & 7 deletions src/LibPQ.jl
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
module LibPQ

export status, reset!, execute, prepare, async_execute, cancel,
num_columns, num_rows, num_params, num_affected_rows

export status,
reset!,
execute,
prepare,
async_execute,
cancel,
num_columns,
num_rows,
num_params,
num_affected_rows

using Base: Semaphore, acquire, release
using Base.Iterators: zip, product
Expand All @@ -21,17 +28,17 @@ using Memento: Memento, getlogger, warn, info, error, debug
using OffsetArrays
using TimeZones

const Parameter = Union{String, Missing}
const Parameter = Union{String,Missing}
const LOGGER = getlogger(@__MODULE__)

function __init__()
INTERVAL_REGEX[] = _interval_regex()
Memento.register(LOGGER)
return nothing
end

# Docstring template for types using DocStringExtensions
@template TYPES =
"""
@template TYPES = """
$(TYPEDEF)

$(DOCSTRING)
Expand All @@ -50,7 +57,7 @@ module libpq_c
include(joinpath(@__DIR__, "..", "deps", "deps.jl"))

function __init__()
check_deps()
return check_deps()
end
else
using LibPQ_jll
Expand Down Expand Up @@ -83,6 +90,9 @@ LibPQ.jl.
"""
const LIBPQ_CONVERSIONS = PQConversions()

const BINARY = true
const TEXT = false

include("connections.jl")
include("results.jl")
include("statements.jl")
Expand Down
91 changes: 59 additions & 32 deletions src/asyncresults.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"An asynchronous PostgreSQL query"
mutable struct AsyncResult
mutable struct AsyncResult{BinaryFormat}
"The LibPQ.jl Connection used for the query"
jl_conn::Connection

Expand All @@ -12,13 +12,15 @@ mutable struct AsyncResult
"Task which errors or returns a LibPQ.jl Result which is created once available"
result_task::Task

function AsyncResult(jl_conn::Connection, result_kwargs::Ref)
return new(jl_conn, result_kwargs, false)
function AsyncResult{BinaryFormat}(
jl_conn::Connection, result_kwargs::Ref
) where BinaryFormat
return new{BinaryFormat}(jl_conn, result_kwargs, false)
end
end

function AsyncResult(jl_conn::Connection; kwargs...)
return AsyncResult(jl_conn, Ref(kwargs))
function AsyncResult{BinaryFormat}(jl_conn::Connection; kwargs...) where BinaryFormat
return AsyncResult{BinaryFormat}(jl_conn, Ref(kwargs))
end

function Base.show(io::IO, async_result::AsyncResult)
Expand All @@ -31,7 +33,7 @@ function Base.show(io::IO, async_result::AsyncResult)
else
"in progress"
end
print(io, typeof(async_result), " (", status, ")")
return print(io, typeof(async_result), " (", status, ")")
end

"""
Expand All @@ -50,16 +52,16 @@ The result returned will be the [`Result`](@ref) of the last query run (the only
using parameters).
Any errors produced by the queries will be thrown together in a `CompositeException`.
"""
function handle_result(async_result::AsyncResult; throw_error=true)
function handle_result(
async_result::AsyncResult{BinaryFormat}; throw_error=true
) where BinaryFormat
errors = []
result = nothing
for result_ptr in _consume(async_result.jl_conn)
try
result = handle_result(
Result(
result_ptr,
async_result.jl_conn;
async_result.result_kwargs[]...
Result{BinaryFormat}(
result_ptr, async_result.jl_conn; async_result.result_kwargs[]...
);
throw_error=throw_error,
)
Expand Down Expand Up @@ -102,9 +104,11 @@ function _consume(jl_conn::Connection)
_cancel(jl_conn)
end

last_log == curr && debug(LOGGER, "Waiting to read from connection $(jl_conn.conn)")
last_log == curr &&
debug(LOGGER, "Waiting to read from connection $(jl_conn.conn)")
wait(watcher)
last_log == curr && debug(LOGGER, "Consuming input from connection $(jl_conn.conn)")
last_log == curr &&
debug(LOGGER, "Consuming input from connection $(jl_conn.conn)")
success = libpq_c.PQconsumeInput(jl_conn.conn) == 1
!success && error(LOGGER, Errors.PQConnectionError(jl_conn))

Expand All @@ -116,8 +120,8 @@ function _consume(jl_conn::Connection)
return result_ptrs
else
result_num = length(result_ptrs) + 1
debug(LOGGER,
"Saving result $result_num from connection $(jl_conn.conn)"
debug(
LOGGER, "Saving result $result_num from connection $(jl_conn.conn)"
)
push!(result_ptrs, result_ptr)
end
Expand All @@ -126,9 +130,12 @@ function _consume(jl_conn::Connection)
catch err
if err isa Base.IOError && err.code == -9 # EBADF
debug(() -> sprint(showerror, err), LOGGER)
error(LOGGER, Errors.JLConnectionError(
"PostgreSQL connection socket was unexpectedly closed"
))
error(
LOGGER,
Errors.JLConnectionError(
"PostgreSQL connection socket was unexpectedly closed"
),
)
else
rethrow(err)
end
Expand All @@ -147,7 +154,7 @@ function cancel(async_result::AsyncResult)
# the actual cancellation will be triggered in the main loop of _consume
# which will call `_cancel` on the `Connection`
async_result.should_cancel = true
return
return nothing
end

function _cancel(jl_conn::Connection)
Expand All @@ -158,9 +165,10 @@ function _cancel(jl_conn::Connection)
errbuf = zeros(UInt8, errbuf_size)
success = libpq_c.PQcancel(cancel_ptr, pointer(errbuf), errbuf_size) == 1
if !success
warn(LOGGER, Errors.JLConnectionError(
"Failed cancelling query: $(String(errbuf))"
))
warn(
LOGGER,
Errors.JLConnectionError("Failed cancelling query: $(String(errbuf))"),
)
else
debug(LOGGER, "Cancelled query for connection $(jl_conn.conn)")
end
Expand All @@ -180,6 +188,7 @@ Base.close(async_result::AsyncResult) = cancel(async_result)
jl_conn::Connection,
query::AbstractString,
[parameters::Union{AbstractVector, Tuple},]
binary_format::Bool=false,
kwargs...
) -> AsyncResult

Expand All @@ -198,6 +207,10 @@ If multiple `AsyncResult`s use the same `Connection`, they will execute serially

`async_execute` optionally takes a `parameters` vector which passes query parameters as
strings to PostgreSQL.

`binary_format`, when set to true, will transfer the data in binary format.
Support for binary transfer is currently limited to a subset of basic data types.

Queries without parameters can contain multiple SQL statements, and the result of the final
statement is returned.
Any errors which occur during executed statements will be bundled together in a
Expand All @@ -207,7 +220,15 @@ As is normal for `Task`s, any exceptions will be thrown when calling `wait` or `
"""
function async_execute end

function async_execute(jl_conn::Connection, query::AbstractString; kwargs...)
function async_execute(conn, query; binary_format=false, kwargs...)
if binary_format
async_execute(conn, query, []; binary_format=binary_format, kwargs...)
else
_multi_async_execute(conn, query; kwargs...)
end
end

function _multi_async_execute(jl_conn::Connection, query::AbstractString; kwargs...)
async_result = _async_execute(jl_conn; kwargs...) do jl_conn
_async_submit(jl_conn.conn, query)
end
Expand All @@ -218,23 +239,28 @@ end
function async_execute(
jl_conn::Connection,
query::AbstractString,
parameters::Union{AbstractVector, Tuple};
kwargs...
parameters::Union{AbstractVector,Tuple};
binary_format::Bool=false,
kwargs...,
)
string_params = string_parameters(parameters)
pointer_params = parameter_pointers(string_params)

async_result = _async_execute(jl_conn; kwargs...) do jl_conn
_async_submit(jl_conn.conn, query, pointer_params)
end
async_result = _async_execute(jl_conn; binary_format=binary_format, kwargs...) do jl_conn
_async_submit(jl_conn.conn, query, pointer_params; binary_format=binary_format)
end

return async_result
end

function _async_execute(
submission_fn::Function, jl_conn::Connection; throw_error::Bool=true, kwargs...
submission_fn::Function,
jl_conn::Connection;
binary_format::Bool=false,
throw_error::Bool=true,
kwargs...,
)
async_result = AsyncResult(jl_conn; kwargs...)
async_result = AsyncResult{binary_format}(jl_conn; kwargs...)

async_result.result_task = @async lock(jl_conn) do
jl_conn.async_result = async_result
Expand Down Expand Up @@ -262,7 +288,8 @@ end
function _async_submit(
conn_ptr::Ptr{libpq_c.PGconn},
query::AbstractString,
parameters::Vector{Ptr{UInt8}},
parameters::Vector{Ptr{UInt8}};
binary_format::Bool=false,
)
num_params = length(parameters)

Expand All @@ -274,7 +301,7 @@ function _async_submit(
parameters,
C_NULL, # paramLengths is ignored for text format parameters
zeros(Cint, num_params), # all parameters in text format
zero(Cint), # return result in text format
Cint(binary_format), # return result in text or binary format
)

return send_status == 1
Expand Down
Loading