Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query Basic Types in Binary Format #223

Merged
merged 26 commits into from
Aug 12, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/src/pages/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
```@docs
LibPQ.Connection
execute
execute_params
prepare
status(::LibPQ.Connection)
Base.close(::LibPQ.Connection)
Expand Down Expand Up @@ -49,6 +50,7 @@ execute(::LibPQ.Connection, ::LibPQ.CopyIn)

```@docs
async_execute
async_execute_params
LibPQ.AsyncResult
cancel
```
Expand Down
8 changes: 4 additions & 4 deletions docs/src/pages/type-conversions.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ end

Currently all types are printed to strings and given to LibPQ as such, with no special treatment.
Expect this to change in a future release.
For now, you can convert the data to strings yourself before passing to [`execute`](@ref).
For now, you can convert the data to strings yourself before passing to [`execute`](@ref) or [`execute_params`](@ref).
This should only be necessary for data types whose Julia string representation is not valid in
PostgreSQL, such as arrays.

```jldoctest
julia> A = collect(12:15);

julia> nt = columntable(execute(conn, "SELECT \$1 = ANY(\$2) AS result", Any[13, string("{", join(A, ","), "}")]));
julia> nt = columntable(execute_params(conn, "SELECT \$1 = ANY(\$2) AS result", Any[13, string("{", join(A, ","), "}")]));

julia> nt[:result][1]
true
Expand Down Expand Up @@ -57,7 +57,7 @@ By default, data streamed using the Tables interface is `Union{T, Missing}`, and
`Vector{Union{T, Missing}}`.
While `libpq` does not provide an interface for checking whether a result column contains `NULL`,
it's possible to assert that columns do not contain `NULL` using the `not_null` keyword argument to
[`execute`](@ref).
[`execute`](@ref) or [`execute_params`](@ref).
This will result in data retrieved as `T`/`Vector{T}` instead.
`not_null` accepts a list of column names or column positions, or a `Bool` asserting that all
columns do or do not have the possibility of `NULL`.
Expand All @@ -72,7 +72,7 @@ Refer to the [Implementation](@ref) section for more detailed information.

#### Query-level

There are three arguments to [`execute`](@ref) for this:
There are three arguments to [`execute`](@ref) or [`execute_params`](@ref) for this:

* `column_types` argument to set the desired types for given columns.
This is accepted as a dictionary mapping column names (as `Symbol`s or `String`s) and/or positions
Expand Down
7 changes: 5 additions & 2 deletions src/LibPQ.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module LibPQ

export status, reset!, execute, prepare, async_execute, cancel,
num_columns, num_rows, num_params, num_affected_rows
export status, reset!, execute, execute_params, prepare, async_execute,
async_execute_params, cancel, num_columns, num_rows, num_params, num_affected_rows


using Base: Semaphore, acquire, release
Expand Down Expand Up @@ -83,6 +83,9 @@ LibPQ.jl.
"""
const LIBPQ_CONVERSIONS = PQConversions()

const BINARY = true
const TEXT = false

include("connections.jl")
include("results.jl")
include("statements.jl")
Expand Down
119 changes: 84 additions & 35 deletions src/asyncresults.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"An asynchronous PostgreSQL query"
mutable struct AsyncResult
mutable struct AsyncResult{BinaryFormat}
"The LibPQ.jl Connection used for the query"
jl_conn::Connection

Expand All @@ -12,13 +12,15 @@ mutable struct AsyncResult
"Task which errors or returns a LibPQ.jl Result which is created once available"
result_task::Task

function AsyncResult(jl_conn::Connection, result_kwargs::Ref)
return new(jl_conn, result_kwargs, false)
function AsyncResult{BinaryFormat}(
jl_conn::Connection, result_kwargs::Ref
) where {BinaryFormat}
return new{BinaryFormat}(jl_conn, result_kwargs, false)
end
end

function AsyncResult(jl_conn::Connection; kwargs...)
return AsyncResult(jl_conn, Ref(kwargs))
function AsyncResult{BinaryFormat}(jl_conn::Connection; kwargs...) where {BinaryFormat}
iamed2 marked this conversation as resolved.
Show resolved Hide resolved
return AsyncResult{BinaryFormat}(jl_conn, Ref(kwargs))
end

function Base.show(io::IO, async_result::AsyncResult)
Expand All @@ -31,7 +33,7 @@ function Base.show(io::IO, async_result::AsyncResult)
else
"in progress"
end
print(io, typeof(async_result), " (", status, ")")
return print(io, typeof(async_result), " (", status, ")")
end

"""
Expand All @@ -50,16 +52,16 @@ The result returned will be the [`Result`](@ref) of the last query run (the only
using parameters).
Any errors produced by the queries will be thrown together in a `CompositeException`.
"""
function handle_result(async_result::AsyncResult; throw_error=true)
function handle_result(
async_result::AsyncResult{BinaryFormat}; throw_error=true
) where {BinaryFormat}
errors = []
result = nothing
for result_ptr in _consume(async_result.jl_conn)
try
result = handle_result(
Result(
result_ptr,
async_result.jl_conn;
async_result.result_kwargs[]...
Result{BinaryFormat}(
result_ptr, async_result.jl_conn; async_result.result_kwargs[]...
);
throw_error=throw_error,
)
Expand Down Expand Up @@ -102,9 +104,11 @@ function _consume(jl_conn::Connection)
_cancel(jl_conn)
end

last_log == curr && debug(LOGGER, "Waiting to read from connection $(jl_conn.conn)")
last_log == curr &&
debug(LOGGER, "Waiting to read from connection $(jl_conn.conn)")
wait(watcher)
last_log == curr && debug(LOGGER, "Consuming input from connection $(jl_conn.conn)")
last_log == curr &&
debug(LOGGER, "Consuming input from connection $(jl_conn.conn)")
success = libpq_c.PQconsumeInput(jl_conn.conn) == 1
!success && error(LOGGER, Errors.PQConnectionError(jl_conn))

Expand All @@ -116,8 +120,8 @@ function _consume(jl_conn::Connection)
return result_ptrs
else
result_num = length(result_ptrs) + 1
debug(LOGGER,
"Saving result $result_num from connection $(jl_conn.conn)"
debug(
LOGGER, "Saving result $result_num from connection $(jl_conn.conn)"
)
push!(result_ptrs, result_ptr)
end
Expand All @@ -126,9 +130,12 @@ function _consume(jl_conn::Connection)
catch err
if err isa Base.IOError && err.code == -9 # EBADF
debug(() -> sprint(showerror, err), LOGGER)
error(LOGGER, Errors.JLConnectionError(
"PostgreSQL connection socket was unexpectedly closed"
))
error(
LOGGER,
Errors.JLConnectionError(
"PostgreSQL connection socket was unexpectedly closed"
),
)
else
rethrow(err)
end
Expand All @@ -147,7 +154,7 @@ function cancel(async_result::AsyncResult)
# the actual cancellation will be triggered in the main loop of _consume
# which will call `_cancel` on the `Connection`
async_result.should_cancel = true
return
return nothing
end

function _cancel(jl_conn::Connection)
Expand All @@ -158,9 +165,10 @@ function _cancel(jl_conn::Connection)
errbuf = zeros(UInt8, errbuf_size)
success = libpq_c.PQcancel(cancel_ptr, pointer(errbuf), errbuf_size) == 1
if !success
warn(LOGGER, Errors.JLConnectionError(
"Failed cancelling query: $(String(errbuf))"
))
warn(
LOGGER,
Errors.JLConnectionError("Failed cancelling query: $(String(errbuf))"),
)
else
debug(LOGGER, "Cancelled query for connection $(jl_conn.conn)")
end
Expand All @@ -179,7 +187,6 @@ Base.close(async_result::AsyncResult) = cancel(async_result)
async_execute(
jl_conn::Connection,
query::AbstractString,
[parameters::Union{AbstractVector, Tuple},]
kwargs...
) -> AsyncResult

Expand All @@ -196,14 +203,14 @@ If multiple `AsyncResult`s use the same `Connection`, they will execute serially

`async_execute` does not yet support [`Statement`](@ref)s.

`async_execute` optionally takes a `parameters` vector which passes query parameters as
strings to PostgreSQL.
Queries without parameters can contain multiple SQL statements, and the result of the final
statement is returned.
Any errors which occur during executed statements will be bundled together in a
`CompositeException` and thrown.

As is normal for `Task`s, any exceptions will be thrown when calling `wait` or `fetch`.

Also see `async_execute_params`.
"""
function async_execute end

Expand All @@ -215,26 +222,67 @@ function async_execute(jl_conn::Connection, query::AbstractString; kwargs...)
return async_result
end

function async_execute(
"""
async_execute_params(
jl_conn::Connection,
query::AbstractString,
[parameters::Union{AbstractVector, Tuple},]
binary_format=TEXT,
kwargs...
) -> AsyncResult

Run a query on the PostgreSQL database and return an [`AsyncResult`](@ref).

The `AsyncResult` contains a `Task` which processes a query asynchronously.
Calling `fetch` on the `AsyncResult` will return a [`Result`](@ref).

All keyword arguments are the same as [`execute`](@ref) and are passed to the created
`Result`.

Only one `AsyncResult` can be active on a [`Connection`](@ref) at once.
If multiple `AsyncResult`s use the same `Connection`, they will execute serially.

`async_execute_params` does not yet support [`Statement`](@ref)s.

`async_execute_params` optionally takes a `parameters` vector which passes query parameters
as strings to PostgreSQL.
Queries without parameters can contain multiple SQL statements, and the result of the final
statement is returned.
Any errors which occur during executed statements will be bundled together in a
`CompositeException` and thrown.
iamed2 marked this conversation as resolved.
Show resolved Hide resolved

As is normal for `Task`s, any exceptions will be thrown when calling `wait` or `fetch`.

Also see `async_execute`.
"""
function async_execute_params end

function async_execute_params(
jl_conn::Connection,
query::AbstractString,
parameters::Union{AbstractVector, Tuple};
kwargs...
parameters::Union{AbstractVector,Tuple}=[];
binary_format=TEXT,
iamed2 marked this conversation as resolved.
Show resolved Hide resolved
kwargs...,
)
string_params = string_parameters(parameters)
pointer_params = parameter_pointers(string_params)

async_result = _async_execute(jl_conn; kwargs...) do jl_conn
_async_submit(jl_conn.conn, query, pointer_params)
end
async_result =
_async_execute(jl_conn; binary_format=binary_format, kwargs...) do jl_conn
_async_submit(jl_conn.conn, query, pointer_params; binary_format=binary_format)
end

return async_result
end

function _async_execute(
submission_fn::Function, jl_conn::Connection; throw_error::Bool=true, kwargs...
submission_fn::Function,
jl_conn::Connection;
binary_format::Bool=TEXT,
throw_error::Bool=true,
kwargs...,
)
async_result = AsyncResult(jl_conn; kwargs...)
async_result = AsyncResult{binary_format}(jl_conn; kwargs...)

async_result.result_task = @async lock(jl_conn) do
jl_conn.async_result = async_result
Expand Down Expand Up @@ -262,7 +310,8 @@ end
function _async_submit(
conn_ptr::Ptr{libpq_c.PGconn},
query::AbstractString,
parameters::Vector{Ptr{UInt8}},
parameters::Vector{Ptr{UInt8}};
binary_format::Bool=TEXT,
)
num_params = length(parameters)

Expand All @@ -274,7 +323,7 @@ function _async_submit(
parameters,
C_NULL, # paramLengths is ignored for text format parameters
zeros(Cint, num_params), # all parameters in text format
zero(Cint), # return result in text format
Cint(binary_format), # return result in text or binary format
)

return send_status == 1
Expand Down
Loading