Skip to content

Commit

Permalink
[4.3] SUPP-33: fix import/delete rates (#6677)
Browse files Browse the repository at this point in the history
* fix kzd_rates accessors return empty binary

Tasks app CSV has empty binary for all fields that are missing from uploaded CSV.
This will cause kzd_rates to always return empty string and doc save with them
in db.

This will make sure all accessros has correct type ne_binary and handle
situation when the value is empty binary so we save a clean JSON in db.

* try to update rate doc on conflict

When a user deletes the rates from ratedeck db, it is soft deleted by default.
This commit will try to update and ensure save the doc even if it is deleted.

* fix import/delete rates task

During testing the rate import and delete tasks for SUPP-33, some errors were found.

* fix import where ratedeck database was incorrectly formatted (like `ratedeck/`) because of
  `ratedeck_id` being an empty binary from CSV
* fix delete returning result which causing crashes in `kt_task_worker`
* in delete, group objects by db, also add better logging
* convert import state to map
* make a clean json to save to db: CSV missing fields are empty binary. Together with changes in kzd_rates make
sure we are generating a clean JSON to be saved to db.

* edoc

* ensure update doc on soft deleted and pvt_deleted

open_doc will ignore if pvt_deleted so update_doc will result in conflict again.
Adding option to allow pvt_deleted doc.

Also remove any previous pvt_deleted from doc on conflict save

* allow to open soft-deleted crossbar if requested in options, always remove pvt_deleted

* merge with old doc when uploading CSV

* bulk merge docs in kt_rates

* routes is binary
  • Loading branch information
icehess authored Dec 18, 2020
1 parent 3b1f127 commit ed9bc87
Show file tree
Hide file tree
Showing 5 changed files with 438 additions and 192 deletions.
44 changes: 29 additions & 15 deletions applications/crossbar/src/crossbar_doc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
,load_view/3, load_view/4, load_view/5, load_view/6
,load_attachment/4, load_docs/2
,save/1, save/2, save/3
,update/3, update/4
,update/3, update/4, update/5
,delete/1, delete/2
,save_attachment/4, save_attachment/5
,delete_attachment/3
Expand Down Expand Up @@ -122,7 +122,7 @@ load(DocId, Context, Options, _RespStatus) when is_binary(DocId) ->
handle_datamgr_errors(Error, DocId, Context);
{'ok', JObj} ->
case check_document_type(Context, JObj, Options) of
'true' -> handle_successful_load(Context, JObj);
'true' -> handle_successful_load(Context, JObj, Options);
'false' ->
ErrorCause = kz_json:from_list([{<<"cause">>, DocId}]),
cb_context:add_system_error('bad_identifier', ErrorCause, Context)
Expand Down Expand Up @@ -219,20 +219,25 @@ depluralize_resource_name(Name) ->
Bin -> Bin
end.

-spec handle_successful_load(cb_context:context(), kz_json:object()) -> cb_context:context().
handle_successful_load(Context, JObj) ->
handle_successful_load(Context, JObj, kz_doc:is_soft_deleted(JObj)).
-spec handle_successful_load(cb_context:context(), kz_json:object(), kz_term:proplist()) -> cb_context:context().
handle_successful_load(Context, JObj, Options) ->
handle_successful_load(Context, JObj, kz_doc:is_soft_deleted(JObj), props:get_is_true('deleted', Options, 'false')).

-spec handle_successful_load(cb_context:context(), kz_json:object(), boolean()) -> cb_context:context().
handle_successful_load(Context, JObj, 'true') ->
-spec handle_successful_load(cb_context:context(), kz_json:object(), boolean(), boolean()) -> cb_context:context().
handle_successful_load(Context, JObj, 'true', 'true') ->
lager:debug("loaded a soft-deleted doc ~s(~s) from ~s with explicitly set return soft-deleted option"
,[kz_doc:id(JObj), kz_doc:revision(JObj), cb_context:account_db(Context)]
),
cb_context:store(handle_datamgr_success(JObj, Context), 'db_doc', JObj);
handle_successful_load(Context, JObj, 'true', 'false') ->
lager:debug("doc ~s(~s) is soft-deleted, returning bad_identifier"
,[kz_doc:id(JObj), kz_doc:revision(JObj)]
),
cb_context:add_system_error('bad_identifier'
,kz_json:from_list([{<<"cause">>, kz_doc:id(JObj)}])
,Context
);
handle_successful_load(Context, JObj, 'false') ->
handle_successful_load(Context, JObj, 'false', _) ->
lager:debug("loaded doc ~s(~s) from ~s"
,[kz_doc:id(JObj), kz_doc:revision(JObj), cb_context:account_db(Context)]
),
Expand Down Expand Up @@ -616,10 +621,17 @@ update(Context, DocId, Updates) ->
-spec update(cb_context:context(), kz_json:key(), kz_json:flat_proplist(), kz_json:flat_proplist()) ->
cb_context:context().
update(Context, DocId, Updates, Creates) ->
UpdateOptions = [{'update', Updates}
,{'create', Creates}
,{'ensure_saved', 'true'}
],
update(Context, DocId, Updates, Creates, []).

-spec update(cb_context:context(), kz_json:key(), kz_json:flat_proplist(), kz_json:flat_proplist(), kz_term:proplist()) ->
cb_context:context().
update(Context, DocId, Updates, Creates, Options) ->
UpdateOptions =
props:set_values([{'update', Updates}
,{'create', Creates}
,{'ensure_saved', 'true'}
], Options
),
case kz_datamgr:update_doc(cb_context:account_db(Context), DocId, UpdateOptions) of
{'error', Error} ->
handle_datamgr_errors(Error, DocId, Context);
Expand Down Expand Up @@ -1252,11 +1264,13 @@ extract_included_docs(Context, JObjs) ->
lists:foldl(fun extract_included_docs_fold/2, {[], Context}, JObjs).

extract_included_docs_fold(JObj, {Docs, Context}) ->
case kz_json:get_ne_value(<<"doc">>, JObj) of
'undefined' ->
Doc = kz_json:get_ne_value(<<"doc">>, JObj),
%% open_docs could return `null' if the doc is hard deleted
case kz_json:is_json_object(Doc) of
'false' ->
Reason = kz_json:get_ne_value(<<"error">>, JObj),
ID = kz_json:get_ne_value(<<"key">>, JObj),
{Docs, handle_datamgr_errors(kz_term:to_atom(Reason, 'true'), ID, Context)};
Doc ->
'true' ->
{[Doc|Docs], Context}
end.
77 changes: 66 additions & 11 deletions applications/crossbar/src/modules/cb_rates.erl
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,42 @@ post(Context) ->

-spec post(cb_context:context(), path_token()) -> cb_context:context().
post(Context, _RateId) ->
crossbar_doc:save(Context).
try_save_conflict(Context).

-spec patch(cb_context:context(), path_token()) -> cb_context:context().
patch(Context, _RateId) ->
crossbar_doc:save(Context).
try_save_conflict(Context).

-spec put(cb_context:context()) -> cb_context:context().
put(Context) ->
crossbar_doc:save(Context).
try_save_conflict(Context).

-spec try_save_conflict(cb_context:context()) -> cb_context:context().
try_save_conflict(C) ->
%% load_merge below will open the doc even if soft-deleted.
%% Since we want to allow re-save soft-deleted remove the key
%% from doc in case load_merge opened a soft-deleted doc.
Doc = kz_json:delete_key(<<"pvt_deleted">>, cb_context:doc(C)),
Context = crossbar_doc:save(cb_context:set_doc(C, Doc)),
case cb_context:resp_status(Context) of
'success' -> Context;
_ ->
try_save_conflict(Context
,cb_context:resp_error_code(Context)
,cb_context:resp_error_msg(Context)
)
end.

-spec try_save_conflict(cb_context:context(), integer(), kz_term:api_ne_binary()) -> cb_context:context().
try_save_conflict(Context, 409, <<"datastore_conflict">>) ->
%% if a rate was deleted allow to create it again on conflict
Doc = kz_json:delete_key(<<"pvt_deleted">>, kz_doc:delete_revision(cb_context:doc(Context))),
DocId = kz_doc:id(Doc, kz_binary:rand_hex(16)),

lager:debug("saving rates ~s resulted in conflict, trying to update", [DocId]),
crossbar_doc:update(Context, DocId, kz_json:to_proplist(kz_json:flatten(Doc)), [], [{'deleted', 'true'}]);
try_save_conflict(Context, _, _) ->
Context.

-spec delete(cb_context:context(), path_token()) -> cb_context:context().
delete(Context, _RateId) ->
Expand Down Expand Up @@ -262,7 +289,8 @@ on_successful_validation('undefined', Context) ->
),
cb_context:set_doc(Context, Doc);
on_successful_validation(Id, Context) ->
crossbar_doc:load_merge(Id, Context, ?TYPE_CHECK_OPTION(<<"rate">>)).
%% if a rate was deleted allow to create it again on conflict
crossbar_doc:load_merge(Id, Context, [{'deleted', 'true'} | ?TYPE_CHECK_OPTION(<<"rate">>)]).

-spec ensure_routes_set(kz_json:object()) -> kz_json:object().
ensure_routes_set(Rate) ->
Expand Down Expand Up @@ -332,7 +360,7 @@ upload_csv(Context) ->
Now = kz_time:now(),
{'ok', {Count, Rates}} = process_upload_file(Context),
lager:debug("trying to save ~b rates (took ~b ms to process)", [Count, kz_time:elapsed_ms(Now)]),
_ = crossbar_doc:save(cb_context:set_doc(Context, Rates), [{'publish_doc', 'false'}]),
save_rates(Context, Rates),
lager:debug("it took ~b ms to process and save ~b rates", [kz_time:elapsed_ms(Now), Count]).

-spec process_upload_file(cb_context:context()) ->
Expand Down Expand Up @@ -390,7 +418,7 @@ process_row(Context, Row, Count, JObjs, BulkInsert) ->
andalso (Count rem BulkInsert) =:= 0 of
'false' -> JObjs;
'true' ->
_Pid = save_processed_rates(cb_context:set_doc(Context, JObjs), Count),
_Pid = save_processed_rates(Context, JObjs),
[]
end,
process_row(Row, {Count, J}).
Expand Down Expand Up @@ -527,16 +555,43 @@ get_row_direction([_|_]) ->
strip_quotes(Bin) ->
binary:replace(Bin, [<<"\"">>, <<"\'">>], <<>>, ['global']).

-spec save_processed_rates(cb_context:context(), integer()) -> pid().
save_processed_rates(Context, Count) ->
-spec save_processed_rates(cb_context:context(), kz_json:objects()) -> pid().
save_processed_rates(Context, JObjs) ->
kz_util:spawn(
fun() ->
Now = kz_time:now(),
_ = cb_context:put_reqid(Context),
_ = crossbar_doc:save(Context, [{'publish_doc', 'false'}]),
lager:debug("saved up to ~b docs (took ~b ms)", [Count, kz_time:elapsed_ms(Now)])
Now = kz_time:now(),
save_rates(Context, JObjs),
lager:debug("saved up to ~b docs (took ~b ms)", [length(JObjs), kz_time:elapsed_ms(Now)])
end).

-spec save_rates(cb_context:context(), kz_json:objects()) -> 'ok'.
save_rates(Context, Rates) ->
JObjs = maybe_merge_docs(Context, Rates),
_ = crossbar_doc:save(cb_context:set_doc(Context, JObjs), [{'publish_doc', 'false'}]),
'ok'.

maybe_merge_docs(Context, Rates) ->
Ids = [kz_doc:id(JObj) || JObj <- Rates],
case kz_datamgr:open_docs(cb_context:account_db(Context), Ids) of
{'ok', Result} ->
%% if a rate was deleted allow to re-save it again
DbJObjs = maps:from_list(
[{kz_doc:id(JObj), kz_json:delete_key(<<"pvt_deleted">>, kz_json:get_value(<<"doc">>, JObj))}
|| JObj <- Result,
kz_json:is_json_object(kz_json:get_value(<<"doc">>, JObj))
]
),
[kz_json:merge(maps:get(kz_doc:id(JObj), DbJObjs, kz_json:new()), JObj)
|| JObj <- Rates
];
{'error', _Reason} ->
lager:debug("failed with error ~p to fetch and merge rates from ~s, hoping for the best on saving"
,[_Reason, cb_context:account_db(Context)]
),
Rates
end.

-spec rate_for_number(kz_term:ne_binary(), cb_context:context()) -> cb_context:context().
rate_for_number(Phonenumber, Context) ->
Request = props:filter_undefined(
Expand Down
Loading

0 comments on commit ed9bc87

Please sign in to comment.