Skip to content

Commit

Permalink
Issues/67 (#113)
Browse files Browse the repository at this point in the history
What:
- Introduce a configurable timeout when trying to connect to SMSC
- Try and detect when the connection to SMSC is disconnected and attempt to re-connect & re-bind
- bugfix; `asyncio.streams.StreamWriter.drain` should not be called concurrently by multiple coroutines[1]
- when shutting down, `naz` now tries to make sure that write buffers are properly flushed[2][3]

Why:
- Make `naz` more failure tolerant
- Fixes: #67
- Fixes: #114
- Fixes: #116
- Fixes: #117
- Fixes: #120

References:
1. https://bugs.python.org/issue29930
2. https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/
3. aio-libs/aiohttp#1369
  • Loading branch information
komuw authored May 31, 2019
1 parent 5ebeffe commit 5f34f83
Show file tree
Hide file tree
Showing 19 changed files with 638 additions and 279 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ script:
- naz-cli --version && naz-cli --help
- naz-cli --config examples/example_config.json --dry-run
- coverage erase
- export CI_ENVIRONMENT=Yes && coverage run --omit="*tests*,*cli/test_*,*examples/*,*.virtualenvs/*,*virtualenv/*,*.venv/*,*__init__*" -m unittest discover -v -s . && bash <(curl -s https://codecov.io/bash)
- coverage report --show-missing --fail-under=83
- export CI_ENVIRONMENT=Yes && coverage run --omit="*tests*,*examples/*,*.virtualenvs/*,*virtualenv/*,*.venv/*,*__init__*" -m unittest discover -v -s . && bash <(curl -s https://codecov.io/bash)
- coverage report --show-missing --fail-under=84
- |
git remote set-branches --add origin master # https://github.com/travis-ci/travis-ci/issues/6069
git fetch
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ most recent version is listed first.
- cleanly handle termination signals like `SIGTERM`: https://github.com/komuw/naz/pull/106
- validate `naz.Client` arguments: https://github.com/komuw/naz/pull/108
- remove ability to bring your own eventloop: https://github.com/komuw/naz/pull/111
- make `naz` more fault tolerant: https://github.com/komuw/naz/pull/113
- `naz` now has a configurable timeout when trying to connect to SMSC
- `naz` will now be able to detect when the connection to SMSC is disconnected and will attempt to re-connect & re-bind
- bugfix; `asyncio.streams.StreamWriter.drain` should not be called concurrently by multiple coroutines
- when shutting down, `naz` now tries to make sure that write buffers are properly flushed.

## **version:** v0.6.0-beta.1
- Bug fix: https://github.com/komuw/naz/pull/98
Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ test:
@export PYTHONASYNCIODEBUG='2'
@printf "\n removing pyc files::\n" && find . -name '*.pyc' -delete;find . -name '__pycache__' -delete | xargs echo
@printf "\n coverage erase::\n" && coverage erase
@printf "\n coverage run::\n" && coverage run --omit="*tests*,*cli/test_*,*examples/*,*.virtualenvs/*,*virtualenv/*,*.venv/*,*__init__*" -m unittest discover -v -s .
@printf "\n coverage report::\n" && coverage report --show-missing --fail-under=83
@printf "\n coverage report html::\n" && coverage html --fail-under=83 --title=naz_coverage
@printf "\n coverage run::\n" && coverage run --omit="*tests*,*examples/*,*.virtualenvs/*,*virtualenv/*,*.venv/*,*__init__*" -m unittest discover -v -s .
@printf "\n coverage report::\n" && coverage report --show-missing --fail-under=84
@printf "\n coverage report html::\n" && coverage html --fail-under=84 --title=naz_coverage
@printf "\n run flake8::\n" && flake8 .
@printf "\n run pylint::\n" && pylint --enable=E --disable=W,R,C examples/ naz/ tests/ cli/ documentation/
@printf "\n run bandit::\n" && bandit -r --exclude .venv -ll .
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ loop.run_until_complete(cli.tranceiver_bind())

try:
# read any data from SMSC, send any queued messages to SMSC and continually check the state of the SMSC
tasks = asyncio.gather(cli.send_forever(), cli.receive_data(), cli.enquire_link())
tasks = asyncio.gather(cli.dequeue_messages(), cli.receive_data(), cli.enquire_link())
loop.run_until_complete(tasks)
loop.run_forever()
except Exception as e:
Expand Down Expand Up @@ -138,7 +138,7 @@ run:
{'event': 'naz.SimpleHook.request', 'stage': 'start', 'smpp_command': 'bind_transceiver', 'log_id': None, 'environment': 'production', 'release': 'canary', 'smsc_host': '127.0.0.1', 'system_id': 'smppclient1', 'client_id': '2VU55VT86KHWXTW7X'}
{'event': 'naz.Client.send_data', 'stage': 'end', 'smpp_command': 'bind_transceiver', 'log_id': None, 'msg': 'hello', 'environment': 'production', 'release': 'canary', 'smsc_host': '127.0.0.1', 'system_id': 'smppclient1', 'client_id': '2VU55VT86KHWXTW7X'}
{'event': 'naz.Client.tranceiver_bind', 'stage': 'end', 'environment': 'production', 'release': 'canary', 'smsc_host': '127.0.0.1', 'system_id': 'smppclient1', 'client_id': '2VU55VT86KHWXTW7X'}
{'event': 'naz.Client.send_forever', 'stage': 'start', 'environment': 'production', 'release': 'canary', 'smsc_host': '127.0.0.1', 'system_id': 'smppclient1', 'client_id': '2VU55VT86KHWXTW7X'}
{'event': 'naz.Client.dequeue_messages', 'stage': 'start', 'environment': 'production', 'release': 'canary', 'smsc_host': '127.0.0.1', 'system_id': 'smppclient1', 'client_id': '2VU55VT86KHWXTW7X'}
```
**NB:**
Expand Down Expand Up @@ -331,7 +331,7 @@ loop.run_until_complete(cli.tranceiver_bind())

try:
# read any data from SMSC, send any queued messages to SMSC and continually check the state of the SMSC
tasks = asyncio.gather(cli.send_forever(), cli.receive_data(), cli.enquire_link())
tasks = asyncio.gather(cli.dequeue_messages(), cli.receive_data(), cli.enquire_link())
loop.run_until_complete(tasks)
loop.run_forever()
except Exception as e:
Expand Down Expand Up @@ -398,7 +398,7 @@ reader, writer = loop.run_until_complete(cli.connect())
loop.run_until_complete(cli.tranceiver_bind())
try:
# read any data from SMSC, send any queued messages to SMSC and continually check the state of the SMSC
tasks = asyncio.gather(cli.send_forever(), cli.receive_data(), cli.enquire_link())
tasks = asyncio.gather(cli.dequeue_messages(), cli.receive_data(), cli.enquire_link())
loop.run_until_complete(tasks)
loop.run_forever()
except Exception as e:
Expand Down
38 changes: 24 additions & 14 deletions cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,27 +212,37 @@ def main():
return

# call naz api
cli = naz.Client(**kwargs)
# connect to the SMSC host
_, _ = loop.run_until_complete(cli.connect())
# bind to SMSC as a tranceiver
loop.run_until_complete(cli.tranceiver_bind())

# read any data from SMSC, send any queued messages to SMSC and continually check the state of the SMSC
tasks = asyncio.gather(
cli.send_forever(TESTING=dry_run),
cli.receive_data(TESTING=dry_run),
cli.enquire_link(TESTING=dry_run),
sig._signal_handling(logger=logger, client=cli, loop=loop),
loop=loop,
client = naz.Client(**kwargs)
loop.run_until_complete(
async_main(client=client, logger=logger, loop=loop, dry_run=dry_run)
)
loop.run_until_complete(tasks)
except Exception as e:
logger.log(logging.ERROR, {"event": "naz.cli.main", "stage": "end", "error": str(e)})
sys.exit(77)
finally:
logger.log(logging.INFO, {"event": "naz.cli.main", "stage": "end"})


async def async_main(
client: naz.Client,
logger: naz.logger.SimpleLogger,
loop: asyncio.events.AbstractEventLoop,
dry_run: bool,
):
# connect & bind to the SMSC host
await client.connect()
await client.tranceiver_bind()

# send any queued messages to SMSC, read any data from SMSC and continually check the state of the SMSC
tasks = asyncio.gather(
client.dequeue_messages(TESTING=dry_run),
client.receive_data(TESTING=dry_run),
client.enquire_link(TESTING=dry_run),
sig._signal_handling(logger=logger, client=client, loop=loop),
loop=loop,
)
await tasks


if __name__ == "__main__":
main()
7 changes: 4 additions & 3 deletions documentation/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,13 @@ loglevel | the level at which to log | INFO
log_metadata | metadata that will be included in all log statements | {"smsc_host": smsc_host, "system_id": system_id}
codec_class | python class instance to be used to encode/decode messages | naz.nazcodec.SimpleNazCodec
codec_errors_level | same meaning as the `errors` argument to pythons' `encode` method as [defined here](https://docs.python.org/3/library/codecs.html#codecs.encode) | strict
enquire_link_interval | time in seconds to wait before sending an `enquire_link` request to SMSC to check on its status | 90
enquire_link_interval | time in seconds to wait before sending an `enquire_link` request to SMSC to check on its status | 55.0
rateLimiter | python class instance implementing rate limitation | naz.ratelimiter.SimpleRateLimiter
hook | python class instance implemeting functionality/hooks to be called by `naz` just before sending request to SMSC and just after getting response from SMSC | naz.hooks.SimpleHook
throttle_handler | python class instance implementing functionality of what todo when naz starts getting throttled responses from SMSC | naz.throttle.SimpleThrottleHandler
correlation_handler | A python class instance that naz uses to store relations between SMPP sequence numbers and user applications' log_id's and/or hook_metadata. | naz.correlater.SimpleCorrelater
drain_duration | duration in seconds that `naz` will wait for after receiving a termination signal. | 8.00
drain_duration | duration in seconds that `naz` will wait for after receiving a termination signal. | 8.00
connect_timeout | duration that `naz` will try to connect to SMSC before timing out | 30.00

`SMSC`: Short Message Service Centre, ie the server
`ESME`: External Short Message Entity, ie the client
Expand All @@ -84,7 +85,7 @@ drain_duration | duration in seconds that `naz` will wait for after receiving a
"release": "canary"
},
"codec_errors_level": "ignore",
"enquire_link_interval": 30,
"enquire_link_interval": 30.0,
"rateLimiter": "dotted.path.to.CustomRateLimiter"
}
```
Expand Down
4 changes: 2 additions & 2 deletions documentation/sphinx-docs/introduction.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ naz is in active development and it's API may change in backward incompatible wa
try:
# read any data from SMSC, send any queued messages to SMSC and continually check the state of the SMSC
tasks = asyncio.gather(cli.send_forever(), cli.receive_data(), cli.enquire_link())
tasks = asyncio.gather(cli.dequeue_messages(), cli.receive_data(), cli.enquire_link())
loop.run_until_complete(tasks)
loop.run_forever()
except Exception as e:
Expand Down Expand Up @@ -373,7 +373,7 @@ An example of using that queue;
try:
# read any data from SMSC, send any queued messages to SMSC and continually check the state of the SMSC
tasks = asyncio.gather(cli.send_forever(), cli.receive_data(), cli.enquire_link())
tasks = asyncio.gather(cli.dequeue_messages(), cli.receive_data(), cli.enquire_link())
loop.run_until_complete(tasks)
loop.run_forever()
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion examples/example_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@
"release": "canary"
},
"codec_errors_level": "ignore",
"enquire_link_interval": 70,
"enquire_link_interval": 70.00,
"rateLimiter": "examples.example_klasses.ExampleRateLimiter"
}
2 changes: 1 addition & 1 deletion examples/in_mem_queue_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

try:
# read any data from SMSC, send any queued messages to SMSC and continually check the state of the SMSC
tasks = asyncio.gather(cli.send_forever(), cli.receive_data(), cli.enquire_link())
tasks = asyncio.gather(cli.dequeue_messages(), cli.receive_data(), cli.enquire_link())
loop.run_until_complete(tasks)
loop.run_forever()
except Exception as e:
Expand Down
4 changes: 2 additions & 2 deletions examples/rabbitmq_queue_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def blocking_dequeue(self):
system_id="smppclient1",
password=os.getenv("password", "password"),
outboundqueue=outboundqueue,
enquire_link_interval=17,
enquire_link_interval=17.00,
)

item_to_enqueue = {
Expand All @@ -148,7 +148,7 @@ def blocking_dequeue(self):

try:
# read any data from SMSC, send any queued messages to SMSC and continually check the state of the SMSC
tasks = asyncio.gather(cli.send_forever(), cli.receive_data(), cli.enquire_link())
tasks = asyncio.gather(cli.dequeue_messages(), cli.receive_data(), cli.enquire_link())
loop.run_until_complete(tasks)
loop.run_forever()
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion examples/redis_queue_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def blocking_dequeue(self):

try:
# read any data from SMSC, send any queued messages to SMSC and continually check the state of the SMSC
tasks = asyncio.gather(cli.send_forever(), cli.receive_data(), cli.enquire_link())
tasks = asyncio.gather(cli.dequeue_messages(), cli.receive_data(), cli.enquire_link())
loop.run_until_complete(tasks)
loop.run_forever()
except Exception as e:
Expand Down
Loading

0 comments on commit 5f34f83

Please sign in to comment.