forked from majalcmaj/ZTNBD-ZAD
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpost_extractor_pipeline.py
48 lines (38 loc) · 1.51 KB
/
post_extractor_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import sys
import re
import json
from post_extractor.modules.posts import *
from pyspark.ml import Pipeline
dataFolder = str(sys.argv[1])
if not dataFolder.endswith("/"):
dataFolder += "/"
print("Using {} as data root".format(dataFolder))
sconf = SparkConf() \
.setMaster('local[*]') \
.setAppName('PipelineFlow')
sc = SparkContext.getOrCreate(sconf)
sess = SparkSession(sc)
mappings = sc.textFile(dataFolder + 'mapping.csv') \
.map(lambda line: line.split('|')) \
.toDF(['path', 'key'])
def extractKey(entry):
fpath, content = entry
fname = fpath.split('/')[-1]
key = re.sub('\.json$', '', fname)
return (key, json.loads(content)) #json parsing should be part of transformer code
statements = sc.wholeTextFiles(dataFolder + 'statement/') \
.map(extractKey) \
.toDF(['key', 'content'])
df = mappings.join(statements, 'key')
poster = PostTransformer().setInputCol('content').setOutputCol('posts')
translator = TranslateTransformer().setInputCol('posts').setOutputCol('translated')
sentencer = SentenceTransformer().setInputCol('translated').setOutputCol('sentences')
pipeline = Pipeline(stages=[poster, translator, sentencer])
out = pipeline.fit(df).transform(df)
a = out.select('sentences').first().sentences[0]
b = out.select('sentences').first().sentences[1]
c = out.select('sentences').first().sentences[2]
d = out.select('translated').first().translated[0]
print('{}\n\n{}\n\n{}\n\n{}'.format(a,b,c,d))