Skip to content

Commit

Permalink
Query Basic Types in Binary Format (#223)
Browse files Browse the repository at this point in the history
Can now query basic postgres data types in binary format. Querying in text format is still the default options.
  • Loading branch information
Samuel Massinon authored Aug 12, 2021
1 parent 2517457 commit f1fe4c4
Show file tree
Hide file tree
Showing 8 changed files with 522 additions and 341 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "LibPQ"
uuid = "194296ae-ab2e-5f79-8cd4-7183a0a5a0d1"
license = "MIT"
version = "1.6.2"
version = "1.7.0"

[deps]
BinaryProvider = "b99e7846-7c00-51b0-8f62-c81ae34c0232"
Expand Down
24 changes: 17 additions & 7 deletions src/LibPQ.jl
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
module LibPQ

export status, reset!, execute, prepare, async_execute, cancel,
num_columns, num_rows, num_params, num_affected_rows

export status,
reset!,
execute,
prepare,
async_execute,
cancel,
num_columns,
num_rows,
num_params,
num_affected_rows

using Base: Semaphore, acquire, release
using Base.Iterators: zip, product
Expand All @@ -21,17 +28,17 @@ using Memento: Memento, getlogger, warn, info, error, debug
using OffsetArrays
using TimeZones

const Parameter = Union{String, Missing}
const Parameter = Union{String,Missing}
const LOGGER = getlogger(@__MODULE__)

function __init__()
INTERVAL_REGEX[] = _interval_regex()
Memento.register(LOGGER)
return nothing
end

# Docstring template for types using DocStringExtensions
@template TYPES =
"""
@template TYPES = """
$(TYPEDEF)
$(DOCSTRING)
Expand All @@ -50,7 +57,7 @@ module libpq_c
include(joinpath(@__DIR__, "..", "deps", "deps.jl"))

function __init__()
check_deps()
return check_deps()
end
else
using LibPQ_jll
Expand Down Expand Up @@ -83,6 +90,9 @@ LibPQ.jl.
"""
const LIBPQ_CONVERSIONS = PQConversions()

const BINARY = true
const TEXT = false

include("connections.jl")
include("results.jl")
include("statements.jl")
Expand Down
91 changes: 59 additions & 32 deletions src/asyncresults.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"An asynchronous PostgreSQL query"
mutable struct AsyncResult
mutable struct AsyncResult{BinaryFormat}
"The LibPQ.jl Connection used for the query"
jl_conn::Connection

Expand All @@ -12,13 +12,15 @@ mutable struct AsyncResult
"Task which errors or returns a LibPQ.jl Result which is created once available"
result_task::Task

function AsyncResult(jl_conn::Connection, result_kwargs::Ref)
return new(jl_conn, result_kwargs, false)
function AsyncResult{BinaryFormat}(
jl_conn::Connection, result_kwargs::Ref
) where BinaryFormat
return new{BinaryFormat}(jl_conn, result_kwargs, false)
end
end

function AsyncResult(jl_conn::Connection; kwargs...)
return AsyncResult(jl_conn, Ref(kwargs))
function AsyncResult{BinaryFormat}(jl_conn::Connection; kwargs...) where BinaryFormat
return AsyncResult{BinaryFormat}(jl_conn, Ref(kwargs))
end

function Base.show(io::IO, async_result::AsyncResult)
Expand All @@ -31,7 +33,7 @@ function Base.show(io::IO, async_result::AsyncResult)
else
"in progress"
end
print(io, typeof(async_result), " (", status, ")")
return print(io, typeof(async_result), " (", status, ")")
end

"""
Expand All @@ -50,16 +52,16 @@ The result returned will be the [`Result`](@ref) of the last query run (the only
using parameters).
Any errors produced by the queries will be thrown together in a `CompositeException`.
"""
function handle_result(async_result::AsyncResult; throw_error=true)
function handle_result(
async_result::AsyncResult{BinaryFormat}; throw_error=true
) where BinaryFormat
errors = []
result = nothing
for result_ptr in _consume(async_result.jl_conn)
try
result = handle_result(
Result(
result_ptr,
async_result.jl_conn;
async_result.result_kwargs[]...
Result{BinaryFormat}(
result_ptr, async_result.jl_conn; async_result.result_kwargs[]...
);
throw_error=throw_error,
)
Expand Down Expand Up @@ -102,9 +104,11 @@ function _consume(jl_conn::Connection)
_cancel(jl_conn)
end

last_log == curr && debug(LOGGER, "Waiting to read from connection $(jl_conn.conn)")
last_log == curr &&
debug(LOGGER, "Waiting to read from connection $(jl_conn.conn)")
wait(watcher)
last_log == curr && debug(LOGGER, "Consuming input from connection $(jl_conn.conn)")
last_log == curr &&
debug(LOGGER, "Consuming input from connection $(jl_conn.conn)")
success = libpq_c.PQconsumeInput(jl_conn.conn) == 1
!success && error(LOGGER, Errors.PQConnectionError(jl_conn))

Expand All @@ -116,8 +120,8 @@ function _consume(jl_conn::Connection)
return result_ptrs
else
result_num = length(result_ptrs) + 1
debug(LOGGER,
"Saving result $result_num from connection $(jl_conn.conn)"
debug(
LOGGER, "Saving result $result_num from connection $(jl_conn.conn)"
)
push!(result_ptrs, result_ptr)
end
Expand All @@ -126,9 +130,12 @@ function _consume(jl_conn::Connection)
catch err
if err isa Base.IOError && err.code == -9 # EBADF
debug(() -> sprint(showerror, err), LOGGER)
error(LOGGER, Errors.JLConnectionError(
"PostgreSQL connection socket was unexpectedly closed"
))
error(
LOGGER,
Errors.JLConnectionError(
"PostgreSQL connection socket was unexpectedly closed"
),
)
else
rethrow(err)
end
Expand All @@ -147,7 +154,7 @@ function cancel(async_result::AsyncResult)
# the actual cancellation will be triggered in the main loop of _consume
# which will call `_cancel` on the `Connection`
async_result.should_cancel = true
return
return nothing
end

function _cancel(jl_conn::Connection)
Expand All @@ -158,9 +165,10 @@ function _cancel(jl_conn::Connection)
errbuf = zeros(UInt8, errbuf_size)
success = libpq_c.PQcancel(cancel_ptr, pointer(errbuf), errbuf_size) == 1
if !success
warn(LOGGER, Errors.JLConnectionError(
"Failed cancelling query: $(String(errbuf))"
))
warn(
LOGGER,
Errors.JLConnectionError("Failed cancelling query: $(String(errbuf))"),
)
else
debug(LOGGER, "Cancelled query for connection $(jl_conn.conn)")
end
Expand All @@ -180,6 +188,7 @@ Base.close(async_result::AsyncResult) = cancel(async_result)
jl_conn::Connection,
query::AbstractString,
[parameters::Union{AbstractVector, Tuple},]
binary_format::Bool=false,
kwargs...
) -> AsyncResult
Expand All @@ -198,6 +207,10 @@ If multiple `AsyncResult`s use the same `Connection`, they will execute serially
`async_execute` optionally takes a `parameters` vector which passes query parameters as
strings to PostgreSQL.
`binary_format`, when set to true, will transfer the data in binary format.
Support for binary transfer is currently limited to a subset of basic data types.
Queries without parameters can contain multiple SQL statements, and the result of the final
statement is returned.
Any errors which occur during executed statements will be bundled together in a
Expand All @@ -207,7 +220,15 @@ As is normal for `Task`s, any exceptions will be thrown when calling `wait` or `
"""
function async_execute end

function async_execute(jl_conn::Connection, query::AbstractString; kwargs...)
function async_execute(conn, query; binary_format=false, kwargs...)
if binary_format
async_execute(conn, query, []; binary_format=binary_format, kwargs...)
else
_multi_async_execute(conn, query; kwargs...)
end
end

function _multi_async_execute(jl_conn::Connection, query::AbstractString; kwargs...)
async_result = _async_execute(jl_conn; kwargs...) do jl_conn
_async_submit(jl_conn.conn, query)
end
Expand All @@ -218,23 +239,28 @@ end
function async_execute(
jl_conn::Connection,
query::AbstractString,
parameters::Union{AbstractVector, Tuple};
kwargs...
parameters::Union{AbstractVector,Tuple};
binary_format::Bool=false,
kwargs...,
)
string_params = string_parameters(parameters)
pointer_params = parameter_pointers(string_params)

async_result = _async_execute(jl_conn; kwargs...) do jl_conn
_async_submit(jl_conn.conn, query, pointer_params)
end
async_result = _async_execute(jl_conn; binary_format=binary_format, kwargs...) do jl_conn
_async_submit(jl_conn.conn, query, pointer_params; binary_format=binary_format)
end

return async_result
end

function _async_execute(
submission_fn::Function, jl_conn::Connection; throw_error::Bool=true, kwargs...
submission_fn::Function,
jl_conn::Connection;
binary_format::Bool=false,
throw_error::Bool=true,
kwargs...,
)
async_result = AsyncResult(jl_conn; kwargs...)
async_result = AsyncResult{binary_format}(jl_conn; kwargs...)

async_result.result_task = @async lock(jl_conn) do
jl_conn.async_result = async_result
Expand Down Expand Up @@ -262,7 +288,8 @@ end
function _async_submit(
conn_ptr::Ptr{libpq_c.PGconn},
query::AbstractString,
parameters::Vector{Ptr{UInt8}},
parameters::Vector{Ptr{UInt8}};
binary_format::Bool=false,
)
num_params = length(parameters)

Expand All @@ -274,7 +301,7 @@ function _async_submit(
parameters,
C_NULL, # paramLengths is ignored for text format parameters
zeros(Cint, num_params), # all parameters in text format
zero(Cint), # return result in text format
Cint(binary_format), # return result in text or binary format
)

return send_status == 1
Expand Down
Loading

0 comments on commit f1fe4c4

Please sign in to comment.