Skip to content

Commit

Permalink
Merge pull request #185 from nolar/184-embedded-signals
Browse files Browse the repository at this point in the history
Add an example for embedded operator (and stop/ready flags)
  • Loading branch information
nolar authored Sep 13, 2019
2 parents 4ad77da + a38d87c commit 73ef163
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 9 deletions.
4 changes: 2 additions & 2 deletions docs/embedding.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ the main application in the main thread.
loop.run_until_complete(kopf.operator())
def main():
thread = threading.Thread(target_fn=kopf_thread)
thread = threading.Thread(target=kopf_thread)
thread.start()
# ...
thread.join()
Expand Down Expand Up @@ -96,7 +96,7 @@ in :mod:`contextvars` containers with values isolated per-loop and per-task.
))
def main():
thread = threading.Thread(target_fn=kopf_thread)
thread = threading.Thread(target=kopf_thread)
thread.start()
# ...
thread.join()
Expand Down
49 changes: 49 additions & 0 deletions examples/12-embedded/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Kopf example for embedded operator

Kopf operators can be embedded into arbitrary applications, such as UI;
or they can be orchestrated explicitly by the developers instead of `kopf run`.

In this example, we start the operator in a side thread, while simulating
an application activity in the main thread. In this case, the "application"
just creates and deletes the example objects, but it can be any activity.

Start the operator:

```bash
python example.py
```

Let it run for 6 seconds (mostly due to sleeps: 3 times by 1+1 second).
Here is what it will print (shortened; the actual output is more verbose):

```
Starting the main app.
[DEBUG ] Pykube is configured via kubeconfig file.
[DEBUG ] Client is configured via kubeconfig file.
[WARNING ] Default peering object not found, falling back to the standalone mode.
[WARNING ] OS signals are ignored: running not in the main thread.
Do the main app activity here. Step 1/3.
[DEBUG ] [default/kopf-example-0] Creation event: ...
[DEBUG ] [default/kopf-example-0] Deletion event: ...
Do the main app activity here. Step 2/3.
[DEBUG ] [default/kopf-example-1] Creation event: ...
[DEBUG ] [default/kopf-example-1] Deletion event: ...
Do the main app activity here. Step 3/3.
[DEBUG ] [default/kopf-example-2] Creation event: ...
[DEBUG ] [default/kopf-example-2] Deletion event: ...
Exiting the main app.
[INFO ] Stop-flag is set to True. Operator is stopping.
[DEBUG ] Root task 'poster of events' is cancelled.
[DEBUG ] Root task 'watcher of kopfexamples.zalando.org' is cancelled.
[DEBUG ] Root tasks are stopped: finished normally; tasks left: set()
[DEBUG ] Hung tasks stopping is skipped: no tasks given.
```
103 changes: 103 additions & 0 deletions examples/12-embedded/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import asyncio
import threading
import time

import kopf
import kubernetes.client.rest


@kopf.on.create('zalando.org', 'v1', 'kopfexamples')
def create_fn(**_):
pass


@kopf.on.delete('zalando.org', 'v1', 'kopfexamples')
def delete_fn(**_):
pass


def kopf_thread(
ready_flag: threading.Event,
stop_flag: threading.Event,
):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

kopf.configure(verbose=True) # log formatting
kopf.login() # tokens & certs

loop.run_until_complete(kopf.operator(
ready_flag=ready_flag,
stop_flag=stop_flag,
))


def main(steps=3):
kopf.login()

# Start the operator and let it initialise.
print(f"Starting the main app.")
ready_flag = threading.Event()
stop_flag = threading.Event()
thread = threading.Thread(target=kopf_thread, kwargs=dict(
stop_flag=stop_flag,
ready_flag=ready_flag,
))
thread.start()
ready_flag.wait()

# The operator is active: run the app's activity.
for step in range(steps):
print(f"Do the main app activity here. Step {step+1}/{steps}.")
_create_object(step)
time.sleep(1.0)
_delete_object(step)
time.sleep(1.0)

# Ask the operator to terminate gracefully (can take a couple of seconds).
print(f"Exiting the main app.")
stop_flag.set()
thread.join()


def _create_object(step):
try:
api = kubernetes.client.CustomObjectsApi()
api.create_namespaced_custom_object(
group='zalando.org',
version='v1',
plural='kopfexamples',
namespace='default',
body=dict(
apiVersion='zalando.org/v1',
kind='KopfExample',
metadata=dict(name=f'kopf-example-{step}'),
),
)
except kubernetes.client.rest.ApiException as e:
if e.status in [409]:
pass
else:
raise


def _delete_object(step):
try:
api = kubernetes.client.CustomObjectsApi()
api.delete_namespaced_custom_object(
group='zalando.org',
version='v1',
plural='kopfexamples',
namespace='default',
name=f'kopf-example-{step}',
body={},
)
except kubernetes.client.rest.ApiException as e:
if e.status in [404]:
pass
else:
raise


if __name__ == '__main__':
main()
8 changes: 8 additions & 0 deletions examples/12-embedded/test_nothing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""
Embeddable operators require very customised application-specific testing.
Kopf cannot help here beyond its regular `kopf.testing.KopfRunner` helper,
which is an equivalent of `kopf run` command.
This file exists to disable the implicit e2e tests
(they skip if explicit e2e tests exist in the example directory).
"""
87 changes: 80 additions & 7 deletions kopf/reactor/running.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import asyncio
import concurrent.futures
import functools
import logging
import signal
import threading
import warnings
from typing import Optional, Callable, Collection
from typing import Optional, Callable, Collection, Union

from kopf.engines import peering
from kopf.engines import posting
Expand All @@ -13,6 +14,8 @@
from kopf.reactor import queueing
from kopf.reactor import registries

Flag = Union[asyncio.Future, asyncio.Event, concurrent.futures.Future, threading.Event]

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -51,6 +54,8 @@ async def operator(
priority: int = 0,
peering_name: Optional[str] = None,
namespace: Optional[str] = None,
stop_flag: Optional[Flag] = None,
ready_flag: Optional[Flag] = None,
):
"""
Run the whole operator asynchronously.
Expand All @@ -68,6 +73,8 @@ async def operator(
namespace=namespace,
priority=priority,
peering_name=peering_name,
stop_flag=stop_flag,
ready_flag=ready_flag,
)
await run_tasks(operator_tasks, ignored=existing_tasks)

Expand All @@ -79,6 +86,8 @@ async def spawn_tasks(
priority: int = 0,
peering_name: Optional[str] = None,
namespace: Optional[str] = None,
stop_flag: Optional[Flag] = None,
ready_flag: Optional[Flag] = None,
) -> Collection[asyncio.Task]:
"""
Spawn all the tasks needed to run the operator.
Expand All @@ -92,13 +101,17 @@ async def spawn_tasks(
registry = registry if registry is not None else registries.get_default_registry()
event_queue = asyncio.Queue(loop=loop)
freeze_flag = asyncio.Event(loop=loop)
should_stop = asyncio.Future(loop=loop)
signal_flag = asyncio.Future(loop=loop)
tasks = []

# A top-level task for external stopping by setting a stop-flag. Once set,
# this task will exit, and thus all other top-level tasks will be cancelled.
tasks.extend([
loop.create_task(_stop_flag_checker(should_stop)),
loop.create_task(_stop_flag_checker(
signal_flag=signal_flag,
ready_flag=ready_flag,
stop_flag=stop_flag,
)),
])

# K8s-event posting. Events are queued in-memory and posted in the background.
Expand Down Expand Up @@ -141,8 +154,8 @@ async def spawn_tasks(

# On Ctrl+C or pod termination, cancel all tasks gracefully.
if threading.current_thread() is threading.main_thread():
loop.add_signal_handler(signal.SIGINT, should_stop.set_result, signal.SIGINT)
loop.add_signal_handler(signal.SIGTERM, should_stop.set_result, signal.SIGTERM)
loop.add_signal_handler(signal.SIGINT, signal_flag.set_result, signal.SIGINT)
loop.add_signal_handler(signal.SIGTERM, signal_flag.set_result, signal.SIGTERM)
else:
logger.warning("OS signals are ignored: running not in the main thread.")

Expand Down Expand Up @@ -265,9 +278,21 @@ async def _root_task_checker(name, coro):
logger.warning(f"Root task {name!r} is finished unexpectedly.")


async def _stop_flag_checker(should_stop):
async def _stop_flag_checker(
signal_flag: asyncio.Future,
ready_flag: Optional[Flag],
stop_flag: Optional[Flag],
):
# TODO: collect the readiness of all root tasks instead, and set this one only when fully ready.
# Notify the caller that we are ready to be executed.
await _raise_flag(ready_flag)

# Wait until one of the stoppers is set/raised.
try:
result = await should_stop
flags = [signal_flag] + ([] if stop_flag is None else [_wait_flag(stop_flag)])
done, pending = await asyncio.wait(flags, return_when=asyncio.FIRST_COMPLETED)
future = done.pop()
result = await future
except asyncio.CancelledError:
pass # operator is stopping for any other reason
else:
Expand All @@ -290,3 +315,51 @@ def create_tasks(loop: asyncio.AbstractEventLoop, *arg, **kwargs):
"use kopf.spawn_tasks() or kopf.operator().",
DeprecationWarning)
return loop.run_until_complete(spawn_tasks(*arg, **kwargs))


async def _wait_flag(
flag: Optional[Flag],
):
"""
Wait for a flag to be raised.
Non-asyncio primitives are generally not our worry,
but we support them for convenience.
"""
if flag is None:
pass
elif isinstance(flag, asyncio.Future):
return await flag
elif isinstance(flag, asyncio.Event):
return await flag.wait()
elif isinstance(flag, concurrent.futures.Future):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, flag.result)
elif isinstance(flag, threading.Event):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, flag.wait)
else:
raise TypeError(f"Unsupported type of a flag: {flag!r}")


async def _raise_flag(
flag: Optional[Flag],
):
"""
Raise a flag.
Non-asyncio primitives are generally not our worry,
but we support them for convenience.
"""
if flag is None:
pass
elif isinstance(flag, asyncio.Future):
flag.set_result(None)
elif isinstance(flag, asyncio.Event):
flag.set()
elif isinstance(flag, concurrent.futures.Future):
flag.set_result(None)
elif isinstance(flag, threading.Event):
flag.set()
else:
raise TypeError(f"Unsupported type of a flag: {flag!r}")

0 comments on commit 73ef163

Please sign in to comment.