Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix docstrings forgensim.models.hdpmodel, gensim.models.lda_worker & gensim.models.lda_dispatcher(#1667) #1912

Merged
merged 17 commits into from
Apr 2, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 223 additions & 59 deletions gensim/models/lda_dispatcher.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,68 @@
# Copyright (C) 2010 Radim Rehurek <[email protected]>
# Licensed under the GNU LGPL v2.1 - http://www.gnu.org/licenses/lgpl.html

"""
USAGE: %(program)s SIZE_OF_JOBS_QUEUE
""":class:`~gensim.models.lda_dispatcher.Dispatcher` process which orchestrates
distributed :class:`~gensim.models.ldamodel.LdaModel` computations.
Run this script only once, on the master node in your cluster.

Notes
-----
The dispatches expects to find worker scripts already running. Make sure
you run as many workers as you like on your machines **before** launching
the dispatcher.

Warnings
--------
Requires installed `Pyro4 <https://pythonhosted.org/Pyro4/>`_.
Distributed version works only in local network.


How to use distributed :class:`~gensim.models.ldamodel.LdaModel`
----------------------------------------------------------------


#. Install needed dependencies (Pyro4) ::

pip install gensim[distributed]

#. Setup serialization (on each machine) ::

export PYRO_SERIALIZERS_ACCEPTED=pickle
export PYRO_SERIALIZER=pickle

#. Run nameserver ::

python -m Pyro4.naming -n 0.0.0.0 &

#. Run workers (on each machine) ::

python -m gensim.models.lda_worker &

Dispatcher process which orchestrates distributed LDA computations. Run this \
script only once, on any node in your cluster.
#. Run dispatcher ::

python -m gensim.models.lda_dispatcher &

#. Run :class:`~gensim.models.ldamodel.LdaModel` in distributed mode ::

>>> from gensim.test.utils import common_corpus, common_dictionary
>>> from gensim.models import LdaModel
>>>
>>> model = LdaModel(common_corpus, id2word=common_dictionary,
distributed=True)

#. You can then infer topic distributions on new, unseen documents, with

>>> doc_lda = model[doc_bow]
The model can be updated (trained) with new documents via
>>> lda.update(other_corpus)


Command line arguments
----------------------

.. program-output:: python -m gensim.models.lda_dispatcher --help
:ellipsis: 0, -5

Example: python -m gensim.models.lda_dispatcher
"""


Expand All @@ -36,9 +91,10 @@


# How many jobs (=chunks of N documents) to keep "pre-fetched" in a queue?
# A small number is usually enough, unless iteration over the corpus is very very
# slow (slower than the actual computation of LDA), in which case you can override
# this value from command line. ie. run "python ./lda_dispatcher.py 100"
# A small number is usually enough, unless iteration over the corpus is
# very very slow (slower than the actual computation of LDA), in which case
# you can override this value from command line. ie.
# run "python ./lda_dispatcher.py 100"
MAX_JOBS_QUEUE = 10

# timeout for the Queue object put/get blocking methods.
Expand All @@ -50,78 +106,160 @@


class Dispatcher(object):
"""
Dispatcher object that communicates and coordinates individual workers.

"""Dispatcher object that communicates and coordinates individual workers.

Attributes
----------
callback : :class: `~Pyro4.core.Proxy`
A proxy for some remote object.Intercepts method calls and \
dispatches them to the remote object.
jobs : :class: `~Queue.Queue`
Constructs a FIFO queue.
lock_update : :class: `~threading.Lock`
This class implements primitive lock objects. Once a thread has \
acquired a lock, subsequent attempts to acquire it block, until it is \
released; any thread may release it.

Warnings
--------
There should never be more than one dispatcher running at any one time.

"""

def __init__(self, maxsize=MAX_JOBS_QUEUE, ns_conf=None):
"""
Note that the constructor does not fully initialize the dispatcher;
use the `initialize()` function to populate it with workers etc.
"""Partly initializes the dispatcher.

A full initialization (including initialization of the workers)
requires a call to
:meth:`~gensim.models.lda_dispatcher.Dispatcher.initialize`

Parameters
----------
maxsize : int, optional
Maximum number of jobs to be kept pre-fetched in the queue.
ns_conf : dict of {str:(str,optional),str:(int,optional), \
str:(bool:optional),str:(str,optional)},optional
Sets up the name server configuration for the pyro daemon server \
of dispatcher.This also helps to keep track of your objects in \
your netword by using logical object names instead of exact \
object name(or id) and its location.
workers : dict of { int : :class: `~Pyro4.core.Proxy` }
Locates all available workers and store their proxies, for \
subsequent RMI calls.
}

"""
self.maxsize = maxsize
self.callback = None # a pyro proxy to this object (unknown at init time, but will be set later)
self.callback = None
self.ns_conf = ns_conf if ns_conf is not None else {}

@Pyro4.expose
def initialize(self, **model_params):
"""
`model_params` are parameters used to initialize individual workers (gets
handed all the way down to `worker.initialize()`).
"""Fully initializes the dispatcher and all its workers.

Parameters
----------
**model_params
Keyword parameters used to initialize individual workers,
see:class:`~gensim.models.ldamodel.LdaModel`.

Raises
------
RuntimeError
No workers found.Need to have atleast one worker running.

"""
self.jobs = Queue(maxsize=self.maxsize)
self.lock_update = threading.Lock()
self._jobsdone = 0
self._jobsreceived = 0

# locate all available workers and store their proxies, for subsequent RMI calls
self.workers = {}
with utils.getNS(**self.ns_conf) as ns:
self.callback = Pyro4.Proxy(ns.list(prefix=LDA_DISPATCHER_PREFIX)[LDA_DISPATCHER_PREFIX])
self.callback = Pyro4.Proxy(ns.list(
prefix=LDA_DISPATCHER_PREFIX)
[LDA_DISPATCHER_PREFIX])
for name, uri in iteritems(ns.list(prefix=LDA_WORKER_PREFIX)):
try:
worker = Pyro4.Proxy(uri)
workerid = len(self.workers)
# make time consuming methods work asynchronously
logger.info("registering worker #%i at %s", workerid, uri)
worker.initialize(workerid, dispatcher=self.callback, **model_params)
logger.info("registering worker #%i at %s", workerid,
uri)
worker.initialize(workerid, dispatcher=self.callback,
**model_params)
self.workers[workerid] = worker
except Pyro4.errors.PyroError:
logger.warning("unresponsive worker at %s, deleting it from the name server", uri)
logger.warning("unresponsive worker at %s,deleting it"
" from the name server", uri)
ns.remove(name)

if not self.workers:
raise RuntimeError('no workers found; run some lda_worker scripts on your machines first!')
raise RuntimeError('no workers found; run some lda_worker '
'scripts on your machines first!')

@Pyro4.expose
def getworkers(self):
"""
Return pyro URIs of all registered workers.
"""Return pyro URIs of all registered workers.

Returns
-------
list of URIs
The pyro URIs for each worker.

"""
return [worker._pyroUri for worker in itervalues(self.workers)]

@Pyro4.expose
def getjob(self, worker_id):
"""Atomically pops a job from the queue.

Parameters
----------
worker_id : int
The worker that requested the job.

Returns
-------
iterable of iterable of (int, float)
The corpus in BoW format.

"""
logger.info("worker #%i requesting a new job", worker_id)
job = self.jobs.get(block=True, timeout=1)
logger.info("worker #%i got a new job (%i left)", worker_id, self.jobs.qsize())
logger.info("worker #%i got a new job (%i left)", worker_id,
self.jobs.qsize())
return job

@Pyro4.expose
def putjob(self, job):
"""Atomically add a job to the queue.

Parameters
----------
job : iterable of iterable of (int, float)
The corpus in BoW format.

"""
self._jobsreceived += 1
self.jobs.put(job, block=True, timeout=HUGE_TIMEOUT)
logger.info("added a new job (len(queue)=%i items)", self.jobs.qsize())
logger.info("added a new job (len(queue)=%i items)",
self.jobs.qsize())

@Pyro4.expose
def getstate(self):
"""
Merge states from across all workers and return the result.

Returns
-------
:class:`~gensim.models.ldamodel.LdaState`
Merged resultant state

"""
logger.info("end of input, assigning all remaining jobs")
logger.debug("jobs done: %s, jobs received: %s", self._jobsdone, self._jobsreceived)
logger.debug("jobs done: %s, jobs received: %s",
self._jobsdone, self._jobsreceived)
i = 0
count = 10
while self._jobsdone < self._jobsreceived:
Expand All @@ -144,8 +282,14 @@ def getstate(self):

@Pyro4.expose
def reset(self, state):
"""
Initialize all workers for a new EM iterations.
"""Reinitializes all workers for a new EM iteration.

Parameters
----------
state : :class:`~gensim.models.ldamodel.LdaState`
Encapsulates information for distributed computation
of LdaModel objects.

"""
for workerid, worker in iteritems(self.workers):
logger.info("resetting worker %s", workerid)
Expand All @@ -158,54 +302,72 @@ def reset(self, state):
@Pyro4.oneway
@utils.synchronous('lock_update')
def jobdone(self, workerid):
"""
A worker has finished its job. Log this event and then asynchronously
transfer control back to the worker.
"""Workers use callback to notify when their job is done.

The job done event is logged and then control is asynchronously
transfered back to the worker(who can then request another job).
In this way, control flow basically oscillates between
:meth:`gensim.models.lda_dispatcher.Dispatcher.jobdone` and
:meth:`gensim.models.lda_worker.Worker.requestjob`.

Parameters
----------
workerid : int
The ID of the worker that finished the job (used for logging).

In this way, control flow basically oscillates between `dispatcher.jobdone()`
and `worker.requestjob()`.
"""
self._jobsdone += 1
logger.info("worker #%s finished job #%i", workerid, self._jobsdone)
self.workers[workerid].requestjob() # tell the worker to ask for another job, asynchronously (one-way)
self.workers[workerid].requestjob() # tell the worker to ask for
# another job, asynchronously (one-way)

def jobsdone(self):
"""Wrap self._jobsdone, needed for remote access through Pyro proxies"""
"""Wrap :attr:`~gensim.models.lda_dispatcher.Dispatcher._jobsdone`,
needed for remote access through proxies.

Returns
-------
int
Number of jobs already completed.

"""
return self._jobsdone

@Pyro4.oneway
def exit(self):
"""
Terminate all registered workers and then the dispatcher.
"""
"""Terminate all registered workers and then the dispatcher."""
for workerid, worker in iteritems(self.workers):
logger.info("terminating worker %s", workerid)
worker.exit()
logger.info("terminating dispatcher")
os._exit(0) # exit the whole process (not just this thread ala sys.exit())
os._exit(0) # exit the whole process (not just this thread ala
# sys.exit())
# endclass Dispatcher


def main():
"""Set up argument parser,logger and launches pyro daemon."""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--maxsize",
help="How many jobs (=chunks of N documents) to keep 'pre-fetched' in a queue (default: %(default)s)",
type=int, default=MAX_JOBS_QUEUE
)
parser.add_argument("--host", help="Nameserver hostname (default: %(default)s)", default=None)
parser.add_argument("--port", help="Nameserver port (default: %(default)s)", default=None, type=int)
parser.add_argument("--no-broadcast", help="Disable broadcast (default: %(default)s)",
action='store_const', default=True, const=False)
parser.add_argument("--hmac", help="Nameserver hmac key (default: %(default)s)", default=None)
parser.add_argument(
'-v', '--verbose',
help='Verbose flag',
action='store_const', dest="loglevel", const=logging.INFO, default=logging.WARNING
)
parser.add_argument("--maxsize", help="How many jobs (=chunks of N "
"documents) to keep 'pre-fetched' in a queue "
"(default: %(default)s)", type=int,
default=MAX_JOBS_QUEUE)
parser.add_argument("--host", help="Nameserver hostname (default: "
"%(default)s)", default=None)
parser.add_argument("--port", help="Nameserver port (default: "
"%(default)s)", default=None, type=int)
parser.add_argument("--no-broadcast", help="Disable broadcast (default"
": %(default)s)", action='store_const', default=True,
const=False)
parser.add_argument("--hmac", help="Nameserver hmac key (default: "
"%(default)s)", default=None)
parser.add_argument('-v', '--verbose', help='Verbose flag',
action='store_const', dest="loglevel",
const=logging.INFO, default=logging.WARNING)
args = parser.parse_args()

logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=args.loglevel)
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s',
level=args.loglevel)
logger.info("running %s", " ".join(sys.argv))

ns_conf = {
Expand All @@ -214,7 +376,9 @@ def main():
"port": args.port,
"hmac_key": args.hmac
}
utils.pyro_daemon(LDA_DISPATCHER_PREFIX, Dispatcher(maxsize=args.maxsize, ns_conf=ns_conf), ns_conf=ns_conf)
utils.pyro_daemon(LDA_DISPATCHER_PREFIX, Dispatcher(
maxsize=args.maxsize, ns_conf=ns_conf),
ns_conf=ns_conf)
logger.info("finished running %s", " ".join(sys.argv))


Expand Down
Loading