Skip to content

Commit

Permalink
Added Geocoding Option for Text Location Strings
Browse files Browse the repository at this point in the history
  • Loading branch information
Sieboldianus committed Jun 20, 2018
1 parent 573fb9a commit ae10cb8
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 52 deletions.
9 changes: 7 additions & 2 deletions classes/fieldMapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from google.protobuf import text_format

class fieldMappingTwitter():
def __init__(self, disableReactionPostReferencing = False):
def __init__(self, disableReactionPostReferencing = False, geocodes = False):
# We're dealing with Twitter in this class, lets create the OriginID globally
# this OriginID is required for all CompositeKeys
origin = lbsnOrigin()
Expand All @@ -21,7 +21,8 @@ def __init__(self, disableReactionPostReferencing = False):
self.lbsnRecords = lbsnRecordDicts() #this is where all the data will be stored
self.log = logging.getLogger('__main__')#logging.getLogger()
self.disableReactionPostReferencing = disableReactionPostReferencing

self.geocodes = geocodes

def parseJsonRecord(self, jsonStringDict):
# decide if main object is post or user json
if 'screen_name' in jsonStringDict:
Expand Down Expand Up @@ -133,6 +134,10 @@ def extractUser(self,jsonStringDict):
userLocation = user.get('location')
if userLocation:
userRecord.user_location = userLocation
if self.geocodes and userRecord.user_location in self.geocodes:
l_lat = self.geocodes[userRecord.user_location][0]
l_lng = self.geocodes[userRecord.user_location][1]
userRecord.user_location_geom = "POINT(%s %s)" % (l_lng,l_lat)
#userGeoLocation = user.get('profile_location') # todo!
userRecord.liked_count = user.get('favourites_count')
userRecord.active_since.CopyFrom(helperFunctions.parseJSONDateStringToProtoBuf(user.get('created_at')))
Expand Down
23 changes: 12 additions & 11 deletions classes/helperFunctions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from datetime import timezone
import re
import csv
import emoji
import numpy as np
from lbsnstructure.Structure_pb2 import *
Expand Down Expand Up @@ -311,14 +312,14 @@ def AddRecordToDict(self,record):
dict = self.dictSelector(record)
self.MergeExistingRecords(record,dict)

#class geocodeLocation():
# def __init__(self,file):
# self.geocodeDict = load_geocodelist(file)
#
# def load_geocodelist(self,file):
# with open(file, newline='', encoding='utf8') as f: #read each unsorted file and sort lines based on datetime (as string)
# next(f) #Skip Headerrow
# logfile_list = csv.reader(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_ALL)
# for logfile_entry in logfile_list:
# logfile_size_dict[logfile_entry[0].replace('\\','/')] = (float(logfile_entry[1]),logfile_entry[2])
# #print(repr(logfile_entry[0].replace('\\','/')))
class geocodeLocations():
def __init__(self):
self.geocodeDict = dict()

def load_geocodelist(self,file):
with open(file, newline='', encoding='utf8') as f: #read each unsorted file and sort lines based on datetime (as string)
#next(f) #Skip Headerrow
locationfile_list = csv.reader(f, delimiter=',', quotechar='', quoting=csv.QUOTE_NONE)
for location_geocode in locationfile_list:
self.geocodeDict[location_geocode[2].replace(';',',')] = (float(location_geocode[0]),location_geocode[1]) # lat/lng
print(f'Loaded {len(self.geocodeDict)} geocodes.')
24 changes: 12 additions & 12 deletions classes/submitData.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,18 +369,18 @@ def submitBatch(self,insert_sql):
try:
self.dbCursor.execute(insert_sql)
except psycopg2.IntegrityError as e:
if '(post_language)' in e.diag.message_detail or '(user_language)' in e.diag.message_detail:
# If language does not exist, we'll trust Twitter and add this to our language list
missingLanguage = e.diag.message_detail.partition("language)=(")[2].partition(") is not present")[0]
print(f'TransactionIntegrityError, inserting language "{missingLanguage}" first..')
#self.dbConnection.rollback()
self.dbCursor.execute("ROLLBACK TO SAVEPOINT submit_recordBatch")
insert_language_sql = '''
INSERT INTO "language" (language_short,language_name,language_name_de)
VALUES (%s,NULL,NULL);
'''
self.dbCursor.execute(insert_language_sql,(missingLanguage,))
#self.prepareLbsnRecord(record,type_name)
if '(post_language)' in e.diag.message_detail or '(user_language)' in e.diag.message_detail:
# If language does not exist, we'll trust Twitter and add this to our language list
missingLanguage = e.diag.message_detail.partition("language)=(")[2].partition(") is not present")[0]
print(f'TransactionIntegrityError, inserting language "{missingLanguage}" first..')
#self.dbConnection.rollback()
self.dbCursor.execute("ROLLBACK TO SAVEPOINT submit_recordBatch")
insert_language_sql = '''
INSERT INTO "language" (language_short,language_name,language_name_de)
VALUES (%s,NULL,NULL);
'''
self.dbCursor.execute(insert_language_sql,(missingLanguage,))
#self.prepareLbsnRecord(record,type_name)
except ValueError as e:
self.log.warning(f'{e}')
input("Press Enter to continue... (entry will be skipped)")
Expand Down
6 changes: 5 additions & 1 deletion config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ def __init__(self):
self.startWithDBRowNumber = 0
self.endWithDBRowNumber = None
self.debugMode = 'INFO' #needs to be implemented

self.geocodeLocations = False # provide path to CSV file with location geocodes (CSV Structure: lat, lng, name)

def parseArgs(self):
parser = argparse.ArgumentParser()
parser.add_argument('-lI', "--LocalInput", default=self.LocalInput)
Expand All @@ -51,6 +52,7 @@ def parseArgs(self):
parser.add_argument('-rS', "--startWithDBRowNumber", default=self.startWithDBRowNumber)
parser.add_argument('-rE', "--endWithDBRowNumber", default=self.endWithDBRowNumber)
parser.add_argument('-d', "--debugMode", default=self.debugMode)
parser.add_argument('-gL', "--geocodeLocations", default=self.geocodeLocations)

args = parser.parse_args()
if args.LocalInput and int(args.LocalInput) == 1:
Expand All @@ -71,6 +73,8 @@ def parseArgs(self):
self.dbPassword_Input = args.dbPassword_Input
self.dbServeradressInput = args.dbServeradressInput
self.dbNameInput = args.dbNameInput
if args.geocodeLocations:
self.geocodeLocations = f'{os.getcwd()}\\{args.geocodeLocations}'
self.dbUser_Output = args.dbUser_Output
self.dbPassword_Output = args.dbPassword_Output
self.dbServeradressOutput = args.dbServeradressOutput
Expand Down
59 changes: 33 additions & 26 deletions transferData.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from classes.dbConnection import dbConnection
from classes.helperFunctions import helperFunctions
from classes.helperFunctions import lbsnRecordDicts as lbsnRecordDicts
from classes.helperFunctions import geocodeLocations as geocodeLocations
from classes.fieldMapping import fieldMappingTwitter as fieldMappingTwitter
from classes.submitData import lbsnDB as lbsnDB
from config.config import baseconfig as baseconfig
Expand Down Expand Up @@ -82,21 +83,27 @@ def main():
processedTotal = 0
startNumber = 0
continueNumber = config.startWithDBRowNumber # Start Value, Modify to continue from last processing
endNumber = config.endWithDBRowNumber # End Value, Modify to continue from last processing

endNumber = config.endWithDBRowNumber # End Value, Modify to continue from last processing

# Optional Geocoding
locatiosGeocodeDict = False
if config.geocodeLocations:
locatiosGeocodeDict = geocodeLocations()
locatiosGeocodeDict.load_geocodelist(config.geocodeLocations)

finished = False
twitterRecords = fieldMappingTwitter(config.disableReactionPostReferencing)
twitterRecords = fieldMappingTwitter(config.disableReactionPostReferencing, locatiosGeocodeDict.geocodeDict)

# Manually add entries that need submission prior to parsing data
# Example: A Group that applies to all entries
deutscherBundestagGroup = helperFunctions.createNewLBSNRecord_with_id(lbsnUserGroup(),"MdB (Bundestag)",twitterRecords.origin)
DBG_owner = helperFunctions.createNewLBSNRecord_with_id(lbsnUser(),"243586130",twitterRecords.origin)
DBG_owner.user_name = 'wahl_beobachter'
DBG_owner.user_fullname = 'Martin Fuchs'
deutscherBundestagGroup.user_owner_pkey.CopyFrom(DBG_owner.pkey)
deutscherBundestagGroup.usergroup_description = 'Alle twitternden Abgeordneten aus dem Deutschen Bundestag #bundestag'
twitterRecords.lbsnRecords.AddRecordToDict(DBG_owner)
twitterRecords.lbsnRecords.AddRecordToDict(deutscherBundestagGroup)
#deutscherBundestagGroup = helperFunctions.createNewLBSNRecord_with_id(lbsnUserGroup(),"MdB (Bundestag)",twitterRecords.origin)
#DBG_owner = helperFunctions.createNewLBSNRecord_with_id(lbsnUser(),"243586130",twitterRecords.origin)
#DBG_owner.user_name = 'wahl_beobachter'
#DBG_owner.user_fullname = 'Martin Fuchs'
#deutscherBundestagGroup.user_owner_pkey.CopyFrom(DBG_owner.pkey)
#deutscherBundestagGroup.usergroup_description = 'Alle twitternden Abgeordneten aus dem Deutschen Bundestag #bundestag'
#twitterRecords.lbsnRecords.AddRecordToDict(DBG_owner)
#twitterRecords.lbsnRecords.AddRecordToDict(deutscherBundestagGroup)

# loop input DB until transferlimit reached or no more rows are returned
while not finished:
Expand Down Expand Up @@ -192,21 +199,21 @@ def fetchJsonData_from_LBSN(cursor, startID = 0, transferlimit = None, numberOfR
def fetchJsonData_from_File(loc_filelist, startFileID = 0, isStackedJson = False):
x = 0
records = []
for locFile in loc_filelist:
if x == startFileID:
with open(locFile, 'r', encoding="utf-8", errors='replace') as file:
# Stacked JSON is a simple file with many concatenated jsons, e.g. {json1}{json2} etc.
if isStackedJson:
try:
for obj in helperFunctions.decode_stacked(file.read()):
records.append(obj)
#print(f'Object: {obj[-1]}')
except json.decoder.JSONDecodeError:
pass
else:
# normal json nesting, e.g. {{record1},{record2}}
records = json.loads(file.read())
x += 1
if startFileID > len(loc_filelist)-1:
return None
locFile = loc_filelist[startFileID]
with open(locFile, 'r', encoding="utf-8", errors='replace') as file:
# Stacked JSON is a simple file with many concatenated jsons, e.g. {json1}{json2} etc.
if isStackedJson:
try:
for obj in helperFunctions.decode_stacked(file.read()):
records.append(obj)
#print(f'Object: {obj[-1]}')
except json.decoder.JSONDecodeError:
pass
else:
# normal json nesting, e.g. {{record1},{record2}}
records = json.loads(file.read())
if records:
return records
else:
Expand Down

0 comments on commit ae10cb8

Please sign in to comment.