Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload the model with push_to_hub in examples #297

Merged
merged 17 commits into from
Jul 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions benchmarks/benchmark_averaging.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import argparse
import math
import time
import threading
import argparse
import time

import torch

import hivemind
from hivemind.utils import LOCALHOST, increase_file_limit, get_logger
from hivemind.proto import runtime_pb2

from hivemind.utils import LOCALHOST, get_logger, increase_file_limit

logger = get_logger(__name__)

Expand Down Expand Up @@ -50,7 +49,7 @@ def run_averager(index):
initial_peers=[f"{LOCALHOST}:{dht_root.port}"],
start=True)
initial_bits = bin(index % num_groups)[2:].rjust(nbits, '0')
averager = hivemind.DecentralizedAverager(
averager = hivemind.averaging.DecentralizedAverager(
peer_tensors[i], dht, prefix='my_tensor', initial_group_bits=initial_bits, listen_on=f"{LOCALHOST}:*",
compression_type=runtime_pb2.CompressionType.FLOAT16, target_group_size=target_group_size,
averaging_expiration=averaging_expiration, request_timeout=request_timeout, start=True)
Expand Down
16 changes: 9 additions & 7 deletions benchmarks/benchmark_dht.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from tqdm import trange

import hivemind
import hivemind.server.expert_uid
from hivemind.utils.threading import increase_file_limit
from hivemind.moe.server import declare_experts, get_experts
from hivemind.utils.limits import increase_file_limit

logger = hivemind.get_logger(__name__)

Expand Down Expand Up @@ -43,16 +43,17 @@ def benchmark_dht(num_peers: int, initial_peers: int, num_experts: int, expert_b
for start in trange(0, num_experts, expert_batch_size):
store_start = time.perf_counter()
endpoints.append(random_endpoint())
store_ok = hivemind.declare_experts(store_peer, expert_uids[start: start + expert_batch_size], endpoints[-1],
expiration=expiration)
store_ok = declare_experts(store_peer, expert_uids[start: start + expert_batch_size], endpoints[-1],
expiration=expiration)
successes = store_ok.values()
total_store_time += time.perf_counter() - store_start

total_stores += len(successes)
successful_stores += sum(successes)
time.sleep(wait_after_request)

logger.info(f"Store success rate: {successful_stores / total_stores * 100:.1f}% ({successful_stores} / {total_stores})")
logger.info(
f"Store success rate: {successful_stores / total_stores * 100:.1f}% ({successful_stores} / {total_stores})")
logger.info(f"Mean store time: {total_store_time / total_stores:.5}, Total: {total_store_time:.5}")
time.sleep(wait_before_read)

Expand All @@ -63,7 +64,7 @@ def benchmark_dht(num_peers: int, initial_peers: int, num_experts: int, expert_b

for start in trange(0, len(expert_uids), expert_batch_size):
get_start = time.perf_counter()
get_result = hivemind.get_experts(get_peer, expert_uids[start: start + expert_batch_size])
get_result = get_experts(get_peer, expert_uids[start: start + expert_batch_size])
total_get_time += time.perf_counter() - get_start

for i, expert in enumerate(get_result):
Expand All @@ -74,7 +75,8 @@ def benchmark_dht(num_peers: int, initial_peers: int, num_experts: int, expert_b
if time.perf_counter() - benchmark_started > expiration:
logger.warning("keys expired midway during get requests. If that isn't desired, increase expiration_time param")

logger.info(f"Get success rate: {successful_gets / len(expert_uids) * 100:.1f} ({successful_gets} / {len(expert_uids)})")
logger.info(
f"Get success rate: {successful_gets / len(expert_uids) * 100:.1f} ({successful_gets} / {len(expert_uids)})")
logger.info(f"Mean get time: {total_get_time / len(expert_uids):.5f}, Total: {total_get_time:.5f}")

alive_peers = [peer.is_alive() for peer in peers]
Expand Down
21 changes: 10 additions & 11 deletions benchmarks/benchmark_throughput.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@

import hivemind
from hivemind import find_open_port
from hivemind.server import layers
from hivemind.utils.threading import increase_file_limit
from hivemind.moe.server import layers
from hivemind.utils.limits import increase_file_limit
from hivemind.utils.logging import get_logger


logger = get_logger(__name__)


Expand Down Expand Up @@ -88,8 +87,8 @@ def benchmark_throughput(num_experts=16, num_handlers=None, num_clients=128, num
max_batch_size=max_batch_size,
)
timestamps['created_experts'] = time.perf_counter()
server = hivemind.Server(None, experts, listen_on=f"{hivemind.LOCALHOST}:{port}",
num_connection_handlers=num_handlers, device=device)
server = hivemind.moe.Server(None, experts, listen_on=f"{hivemind.LOCALHOST}:{port}",
num_connection_handlers=num_handlers, device=device)
server.start()
server.ready.wait()
timestamps['server_ready'] = time.perf_counter()
Expand All @@ -116,18 +115,18 @@ def benchmark_throughput(num_experts=16, num_handlers=None, num_clients=128, num
total_examples = batch_size * num_clients * num_batches_per_client

logger.info("Benchmark finished, status:" + ["Success", "Failure"][benchmarking_failed.is_set()])
logger.info(f"Server parameters: num_experts={num_experts}, num_handlers={num_handlers}, max_batch_size={max_batch_size},"
f" expert_cls={expert_cls}, hid_dim={hid_dim}, device={device}")
logger.info(f"Server parameters: num_experts={num_experts}, num_handlers={num_handlers}, "
f"max_batch_size={max_batch_size}, expert_cls={expert_cls}, hid_dim={hid_dim}, device={device}")
logger.info(f"Client parameters: num_clients={num_clients}, num_batches_per_client={num_batches_per_client}, "
f"batch_size={batch_size}, backprop={backprop}")
f"batch_size={batch_size}, backprop={backprop}")

logger.info("Results: ")
logger.info(f"\tServer startup took {time_between('began_launching_server', 'server_ready') :.3f} s. "
f"({time_between('began_launching_server', 'created_experts') :.3f} s. experts + "
f"{time_between('created_experts', 'server_ready') :.3f} s. networking)")
f"({time_between('began_launching_server', 'created_experts') :.3f} s. experts + "
f"{time_between('created_experts', 'server_ready') :.3f} s. networking)")
logger.info(f"\tProcessed {total_examples} examples in {time_between('server_ready', 'clients_finished') :.3f}")
logger.info(f"\tThroughput for {'forward + backward' if backprop else 'forward'} passes: "
f"{total_examples / time_between('server_ready', 'clients_finished') :.3f} samples / s.")
f"{total_examples / time_between('server_ready', 'clients_finished') :.3f} samples / s.")
logger.info(f"\tBenchmarking took {time_between('started', 'server_shutdown_finished') :.3f} s.")
if benchmarking_failed.is_set():
logger.info("Note: benchmark code failed, timing/memory results only indicate time till failure!")
Expand Down
15 changes: 15 additions & 0 deletions docs/modules/averaging.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
**hivemind.averaging**
======================

.. automodule:: hivemind.averaging

.. currentmodule:: hivemind.averaging
.. raw:: html

This module lets you average tensors in a decentralized manner.
<br><br>

.. autoclass:: DecentralizedAverager
:members:
:member-order: bysource
:exclude-members: get_tensors, get_tensors_async, update_tensors, rpc_join_group, rpc_aggregate_part, register_allreduce_group
15 changes: 5 additions & 10 deletions docs/modules/client.rst
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
**hivemind.client**
====================
**hivemind.moe.client**
=======================

.. automodule:: hivemind.client
.. automodule:: hivemind.moe.client

.. currentmodule:: hivemind.client
.. currentmodule:: hivemind.moe.client

.. raw:: html

Expand All @@ -20,9 +20,4 @@

.. autoclass:: RemoteSwitchMixtureOfExperts
:members:
:member-order: bysource

.. autoclass:: DecentralizedAverager
:members:
:member-order: bysource
:exclude-members: get_tensors, get_tensors_async, update_tensors, rpc_join_group, rpc_aggregate_part, register_allreduce_group
:member-order: bysource
6 changes: 4 additions & 2 deletions docs/modules/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
.. toctree::
:maxdepth: 2

optim
averaging
dht
client
server
dht
server
18 changes: 18 additions & 0 deletions docs/modules/optim.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
**hivemind.optim**
==================

.. automodule:: hivemind.optim
.. currentmodule:: hivemind.optim

.. raw:: html

This module contains decentralized optimizers that wrap regular pytorch optimizers to collaboratively train a shared model. Depending on the exact type, optimizer may average model parameters with peers, exchange gradients, or follow a more complicated distributed training strategy.
<br><br>

.. autoclass:: CollaborativeOptimizer
:members: step
:member-order: bysource

.. autoclass:: CollaborativeAdaptiveOptimizer
:members:
:member-order: bysource
16 changes: 9 additions & 7 deletions docs/modules/server.rst
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
**hivemind.server**
**hivemind.moe.server**
========================================

A hivemind server hosts one or several experts and processes incoming requests to those experts. It periodically
re-publishes these experts to the dht via a dedicated **hivemind.dht.DHT** peer that runs in background.
The experts can be accessed directly as **hivemind.client.RemoteExpert("addr:port", "expert.uid.here")**
or as a part of **hivemind.client.RemoteMixtureOfExperts** that finds the most suitable experts across the DHT.
The experts can be accessed directly as **hivemind.moe.client.RemoteExpert("addr:port", "expert.uid.here")**
or as a part of **hivemind.moe.client.RemoteMixtureOfExperts** that finds the most suitable experts across the DHT.

The hivemind.server module is organized as follows:
The hivemind.moe.server module is organized as follows:

- Server_ is the main class that publishes experts, accepts incoming requests, and passes them to Runtime_ for compute.
- Runtime_ balances the device (GPU) usage between several ExpertBackend_ instances that each service one expert.
- ExpertBackend_ is a wrapper for `torch.nn.Module <https://pytorch.org/docs/stable/generated/torch.nn.Module.html>`_ \
that can be accessed by remote clients. It has two TaskPool_ -s for forward and backward requests.
that can be accessed by remote clients. It has two TaskPool_ s for forward and backward requests.
- TaskPool_ stores incoming requests for a batch-parallel computation (e.g. forward pass), groups them into batches \
and offers those batches to Runtime_ for processing.


.. automodule:: hivemind.server
.. automodule:: hivemind.moe.server

.. currentmodule:: hivemind.server
.. currentmodule:: hivemind.moe.server

.. _Server:
.. autoclass:: Server
Expand All @@ -35,6 +35,8 @@ The hivemind.server module is organized as follows:
:members: forward, backward, apply_gradients, get_info, get_pools
:member-order: bysource

.. currentmodule:: hivemind.moe.server.task_pool

.. _TaskPool:
.. autoclass:: TaskPool
:members: submit_task, iterate_minibatches, load_batch_to_runtime, send_outputs_from_runtime, get_task_size, empty
Expand Down
2 changes: 1 addition & 1 deletion docs/user/benchmarks.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ hivemind.
### Server throughput

You can use [this benchmark](https://github.com/learning-at-home/hivemind/blob/master/benchmarks/benchmark_throughput.py) to
check the performance impact of your changes to hivemind.client and server. The benchmark will start one server without
check the performance impact of your changes to hivemind.moe. The benchmark will start one server without
DHT with several experts, and then spawn trainer processes that load the server with requests. The two main statistics
in this benchmark samples/s and startup time.

Expand Down
2 changes: 1 addition & 1 deletion docs/user/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ You can also install it in the editable mode with `pip install -e .`.

## Host a server

`hivemind.Server` hosts one or several experts (PyTorch modules) for remote access. These experts are responsible for
`hivemind.moe.Server` hosts one or several experts (PyTorch modules) for remote access. These experts are responsible for
most of the model parameters and computation. The server can be started using either Python or
[a shell script](https://github.com/learning-at-home/hivemind/blob/master/hivemind/hivemind_cli/run_server.py). We'll use the shell
for now. To host a server with default experts, run this in your shell:
Expand Down
12 changes: 6 additions & 6 deletions examples/albert/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ This tutorial will walk you through the steps to set up collaborative training w
## Running an experiment
- Run the first DHT peer to welcome trainers and record training statistics (e.g. loss, performance):
- In this example, we use [wandb.ai](https://wandb.ai/site) to plot training metrics; If you're unfamiliar with Weights & Biases, here's a [quickstart tutorial](https://docs.wandb.ai/quickstart).
- Run `python run_first_peer.py --dht_listen_on '[::]:*' --experiment_prefix NAME_YOUR_EXPERIMENT --wandb_project WANDB_PROJECT_HERE`
- Run `python run_training_monitor.py --dht_listen_on '[::]:*' --experiment_prefix NAME_YOUR_EXPERIMENT --wandb_project WANDB_PROJECT_HERE`
- `NAME_YOUR_EXPERIMENT` must be a unique name of this training run, e.g. `my-first-albert`. It cannot contain `.` due to naming conventions.
- `WANDB_PROJECT_HERE` is a name of wandb project used to track training metrics. Multiple experiments can have the same project name.
- This peer will run a DHT node on a certain IP/port (`Running DHT root at ...`). You will need this address for next steps
```
+ python run_first_peer.py --dht_listen_on '[::]:*' --experiment_prefix my-albert-v1 --wandb_project Demo-run
+ python run_training_monitor.py --dht_listen_on '[::]:*' --experiment_prefix my-albert-v1 --wandb_project Demo-run
[2021/06/17 16:26:35.931][WARN][root.<module>:140] No address specified. Attempting to infer address from DNS.
[2021/06/17 16:26:36.083][INFO][root.<module>:149] Running DHT root at 193.106.95.184:38319
wandb: Currently logged in as: XXX (use `wandb login --relogin` to force relogin)
Expand All @@ -40,8 +40,8 @@ wandb: Run `wandb offline` to turn off syncing.
- if necessary, specify paths: `--dataset_path ./path/to/unpacked/data --tokenizer ./path/to/tokenizer/config` (see [default paths](https://github.com/learning-at-home/hivemind/blob/collaborative_albert_example/examples/albert/run_trainer.py#L63-L69) for reference)
- run:
```shell
HIVEMIND_THREADS=64 python run_trainer.py \
--experiment_prefix SAME_AS_IN_RUN_FIRST_PEER --initial_peers ONE_OR_MORE_PEERS --seed 42 \
python run_trainer.py \
--experiment_prefix SAME_AS_IN_RUN_TRAINING_MONITOR --initial_peers ONE_OR_MORE_PEERS --seed 42 \
--logging_first_step --logging_steps 100 --output_dir ./outputs --overwrite_output_dir --logging_dir ./logs
```
Here, `ONE_OR_MORE_PEERS` stands for either your coordinator endpoint (e.g. `123.123.123.123:1337`), an endpoint of any pre-existing trainer or multiple endpoints for stability. See tips & tricks section below for more information on setting up collaborative training.
Expand Down Expand Up @@ -71,7 +71,7 @@ Finally, we provide best practices for running collaborative experiments of diff
### Hosting the data
For small experiments (3-16 peers, <1GB data), you can use a free-tier file hosting that has a convenient way to [download with curl/wget](https://superuser.com/questions/470664/how-to-download-dropbox-files-using-wget-command). However, these services are not meant for high load and could ban you for generating too much traffic. If you want to scale up, you could either use an S3-like storage from [any](https://aws.amazon.com/s3/) [cloud](https://cloud.google.com/storage) [provider](https://cloud.google.com/storage) or host the data [yourself]((https://gist.github.com/willurd/5720255)). Large data files (>5GB) will take long to download; we recommend splitting them into chunks and implementing a custom dataloader that can load chunks on the fly. Finally, the most _comme il faut_ solution to sharing large datasets is to use [academic torrents](https://academictorrents.com/).

### run_first_peer.py
### run_training_monitor.py
This peer exists solely to welcome other peers onto the DHT and track learning progress. It requires neither GPU nor high bandwidth, the only prerequisite is that coordinator should have high uptime. If no high uptime server is available, one can also run multiple coordinators on different servers and list all of them as `--initial_peers`. The system will stay up as long as at least one coordinator is available. For short- to mid-term experiments you can host coordinator on a [free-tier VM](https://www.quora.com/Are-there-any-free-online-virtual-machines).

### Tuning for hardware/network
Expand All @@ -88,7 +88,7 @@ Here's an example of a full trainer script for Google Colab:
!pip install transformers datasets sentencepiece torch_optimizer==0.1.0
!git clone https://github.com/learning-at-home/hivemind && cd hivemind && pip install -e .
!curl -L YOUR_HOSTED_DATA | tar xzf - # example: https://hivemind-data.s3.us-east-2.amazonaws.com/wikitext103.tar.gz
!ulimit -n 4096 && HIVEMIND_THREADS=256 python ./hivemind/examples/albert/run_trainer.py \
!ulimit -n 4096 && python ./hivemind/examples/albert/run_trainer.py \
--client_mode --initial_peers ONE_OR_MORE_PEERS --averaging_expiration 10 \
--batch_size_lead 300 --per_device_train_batch_size 4 --gradient_accumulation_steps 1 \
--logging_first_step --logging_steps 100 --output_dir ./outputs --overwrite_output_dir --logging_dir ./logs \
Expand Down
2 changes: 1 addition & 1 deletion examples/albert/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class AlbertTrainingArguments(TrainingArguments):
gradient_accumulation_steps: int = 2
seq_length: int = 512

max_steps: int = 1_000_000 # Albert is actually ready after 125000 steps
max_steps: int = 125_000 # please note: this affects both number of steps and learning rate schedule
learning_rate: float = 0.00176
warmup_steps: int = 5000
adam_epsilon: float = 1e-6
Expand Down
2 changes: 1 addition & 1 deletion examples/albert/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
transformers>=4.5.1
transformers>=4.6.0
datasets>=1.5.0
torch_optimizer>=0.1.0
wandb>=0.10.26
Expand Down
Loading