From e3cb76167965ee0e4ab42676f8a5fd270289b08a Mon Sep 17 00:00:00 2001 From: AD Date: Wed, 6 Jun 2018 08:18:45 +0200 Subject: [PATCH] Added enhanced Array_Merge for name_alternatives column; added args for startInputID to endInputID --- classes/submitData.py | 6 +++++- transferData.py | 23 +++++++++++++---------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/classes/submitData.py b/classes/submitData.py index 19c5a69..b3cef29 100644 --- a/classes/submitData.py +++ b/classes/submitData.py @@ -101,11 +101,15 @@ def submitLbsnCountry(self, record): ON CONFLICT (origin_id,country_guid) DO UPDATE SET name = COALESCE(EXCLUDED.name, "country".name), - name_alternatives = COALESCE(mergeArrays(EXCLUDED.name_alternatives, "country".name_alternatives),ARRAY[]::text[]), + name_alternatives = COALESCE((SELECT array_remove(altNamesNewArray,"country".name) from mergeArrays(EXCLUDED.name_alternatives, "country".name_alternatives) AS altNamesNewArray), ARRAY[]::text[]), geom_center = COALESCE(EXCLUDED.geom_center, "country".geom_center), geom_area = COALESCE(EXCLUDED.geom_area, "country".geom_area), url = COALESCE(EXCLUDED.url, "country".url); ''' + # Array merge of alternatives: + # Arrays cannot be null, therefore COALESCE([if array not null],[otherwise create empoty array]) + # We don't want the english name to appear in alternatives, therefore: array_remove(altNamesNewArray,"country".name) + # Finally, merge New Entries with existing ones mergeArrays([new],[old]) uses custom mergeArrays function (see function definitions) self.dbCursor.execute(insert_sql,(iCountry_OriginID,iCountry_Guid,iCountry_name,iCountry_name_alternatives,iCountry_geom_center,iCountry_geom_area,iCountry_url)) self.country_already_inserted.add(iCountry_Guid) diff --git a/transferData.py b/transferData.py index 512acf5..258519a 100644 --- a/transferData.py +++ b/transferData.py @@ -50,6 +50,8 @@ def main(): parser.add_argument('-t', "--transferlimit", default=0) parser.add_argument('-tR', "--transferReactions", default=0) parser.add_argument('-tG', "--transferNotGeotagged", default=0) + parser.add_argument('-rS', "--startWithDBRowNumber", default=0) + parser.add_argument('-rE', "--endWithDBRowNumber", default=None) parser.add_argument('-d', "--debugMode", default="INFO") #needs to be implemented args = parser.parse_args() logging.basicConfig(handlers=[logging.FileHandler('test.log', 'w', 'utf-8')], @@ -80,20 +82,21 @@ def main(): True # ReadOnly Mode ) processedRecords = 0 - lastDBRowNumber = 0 # Start Value, Modify to continue from last processing - firstDBRowNumber = 0 + firstDBRowNumber = 0 + continueWithDBRowNumber = args.startWithDBRowNumber # Start Value, Modify to continue from last processing + endWithDBRowNumber = args.endWithDBRowNumber # End Value, Modify to continue from last processing conn_input, cursor_input = inputConnection.connect() finished = False lbsnRecords = lbsnRecordDicts() while not finished: - records, returnedRecord_count = fetchJsonData_from_LBSN(cursor_input, lastDBRowNumber) + records, returnedRecord_count = fetchJsonData_from_LBSN(cursor_input, continueWithDBRowNumber) if not firstDBRowNumber: - firstDBRowNumber= records[0][0] + firstDBRowNumber = records[0][0] if returnedRecord_count == 0: finished = True break else: - lbsnRecords, processedRecords, lastDBRowNumber, finished = loopInputRecords(records, origin, processedRecords, transferlimit, finished, lbsnRecords) + lbsnRecords, processedRecords, continueWithDBRowNumber, finished = loopInputRecords(records, origin, processedRecords, transferlimit, finished, lbsnRecords, endWithDBRowNumber) print(f'{processedRecords} Processed. Count per type: {lbsnRecords.getTypeCounts()}records.', end='\r') # update console sys.stdout.flush() @@ -103,7 +106,7 @@ def main(): outputDB.submitLbsnRecordDicts(lbsnRecords) outputDB.commitChanges() cursor_output.close() - log.info(f'\n\nProcessed {processedRecords} records. From DBRowNumber {firstDBRowNumber} to {lastDBRowNumber}.') + log.info(f'\n\nProcessed {processedRecords} records. From DBRowNumber {firstDBRowNumber} to {continueWithDBRowNumber}.') #print('10 Random samples for each type:\n') #for key,keyHash in lbsnRecords.KeyHashes.items(): # print(f'{key}: {", ".join(val for i, val in enumerate(random.sample(keyHash, min(10,len(keyHash)))))}') @@ -117,18 +120,18 @@ def main(): print('Done.') -def loopInputRecords(jsonRecords, origin, processedRecords, transferlimit, finished, lbsnRecords): +def loopInputRecords(jsonRecords, origin, processedRecords, transferlimit, finished, lbsnRecords, endWithDBRowNumber): for record in jsonRecords: processedRecords += 1 - lastDBRowNumber = record[0] + continueWithDBRowNumber = record[0] singleJSONRecordDict = record[2] if singleJSONRecordDict.get('limit'): # Skip Rate Limiting Notice continue lbsnRecords = fieldMappingTwitter.parseJsonRecord(singleJSONRecordDict, origin, lbsnRecords) - if processedRecords >= transferlimit: + if processedRecords >= transferlimit or (endWithDBRowNumber and continueWithDBRowNumber >= endWithDBRowNumber): finished = True - return lbsnRecords, processedRecords, lastDBRowNumber, finished + return lbsnRecords, processedRecords, continueWithDBRowNumber, finished def fetchJsonData_from_LBSN(cursor, startID = 0): query_sql = '''