from unittest import result import pyspark from pyspark.sql import SparkSession import argparse conf = ( pyspark.SparkConf() .setMaster("local[*]") .setAll( [ ("spark.executor.memory", "80g"), # find ("spark.driver.memory", "5g"), # your ("spark.driver.maxResultSize", "35g"), # setup ] ) ) def changePuntuation(line): return (line .replace('\n', '') .replace(',', '') .replace('.', '') .replace('!', '') .replace('?', '') .replace(':', '') .replace(';', '') .replace('(', '') .replace(')', '') .replace('[', '') .replace(']', '') .replace("'", "") ) def filterEmpty(line): return line[0] != '' and line[1] != '' def make_pretreatement(rdds): return( rdds .map(changePuntuation) .flatMap(lambda line: line.split(" ")) .map(lambda line: line.split("")) .filter(lambda line: len(line) == 2) .filter(filterEmpty) .map(lambda line: [(line[0], line[1]), 1]) .reduceByKey(lambda a, b: a + b) ) def make_pretreatement2(rdds): return( rdds .map(changePuntuation) .flatMap(lambda line: line.split(" ")) .map(lambda line: line.split("")) .filter(lambda line: len(line) == 2) .filter(filterEmpty) .map(lambda line: [(line[1], line[0]), 1]) .reduceByKey(lambda a, b: a + b) ) def make_dic(pretreatement, output : str, filepath : str, sc): rdds = sc.textFile(filepath) rdds = pretreatement(rdds) count_number_appereance = ( rdds .map(lambda line : [line[0][0], (line[0][1], line[1])]) .groupByKey() .sortByKey() .collect() ) """(count_number_appereance .append([',', (',', 1)]) .append(['.', ('.', 1)]) .append(['!', ('!', 1)]) .append(['?', ('?', 1)]) .append([':', (':', 1)]) .append([';', (';', 1)]) .append(['(', ('(', 1)]) .append([')', (')', 1)]) .append(['[', ('[', 1)]) .append([']', ('}', 1)]) .append(["'", ("'", 1)]) )""" with open(output, 'w') as writer: for word, values in count_number_appereance: sorted_values = list(values) sorted_values.sort(key=lambda item:item[1], reverse=True) treshold = int(sorted_values[0][1]/10) sorted_values = [ i for i in sorted_values if i[1] > treshold] total_occurence = sum((i[1] for i in sorted_values)) for translation, occurence in sorted_values: #print(f'{translation} {word} {float(occurence/total_occurence)}') writer.write(translation + " " + word + " {:.6f}\n".format(float(occurence/total_occurence))) def main(args): # create the session spark = SparkSession.builder.config(conf=conf).getOrCreate() # create the context sc = spark.sparkContext make_dic(make_pretreatement, "lex.e2f", args.alignement_file, sc) make_dic(make_pretreatement2, "lex.f2e", args.alignement_file, sc) if __name__ == "__main__": print("Here is my super project! 🚀") parser = argparse.ArgumentParser(description='Create probabilistic dictionaries') parser.add_argument('alignement_file', type=str, help="The result from awesome-align") args = parser.parse_args() main(args)