Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

threadsafe job counts; configurable batch_size #581

Merged
merged 3 commits into from
Jan 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ Changes
* Word2vec allows non-strict unicode error handling (ignore or replace) (Gordon Mohr, #466)
* On-demand loading of the `pattern` library in utils.lemmatize (Jan Zikes, #461)
- `utils.HAS_PATTERN` flag moved to `utils.has_pattern()`
* Threadsafe Word2Vec/Doc2Vec finish-check to avoid hang/unending Word2Vec/Doc2Vec training (Gordon Mohr, #571)
* Tuned `TestWord2VecModel.test_cbow_hs()` against random failures (Gordon Mohr, #531)
* Forwards compatibility for NumPy > 1.10 (Matti Lyra, #494, #513)
- LdaModel and LdaMulticore produce a large number of DeprecationWarnings from
.inference() because the term ids in each chunk returned from utils.grouper
Expand Down
45 changes: 25 additions & 20 deletions gensim/models/word2vec.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ def __init__(
self, sentences=None, size=100, alpha=0.025, window=5, min_count=5,
max_vocab_size=None, sample=0, seed=1, workers=1, min_alpha=0.0001,
sg=1, hs=1, negative=0, cbow_mean=0, hashfxn=hash, iter=1, null_word=0,
trim_rule=None, sorted_vocab=1):
trim_rule=None, sorted_vocab=1, batch_words=MAX_WORDS_IN_BATCH):
"""
Initialize the model from an iterable of `sentences`. Each sentence is a
list of words (unicode strings) that will be used for training.
Expand Down Expand Up @@ -403,6 +403,10 @@ def __init__(
`sorted_vocab` = if 1 (default), sort the vocabulary by descending frequency before
assigning word indexes.

`batch_words` = target size (in words) for batches of examples passed to worker threads (and
thus cython routines). Default is 10000. (Larger batches can be passed if individual
texts are longer, but the cython code may truncate.)

"""
self.vocab = {} # mapping from a word (string) to a Vocab object
self.index2word = [] # map from a word's matrix index (int) to word (string)
Expand Down Expand Up @@ -430,6 +434,7 @@ def __init__(
self.train_count = 0
self.total_train_time = 0
self.sorted_vocab = sorted_vocab
self.batch_words = batch_words

if sentences is not None:
if isinstance(sentences, GeneratorType):
Expand Down Expand Up @@ -668,7 +673,7 @@ def _raw_word_count(self, job):
"""Return the number of words in a given job."""
return sum(len(sentence) for sentence in job)

def train(self, sentences, total_words=None, word_count=0, batch_words=None,
def train(self, sentences, total_words=None, word_count=0,
total_examples=None, queue_factor=2, report_delay=1.0):
"""
Update the model's neural weights from a sequence of sentences (can be a once-only generator stream).
Expand All @@ -689,7 +694,6 @@ def train(self, sentences, total_words=None, word_count=0, batch_words=None,
self.neg_labels = zeros(self.negative + 1)
self.neg_labels[0] = 1.

batch_words = min(batch_words or MAX_WORDS_IN_BATCH, MAX_WORDS_IN_BATCH)
logger.info(
"training model with %i workers on %i vocabulary and %i features, "
"using sg=%s hs=%s sample=%s negative=%s",
Expand All @@ -708,7 +712,7 @@ def train(self, sentences, total_words=None, word_count=0, batch_words=None,
else:
raise ValueError("you must provide either total_words or total_examples, to enable alpha and progress calculations")

self.jobs_finished, self.job_no, self.jobs_left = False, 0, 0
job_tally = 0

if self.iter > 1:
sentences = utils.RepeatCorpusNTimes(sentences, self.iter)
Expand All @@ -727,7 +731,6 @@ def worker_loop():
break # no more jobs => quit this worker
sentences, alpha = job
tally, raw_tally = self._do_train_job(sentences, alpha, (work, neu1))
self.jobs_left -= 1
progress_queue.put((len(sentences), tally, raw_tally)) # report back progress
jobs_processed += 1
logger.debug("worker exiting, processed %i jobs", jobs_processed)
Expand All @@ -737,22 +740,22 @@ def job_producer():
job_batch, batch_size = [], 0
pushed_words, pushed_examples = 0, 0
next_alpha = self.alpha
job_no = 0

for sent_idx, sentence in enumerate(sentences):
sentence_length = self._raw_word_count([sentence])

# can we fit this sentence into the existing job batch?
if batch_size + sentence_length <= batch_words:
if batch_size + sentence_length <= self.batch_words:
# yes => add it to the current job
job_batch.append(sentence)
batch_size += sentence_length
else:
# no => submit the existing job
logger.debug(
"queueing job #%i (%i words, %i sentences) at alpha %.05f",
self.job_no, batch_size, len(job_batch), next_alpha)
self.job_no += 1
self.jobs_left += 1
job_no, batch_size, len(job_batch), next_alpha)
job_no += 1
job_queue.put((job_batch, next_alpha))

# update the learning rate for the next job
Expand All @@ -775,15 +778,11 @@ def job_producer():
if job_batch:
logger.debug(
"queueing job #%i (%i words, %i sentences) at alpha %.05f",
self.job_no, batch_size, len(job_batch), next_alpha)
self.job_no += 1
self.jobs_left += 1
job_no, batch_size, len(job_batch), next_alpha)
job_no += 1
job_queue.put((job_batch, next_alpha))

self.jobs_finished = True
logger.info("reached end of input; waiting to finish %i outstanding jobs", self.jobs_left)

if self.job_no == 0 and self.train_count == 0:
if job_no == 0 and self.train_count == 0:
logger.warning(
"train() called with an empty iterator (if not intended, "
"be sure to provide a corpus that offers restartable "
Expand All @@ -792,14 +791,15 @@ def job_producer():

# give the workers heads up that they can finish -- no more work!
for _ in xrange(self.workers):
job_queue.put(None) # no need to increase job_no
logger.debug("job loop exiting, total %i jobs", self.job_no)
job_queue.put(None)
logger.debug("job loop exiting, total %i jobs", job_no)

# buffer ahead only a limited number of jobs.. this is the reason we can't simply use ThreadPool :(
job_queue = Queue(maxsize=queue_factor * self.workers)
progress_queue = Queue(maxsize=(queue_factor + 1) * self.workers)

workers = [threading.Thread(target=worker_loop) for _ in xrange(self.workers)]
unfinished_worker_count = len(workers)
workers.append(threading.Thread(target=job_producer))

for thread in workers:
Expand All @@ -809,11 +809,14 @@ def job_producer():
example_count, trained_word_count, raw_word_count = 0, 0, word_count
start, next_report = default_timer(), 1.0

while not self.jobs_finished or self.jobs_left > 0:
while unfinished_worker_count > 0:
report = progress_queue.get() # blocks if workers too slow
if report is None:
if report is None: # a thread reporting that it finished
unfinished_worker_count -= 1
logger.info("worker thread finished; awaiting finish of %i more threads", unfinished_worker_count)
continue
examples, trained_words, raw_words = report
job_tally += 1

# update progress stats
example_count += examples
Expand Down Expand Up @@ -842,6 +845,8 @@ def job_producer():
logger.info(
"training on %i raw words (%i effective words) took %.1fs, %.0f effective words/s",
raw_word_count, trained_word_count, elapsed, trained_word_count / elapsed if elapsed else 0.0)
if job_tally < 10 * self.workers:
logger.warn("under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay")

# check that the input corpus hasn't changed during iteration
if total_examples and total_examples != example_count:
Expand Down
5 changes: 2 additions & 3 deletions gensim/test/test_word2vec.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def __iter__(self):
['graph', 'minors', 'survey']
]


def testfile():
# temporary data will be stored to this file
return os.path.join(tempfile.gettempdir(), 'gensim_word2vec.tst')
Expand Down Expand Up @@ -253,8 +252,8 @@ def test_sg_neg(self):

def test_cbow_hs(self):
"""Test CBOW w/ hierarchical softmax"""
model = word2vec.Word2Vec(sg=0, cbow_mean=1, alpha=0.05, window=5, hs=1, negative=0,
min_count=5, iter=10, workers=2)
model = word2vec.Word2Vec(sg=0, cbow_mean=1, alpha=0.05, window=8, hs=1, negative=0,
min_count=5, iter=10, workers=2, batch_words=1000)
self.model_sanity(model)

def test_cbow_neg(self):
Expand Down