-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
google cloud datastore put_multi couldn't insert data #3115
Comments
@daiyueweng the library isn't thread safe 😞. It uses httplib2 (however it's in process of being removed #1998). If you run this in a single process does it work? |
@daiyueweng The Datastore Admin console will be eventually consistent, so you may not see your inserted entities right away. As @daspecster mentioned, there are some issues with thread-safety, so sharing a |
Hi. changed the code to use the single process and set batch size to 500 (maximal). It took 74 secs to insert 6946 One other thing, I use |
If I use
|
@daiyueweng That error seems to indicate a bug in your calling code. |
@dhermes I updated with the calling code, got some errors with def multiprocess_insert(val_dicts):
client = datastore.Client()
# val_dicts is a list of dictionaries for updating the entities
entities = [Entity(client.key(self.kind)) for i in range(len(val_dicts))]
for entity, update_dict in zip(entities, val_dicts):
entity.update(update_dict)
cores_to_use = 3 # cores_to_use is how many cpu cores available for dividing workload
# a datastore client is passed in as the argument
inserter = FastInsertGCDatastore(client) # FastInsertGCDatastore class code is in my OP
# entities is a list of datastore entities to be inserted
# the number of entities is 6000 here
input_size = len(entities)
slice_size = int(input_size / cores_to_use)
entity_blocks = []
iterator = iter(entities)
for i in range(cores_to_use):
entity_blocks.append([])
for j in range(slice_size):
entity_blocks[i].append(iterator.__next__())
for block in entity_blocks:
p = multiprocessing.Process(target=inserter.execute, args=(block,))
p.start() Error
|
@daiyueweng Your code still does not create a new |
@dhermes That was my mistake I didn't put the correct code snippet, I have updated my last post correctly. I am getting those error, even I passed in |
@daiyueweng Your snippet still uses entities = [Entity(self.client.key(self.kind)) for i in range(len(val_dicts))] That means the code won't run. |
@daiyueweng I'm saying |
@dhermes changed the code to spawn processes within the class FastInsertGCDatastore:
"""
batch insert entities into gc datastore
"""
def execute(self, entities):
client = datastore.Client()
number_of_entities = len(entities)
batch_size = 50
batch_documents = [0] * batch_size
rowct = 0 # entity count as index for accessing rows
for index in range(number_of_entities):
try:
batch_documents[index % batch_size] = entities[rowct]
rowct += 1
if (index + 1) % batch_size == 0:
# self.client.put_multi(batch_documents)
client.put_multi(batch_documents)
index += 1
except Exception as e:
print('Unexpected error for index ', index, ' message reads', str(e))
raise e
# insert any that missed the boat
if not index % batch_size == 0:
# self.client.put_multi(batch_documents[:index % batch_size])
client.put_multi(batch_documents[:index % batch_size]) Tested the entire piece of code with inserting over 1 million of Process Process-1:
Traceback (most recent call last):
File "/home/daiyue/.local/lib/python3.5/site-packages/google/cloud/datastore/_http.py", line 253, in _grpc_catch_rendezvous
yield
File "/home/daiyue/.local/lib/python3.5/site-packages/google/cloud/datastore/_http.py", line 356, in commit
return self._stub.Commit(request_pb)
File "/home/daiyue/.local/lib/python3.5/site-packages/grpc/_channel.py", line 481, in __call__
return _end_unary_response_blocking(state, False, deadline)
File "/home/daiyue/.local/lib/python3.5/site-packages/grpc/_channel.py", line 432, in _end_unary_response_blocking
raise _Rendezvous(state, None, None, deadline)
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, {"created":"@1489058568.826120836","description":"Secure read failed","file":"src/core/lib/security/transport/secure_endpoint.c","file_line":157,"grpc_status":14,"referenced_errors":[{"created":"@1489058568.217166049","description":"EOF","file":"src/core/lib/iomgr/tcp_posix.c","file_line":235}]})>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/home/daiyue/PycharmProjects/lumar_ingestion/ingestion_db/fast_insert_gc_datastore.py", line 63, in execute
raise e
File "/home/daiyue/PycharmProjects/lumar_ingestion/ingestion_db/fast_insert_gc_datastore.py", line 59, in execute
client.put_multi(batch_documents)
File "/home/daiyue/.local/lib/python3.5/site-packages/google/cloud/datastore/client.py", line 362, in put_multi
current.commit()
File "/home/daiyue/.local/lib/python3.5/site-packages/google/cloud/datastore/batch.py", line 265, in commit
self._commit()
File "/home/daiyue/.local/lib/python3.5/site-packages/google/cloud/datastore/batch.py", line 242, in _commit
self.project, self._commit_request, self._id)
File "/home/daiyue/.local/lib/python3.5/site-packages/google/cloud/datastore/_http.py", line 627, in commit
response = self._datastore_api.commit(project, request)
File "/home/daiyue/.local/lib/python3.5/site-packages/google/cloud/datastore/_http.py", line 356, in commit
return self._stub.Commit(request_pb)
File "/usr/lib/python3.5/contextlib.py", line 77, in __exit__
self.gen.throw(type, value, traceback)
File "/home/daiyue/.local/lib/python3.5/site-packages/google/cloud/datastore/_http.py", line 260, in _grpc_catch_rendezvous
raise error_class(exc.details())
google.cloud.exceptions.ServiceUnavailable: 503 {"created":"@1489058568.826120836","description":"Secure read failed","file":"src/core/lib/security/transport/secure_endpoint.c","file_line":157,"grpc_status":14,"referenced_errors":[{"created":"@1489058568.217166049","description":"EOF","file":"src/core/lib/iomgr/tcp_posix.c","file_line":235}]}
F
self = <lumar_ingestion.ingestion_workflow_modules.insert_into_database_test.TestInsertIntoDatabase object at 0x7f72c174cb00>
def test_sap_data_gc_datastore(self):
# read db config file
with open('linux_mint_config_datastore.json') as db_config_file:
db_config = json.load(db_config_file)
# create the input data packages from standard sap test rows
data_set = create_test_data_objects(erp='sap', data_folder='copaco', stage_to_test='insert_into_database',
use_cache=False, db_config=db_config)
# init google datastore client
gc_datastore_client = datastore.Client()
# insert every table into gc datastore using InsertIntoDatabase module and db_handler
for tab in data_set:
tab.time.start_time = time.time()
insert_into_db = InsertIntoDatabase(
db_handler=DatabaseHandler(database=gc_datastore_client, table=tab['table_name']))
> tab_output = insert_into_db.execute(tab)
ingestion_workflow_modules/insert_into_database_test.py:92:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
ingestion_workflow_modules/insert_into_database.py:72: in execute
if data.table_name in self.db_handler.get_tables():
ingestion_db/db_handler.py:45: in get_tables
return self.db_handler.get_tables()
ingestion_db/back_end_interfaces/google_db_handler.py:60: in get_tables
kinds = [entity.key.id_or_name for entity in query.fetch()
ingestion_db/back_end_interfaces/google_db_handler.py:60: in <listcomp>
kinds = [entity.key.id_or_name for entity in query.fetch()
../../.local/lib/python3.5/site-packages/google/cloud/iterator.py:210: in _items_iter
for page in self._page_iter(increment=False):
../../.local/lib/python3.5/site-packages/google/cloud/iterator.py:239: in _page_iter
page = self._next_page()
../../.local/lib/python3.5/site-packages/google/cloud/datastore/query.py:499: in _next_page
transaction_id=transaction and transaction.id,
../../.local/lib/python3.5/site-packages/google/cloud/datastore/_http.py:573: in run_query
response = self._datastore_api.run_query(project, request)
../../.local/lib/python3.5/site-packages/google/cloud/datastore/_http.py:321: in run_query
return self._stub.RunQuery(request_pb)
/usr/lib/python3.5/contextlib.py:77: in __exit__
self.gen.throw(type, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
@contextlib.contextmanager
def _grpc_catch_rendezvous():
"""Re-map gRPC exceptions that happen in context.
.. _code.proto: https://github.com/googleapis/googleapis/blob/\
master/google/rpc/code.proto
Remaps gRPC exceptions to the classes defined in
:mod:`~google.cloud.exceptions` (according to the description
in `code.proto`_).
"""
try:
yield
except exceptions.GrpcRendezvous as exc:
error_code = exc.code()
error_class = _GRPC_ERROR_MAPPING.get(error_code)
if error_class is None:
raise
else:
> raise error_class(exc.details())
E google.cloud.exceptions.ServiceUnavailable: 503 {"created":"@1489058833.599921395","description":"Secure read failed","file":"src/core/lib/security/transport/secure_endpoint.c","file_line":157,"grpc_status":14,"referenced_errors":[{"created":"@1489058833.376186909","description":"EOF","file":"src/core/lib/iomgr/tcp_posix.c","file_line":235}]}
../../.local/lib/python3.5/site-packages/google/cloud/datastore/_http.py:260: ServiceUnavailable The code seemed to be partially worked, as I can see from |
The failures may just be from the emulator not being able to handle a certain level of throughput? That is outside our purview. From here, it seems like a retry strategy (on errors) would help you. We are planning to add a retry feature but as of right now you'd need to implement your own. |
Hi, I am trying to insert 6000 rows/
entities
intogoogle cloud datastore
. I am also usingdatastore emulator
as the local server onLinux Mint 18.1
.In the code, I created an insert function that inserts entities in batches using
put_multi
and set the batch size to 50. I usepython multiprocessing
to spawn processes that execute the function.A slice function is also used to divide the workload based on how many CPU cores are used. e.g. if there are 3 cores, the workload (6000
entities
) is divided into 3 parts with 2000entities
each, then each part is inserted by a spawned process that executes the insert function.After insertion is done, I checked with
Cloud Datastore Admin
console, but couldn't find thekinds
that have been inserted.I am wondering what is the issue here and how to solve it.
I am using
Python 3.5.2
.the code snippet is as follows,
The text was updated successfully, but these errors were encountered: