Skip to content

Commit

Permalink
Added enhanced Array_Merge for name_alternatives column; added args f…
Browse files Browse the repository at this point in the history
…or startInputID to endInputID
  • Loading branch information
Sieboldianus committed Jun 6, 2018
1 parent 5d40b08 commit e3cb761
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 11 deletions.
6 changes: 5 additions & 1 deletion classes/submitData.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,15 @@ def submitLbsnCountry(self, record):
ON CONFLICT (origin_id,country_guid)
DO UPDATE SET
name = COALESCE(EXCLUDED.name, "country".name),
name_alternatives = COALESCE(mergeArrays(EXCLUDED.name_alternatives, "country".name_alternatives),ARRAY[]::text[]),
name_alternatives = COALESCE((SELECT array_remove(altNamesNewArray,"country".name) from mergeArrays(EXCLUDED.name_alternatives, "country".name_alternatives) AS altNamesNewArray), ARRAY[]::text[]),
geom_center = COALESCE(EXCLUDED.geom_center, "country".geom_center),
geom_area = COALESCE(EXCLUDED.geom_area, "country".geom_area),
url = COALESCE(EXCLUDED.url, "country".url);
'''
# Array merge of alternatives:
# Arrays cannot be null, therefore COALESCE([if array not null],[otherwise create empoty array])
# We don't want the english name to appear in alternatives, therefore: array_remove(altNamesNewArray,"country".name)
# Finally, merge New Entries with existing ones mergeArrays([new],[old]) uses custom mergeArrays function (see function definitions)
self.dbCursor.execute(insert_sql,(iCountry_OriginID,iCountry_Guid,iCountry_name,iCountry_name_alternatives,iCountry_geom_center,iCountry_geom_area,iCountry_url))
self.country_already_inserted.add(iCountry_Guid)

Expand Down
23 changes: 13 additions & 10 deletions transferData.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def main():
parser.add_argument('-t', "--transferlimit", default=0)
parser.add_argument('-tR', "--transferReactions", default=0)
parser.add_argument('-tG', "--transferNotGeotagged", default=0)
parser.add_argument('-rS', "--startWithDBRowNumber", default=0)
parser.add_argument('-rE', "--endWithDBRowNumber", default=None)
parser.add_argument('-d', "--debugMode", default="INFO") #needs to be implemented
args = parser.parse_args()
logging.basicConfig(handlers=[logging.FileHandler('test.log', 'w', 'utf-8')],
Expand Down Expand Up @@ -80,20 +82,21 @@ def main():
True # ReadOnly Mode
)
processedRecords = 0
lastDBRowNumber = 0 # Start Value, Modify to continue from last processing
firstDBRowNumber = 0
firstDBRowNumber = 0
continueWithDBRowNumber = args.startWithDBRowNumber # Start Value, Modify to continue from last processing
endWithDBRowNumber = args.endWithDBRowNumber # End Value, Modify to continue from last processing
conn_input, cursor_input = inputConnection.connect()
finished = False
lbsnRecords = lbsnRecordDicts()
while not finished:
records, returnedRecord_count = fetchJsonData_from_LBSN(cursor_input, lastDBRowNumber)
records, returnedRecord_count = fetchJsonData_from_LBSN(cursor_input, continueWithDBRowNumber)
if not firstDBRowNumber:
firstDBRowNumber= records[0][0]
firstDBRowNumber = records[0][0]
if returnedRecord_count == 0:
finished = True
break
else:
lbsnRecords, processedRecords, lastDBRowNumber, finished = loopInputRecords(records, origin, processedRecords, transferlimit, finished, lbsnRecords)
lbsnRecords, processedRecords, continueWithDBRowNumber, finished = loopInputRecords(records, origin, processedRecords, transferlimit, finished, lbsnRecords, endWithDBRowNumber)
print(f'{processedRecords} Processed. Count per type: {lbsnRecords.getTypeCounts()}records.', end='\r')
# update console
sys.stdout.flush()
Expand All @@ -103,7 +106,7 @@ def main():
outputDB.submitLbsnRecordDicts(lbsnRecords)
outputDB.commitChanges()
cursor_output.close()
log.info(f'\n\nProcessed {processedRecords} records. From DBRowNumber {firstDBRowNumber} to {lastDBRowNumber}.')
log.info(f'\n\nProcessed {processedRecords} records. From DBRowNumber {firstDBRowNumber} to {continueWithDBRowNumber}.')
#print('10 Random samples for each type:\n')
#for key,keyHash in lbsnRecords.KeyHashes.items():
# print(f'{key}: {", ".join(val for i, val in enumerate(random.sample(keyHash, min(10,len(keyHash)))))}')
Expand All @@ -117,18 +120,18 @@ def main():
print('Done.')


def loopInputRecords(jsonRecords, origin, processedRecords, transferlimit, finished, lbsnRecords):
def loopInputRecords(jsonRecords, origin, processedRecords, transferlimit, finished, lbsnRecords, endWithDBRowNumber):
for record in jsonRecords:
processedRecords += 1
lastDBRowNumber = record[0]
continueWithDBRowNumber = record[0]
singleJSONRecordDict = record[2]
if singleJSONRecordDict.get('limit'):
# Skip Rate Limiting Notice
continue
lbsnRecords = fieldMappingTwitter.parseJsonRecord(singleJSONRecordDict, origin, lbsnRecords)
if processedRecords >= transferlimit:
if processedRecords >= transferlimit or (endWithDBRowNumber and continueWithDBRowNumber >= endWithDBRowNumber):
finished = True
return lbsnRecords, processedRecords, lastDBRowNumber, finished
return lbsnRecords, processedRecords, continueWithDBRowNumber, finished

def fetchJsonData_from_LBSN(cursor, startID = 0):
query_sql = '''
Expand Down

0 comments on commit e3cb761

Please sign in to comment.