diff --git a/README.md b/README.md index 97286a4..90397ef 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,7 @@ ## LBSN Data Structure Concept: Twitter-json_protobuf_mapping -This tool will read JSON strings from a Postgres Database or local folder* and map the Twitter Endpoints to our common [LBSN Data Structure](https://gitlab.vgiscience.de/lbsn/concept) in ProtoBuf. - -^* Not currently implemented +This tool will read JSON strings from a Postgres Database or local folder and map the Twitter Endpoints to our common [LBSN Data Structure](https://gitlab.vgiscience.de/lbsn/concept) in ProtoBuf. +Output can be either a Postgres Database or local CSV. ### Quick Start diff --git a/classes/helperFunctions.py b/classes/helperFunctions.py index 46a5221..edf67e6 100644 --- a/classes/helperFunctions.py +++ b/classes/helperFunctions.py @@ -4,7 +4,8 @@ import re import csv import emoji -import numpy as np +#from numpy import amin as np_min +#from numpy import amax as np_max from lbsnstructure.Structure_pb2 import * from lbsnstructure.external.timestamp_pb2 import Timestamp import datetime @@ -39,10 +40,10 @@ def getRectangleBounds(points): for point in points: lngs.append(point[0]) lats.append(point[1]) - limYMin = np.min(lats) - limYMax = np.max(lats) - limXMin = np.min(lngs) - limXMax = np.max(lngs) + limYMin = min(lats) + limYMax = max(lats) + limXMin = min(lngs) + limXMax = max(lngs) return limYMin,limYMax,limXMin,limXMax def createNewLBSNRecord_with_id(record,id,origin): diff --git a/classes/submitData.py b/classes/submitData.py index 873931a..1ba8e90 100644 --- a/classes/submitData.py +++ b/classes/submitData.py @@ -42,6 +42,7 @@ def __init__(self, dbCursor = None, self.count_affected = 0 self.commit_volume = commit_volume self.store_volume = 500000 + self.storeCSVPart = 0 self.count_glob = 0 self.null_island_count = 0 self.disableReactionPostReferencing = disableReactionPostReferencing @@ -83,7 +84,7 @@ def __init__(self, dbCursor = None, '_user_friends_user': 'origin_id, user_guid, friend_guid' } if self.storeCSV: - self.OutputPathFile = f'{os.getcwd()}\\Output\\' + self.OutputPathFile = f'{os.getcwd()}\\02_Output\\' if not os.path.exists(self.OutputPathFile): os.makedirs(self.OutputPathFile) @@ -572,7 +573,7 @@ def sortCleanProtoRepeatedField(self, record): def storeAppendCSV(self, typeName, pgCopyFormat = False): records = self.batchedRecords[typeName] - filePath = f'{self.OutputPathFile}{typeName}_{self.countRound:03d}.csv' + filePath = f'{self.OutputPathFile}{typeName}-{self.countRound:03d}.csv' with open(filePath, 'a', encoding='utf8') as f: #csvOutput = csv.writer(f, delimiter=',', lineterminator='\n', quotechar='"', quoting=csv.QUOTE_MINIMAL) for record in records: @@ -596,18 +597,19 @@ def serializeEncodeRecord(self, record): def cleanCSVBatches(self): # function that merges all output streams at end x=0 + self.storeCSVPart += 1 + print('Cleaning and merging output files..') for typeName in self.batchedRecords: x+= 1 - filelist = glob(f'{self.OutputPathFile}{typeName}{self.countRound:03d}_*.csv') + filelist = glob(f'{self.OutputPathFile}{typeName}-*.csv') if filelist: - print(f'Cleaning & merging output files..{x}/{len(self.batchedRecords)}', end='\r') - sys.stdout.flush() self.sortFiles(filelist,typeName) if len(filelist) > 1: + print(f'Cleaning & merging output files..{x}/{len(self.batchedRecords)}', end='\r') self.mergeFiles(filelist,typeName) else: # no need to merge files if only one round - new_filename = filelist[0].replace('_001','') + new_filename = filelist[0].replace('_001','001Proto') if os.path.isfile(new_filename): os.remove(new_filename) os.rename(filelist[0], new_filename) @@ -633,7 +635,7 @@ def sortFiles(self, filelist, typeName): def mergeFiles(self, filelist, typeName): with ExitStack() as stack: files = [stack.enter_context(open(fname, encoding='utf8')) for fname in filelist] - with open(f'{self.OutputPathFile}{typeName}{self.countRound}.csv','w', encoding='utf8') as mergedFile: + with open(f'{self.OutputPathFile}{typeName}_Part{self.countRound:03d}Proto.csv','w', encoding='utf8') as mergedFile: mergedFile.writelines(heapqMerge(*files)) for file in filelist: os.remove(file) @@ -735,7 +737,7 @@ def cleanMergedFile(mergedFile, cleanedMergedFile): csvOutput.writerow(formattedValueList) print(f'{typeName} Duplicates Merged: {dupsremoved} ') # main - mergedFilename = f'{self.OutputPathFile}{typeName}{self.countRound:03d}.csv' + mergedFilename = f'{self.OutputPathFile}{typeName}{self.countRound:03d}Proto.csv' cleanedMergedFilename = f'{self.OutputPathFile}{typeName}{self.countRound:03d}_cleaned.csv' cleanedMergedFilename_CSV = f'{self.OutputPathFile}{typeName}{self.countRound:03d}pgCSV.csv' mergedFile = open(mergedFilename,'r', encoding='utf8') diff --git a/config/config.py b/config/config.py index 03bb5e4..464b68e 100644 --- a/config/config.py +++ b/config/config.py @@ -73,7 +73,7 @@ def parseArgs(self): else: self.isStackedJson = False if not self.InputPath: - self.InputPath = f'{os.getcwd()}\\Input\\' + self.InputPath = f'{os.getcwd()}\\01_Input\\' print(f'Using Path: {self.InputPath}') else: self.InputPath = args.InputPath diff --git a/setup.py b/setup.py index a95d7b3..ae09ce9 100644 --- a/setup.py +++ b/setup.py @@ -1,22 +1,77 @@ -from setuptools import setup +# -*- coding: utf-8 -*- +#from setuptools import setup as setup_wheel +import sys +from cx_Freeze import setup, Executable + +#Derive Package Paths Dynamically +import os.path +PYTHON_INSTALL_DIR = os.path.dirname(os.path.dirname(os.__file__)) + +## setuptools dev +#setup_wheel( name = "lbsnDataTransfer", +# version = "0.1.4", +# description = "lbsn data structure format & transfer tool", +# author='Alexander Dunkel', +# url='https://gitlab.vgiscience.de/lbsn/lbsn-twitter-json-mapping', +# license='GNU GPLv3 or any higher', +# packages=['classes', +# 'config', +# 'lbsnstructure'], +# install_requires=[ +# 'protobuf', +# 'psycopg2', +# 'ppygis3', +# 'shapely', +# 'emoji', +# 'numpy', +# 'shutil', +# 'heapq' +# ]) + +# Dependencies are automatically detected, but it might need fine tuning. +#build_exe_options = {"packages": ["os"], "excludes": []} +#includes_mod = ['google.protobuf', +# 'psycopg2', +# 'ppygis3', +# 'shapely', +# 'emoji', +# 'numpy', +# 'shutil', +# 'heapq', +# ] + +## CXFreeze build +excludes_mod = ['tkinter', + 'matplotlib', + 'IPython', + 'ipykernel', + 'jedi', + 'jinja2', + 'jupyter_client', + 'multiprocessing', + 'scipy', + 'numpy'] +packages_mod = [ + 'psycopg2', + 'ppygis3', + 'shapely', + 'emoji', + 'shutil', + 'heapq' + ] +include_folders_files = [('tests/00_TransferAll_Default.sh', '00_TransferAll_Default.sh') + ] +build_exe_options = {'include_files': include_folders_files, "packages": packages_mod, "excludes": excludes_mod} +base = None +executables = [ + Executable('transferData.py', base=base) +] setup( name = "lbsnDataTransfer", version = "0.1.4", description = "lbsn data structure format & transfer tool", author='Alexander Dunkel', url='https://gitlab.vgiscience.de/lbsn/lbsn-twitter-json-mapping', license='GNU GPLv3 or any higher', - packages=['classes', - 'config', - 'lbsnstructure'], - install_requires=[ - 'protobuf', - 'psycopg2', - 'ppygis3', - 'shapely', - 'emoji', - 'numpy', - 'shutil', - 'heapq', - 'contextlib' - ]) \ No newline at end of file + options = {'build_exe': build_exe_options }, + executables = executables) \ No newline at end of file diff --git a/transferData.py b/transferData.py index 753787a..07c2f1b 100644 --- a/transferData.py +++ b/transferData.py @@ -1,3 +1,4 @@ +#!/usr/bin/env python # -*- coding: utf-8 -*- __author__ = "Alexander Dunkel" @@ -42,7 +43,7 @@ def main(): config.parseArgs() sys.stdout.flush() # set logger - logging.basicConfig(handlers=[logging.FileHandler('test.log', 'w', 'utf-8')], + logging.basicConfig(handlers=[logging.FileHandler('log.log', 'w', 'utf-8')], format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', datefmt='%H:%M:%S', level=logging.DEBUG)