Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

producer-consumer への変更での並列実行の高速化 #204

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5f9abf6
modify _analyze_parallel to speedup
r-terada Oct 28, 2021
d980239
enable to use parallel when input is from pipe
r-terada Oct 29, 2021
d861c10
send 'terminate' message from load_process instead of using load_end …
r-terada Nov 7, 2021
c3c09e0
put analysis error to out queue
r-terada Nov 12, 2021
329a835
refactor analyzer.py to output str directly
hiroshi-matsuda-rit Nov 14, 2021
94e2a72
debug: nlp.pipe() needs rstrip and does
hiroshi-matsuda-rit Nov 14, 2021
4fabc2f
debug: multi processing target function not serializable
hiroshi-matsuda-rit Nov 14, 2021
9667ce5
rename to parallel_level
hiroshi-matsuda-rit Nov 14, 2021
f5a0f1b
remove imports
hiroshi-matsuda-rit Nov 14, 2021
f6fd7cf
use sudachipy directly in mecab mode
hiroshi-matsuda-rit Nov 14, 2021
7e5b41c
only moving _analyzer_parallel()
hiroshi-matsuda-rit Nov 14, 2021
61efead
refactor multi processing with avoid event
hiroshi-matsuda-rit Nov 14, 2021
b6080ef
Merge pull request #206 from megagonlabs/feature/optimize_parallel_pr…
r-terada Nov 16, 2021
1fec7b0
upgrade dependencies
hiroshi-matsuda-rit Nov 17, 2021
8e187d2
fix tests
r-terada Nov 17, 2021
a2303c7
Merge branch 'feature/optimize_parallel_process_and_queue' of https:/…
hiroshi-matsuda-rit Nov 18, 2021
907679d
use morph features and token.norm_
hiroshi-matsuda-rit Nov 18, 2021
669ee7b
debug cabocha inflection format
hiroshi-matsuda-rit Nov 18, 2021
c1f5323
add use_normalized_form argument
hiroshi-matsuda-rit Nov 19, 2021
a0ef56a
remove download_model()
hiroshi-matsuda-rit Nov 20, 2021
e36c657
split_mode=None, add test_analyze_batch()
hiroshi-matsuda-rit Nov 20, 2021
2ae766e
Revert "remove download_model()"
hiroshi-matsuda-rit Nov 20, 2021
7b16604
remove --use-use_normalized_form option from ginzame (use True always)
r-terada Nov 20, 2021
183863c
remove normalized_form output test
r-terada Nov 20, 2021
23e671d
Merge pull request #209 from megagonlabs/feature/use_morph_features_o…
r-terada Nov 20, 2021
fc62470
add test_do_not_use_normalized_form()
hiroshi-matsuda-rit Nov 20, 2021
f42aae2
Merge branch 'develop' into feature/optimize_parallel_process_and_queue
hiroshi-matsuda-rit Nov 20, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions ginza/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,21 @@ def ent_label_ontonotes(token: Token) -> str:
# token field getters for Doc.user_data

def reading_form(token: Token, use_orth_if_none=True) -> str:
reading = token.doc.user_data["reading_forms"][token.i]
if not reading and use_orth_if_none:
reading = token.orth_
return reading
reading = token.morph.get("Reading")
if reading:
return reading[0]
elif use_orth_if_none:
return token.orth_
else:
return None


def inflection(token: Token) -> str:
return token.doc.user_data["inflections"][token.i]
inf = token.morph.get("Inflection")
if inf:
return inf[0].replace(";", ",")
else:
return ""


# bunsetu related field getters for Doc.user_data
Expand Down
210 changes: 130 additions & 80 deletions ginza/analyzer.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,39 @@
# coding: utf8
import json
import sys
from typing import Iterable, Iterator, Optional, Tuple
from typing import Iterable, Optional

import spacy
from spacy.tokens import Span
from spacy.tokens import Doc, Span
from spacy.language import Language
from spacy.lang.ja import Japanese, JapaneseTokenizer

from . import set_split_mode, inflection, reading_form, ent_label_ene, ent_label_ontonotes, bunsetu_bi_label, bunsetu_position_type
from .bunsetu_recognizer import bunsetu_available, bunsetu_head_list, bunsetu_phrase_span


def try_sudachi_import(split_mode: str):
"""SudachiPy is required for Japanese support, so check for it.
It it's not available blow up and explain how to fix it.
split_mode should be one of these values: "A", "B", "C", None->"A"."""
try:
from sudachipy import dictionary, tokenizer

split_mode = {
None: tokenizer.Tokenizer.SplitMode.A,
"A": tokenizer.Tokenizer.SplitMode.A,
"B": tokenizer.Tokenizer.SplitMode.B,
"C": tokenizer.Tokenizer.SplitMode.C,
}[split_mode]
tok = dictionary.Dictionary().create(mode=split_mode)
return tok
except ImportError:
raise ImportError(
"Japanese support requires SudachiPy and SudachiDict-core "
"(https://github.com/WorksApplications/SudachiPy). "
"Install with `pip install sudachipy sudachidict_core` or "
"install spaCy with `pip install spacy[ja]`."
) from None


class Analyzer:
def __init__(
self,
Expand All @@ -22,6 +44,7 @@ def __init__(
output_format: str,
require_gpu: bool,
disable_sentencizer: bool,
use_normalized_form: bool,
) -> None:
self.model_path = model_path
self.ensure_model = ensure_model
Expand All @@ -30,6 +53,7 @@ def __init__(
self.output_format = output_format
self.require_gpu = require_gpu
self.disable_sentencizer = disable_sentencizer
self.use_normalized_form = use_normalized_form
self.nlp: Optional[Language] = None

def set_nlp(self) -> None:
Expand All @@ -40,7 +64,7 @@ def set_nlp(self) -> None:
spacy.require_gpu()

if self.output_format in ["2", "mecab"]:
nlp = JapaneseTokenizer(nlp=Japanese(), split_mode=self.split_mode).tokenizer
nlp = try_sudachi_import(self.split_mode)
else:
# Work-around for pickle error. Need to share model data.
if self.model_path:
Expand Down Expand Up @@ -68,81 +92,105 @@ def set_nlp(self) -> None:

self.nlp = nlp

def analyze_lines_mp(self, lines: Iterable[str]) -> Tuple[Iterable[Iterable[str]]]:
def analyze_batch(self, lines: Iterable[str]) -> str:
self.set_nlp()
return tuple(list(map(list, self.analyze_line(line))) for line in lines) # to avoid generator serialization inside of results of analyze_line

def analyze_line(self, line: str) -> Iterable[Iterable[str]]:
return analyze(self.nlp, self.hash_comment, self.output_format, line)


def analyze(
nlp: Language, hash_comment: str, output_format: str, line: str
) -> Iterable[Iterable[str]]:
line = line.rstrip("\n")
if line.startswith("#"):
if hash_comment == "print":
return ((line,),)
elif hash_comment == "skip":
return ((),)
if line == "":
return (("",),)
if self.output_format in ["2", "mecab"]:
return "".join(self.analyze_line(line) for line in lines)

if self.hash_comment == "print":
batch = list(self.nlp.pipe(line.rstrip("\n") for line in lines if not line.startswith("#")))
docs = []
index = 0
for line in lines:
if line.startswith("#"):
docs.append(line)
else:
docs.append(batch[index])
index += 1
else:
lines = [line.rstrip("\n") for line in lines if self.hash_comment != "skip" or not line.startswith("#")]
docs = self.nlp.pipe(lines)

if self.output_format in ["3", "json"]:
sep = ",\n"
else:
sep = ""
return sep.join(format_doc(doc, self.output_format, self.use_normalized_form) if isinstance(doc, Doc) else doc for doc in docs)

def analyze_line(self, input_line: str) -> str:
line = input_line.rstrip("\n")
if line.startswith("#"):
if self.hash_comment == "print":
return input_line
elif self.hash_comment == "skip":
return ""
if line == "":
return "\n"
if self.output_format in ["2", "mecab"]:
doc = self.nlp.tokenize(line)
else:
doc = self.nlp(line)
return format_doc(doc, self.output_format, self.use_normalized_form)


def format_doc(
doc: Doc, output_format: str, use_normalized_form: bool,
) -> str:
if output_format in ["0", "conllu"]:
doc = nlp(line)
return [analyze_conllu(sent) for sent in doc.sents]
return "".join(format_conllu(sent, use_normalized_form) for sent in doc.sents)
elif output_format in ["1", "cabocha"]:
doc = nlp(line)
return [analyze_cabocha(sent) for sent in doc.sents]
return "".join(format_cabocha(sent, use_normalized_form) for sent in doc.sents)
elif output_format in ["2", "mecab"]:
doc = nlp.tokenize(line)
return [analyze_mecab(doc)]
return "".join(format_mecab(doc, use_normalized_form))
elif output_format in ["3", "json"]:
doc = nlp(line)
return [analyze_json(sent) for sent in doc.sents]
return ",\n".join(format_json(sent) for sent in doc.sents)
else:
raise Exception(output_format + " is not supported")


def analyze_json(sent: Span) -> Iterator[str]:
tokens = []
for token in sent:
t = {
"id": token.i - sent.start + 1,
"orth": token.orth_,
"tag": token.tag_,
"pos": token.pos_,
"lemma": token.lemma_,
"head": token.head.i - token.i,
"dep": token.dep_,
"ner": "{}-{}".format(token.ent_iob_, token.ent_type_) if token.ent_type_ else token.ent_iob_,
}
if token.whitespace_:
t["whitespace"] = token.whitespace_
tokens.append(" " + json.dumps(t, ensure_ascii=False))
tokens = ",\n".join(tokens)

yield """ {{
def format_json(sent: Span) -> str:
token_lines = ",\n".join(
f""" {{"id":{
token.i - sent.start + 1
},"orth":"{
token.orth_
}","tag":"{
token.tag_
}","pos":"{
token.pos_
}","lemma":"{
token.lemma_
}","norm":"{
token.norm_
}","head":{
token.head.i - token.i
},"dep":"{
token.dep_
}","ner":"{
token.ent_iob_
}{
"-" + token.ent_type_ if token.ent_type_ else ""
}"{
',"whitespacce":"' + token.whitespace_ + '"' if token.whitespace_ else ""
}}}""" for token in sent
)
return f""" {{
"paragraphs": [
{{
"raw": "{}",
"raw": "{sent.text}",
"sentences": [
{{
"tokens": [
{}
{token_lines}
]
}}
]
}}
]
}}""".format(
sent.text,
tokens,
)
}}"""


def analyze_conllu(sent: Span, print_origin=True) -> Iterator[str]:
if print_origin:
yield "# text = {}".format(sent.text)
def format_conllu(sent: Span, use_normalized_form, print_origin=True) -> str:
np_labels = [""] * len(sent)
use_bunsetu = bunsetu_available(sent)
if use_bunsetu:
Expand All @@ -152,12 +200,14 @@ def analyze_conllu(sent: Span, print_origin=True) -> Iterator[str]:
if phrase.label_ == "NP":
for idx in range(phrase.start - sent.start, phrase.end - sent.start):
np_labels[idx] = "NP_B" if idx == phrase.start else "NP_I"
for token, np_label in zip(sent, np_labels):
yield conllu_token_line(sent, token, np_label, use_bunsetu)
yield ""
token_lines = "".join(conllu_token_line(sent, token, np_label, use_bunsetu, use_normalized_form) for token, np_label in zip(sent, np_labels))
if print_origin:
return f"# text = {sent.text}\n{token_lines}\n"
else:
return f"{token_lines}\n"


def conllu_token_line(sent, token, np_label, use_bunsetu) -> str:
def conllu_token_line(sent, token, np_label, use_bunsetu, use_normalized_form) -> str:
bunsetu_bi = bunsetu_bi_label(token) if use_bunsetu else None
position_type = bunsetu_position_type(token) if use_bunsetu else None
inf = inflection(token)
Expand All @@ -184,7 +234,7 @@ def conllu_token_line(sent, token, np_label, use_bunsetu) -> str:
[
str(token.i - sent.start + 1),
token.orth_,
token.lemma_,
token.norm_ if use_normalized_form else token.lemma_,
token.pos_,
token.tag_.replace(",*", "").replace(",", "-"),
"NumType=Card" if token.pos_ == "NUM" else "_",
Expand All @@ -193,10 +243,10 @@ def conllu_token_line(sent, token, np_label, use_bunsetu) -> str:
"_",
misc if misc else "_",
]
)
) + "\n"


def analyze_cabocha(sent: Span) -> Iterable[str]:
def format_cabocha(sent: Span, use_normalized_form) -> str:
bunsetu_index_list = {}
bunsetu_index = -1
for token in sent:
Expand All @@ -208,10 +258,9 @@ def analyze_cabocha(sent: Span) -> Iterable[str]:
for token in sent:
if bunsetu_bi_label(token) == "B":
lines.append(cabocha_bunsetu_line(sent, bunsetu_index_list, token))
lines.append(cabocha_token_line(token))
lines.append("EOS")
lines.append("")
return lines
lines.append(cabocha_token_line(token, use_normalized_form))
lines.append("EOS\n\n")
return "".join(lines)


def cabocha_bunsetu_line(sent: Span, bunsetu_index_list, token) -> str:
Expand All @@ -237,7 +286,7 @@ def cabocha_bunsetu_line(sent: Span, bunsetu_index_list, token) -> str:
bunsetu_head_index = 0
if bunsetu_dep_index is None:
bunsetu_dep_index = -1
return "* {} {}{} {}/{} 0.000000".format(
return "* {} {}{} {}/{} 0.000000\n".format(
bunsetu_index_list[token.i],
bunsetu_dep_index,
dep_type,
Expand All @@ -246,30 +295,31 @@ def cabocha_bunsetu_line(sent: Span, bunsetu_index_list, token) -> str:
)


def cabocha_token_line(token) -> str:
def cabocha_token_line(token, use_normalized_form) -> str:
part_of_speech = token.tag_.replace("-", ",")
part_of_speech += ",*" * (3 - part_of_speech.count(",")) + "," + inflection(token)
inf = inflection(token)
part_of_speech += ",*" * (3 - part_of_speech.count(",")) + "," + (inf if inf else "*,*")
reading = reading_form(token)
return "{}\t{},{},{},{}\t{}".format(
return "{}\t{},{},{},{}\t{}\n".format(
token.orth_,
part_of_speech,
token.lemma_,
token.norm_ if use_normalized_form else token.lemma_,
reading if reading else token.orth_,
"*",
"O" if token.ent_iob_ == "O" else "{}-{}".format(token.ent_iob_, token.ent_type_),
)


def analyze_mecab(sudachipy_tokens) -> Iterable[str]:
return tuple(mecab_token_line(t) for t in sudachipy_tokens) + ("EOS", "")
def format_mecab(sudachipy_tokens, use_normalized_form) -> str:
return "".join(mecab_token_line(t, use_normalized_form) for t in sudachipy_tokens) + "EOS\n\n"


def mecab_token_line(token) -> str:
def mecab_token_line(token, use_normalized_form) -> str:
reading = token.reading_form()
return "{}\t{},{},{},{}".format(
return "{}\t{},{},{},{}\n".format(
token.surface(),
",".join(token.part_of_speech()),
token.normalized_form(),
token.normalized_form() if use_normalized_form else token.dictionary_form(),
reading if reading else token.surface(),
"*",
)
Loading