From c2a69c20efe91bf9f42e6921e896ba3d7e9c1ba7 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Tue, 26 Mar 2019 15:59:55 +0100 Subject: [PATCH] Import the project as it is right now, with the squashed history --- .gitignore | 23 ++ DEVELOPMENT.md | 144 ++++++++ LICENSE => LICENSE.md | 0 README.md | 111 ++++++ examples/01-minimal/README.md | 58 +++ examples/01-minimal/example.py | 7 + examples/02-children/README.md | 38 ++ examples/02-children/example.py | 35 ++ examples/03-exceptions/README.md | 36 ++ examples/03-exceptions/example.py | 18 + examples/04-events/README.md | 38 ++ examples/04-events/example.py | 19 + examples/05-handlers/README.md | 56 +++ examples/05-handlers/example.py | 31 ++ examples/06-peering/README.md | 65 ++++ examples/06-peering/example.py | 6 + examples/07-subhandlers/README.md | 69 ++++ examples/07-subhandlers/example.py | 11 + examples/99-all-at-once/README.md | 4 + examples/99-all-at-once/example.py | 55 +++ examples/README.md | 13 + examples/crd.yaml | 35 ++ examples/obj.yaml | 13 + examples/requirements.txt | 3 + kopf/__init__.py | 59 +++ kopf/__main__.py | 10 + kopf/cli.py | 89 +++++ kopf/config.py | 76 ++++ kopf/events.py | 82 +++++ kopf/on.py | 164 +++++++++ kopf/reactor/__init__.py | 9 + kopf/reactor/handling.py | 562 +++++++++++++++++++++++++++++ kopf/reactor/lifecycles.py | 56 +++ kopf/reactor/loading.py | 34 ++ kopf/reactor/peering.py | 229 ++++++++++++ kopf/reactor/queueing.py | 248 +++++++++++++ kopf/reactor/registry.py | 160 ++++++++ kopf/structs/__init__.py | 12 + kopf/structs/diffs.py | 55 +++ kopf/structs/finalizers.py | 31 ++ kopf/structs/hierarchies.py | 114 ++++++ kopf/structs/lastseen.py | 74 ++++ kopf/structs/progress.py | 151 ++++++++ peering.yaml | 22 ++ setup.py | 14 + 45 files changed, 3139 insertions(+) create mode 100644 .gitignore create mode 100644 DEVELOPMENT.md rename LICENSE => LICENSE.md (100%) create mode 100644 examples/01-minimal/README.md create mode 100644 examples/01-minimal/example.py create mode 100644 examples/02-children/README.md create mode 100644 examples/02-children/example.py create mode 100644 examples/03-exceptions/README.md create mode 100644 examples/03-exceptions/example.py create mode 100644 examples/04-events/README.md create mode 100644 examples/04-events/example.py create mode 100644 examples/05-handlers/README.md create mode 100644 examples/05-handlers/example.py create mode 100644 examples/06-peering/README.md create mode 100644 examples/06-peering/example.py create mode 100644 examples/07-subhandlers/README.md create mode 100644 examples/07-subhandlers/example.py create mode 100644 examples/99-all-at-once/README.md create mode 100644 examples/99-all-at-once/example.py create mode 100644 examples/README.md create mode 100644 examples/crd.yaml create mode 100644 examples/obj.yaml create mode 100644 examples/requirements.txt create mode 100644 kopf/__main__.py create mode 100644 kopf/cli.py create mode 100644 kopf/config.py create mode 100644 kopf/events.py create mode 100644 kopf/on.py create mode 100644 kopf/reactor/__init__.py create mode 100644 kopf/reactor/handling.py create mode 100644 kopf/reactor/lifecycles.py create mode 100644 kopf/reactor/loading.py create mode 100644 kopf/reactor/peering.py create mode 100644 kopf/reactor/queueing.py create mode 100644 kopf/reactor/registry.py create mode 100644 kopf/structs/__init__.py create mode 100644 kopf/structs/diffs.py create mode 100644 kopf/structs/finalizers.py create mode 100644 kopf/structs/hierarchies.py create mode 100644 kopf/structs/lastseen.py create mode 100644 kopf/structs/progress.py create mode 100644 peering.yaml diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..de5491ad --- /dev/null +++ b/.gitignore @@ -0,0 +1,23 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] + +# Distribution / packaging +build/ +dist/ +.eggs/ +*.egg-info/ +*.egg + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +.pytest_cache +nosetests.xml +coverage.xml +junit.xml +*.cover +.hypothesis/ diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md new file mode 100644 index 00000000..f6387ecf --- /dev/null +++ b/DEVELOPMENT.md @@ -0,0 +1,144 @@ +# Bootstrap the development environment + +## Minikube cluster + +To develop the framework and the operators in an isolated Kubernetes cluster, +use [minikube](https://github.com/kubernetes/minikube): + +MacOS: + +```bash +brew install docker-machine-driver-hyperkit +sudo chown root:wheel /usr/local/opt/docker-machine-driver-hyperkit/bin/docker-machine-driver-hyperkit +sudo chmod u+s /usr/local/opt/docker-machine-driver-hyperkit/bin/docker-machine-driver-hyperkit + +brew cask install minikube +minikube config set vm-driver hyperkit +``` + +Start the minikube cluster: + +```bash +minikube start +minikube dashboard +``` + +It will automatically create and activate the kubectl context named `minikube`. +If not, or if you have multiple clusters, activate it explicitly: + +```bash +kubectl config get-contexts +kubectl config current-context +kubectl config use-context minikube +``` + + +## Cluster setup + +Apply the framework's peering resource definition (for neighbourhood awareness): + +```bash +kubectl apply -f peering.yaml +``` + +Apply the custom resource definitions of your application +(here, we use an example application and resource): + +```bash +kubectl apply -f examples/crd.yaml +``` + + +## Runtime setup + +Install the operator to your virtualenv in the editable mode +(and all its dependencies): + +```bash +pip install -e . +kopf --help +``` + +Run the operator in the background console/terminal tab: + +```bash +kopf run examples/01-minimal/example.py --verbose +``` + +Create and delete a sample object (just an example here). +Observe how the operator reacts and prints the logs, +and how the handling progress is reported on the object's events. + +```bash +kubectl apply -f examples/obj.yaml +kubectl describe -f examples/obj.yaml +kubectl delete -f examples/obj.yaml +``` + +## PyCharm & IDEs + +If you use PyCharm, create a Run/Debug Configuration as follows: + +* Mode: `module name` +* Module name: `kopf` +* Arguments: `run examples/01-minimal/example.py --verbose` +* Python Interpreter: anything with Python>=3.7 + +Stop the console operator, and start the IDE debug session. +Put a breakpoint in the used operator script on the first line of the function. +Repeat the object creation, and ensure the IDE stops at the breakpoint. + +Congratulations! You are ready to develop and debug your own operator. + + +## Real cluster + +**WARNING:** Running the operator against a real cluster can influence +the real applications in the ways not expected by other team members. +The dev-mode operator's logs will not be visible in the central loggging, +as there are not sent there. Use the real clusters only when you have +the strong reasons to do so, such as the system resource requests +(CPU, RAM, PVC), which are not achievable in the minikube's VMs. + +**WARNING:** Running multiple operators for the same cluster without isolation +can cause infinite loops, conflicting changes, and duplicated side effects +(such as the children object creation, e.g. jobs, pods, etc). +It is your responsibility to design the deployment in such a way that +the operators do not collide. The framework helps by providing the `--peering` +and `--namespace` CLI options, but does not prevent the mis-configurations. + +To run against the real cluster, use the dev-mode of the framework. +This will set the operator's priority to 666 (just a high number), +and will freeze all other running operators (the default priority is 0) +for the runtime, so that they do not collide with each other: + +```bash +kopf run examples/01-minimal/example.py --verbose --dev +``` + +Alternatively, explicitly freeze/resume all other operators, +and it will freeze them even if your operator is not running +(e.g., for 2 hours): + +```bash +kopf freeze --lifetime $((2*60*60)) +kopf resume +``` + + +## Cleanup + +To cleanup the cluster from the framework-related objects: + +```bash +kubectl delete -f peering.yaml +kubectl delete -f examples/obj.yaml +kubectl delete -f examples/crd.yaml +``` + +For the minikube cleanup (to release the CPU/RAM/disk resources): + +```bash +minikube stop +minikube delete +``` diff --git a/LICENSE b/LICENSE.md similarity index 100% rename from LICENSE rename to LICENSE.md diff --git a/README.md b/README.md index e69de29b..f9b40335 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,111 @@ +# Kubernetes Operator Pythonic Framework (Kopf) + +**Kopf** -- "Kubernetes Operator Pythonic Framework" -- a framework and a library +to make Kubernetes operators development easier, just in few lines of Python code. + +The main goal is to bring the Domain-Driven Design to the infrastructure level, +with Kubernetes being an orchestrator/database of the domain objects (custom resources), +and the operators containing the domain logic (with no or minimal infrastructure logic). + + +## Features: + +* A full-featured operator in just 2 files: `Dockerfile` + a Python module. +* Implicit object's status updates, as returned from the Python functions. +* Multiple creation/update/deletion handlers to track the object handling process. +* Update handlers for the selected fields with automatic value diffs. +* Dynamically generated sub-handlers using the same handling tracking feature. +* Retries of the handlers in case of failures or exceptions. +* Easy object hierarchy building with the labels/naming propagation. +* Built-in _events_ for the objects to reflect their state (as seen in `kubectl describe`). +* Automatic logging/reporting of the handling process (as logs + _events_). +* Handling of multiple CRDs in one process. +* The development instance temporarily suppresses the deployed ones. + +Not yet, todos: + +* ~~Multiple operators are automatically synced to avoid double-processing.~~ +* ~~CLI commands to investigate the status of the handled objects in the cluster.~~ + + +## Examples + +See `./examples/` for the examples of the typical use-cases. + +The minimalistic operator can look like this: + +```python +import kopf + +@kopf.on.create('zalando.org', 'v1', 'kopfexamples') +def create_fn(spec, meta, status, **kwargs): + print(f"And here we are! Creating: {spec}") +``` + +The keyword arguments available to the handlers: + +* `body` for the whole body of the handled objects. +* `spec` as an alias for `body['spec']`. +* `meta` as an alias for `body['metadata']`. +* `status` as an alias for `body['status']`. +* `patch` is a dict with the object changes to applied after the handler. +* `retry` (`int`) is the sequential number of retry of this handler. +* `started` (`datetime.datetime`) is the start time of the handler, in case of retries & errors. +* `runtime` (`datetime.timedelat`) is the duration of the handler run, in case of retries & errors. +* `diff` is a list of changes of the object (only for the update events). +* `old` is the old state of the object or a field (only for the update events). +* `new` is the new state of the object or a field (only for the update events). +* `logger` is a per-object logger, with the messages prefixed with the object's namespace/name. +* `event` is the raw event as received from the Kubernetes API. +* `cause` is the processed cause of the handler as detected by the framework (create/update/delete). + +`**kwargs` (or `**_` to stop lint warnings) is required for the forward +compatibility: the framework can add new keyword arguments in the future, +and the existing handlers should accept them. + + +## Usage + +### In-cluster + +We assume that when the operator is executed in the cluster, it must be packaged +into a docker image with CI/CD tool of your preference. + +```bash +FROM python:3.7 +ADD . /src +RUN pip install kopf +CMD kopf run handlers.py +``` + +Where `handlers.py` is your Python script with the handlers +(see `examples/*/example.py` for the examples). + +See `kopf run --help` for others ways of attaching the handlers. + + +## Contributing + +Please read [CONTRIBUTING.md](CONTRIBUTING.md) for details +on our process for submitting pull requests to us, and please ensure +you follow the [CODE_OF_CONDUCT.md](CODE_OF_CONDUCT.md). + +To install the environment for the local development, +read [DEVELOPMENT.md](DEVELOPMENT.md). + + +## Versioning + +We use [SemVer](http://semver.org/) for versioning. For the versions available, +see the [tags on this repository](https://github.com/zalando-incubator/kopf/tags). + + +## License + +This project is licensed under the MIT License - see the [LICENSE.md](LICENSE.md) file for details + + +## Acknowledgments + +* Thanks to [@side8](https://github.com/side8) and their [k8s-operator](https://github.com/side8/k8s-operator) + for the inspiration. diff --git a/examples/01-minimal/README.md b/examples/01-minimal/README.md new file mode 100644 index 00000000..cc332977 --- /dev/null +++ b/examples/01-minimal/README.md @@ -0,0 +1,58 @@ +# Kopf minimal example + +The minimum codebase needed for to make a runnable Kubernetes operator. + +Start the operator: + +```bash +kopf run example.py --verbose +``` + +It does nothing useful, just notices the object creation, +and prints the message to stdout -- can be seen in the operator's output. + +In addition, the object's status is updated, as can be seen here: + +```bash +$ kubectl apply -f ../obj.yaml +$ kubectl get kopfexamples +NAME DURATION CHILDREN MESSAGE +kopf-example-1 1m hello world +``` + +```bash +$ kubectl describe KopfExample kopf-example-1 +Name: kopf-example-1 +Namespace: default +Labels: somelabel=somevalue +... +Status: + Message: hello world +Events: + Type Reason Age From Message + ---- ------ ---- ---- ------- + Normal Finished 42s kopf All handlers succeeded. + Normal Success 43s kopf Handler create_fn succeeded. +``` + +```bash +$ kubectl get KopfExample kopf-example-1 -o yaml +apiVersion: zalando.org/v1 +kind: KopfExample +metadata: + ... +spec: + duration: 1m + field: value + items: + - item1 + - item2 +status: + message: hello world +``` + +Cleanup in the end: + +```bash +$ kubectl delete -f ../obj.yaml +``` diff --git a/examples/01-minimal/example.py b/examples/01-minimal/example.py new file mode 100644 index 00000000..041ac406 --- /dev/null +++ b/examples/01-minimal/example.py @@ -0,0 +1,7 @@ +import kopf + + +@kopf.on.create('zalando.org', 'v1', 'kopfexamples') +def create_fn(spec, **kwargs): + print(f"And here we are! Creating: {spec}") + return {'message': 'hello world'} # will be the new status diff --git a/examples/02-children/README.md b/examples/02-children/README.md new file mode 100644 index 00000000..3079ae8e --- /dev/null +++ b/examples/02-children/README.md @@ -0,0 +1,38 @@ +# Kopf example with children + +This example creates a `Pod` for every created `KopfExample` object, +and attaches it as a child of that example object. The latter means that +when the parent object is deleted, the child pod is also terminated. + +Start the operator: + +```bash +kopf run example.py --verbose +``` + +The child pod's id is stored as the parent's status field, +so that it can be seen on the object listing (see also `crd.yaml`): + +```bash +$ kubectl apply -f ../obj.yaml +$ kubectl get kopfexamples +NAME FIELD CHILDREN +kopf-example-1 value [aed7f7ac-2971-11e9-b4d3-061441377794] + +$ kubectl get pod -l somelabel=somevalue +NAME READY STATUS RESTARTS AGE +kopf-example-1-jvlfs 1/1 Running 0 26s +``` + +```bash +$ kubectl delete -f ../obj.yaml +$ kubectl get pod -l somelabel=somevalue +NAME READY STATUS RESTARTS AGE +kopf-example-1-jvlfs 1/1 Terminating 0 52s +``` + +Cleanup in the end: + +```bash +$ kubectl delete -f ../obj.yaml +``` diff --git a/examples/02-children/example.py b/examples/02-children/example.py new file mode 100644 index 00000000..1ffab344 --- /dev/null +++ b/examples/02-children/example.py @@ -0,0 +1,35 @@ +import kopf +import kubernetes.client +import yaml + + +@kopf.on.create('zalando.org', 'v1', 'kopfexamples') +def create_fn(body, spec, **kwargs): + + # Render the pod yaml with some spec fields used in the template. + doc = yaml.load(f""" + apiVersion: v1 + kind: Pod + spec: + containers: + - name: the-only-one + image: busybox + command: ["sh", "-x", "-c"] + args: + - | + echo "FIELD=$FIELD" + sleep {spec.get('duration', 0)} + env: + - name: FIELD + value: {spec.get('field', 'default-value')} + """) + + # Make it our child: assign the namespace, name, labels, owner references, etc. + kopf.adopt(doc, owner=body) + + # Actually create an object by requesting the Kubernetes API. + api = kubernetes.client.CoreV1Api() + pod = api.create_namespaced_pod(namespace=doc['metadata']['namespace'], body=doc) + + # Update the parent's status. + return {'children': [pod.metadata.uid]} diff --git a/examples/03-exceptions/README.md b/examples/03-exceptions/README.md new file mode 100644 index 00000000..b84a9dc0 --- /dev/null +++ b/examples/03-exceptions/README.md @@ -0,0 +1,36 @@ +# Kopf example with exceptions in the handler + +This example raises the exceptions in the handler, +so that it is retried few time until succeeded. + +Start the operator: + +```bash +kopf run example.py --verbose +``` + +Observe how the exceptions are repored in the operator's log (stderr), +and also briefly reported as the events on the processed object: + +```bash +$ kubectl apply -f ../obj.yaml +$ kubectl describe kopfexample kopf-example-1 +Name: kopf-example-1 +Namespace: default +Labels: somelabel=somevalue +... +Status: +Events: + Type Reason Age From Message + ---- ------ ---- ---- ------- + Error Exception 9s kopf Handler create_fn failed.: First failure. + Error MyException 6s kopf Handler create_fn failed.: Second failure. + Normal Success 4s kopf Handler create_fn succeeded. + Normal Finished 4s kopf All handlers succeeded. +``` + +Cleanup in the end: + +```bash +$ kubectl delete -f ../obj.yaml +``` diff --git a/examples/03-exceptions/example.py b/examples/03-exceptions/example.py new file mode 100644 index 00000000..abc0ae1c --- /dev/null +++ b/examples/03-exceptions/example.py @@ -0,0 +1,18 @@ +import time + +import kopf + + +class MyException(Exception): + pass + + +@kopf.on.create('zalando.org', 'v1', 'kopfexamples') +def create_fn(retry, **kwargs): + time.sleep(2) # for different timestamps of the events + if not retry: + raise Exception("First failure.") + elif retry == 1: + raise MyException("Second failure.") + else: + pass diff --git a/examples/04-events/README.md b/examples/04-events/README.md new file mode 100644 index 00000000..085ec69a --- /dev/null +++ b/examples/04-events/README.md @@ -0,0 +1,38 @@ +# Kopf example with the event reporting + +The framework reports some basic events on the handling progress. +But the developers can report their own events conveniently. + +Start the operator: + +```bash +kopf run example.py --verbose +``` + +The events are shown on the object's description +(and are usually garbage-collected after few minutes). + +```bash +$ kubectl apply -f ../obj.yaml +$ kubectl describe kopfexample kopf-example-1 +... +Events: + Type Reason Age From Message + ---- ------ ---- ---- ------- + Normal SomeReason 5s kopf Some message + Normal Success 5s kopf Handler create_fn succeeded. + SomeType SomeReason 6s kopf Some message + Normal Finished 5s kopf All handlers succeeded. + Error SomeReason 5s kopf Some exception: Exception text. + Warning SomeReason 5s kopf Some message + +``` + +Note that the events are shown out of any order -- this is a behaviour of the CLI tool or of the API. +It has nothing to do with the framework: the framework reports the timestamps properly. + +Cleanup in the end: + +```bash +$ kubectl delete -f ../obj.yaml +``` diff --git a/examples/04-events/example.py b/examples/04-events/example.py new file mode 100644 index 00000000..f36c33e2 --- /dev/null +++ b/examples/04-events/example.py @@ -0,0 +1,19 @@ +""" +Send the custom events for the handled or other objects. +""" +import kopf + + +@kopf.on.create('zalando.org', 'v1', 'kopfexamples') +def create_fn(body, **kwargs): + + # The all-purpose function for the vent creation. + kopf.event(body, type="SomeType", reason="SomeReason", message="Some message") + + # The shortcuts for the conventional events and common cases. + kopf.info(body, reason="SomeReason", message="Some message") + kopf.warn(body, reason="SomeReason", message="Some message") + try: + raise RuntimeError("Exception text.") + except: + kopf.exception(body, reason="SomeReason", message="Some exception:") diff --git a/examples/05-handlers/README.md b/examples/05-handlers/README.md new file mode 100644 index 00000000..14ed780d --- /dev/null +++ b/examples/05-handlers/README.md @@ -0,0 +1,56 @@ +# Kopf example with multiple handlers + +Multiple handlers can be registered for the same event. +They are executed in the order of registration. + +Beside the stardard create-update-delete events, a per-field diff can be registered. +It is called only in case of the specified field changes, +with `old` & `new` set to that field's values. + +Start the operator (we skip the verbose mode here, for clarity): + +```bash +kopf run example.py +``` + +Trigger the object creation and monitor the stderr of the operator: + +```bash +$ kubectl apply -f ../obj.yaml +``` + +``` +CREATED 1st +[2019-02-05 20:33:50,336] kopf.handling [INFO ] [default/kopf-example-1] Handler create_fn_1 succeeded. +CREATED 2nd +[2019-02-05 20:33:50,557] kopf.handling [INFO ] [default/kopf-example-1] Handler create_fn_2 succeeded. +[2019-02-05 20:33:50,781] kopf.handling [INFO ] [default/kopf-example-1] All handlers succeeded. +``` + +Now, trigger the object change: + +```bash +$ kubectl patch -f ../obj.yaml --type merge -p '{"spec": {"field": "newvalue", "newfield": 100}}' +``` + +``` +UPDATED +[2019-02-05 20:34:06,358] kopf.handling [INFO ] [default/kopf-example-1] Handler update_fn succeeded. +FIELD CHANGED: value -> newvalue +[2019-02-05 20:34:06,682] kopf.handling [INFO ] [default/kopf-example-1] Handler field_fn/spec.field succeeded. +[2019-02-05 20:34:06,903] kopf.handling [INFO ] [default/kopf-example-1] All handlers succeeded. +``` + +Finally, delete the object: + +```bash +$ kubectl delete -f ../obj.yaml +``` + +``` +DELETED 1st +[2019-02-05 20:34:42,496] kopf.handling [INFO ] [default/kopf-example-1] Handler delete_fn_1 succeeded. +DELETED 2nd +[2019-02-05 20:34:42,715] kopf.handling [INFO ] [default/kopf-example-1] Handler delete_fn_2 succeeded. +[2019-02-05 20:34:42,934] kopf.handling [INFO ] [default/kopf-example-1] All handlers succeeded. +``` diff --git a/examples/05-handlers/example.py b/examples/05-handlers/example.py new file mode 100644 index 00000000..a3f87933 --- /dev/null +++ b/examples/05-handlers/example.py @@ -0,0 +1,31 @@ +import kopf + + +@kopf.on.create('zalando.org', 'v1', 'kopfexamples') +def create_fn_1(**kwargs): + print('CREATED 1st') + + +@kopf.on.create('zalando.org', 'v1', 'kopfexamples') +def create_fn_2(**kwargs): + print('CREATED 2nd') + + +@kopf.on.update('zalando.org', 'v1', 'kopfexamples') +def update_fn(old, new, diff, **kwargs): + print('UPDATED') + + +@kopf.on.delete('zalando.org', 'v1', 'kopfexamples') +def delete_fn_1(**kwargs): + print('DELETED 1st') + + +@kopf.on.delete('zalando.org', 'v1', 'kopfexamples') +def delete_fn_2(**kwargs): + print('DELETED 2nd') + + +@kopf.on.field('zalando.org', 'v1', 'kopfexamples', field='spec.field') +def field_fn(old, new, **kwargs): + print(f'FIELD CHANGED: {old} -> {new}') diff --git a/examples/06-peering/README.md b/examples/06-peering/README.md new file mode 100644 index 00000000..7edf3d2a --- /dev/null +++ b/examples/06-peering/README.md @@ -0,0 +1,65 @@ +# Kopf example with multiple processes and development mode + +When multiple operators start for the same cluster (in the cluster or outside), +they become aware about each other, and exchange the basic information about +their liveliness and the priorities, and cooperate to avoid the undesired +side-effects (e.g., duplicated children creation, infinite cross-changes). + +The main use-case for this is the development mode: when a developer starts +an operator on their workstation, all the deployed operators should freeze +and stop processing of the objects, until the developer's operator exits. + +In shell A, start an operator: + +```bash +kopf run example.py --verbose +``` + +In shell B, start another operator: + +```bash +kopf run example.py --verbose +``` + +Notice how both A & B complain about the same-priority sibling operator: + +``` +[2019-02-05 20:42:39,052] kopf.peering [WARNING ] Possibly conflicting operators with the same priority: [Peer(089e5a18a71d4660b07ae37acc776250, priority=0, lastseen=2019-02-05 19:42:38.932613, lifetime=0:01:00)]. +``` + +``` +[2019-02-05 20:42:39,223] kopf.peering [WARNING ] Possibly conflicting operators with the same priority: [Peer(590581cbceff403e90a3e874379c4daf, priority=0, lastseen=2019-02-05 19:42:23.241150, lifetime=0:01:00)]. +``` + +Now, stop the operator B wtih Ctrl+C (twice), and start it with `--dev` option +(equivalent to `--priority 666`): + +```bash +kopf run example.py --verbose --dev +``` + +Observe how the operator A freezes and lets +operator B to take control over the objects. + +``` +[2019-02-05 20:43:40,360] kopf.peering [INFO ] Freezing operations in favour of [Peer(54e7054f28d948c4985db79410c9ef4a, priority=666, lastseen=2019-02-05 19:43:40.166561, lifetime=0:01:00)]. +``` + +Stop the operator B again with Ctrl+C (twice). +The operator A resumes its operations: + +``` +[2019-02-05 20:44:54,311] kopf.peering [INFO ] Resuming operations after the freeze. +``` + +The same can be achieved with the explicit CLI commands: + +```bash +kopf freeze --lifetime 60 --priority 100 +kopf resume +``` + +``` +[2019-02-05 20:45:34,354] kopf.peering [INFO ] Freezing operations in favour of [Peer(manual, priority=100, lastseen=2019-02-05 19:45:34.226070, lifetime=0:01:00)]. +[2019-02-05 20:45:49,427] kopf.peering [INFO ] Resuming operations after the freeze. +``` diff --git a/examples/06-peering/example.py b/examples/06-peering/example.py new file mode 100644 index 00000000..b079df81 --- /dev/null +++ b/examples/06-peering/example.py @@ -0,0 +1,6 @@ +import kopf + + +@kopf.on.create('zalando.org', 'v1', 'kopfexamples') +def create_fn(**kwargs): + pass diff --git a/examples/07-subhandlers/README.md b/examples/07-subhandlers/README.md new file mode 100644 index 00000000..2b695937 --- /dev/null +++ b/examples/07-subhandlers/README.md @@ -0,0 +1,69 @@ +# Kopf example with dynamic sub-handlers + +It is convenient to re-use the framework's capabilities to track +the handler execution, to skip the finished or failed handlers, +and to retry to recoverable errors -- without the reimplemenation +of the same logic inside of the handlers. + +In some cases, however, the required handlers can be identified +only at the handling time, mostly when they are based on the spec, +or on some external environment (databases, remote APIs, other objects). + +For this case, the sub-handlers can be useful. The sub-handlers "extend" +the main handler, inside of which they are defined, but delegate +the progress tracking to the framework. + +In all aspects, the sub-handler are the same as other handlers: +the same function signatures, the same execution environment, +the same error handling, etc. + +Start the operator: + +```bash +kopf run example.py --verbose +``` + +Trigger the object creation and monitor the stderr of the operator: + +```bash +$ kubectl apply -f ../obj.yaml +``` + +Observe how the sub-handlers are nested within the parent handler, +and use the `spec.items` to dynamically decide how many and which +sub-handlers must be executed. + +``` +[2019-02-19 16:05:56,432] kopf.reactor.handlin [DEBUG ] [default/kopf-example-1] First appearance: ... +[2019-02-19 16:05:56,432] kopf.reactor.handlin [DEBUG ] [default/kopf-example-1] Adding the finalizer, thus preventing the actual deletion. + +[2019-02-19 16:05:56,645] kopf.reactor.handlin [DEBUG ] [default/kopf-example-1] Creation event: ... +[2019-02-19 16:05:56,650] kopf.reactor.handlin [DEBUG ] [default/kopf-example-1] Invoking handler create_fn. +[2019-02-19 16:05:56,654] kopf.reactor.handlin [DEBUG ] [default/kopf-example-1] Invoking handler create_fn/item1. + +=== Handling creation for item1. === + +[2019-02-19 16:05:56,656] kopf.reactor.handlin [INFO ] [default/kopf-example-1] Handler create_fn/item1 succeeded. +[2019-02-19 16:05:56,982] kopf.reactor.handlin [INFO ] [default/kopf-example-1] Handler create_fn has unfinished sub-handlers. Will retry soon. + +[2019-02-19 16:05:57,200] kopf.reactor.handlin [DEBUG ] [default/kopf-example-1] Creation event: ... +[2019-02-19 16:05:57,201] kopf.reactor.handlin [DEBUG ] [default/kopf-example-1] Invoking handler create_fn. +[2019-02-19 16:05:57,203] kopf.reactor.handlin [DEBUG ] [default/kopf-example-1] Invoking handler create_fn/item2. + +=== Handling creation for item2. === + +[2019-02-19 16:05:57,208] kopf.reactor.handlin [INFO ] [default/kopf-example-1] Handler create_fn/item2 succeeded. +[2019-02-19 16:05:57,419] kopf.reactor.handlin [INFO ] [default/kopf-example-1] Handler create_fn succeeded. + +[2019-02-19 16:05:57,634] kopf.reactor.handlin [INFO ] [default/kopf-example-1] All handlers succeeded for creation. +``` + +Try creating the object with more items in it to see more sub-handlers +executed (note: do not change it, but re-create it, as only the creation handler +is implemented in this example; or implement the update handler yourselves). + +Cleanup in the end: + +```bash +$ kubectl delete -f ../obj.yaml +``` diff --git a/examples/07-subhandlers/example.py b/examples/07-subhandlers/example.py new file mode 100644 index 00000000..33143726 --- /dev/null +++ b/examples/07-subhandlers/example.py @@ -0,0 +1,11 @@ +import kopf + + +@kopf.on.create('zalando.org', 'v1', 'kopfexamples') +def create_fn(spec, **kwargs): + + for item in spec.get('items', []): + + @kopf.on.this(id=item) + async def create_item_fn(item=item, **kwargs): + print(f"=== Handling creation for {item}. ===") diff --git a/examples/99-all-at-once/README.md b/examples/99-all-at-once/README.md new file mode 100644 index 00000000..fb82d414 --- /dev/null +++ b/examples/99-all-at-once/README.md @@ -0,0 +1,4 @@ +# Kopf example with all the features at once + +This operator contains all the features of the framework at once. +It is used mostly for the development and debugging. diff --git a/examples/99-all-at-once/example.py b/examples/99-all-at-once/example.py new file mode 100644 index 00000000..fda910d0 --- /dev/null +++ b/examples/99-all-at-once/example.py @@ -0,0 +1,55 @@ +""" +Kubernetes operator example: all the features at once (for debugging & testing). +""" + +import pprint +import time + +import kopf + + +@kopf.on.create('zalando.org', 'v1', 'kopfexamples') +def create_1(body, meta, spec, status, **kwargs): + children = _create_children(owner=body) + + kopf.info(body, reason='AnyReason') + kopf.event(body, type='Warning', reason='SomeReason', message="Cannot do something") + kopf.event(children, type='Normal', reason='SomeReason', message="Created as part of the job1step") + + return {'job1-status': 100} + + +@kopf.on.create('zalando.org', 'v1', 'kopfexamples') +def create_2(body, meta, spec, status, retry=None, **kwargs): + wait_for_something() # specific for job2, e.g. an external API poller + + if not retry: + # will be retried by the framework, even if it has been restarted + raise Exception("Whoops!") + + return {'job2-status': 100} + + +@kopf.on.update('zalando.org', 'v1', 'kopfexamples') +def update(body, meta, spec, status, old, new, diff, **kwargs): + print('Handling the diff') + pprint.pprint(list(diff)) + + +@kopf.on.field('zalando.org', 'v1', 'kopfexamples', field='spec.lst') +def update_lst(body, meta, spec, status, old, new, **kwargs): + print(f'Handling the FIELD = {old} -> {new}') + + +@kopf.on.delete('zalando.org', 'v1', 'kopfexamples') +def delete(body, meta, spec, status, **kwargs): + pass + + +def _create_children(owner): + return [] + + +def wait_for_something(): + # Note: intentionally blocking from the asyncio point of view. + time.sleep(1) diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 00000000..fc23c314 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,13 @@ +# Kopf examples + +For the examples to work, a sample CRD (Custom Resource Definition) should be created: + +```bash +kubectl apply -f crd.yaml +``` + +Also, some libraries are needed for some operators and handlers: + +```bash +pip install -r requirements.txt +``` diff --git a/examples/crd.yaml b/examples/crd.yaml new file mode 100644 index 00000000..64c1678e --- /dev/null +++ b/examples/crd.yaml @@ -0,0 +1,35 @@ +# A demo CRD for the Kopf example operators. +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: kopfexamples.zalando.org +spec: + scope: Namespaced + group: zalando.org + versions: + - name: v1 + served: true + storage: true + names: + kind: KopfExample + plural: kopfexamples + singular: kopfexample + shortNames: + - kopfexes + - kopfex + additionalPrinterColumns: + - name: Duration + type: string + priority: 0 + JSONPath: .spec.duration + description: For how long the pod should sleep. + - name: Children + type: string + priority: 0 + JSONPath: .status.children + description: The children pods created. + - name: Message + type: string + priority: 0 + JSONPath: .status.message + description: As returned from the handler (sometimes). diff --git a/examples/obj.yaml b/examples/obj.yaml new file mode 100644 index 00000000..5391b559 --- /dev/null +++ b/examples/obj.yaml @@ -0,0 +1,13 @@ +# A demo custom resource for the Kopf example operators. +apiVersion: zalando.org/v1 +kind: KopfExample +metadata: + name: kopf-example-1 + labels: + somelabel: somevalue +spec: + duration: 1m + field: value + items: + - item1 + - item2 diff --git a/examples/requirements.txt b/examples/requirements.txt new file mode 100644 index 00000000..e7d4c85d --- /dev/null +++ b/examples/requirements.txt @@ -0,0 +1,3 @@ +kopf +kubernetes +pyyaml diff --git a/kopf/__init__.py b/kopf/__init__.py index e69de29b..6931b74b 100644 --- a/kopf/__init__.py +++ b/kopf/__init__.py @@ -0,0 +1,59 @@ +""" +The main Kopf module for all the exported functions & classes. +""" + +from kopf import ( + on, # as a separate name on the public namespace +) +from kopf.config import ( + login, + configure, +) +from kopf.events import ( + event, + info, + warn, + exception, +) +from kopf.on import ( + register, +) +from kopf.reactor import ( + lifecycles, # as a separate name on the public namespace +) +from kopf.reactor.handling import ( + HandlerRetryError, + HandlerFatalError, + HandlerTimeoutError, + execute, +) +from kopf.reactor.lifecycles import ( + get_default_lifecycle, + set_default_lifecycle, +) +from kopf.reactor.queueing import ( + run, + create_tasks, +) +from kopf.structs.hierarchies import ( + adopt, + label, + build_object_reference, + build_owner_reference, + append_owner_reference, + remove_owner_reference, +) + +__all__ = [ + 'on', 'lifecycles', 'register', 'execute', + 'login', 'configure', + 'event', 'info', 'warn', 'exception', + 'run', 'create_tasks', + 'adopt', 'label', + 'get_default_lifecycle', 'set_default_lifecycle', + 'build_object_reference', 'build_owner_reference', + 'append_owner_reference', 'remove_owner_reference', + 'HandlerRetryError', + 'HandlerFatalError', + 'HandlerTimeoutError', +] diff --git a/kopf/__main__.py b/kopf/__main__.py new file mode 100644 index 00000000..42430095 --- /dev/null +++ b/kopf/__main__.py @@ -0,0 +1,10 @@ +""" +CLI entry point, when used as a module: `python -m kopf`. + +Useful for debugging in the IDEs (use the start-mode "Module", module "kopf"). +""" + +from kopf.cli import main + +if __name__ == '__main__': + main() diff --git a/kopf/cli.py b/kopf/cli.py new file mode 100644 index 00000000..8f2c4b4e --- /dev/null +++ b/kopf/cli.py @@ -0,0 +1,89 @@ +import functools + +import click + +from kopf.config import login, configure +from kopf.reactor.loading import preload +from kopf.reactor.peering import Peer, PEERING_DEFAULT_NAME, detect_own_id +from kopf.reactor.queueing import run as real_run + + +def logging_options(fn): + """ A decorator to configure logging in all command in the same way.""" + @click.option('-v', '--verbose', is_flag=True) + @click.option('-d', '--debug', is_flag=True) + @click.option('-q', '--quiet', is_flag=True) + @functools.wraps(fn) # to preserve other opts/args + def wrapper(verbose, quiet, debug, *args, **kwargs): + configure(debug=debug, verbose=verbose, quiet=quiet) + return fn(*args, **kwargs) + return wrapper + + +@click.group(context_settings=dict( + auto_envvar_prefix='KOPF', +)) +def main(): + pass + + +@main.command() +@logging_options +@click.option('-n', '--namespace', default=None) +@click.option('--standalone', is_flag=True, default=False) +@click.option('--dev', 'priority', flag_value=666) +@click.option('-P', '--peering', type=str, default=PEERING_DEFAULT_NAME) +@click.option('-p', '--priority', type=int, default=0) +@click.option('-m', '--module', 'modules', multiple=True) +@click.argument('paths', nargs=-1) +def run(paths, modules, peering, priority, standalone, namespace): + """ Start an operator process and handle all the requests. """ + login() + preload( + paths=paths, + modules=modules, + ) + return real_run( + standalone=standalone, + namespace=namespace, + priority=priority, + peering=peering, + ) + + +@main.command() +@logging_options +@click.option('-n', '--namespace', default=None) +@click.option('-i', '--id', type=str, default=None) +@click.option('--dev', 'priority', flag_value=666) +@click.option('-P', '--peering', type=str, default=PEERING_DEFAULT_NAME) +@click.option('-p', '--priority', type=int, default=100) +@click.option('-t', '--lifetime', type=int, required=True) +@click.option('-m', '--message', type=str) +def freeze(id, message, lifetime, namespace, peering, priority): + """ Freeze the resource handling in the cluster. """ + login() + ourserlves = Peer( + id=id or detect_own_id(), + peering=peering, + namespace=namespace, + priority=priority, + lifetime=lifetime, + ) + ourserlves.keepalive() + + +@main.command() +@logging_options +@click.option('-n', '--namespace', default=None) +@click.option('-i', '--id', type=str, default=None) +@click.option('-P', '--peering', type=str, default=PEERING_DEFAULT_NAME) +def resume(id, namespace, peering): + """ Resume the resource handling in the cluster. """ + login() + ourselves = Peer( + id=id or detect_own_id(), + peering=peering, + namespace=namespace, + ) + ourselves.disappear() diff --git a/kopf/config.py b/kopf/config.py new file mode 100644 index 00000000..ef734899 --- /dev/null +++ b/kopf/config.py @@ -0,0 +1,76 @@ + +import asyncio +import logging + +import click +import kubernetes +import kubernetes.client.rest +import urllib3.exceptions + +logger = logging.getLogger(__name__) +format = '[%(asctime)s] %(name)-20.20s [%(levelname)-8.8s] %(message)s' + + +class LoginError(click.ClickException): + """ Raised when the operator cannot login to the API. """ + + +def login(): + """ + Login the the Kubernetes cluster, locally or remotely. + """ + + # Configure the default client credentials for all possible environments. + try: + kubernetes.config.load_incluster_config() # cluster env vars + logger.debug("configured in cluster with service account") + except kubernetes.config.ConfigException: + kubernetes.config.load_kube_config() # developer's config files + logger.debug("configured via kubeconfig file") + + # Make a sample API call to ensure the login is successful, + # and convert some of the known exceptions to the CLI hints. + try: + api = kubernetes.client.CoreApi() + api.get_api_versions() + except urllib3.exceptions.HTTPError as e: + raise LoginError("Cannot connect to the Kubernetes API. " + "Please configure the cluster access.") + except kubernetes.client.rest.ApiException as e: + if e.status == 401: + raise LoginError("Cannot authenticate to the Kubernetes API. " + "Please login or configure the tokens.") + else: + raise + + +def configure(debug=None, verbose=None, quiet=None): + log_level = 'DEBUG' if debug or verbose else 'WARNING' if quiet else 'INFO' + + logger = logging.getLogger() + handler = logging.StreamHandler() + formatter = logging.Formatter(format) + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.setLevel(log_level) + + # Configure the Kubernetes client defaults according to our settings. + config = kubernetes.client.configuration.Configuration() + config.logger_format = format + config.logger_file = None # once again after the constructor to re-apply the formatter + config.debug = debug + kubernetes.client.configuration.Configuration.set_default(config) + + # Kubernetes client is as buggy as hell: it adds its own stream handlers even in non-debug mode, + # does not respect the formatting, and dumps too much of the low-level info. + if not debug: + logger = logging.getLogger("urllib3") + del logger.handlers[1:] # everything except the default NullHandler + + # Prevent the low-level logging unless in the debug verbosity mode. Keep only the operator's messages. + if not debug: + logging.getLogger('asyncio').propagate = False + logging.getLogger('kubernetes').propagate = False + + loop = asyncio.get_event_loop() + loop.set_debug(debug) diff --git a/kopf/events.py b/kopf/events.py new file mode 100644 index 00000000..fcb35090 --- /dev/null +++ b/kopf/events.py @@ -0,0 +1,82 @@ +""" +All the functions to write the Kubernetes events on the Kubernetes objects. + +They are used internally in the handling routine to show the progress, +and can be used directly from the handlers to add arbitrary custom events. + +The events look like this: + + kubectl describe -f myres.yaml + ... + TODO + +""" + +import sys + +import datetime +import kubernetes + +from kopf.structs.hierarchies import build_object_reference + + +# TODO: rename it it kopf.log()? kopf.events.log()? kopf.events.warn()? +def event(obj, *, type, reason, message=''): + """ + Issue an event for the object. + """ + if isinstance(obj, (list, tuple)): + for item in obj: + event(obj, type=type, reason=reason, message=message) + return + + now = datetime.datetime.utcnow() + namespace = obj['metadata']['namespace'] + + meta = kubernetes.client.V1ObjectMeta( + namespace=namespace, + generate_name='kopf-event-', + ) + body = kubernetes.client.V1beta1Event( + metadata=meta, + + action='Action?', + type=type, + reason=reason, + note=message, + # message=message, + + reporting_controller='kopf', + reporting_instance='dev', + deprecated_source=kubernetes.client.V1EventSource(component='kopf'), # used in the "From" column in `kubectl describe`. + + regarding=build_object_reference(obj), + # related=build_object_reference(obj), + + event_time=now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z' + deprecated_first_timestamp=now.isoformat() + 'Z', # used in the "Age" column in `kubectl describe`. + ) + + api = kubernetes.client.EventsV1beta1Api() + api.create_namespaced_event( + namespace=namespace, + body=body, + ) + + +# Shortcuts for the only two officially documented event types as of now. +# However, any arbitrary strings can be used as an event type to the base function. +def info(obj, *, reason, message=''): + return event(obj, reason=reason, message=message, type='Normal') + + +def warn(obj, *, reason, message=''): + return event(obj, reason=reason, message=message, type='Warning') + + +def exception(obj, *, reason='', message='', exc=None): + if exc is None: + _, exc, _ = sys.exc_info() + reason = reason if reason else type(exc).__name__ + message = f'{message} {exc}' if message else f'{exc}' + return event(obj, reason=reason, message=message, type='Error') diff --git a/kopf/on.py b/kopf/on.py new file mode 100644 index 00000000..4432ad44 --- /dev/null +++ b/kopf/on.py @@ -0,0 +1,164 @@ +""" +The decorators for the event handlers. Usually used as:: + + import kopf + + @kopf.on.create('zalando.org', 'v1', 'kopfexamples') + def creation_handler(**kwargs): + pass + +""" + +# TODO: add cluster=True support (different API methods) + +from typing import Optional, Union, Tuple, List + +from kopf.reactor.handling import subregistry_var +from kopf.reactor.registry import CREATE, UPDATE, DELETE, FIELD +from kopf.reactor.registry import GlobalRegistry, SimpleRegistry, get_default_registry + + +def create( + group: str, version: str, plural: str, + *, + id: Optional[str] = None, + timeout: Optional[float] = None, + registry: Optional[GlobalRegistry] = None): + """ `@kopf.on.create()` handler for the object creation. """ + registry = registry if registry is not None else get_default_registry() + def decorator(fn): + registry.register( + group=group, version=version, plural=plural, + event=CREATE, id=id, timeout=timeout, + fn=fn) + return fn + return decorator + + +def update( + group: str, version: str, plural: str, + *, + id: Optional[str] = None, + timeout: Optional[float] = None, + registry: Optional[GlobalRegistry] = None): + """ `@kopf.on.update()` handler for the object update or change. """ + registry = registry if registry is not None else get_default_registry() + def decorator(fn): + registry.register( + group=group, version=version, plural=plural, + event=UPDATE, id=id, timeout=timeout, + fn=fn) + return fn + return decorator + + +def delete( + group: str, version: str, plural: str, + *, + id: Optional[str] = None, + timeout: Optional[float] = None, + registry: Optional[GlobalRegistry] = None): + """ `@kopf.on.delete()` handler for the object deletion. """ + registry = registry if registry is not None else get_default_registry() + def decorator(fn): + registry.register( + group=group, version=version, plural=plural, + event=DELETE, id=id, timeout=timeout, + fn=fn) + return fn + return decorator + + +def field( + group: str, version: str, plural: str, + field: Union[str, List[str], Tuple[str, ...]], + *, + id: Optional[str] = None, + timeout: Optional[float] = None, + registry: Optional[GlobalRegistry] = None): + """ `@kopf.on.field()` handler for the individual field changes. """ + registry = registry if registry is not None else get_default_registry() + def decorator(fn): + registry.register( + group=group, version=version, plural=plural, + event=FIELD, field=field, id=id, timeout=timeout, + fn=fn) + return fn + return decorator + + +# TODO: find a better name: `@kopf.on.this` is confusing and does not fully +# TODO: match with the `@kopf.on.{cause}` pattern, where cause is create/update/delete. +def this( + *, + id: Optional[str] = None, + timeout: Optional[float] = None, + registry: Optional[SimpleRegistry] = None): + """ + `@kopf.on.this()` decorator for the dynamically generated sub-handlers. + + Can be used only inside of the handler function. + It is efficiently a syntax sugar to look like all other handlers: + + @kopf.on.create('zalando.org', 'v1', 'kopfexamples') + def create(*, spec, **kwargs): + + for task in spec.get('tasks', []): + + @kopf.on.this(id=f'task_{task}') + def create_task(*, spec, task=task, **kwargs): + + pass + + In this example, having spec.tasks set to `[abc, def]`, this will create + the following handlers: `create`, `create/task_abc`, `create/task_def`. + + The parent handler is not considered as finished if there are unfinished + sub-handlers left. Since the sub-handlers will be executed in the regular + reactor and lifecycle, with multiple low-level events (one per iteration), + the parent handler will also be executed multiple times, and is expected + to produce the same (or at least predictable) set of sub-handlers. + In addition, keep its logic idempotent (not failing on the repeated calls). + + Note: `task=task` is needed to freeze the closure variable, so that every + create function will have its own value, not the latest in the for-cycle. + """ + registry = registry if registry is not None else subregistry_var.get() + def decorator(fn): + registry.register(id=id, fn=fn, timeout=timeout) + return fn + return decorator + + +def register( + fn, + *, + id: Optional[str] = None, + timeout: Optional[float] = None, + registry: Optional[SimpleRegistry] = None, +): + """ + Register a function as a sub-handler of the currently executed handler. + + Example:: + + @kopf.on.create('zalando.org', 'v1', 'kopfexamples') + def create_it(spec, **kwargs): + for task in spec.get('tasks', []): + + def create_single_task(task=task, **_): + pass + + kopf.register(id=task, fn=create_single_task) + + This is efficiently an equivalent for:: + + @kopf.on.create('zalando.org', 'v1', 'kopfexamples') + def create_it(spec, **kwargs): + for task in spec.get('tasks', []): + + @kopf.on.this(id=task) + def create_single_task(task=task, **_): + pass + """ + return this(id=id, timeout=timeout, registry=registry)(fn) diff --git a/kopf/reactor/__init__.py b/kopf/reactor/__init__.py new file mode 100644 index 00000000..48a160e8 --- /dev/null +++ b/kopf/reactor/__init__.py @@ -0,0 +1,9 @@ +""" +The reactor groups all modules to watch & process the low- & high-level events. + +The low-level events are the kubernetes watch streams, received on every +object change, including the metainfo, status, etc. + +The high-level events are the actually identified changes in the objects, +such as their creation, deletion, update both in general and per-field. +""" diff --git a/kopf/reactor/handling.py b/kopf/reactor/handling.py new file mode 100644 index 00000000..5c173784 --- /dev/null +++ b/kopf/reactor/handling.py @@ -0,0 +1,562 @@ +""" +Conversion of the low-level events to the high-level causes, and handling them. + +These functions are invoked from the queueing module `kopf.reactor.queueing`, +which are the actual event loop of the operator process. + +The conversion of the low-level events to the high-level causes is done by +checking the object's state and comparing it to the preserved last-seen state. + +The framework itself makes the necessary changes to the object, -- such as the +finalizers attachment, last-seen state updates, and handler status tracking, -- +thus provoking the low-level watch-events and additional queueing calls. +But these internal changes are filtered out from the cause detection +and therefore do not trigger the user-defined handlers. +""" + +import asyncio +import collections +import concurrent.futures +import contextvars +import datetime +import functools +import logging +from contextvars import ContextVar +from typing import NamedTuple, Optional, Any, MutableMapping, Text, Callable, Iterable + +import kubernetes + +from kopf import events +from kopf.reactor.registry import ( + CREATE, UPDATE, DELETE, + Handler, + Resource, + BaseRegistry, + SimpleRegistry, +) +from kopf.structs.diffs import ( + Diff, + resolve, +) +from kopf.structs.finalizers import ( + is_deleted, + has_finalizers, + append_finalizers, + remove_finalizers, +) +from kopf.structs.lastseen import ( + has_state, + get_state_diffs, + is_state_changed, + refresh_last_seen_state, +) +from kopf.structs.progress import ( + is_started, + is_sleeping, + is_awakened, + is_finished, + get_retry_count, + get_start_time, + get_awake_time, + set_start_time, + set_retry_time, + store_failure, + store_success, + purge_progress, +) + + +WAITING_KEEPALIVE_INTERVAL = 10 * 60 +""" How often to wake up from the long sleep, to show the liveliness. """ + +DEFAULT_RETRY_DELAY = 1 * 60 +""" The default delay duration for the regular exception in retry-mode. """ + + +class ObjectLogger(logging.LoggerAdapter): + """ An utility to prefix the per-object log messages. """ + def process(self, msg, kwargs): + return f"[{self.extra['namespace']}/{self.extra['name']}] {msg}", kwargs + + +class Cause(NamedTuple): + """ + The cause is what has caused the whole reaction as a chain of handlers. + + Unlike the low-level Kubernetes watch-events, the cause is aware + of the actual field changes, including the multi-handlers changes. + """ + logger: ObjectLogger + resource: Resource + event: Text + body: MutableMapping + patch: MutableMapping + diff: Optional[Diff] = None + old: Optional[Any] = None + new: Optional[Any] = None + + +class HandlerFatalError(Exception): + """ A fatal handler error, the reties are useless. """ + + +class HandlerRetryError(Exception): + """ A potentially recoverable error, should be retried. """ + def __init__(self, *args, delay=DEFAULT_RETRY_DELAY, **kwargs): + super().__init__(*args, **kwargs) + self.delay = delay + + +class HandlerTimeoutError(HandlerFatalError): + """ An error for the handler's timeout (if set). """ + + +class HandlerChildrenRetry(HandlerRetryError): + """ An internal pseudo-error to retry for the next sub-handlers attempt. """ + + +# The executor for the sync-handlers (i.e. regular functions). +# TODO: make the limits if sync-handlers configurable? +executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) +# executor = concurrent.futures.ProcessPoolExecutor(max_workers=3) + +# The task-local context; propagated down the stack instead of multiple kwargs. +# Used in `@kopf.on.this` and `kopf.execute()` to add/get the sub-handlers. +sublifecycle_var: ContextVar[Callable] = ContextVar('sublifecycle_var') +subregistry_var: ContextVar[SimpleRegistry] = ContextVar('subregistry_var') +subexecuted_var: ContextVar[bool] = ContextVar('subexecuted_var') +handler_var: ContextVar[Handler] = ContextVar('handler_var') +cause_var: ContextVar[Cause] = ContextVar('cause_var') + + +async def custom_object_handler( + lifecycle: Callable, + registry: BaseRegistry, + resource: Resource, + event: dict, + freeze: asyncio.Event, +) -> None: + """ + Handle a single custom object low-level watch-event. + + Convert the low-level events, as provided by the watching/queueing tasks, + to the high-level causes, and then call the cause-handling logic. + + All the internally provoked changes are intercepted, do not create causes, + and therefore do not call the handling logic. + """ + etyp = event['type'] # e.g. ADDED, MODIFIED, DELETED. + body = event['object'] + + # Each object has its own prefixed logger, to distinguish parallel handling. + logger = ObjectLogger(logging.getLogger(__name__), extra=dict( + namespace=body.get('metadata', {}).get('namespace', 'default'), + name=body.get('metadata', {}).get('name', body.get('metadata', {}).get('uid', None)), + )) + + # Object patch accumulator. Populated by the methods. Applied in the end of the handler. + patch = {} + delay = None + + # If the global freeze is set for the processing (i.e. other operator overrides), do nothing. + if freeze.is_set(): + logger.debug("Ignoring the events due to freeze.") + + # The object was really deleted from the cluster. But we do not care anymore. + elif etyp == 'DELETED': + logger.debug("Deleted, really deleted, and we are notified.") + + # The finalizer has been just removed. We are fully done. + elif is_deleted(body) and not has_finalizers(body): + logger.debug("Deletion event, but we are done with it, but we do not care.") + + elif is_deleted(body): + logger.debug("Deletion event: %r", body) + cause = Cause(resource=resource, event=DELETE, body=body, patch=patch, logger=logger) + try: + await execute(lifecycle=lifecycle, registry=registry, cause=cause) + except HandlerChildrenRetry as e: + # on the top-level, no patches -- it is pre-patched. + delay = e.delay + else: + logger.info(f"All handlers succeeded for deletion.") + events.info(cause.body, reason='Success', message=f"All handlers succeeded for deletion.") + logger.debug("Removing the finalizer, thus allowing the actual deletion.") + remove_finalizers(body=body, patch=patch) + + # For a fresh new object, first block it from accidental deletions without our permission. + # The actual handler will be called on the next call. + elif not has_finalizers(body): + logger.debug("First appearance: %r", body) + logger.debug("Adding the finalizer, thus preventing the actual deletion.") + append_finalizers(body=body, patch=patch) + + # For the object seen for the first time (i.e. just-created), call the creation handlers, + # then mark the state as if it was seen when the creation has finished. + elif not has_state(body): + logger.debug("Creation event: %r", body) + cause = Cause(resource=resource, event=CREATE, body=body, patch=patch, logger=logger) + try: + await execute(lifecycle=lifecycle, registry=registry, cause=cause) + except HandlerChildrenRetry as e: + # on the top-level, no patches -- it is pre-patched. + delay = e.delay + else: + logger.info(f"All handlers succeeded for creation.") + events.info(cause.body, reason='Success', message=f"All handlers succeeded for creation.") + purge_progress(body=body, patch=patch) + refresh_last_seen_state(body=body, patch=patch) + + # The previous step triggers one more patch operation without actual change. Ignore it. + # Either the last-seen state or the status field has changed. + elif not is_state_changed(body): + pass + + # And what is left, is the update operation on one of the useful fields of the existing object. + else: + old, new, diff = get_state_diffs(body) + logger.debug("Update event: %r", diff) + cause = Cause(resource=resource, event=UPDATE, body=body, patch=patch, logger=logger, + old=old, new=new, diff=diff) + try: + await execute(lifecycle=lifecycle, registry=registry, cause=cause) + except HandlerChildrenRetry as e: + # on the top-level, no patches -- it is pre-patched. + delay = e.delay + else: + logger.info(f"All handlers succeeded for update.") + events.info(cause.body, reason='Success', message=f"All handlers succeeded for update.") + purge_progress(body=body, patch=patch) + refresh_last_seen_state(body=body, patch=patch) + + # Provoke a dummy change to trigger the reactor after sleep. + # TODO: reimplement via the handler delayed statuses properly. + if delay and not patch: + patch.setdefault('kopf', {})['dummy'] = datetime.datetime.utcnow().isoformat() + + # Whatever was done, apply the accumulated changes to the object. + # But only once, to reduce the number of API calls and the generated irrelevant events. + if patch: + logger.debug("Patching with: %r", patch) + api = kubernetes.client.CustomObjectsApi() + api.patch_namespaced_custom_object( + group=resource.group, + version=resource.version, + plural=resource.plural, + namespace=body['metadata']['namespace'], + name=body['metadata']['name'], + body=patch, + ) + + # Sleep strictly after patching, never before -- to keep the status proper. + if delay: + logger.info(f"Sleeping for {delay} seconds for the delayed handlers.") + await asyncio.sleep(delay) + + +async def execute( + *, + fns: Optional[Iterable[Callable]] = None, + handlers: Optional[Iterable[Handler]] = None, + registry: Optional[BaseRegistry] = None, + lifecycle: Callable = None, + cause: Cause = None, +) -> None: + """ + Execute the handlers in an isolated lifecycle. + + This function is just a public wrapper for `_execute()` with multiple + ways to specify the handlers: either as the raw functions, or as the + pre-created handlers, or as a registry (as used in the object handling). + + If no explicit functions or handlers or registry are passed, + the sub-handlers of the current handler are assumed, as accumulated + in the per-handler registry with `@kopf.on.this`. + + If the call to this method for the sub-handlers is not done explicitly + in the handler, it is done implicitly after the handler is exited. + One way or another, it is executed for the sub-handlers. + """ + + # Restore the current context as set in the handler execution cycle. + lifecycle = lifecycle if lifecycle is not None else sublifecycle_var.get() + handler = handler_var.get(None) + cause = cause if cause is not None else cause_var.get() + + # Validate the inputs; the function signatures cannot put these kind of restrictions, so we do. + if len([v for v in [fns, handlers, registry] if v is not None]) > 1: + raise TypeError("Only one of the fns, handlers, registry can be passed. Got more.") + + elif fns is not None and isinstance(fns, collections.Mapping): + registry = SimpleRegistry(prefix=handler.id if handler else None) + for id, fn in fns.items(): + registry.register(fn=fn, id=id) + + elif fns is not None and isinstance(fns, collections.Iterable): + registry = SimpleRegistry(prefix=handler.id if handler else None) + for fn in fns: + registry.register(fn=fn) + + elif fns is not None: + raise ValueError(f"fns must be a mapping or an iterable, got {fns.__class__}.") + + elif handlers is not None: + registry = SimpleRegistry(prefix=handler.id if handler else None) + for handler in handlers: + registry.append(handler=handler) + + # Use the registry as is; assume that the caller knows what they do. + elif registry is not None: + pass + + # Prevent double implicit execution. + elif subexecuted_var.get(): + return + + # If no explicit args were passed, implicitly use the accumulated handlers from `@kopf.on.this`. + else: + subexecuted_var.set(True) + registry = subregistry_var.get() + + # Execute the real handlers (all or few or one of them, as per the lifecycle). + # Raises `HandlerChildrenRetry` if the execute should be continued on the next iteration. + await _execute( + lifecycle=lifecycle, + registry=registry, + cause=cause, + ) + + +async def _execute( + lifecycle: Callable, + registry: BaseRegistry, + cause: Cause, + retry_on_errors: bool = True, +) -> None: + """ + Call the next handler(s) from the chain of the handlers. + + Keep the record on the progression of the handlers in the object's status, + and use it on the next invocation to determined which handler(s) to call. + + This routine is used both for the global handlers (via global registry), + and for the sub-handlers (via a simple registry of the current handler). + + Raises `HandlerChildrenRetry` if there are children handlers to be executed + on the next call, and implicitly provokes such a call by making the changes + to the status fields (on the handler progression and number of retries). + + Exits normally if all handlers for this cause are fully done. + """ + logger = cause.logger + + # Filter and select the handlers to be executed right now, on this event reaction cycle. + handlers = registry.get_handlers(cause=cause) + handlers_done = [handler for handler in handlers if is_finished(body=cause.body, handler=handler)] + handlers_wait = [handler for handler in handlers if is_sleeping(body=cause.body, handler=handler)] + handlers_todo = [handler for handler in handlers if is_awakened(body=cause.body, handler=handler)] + handlers_plan = [handler for handler in await _call_fn(lifecycle, handlers_todo, cause=cause)] + handlers_left = [handler for handler in handlers_todo if handler.id not in {handler.id for handler in handlers_plan}] + + # Set the timestamps -- even if not executed on this event, but just got registered. + for handler in handlers: + if not is_started(body=cause.body, handler=handler): + set_start_time(body=cause.body, patch=cause.patch, handler=handler) + + # Execute all planned (selected) handlers in one event reaction cycle, even if there are few. + for handler in handlers_plan: + + # Restore the handler's progress status. It can be useful in the handlers. + retry = get_retry_count(body=cause.body, handler=handler) + started = get_start_time(body=cause.body, handler=handler, patch=cause.patch) + runtime = datetime.datetime.utcnow() - started + + # The exceptions are handled locally and are not re-raised, to keep the operator running. + try: + logger.debug(f"Invoking handler {handler.id!r}.") + + if handler.timeout is not None and runtime.total_seconds() > handler.timeout: + raise HandlerTimeoutError(f"Handler {handler.id!r} has timed out after {runtime}.") + + result = await _call_handler( + handler, + cause=cause, + retry=retry, + started=started, + runtime=runtime, + lifecycle=lifecycle, # just a default for the sub-handlers, not used directly. + ) + + # Unfinished children cause the regular retry, but with less logging and event reporting. + except HandlerChildrenRetry as e: + logger.info(f"Handler {handler.id!r} has unfinished sub-handlers. Will retry soon.") + set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=e.delay) + handlers_left.append(handler) + + # Definitely retriable error, no matter what is the error-reaction mode. + except HandlerRetryError as e: + logger.exception(f"Handler {handler.id!r} failed with an retry exception. Will retry.") + events.exception(cause.body, message=f"Handler {handler.id!r} failed. Will retry.") + set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=e.delay) + handlers_left.append(handler) + + # Definitely fatal error, no matter what is the error-reaction mode. + except HandlerFatalError as e: + logger.exception(f"Handler {handler.id!r} failed with an fatal exception. Will stop.") + events.exception(cause.body, message=f"Handler {handler.id!r} failed. Will stop.") + store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e) + # TODO: report the handling failure somehow (beside logs/events). persistent status? + + # Regular errors behave as either retriable or fatal depending on the error-reaction mode. + except Exception as e: + if retry_on_errors: + logger.exception(f"Handler {handler.id!r} failed with an exception. Will retry.") + events.exception(cause.body, message=f"Handler {handler.id!r} failed. Will retry.") + set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=DEFAULT_RETRY_DELAY) + handlers_left.append(handler) + else: + logger.exception(f"Handler {handler.id!r} failed with an exception. Will stop.") + events.exception(cause.body, message=f"Handler {handler.id!r} failed. Will stop.") + store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e) + # TODO: report the handling failure somehow (beside logs/events). persistent status? + + # No errors means the handler should be excluded from future runs in this reaction cycle. + else: + logger.info(f"Handler {handler.id!r} succeeded.") + events.info(cause.body, reason='Success', message=f"Handler {handler.id!r} succeeded.") + store_success(body=cause.body, patch=cause.patch, handler=handler, result=result) + + # Provoke the retry of the handling cycle if there were any unfinished handlers, + # either because they were not selected by the lifecycle, or failed and need a retry. + if handlers_left: + raise HandlerChildrenRetry(delay=None) + + # If there are delayed handlers, block this object's cycle; but do keep-alives every few mins. + # Other (non-delayed) handlers will continue as normlally, due to raise few lines above. + # Other objects will continue as normally in their own handling asyncio tasks. + if handlers_wait: + times = [get_awake_time(body=cause.body, handler=handler) for handler in handlers_wait] + until = min(times) # the soonest awake datetime. + delay = (until - datetime.datetime.utcnow()).total_seconds() + delay = max(0, min(WAITING_KEEPALIVE_INTERVAL, delay)) + raise HandlerChildrenRetry(delay=delay) + + +async def _call_handler( + handler: Handler, + *args, + cause: Cause, + lifecycle: Callable, + **kwargs): + """ + Invoke one handler only, according to the calling conventions. + + Specifically, calculate the handler-specific fields (e.g. field diffs). + + Ensure the global context for this asyncio task is set to the handler and + its cause -- for proper population of the sub-handlers via the decorators + (see `@kopf.on.this`). + """ + + # For the field-handlers, the old/new/diff values must match the field, not the whole object. + old = cause.old if handler.field is None else resolve(cause.old, handler.field) + new = cause.new if handler.field is None else resolve(cause.new, handler.field) + diff = cause.diff # TODO: pass the relevant diff-records for the field, not global. + cause = cause._replace(old=old, new=new, diff=diff) + + # Store the context of the current resource-object-event-handler, to be used in `@kopf.on.this`, + # and maybe other places, and consumed in the recursive `execute()` calls for the children. + # This replaces the multiple kwargs passing through the whole call stack (easy to forget). + sublifecycle_token = sublifecycle_var.set(lifecycle) + subregistry_token = subregistry_var.set(SimpleRegistry(prefix=handler.id)) + subexecuted_token = subexecuted_var.set(False) + handler_token = handler_var.set(handler) + cause_token = cause_var.set(cause) + + # And call it. If the sub-handlers are not called explicitly, run them implicitly + # as if it was done inside of the handler (i.e. under try-finally block). + try: + result = await _call_fn( + handler.fn, + *args, + cause=cause, + **kwargs, + ) + + if not subexecuted_var.get(): + await execute() + + return result + + finally: + # Reset the context to the parent's context, or to nothing (if already in a root handler). + sublifecycle_var.reset(sublifecycle_token) + subregistry_var.reset(subregistry_token) + subexecuted_var.reset(subexecuted_token) + handler_var.reset(handler_token) + cause_var.reset(cause_token) + + +async def _call_fn( + fn: Callable, + *args, + cause: Cause, + **kwargs): + """ + Invoke a single function, but safely for the main asyncio process. + + Used both for the handler functions and for the lifecycle callbacks. + + A full set of the arguments is provided, expanding the cause to some easily + usable aliases. The function is expected to accept ``**kwargs`` for the args + that it does not use -- for forward compatibility with the new features. + + The synchronous methods are executed in the executor (threads or processes), + thus making it non-blocking for the main event loop of the operator. + See: https://pymotw.com/3/asyncio/executors.html + """ + + # Add aliases for the kwargs, directly linked to the body, or to the assumed defaults. + kwargs.update( + cause=cause, + event=cause.event, + body=cause.body, + diff=cause.diff, + old=cause.old, + new=cause.new, + patch=cause.patch, + logger=cause.logger, + spec=cause.body.setdefault('spec', {}), + meta=cause.body.setdefault('metadata', {}), + status=cause.body.setdefault('status', {}), + ) + + if _is_async_fn(fn): + result = await fn(*args, **kwargs) + else: + + # Not that we want to use functools, but for executors kwargs, it is officially recommended: + # https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor + real_fn = functools.partial(fn, *args, **kwargs) + + # Copy the asyncio context from current thread to the handlr's thread. + # It can be copied 2+ times if there are sub-sub-handlers (rare case). + context = contextvars.copy_context() + real_fn = functools.partial(context.run, real_fn) + + loop = asyncio.get_event_loop() + task = loop.run_in_executor(executor, real_fn) + await asyncio.wait([task]) + result = task.result() # re-raises + return result + + +def _is_async_fn(fn): + if fn is None: + return None + elif isinstance(fn, functools.partial): + return _is_async_fn(fn.func) + elif hasattr(fn, '__wrapped__'): # @functools.wraps() + return _is_async_fn(fn.__wrapped__) + else: + return asyncio.iscoroutinefunction(fn) diff --git a/kopf/reactor/lifecycles.py b/kopf/reactor/lifecycles.py new file mode 100644 index 00000000..ffbb7474 --- /dev/null +++ b/kopf/reactor/lifecycles.py @@ -0,0 +1,56 @@ +""" +Few simple lifecycles for the handlers. + +New lifecycles can be implemented the same way: accept `handlers` +in the order they are registered (except those already succeeded), +and return the list of handlers in the order and amount to be executed. + +The default behaviour of the framework is the most simplistic: +execute in the order they are registered, one by one. +""" + +import logging +import random + +logger = logging.getLogger(__name__) + + +def all_at_once(handlers, **kwargs): + """ Execute all handlers at once, in one event reaction cycle, if possible. """ + return handlers + + +def one_by_one(handlers, **kwargs): + """ Execute handlers one at a time, in the order they were registered. """ + return handlers[:1] + + +def randomized(handlers, **kwargs): + """ Execute one handler at a time, in the random order. """ + return random.choice(handlers) + + +def shuffled(handlers, **kwargs): + """ Execute all handlers at once, but in the random order. """ + return random.sample(handlers, k=len(handlers)) + + +def asap(handlers, *, body, **kwargs): + """ Execute one handler at a time, skip on failure, try the next one, retry after the full cycle. """ + retries = body.get('status', {}).get('kopf', {}).get('retries', {}) + retryfn = lambda handler: retries.get(handler.id, 0) + return sorted(handlers, key=retryfn)[:1] + + +_default_lifecycle = None + + +def get_default_lifecycle(): + return _default_lifecycle if _default_lifecycle is not None else asap + + +def set_default_lifecycle(lifecycle): + global _default_lifecycle + if _default_lifecycle is not None: + logger.warn(f"The default lifecycle is already set to {_default_lifecycle}, overriding it to {lifecycle}.") + _default_lifecycle = lifecycle diff --git a/kopf/reactor/loading.py b/kopf/reactor/loading.py new file mode 100644 index 00000000..03f4d0df --- /dev/null +++ b/kopf/reactor/loading.py @@ -0,0 +1,34 @@ +""" +Module- and file-loading to trigger the handlers to be registered. + +Since the framework is based on the decorators to register the handlers, +the files/modules with these handlers should be loaded first, +thus executing the decorators. + +The files/modules to be loaded are usually specified on the command-line. +Currently, two loading modes are supported, both are equivalent to Python CLI: + +* Plain files files (`kopf run file.py`). +* Importable modules (`kopf run -m pkg.mod`). + +Multiple files/modules can be specified. They will be loaded in the order. +""" + +import importlib +import importlib.util +import os.path + + +def preload(paths, modules): + """ + Ensure the handlers are registered by loading/importing the files/modules. + """ + + for path in paths: + name, _ = os.path.splitext(os.path.basename(path)) + spec = importlib.util.spec_from_file_location(name, path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + for name in modules: + importlib.import_module(name) diff --git a/kopf/reactor/peering.py b/kopf/reactor/peering.py new file mode 100644 index 00000000..1caf6edb --- /dev/null +++ b/kopf/reactor/peering.py @@ -0,0 +1,229 @@ +""" +Peer monitoring: knowing which other operators do run, and exchanging the basic signals with them. + +The main use-case is to suppress all deployed operators when a developer starts a dev-/debug-mode +operator for the same cluster on their workstation -- to avoid any double-processing. + +See also: `kopf freeze` & `kopf resume` CLI commands for the same purpose. + +WARNING: There are **NO** per-object locks between the operators, so only one operator +should be functional for the cluster, i.e. only one with the highest priority running. +If the operator sees the violations of this constraint, it will print the warnings +pointing to another same-priority operator, but will continue to function. + +The "signals" exchanged are only the keep-alive notifications from the operator being alive, +and detection of other operators hard termination (by timeout rather than by clear exit). + +The peers monitoring covers both the in-cluster operators running, +and the dev-mode operators running in the dev workstations. + +For this, a special CRD ``kind: KopfPeering`` (cluster-scoped) should be registered +in the cluster, and its status is used by all the operators to sync their keep-alive info. + +The namespace-bound operators (e.g. `--namespace=`) report their individual +namespaces are part of the payload, can see all other cluster and namespaced +operators (even from the different namespaces), and behave accordingly. + +The CRD is not applied automatically, so you have to deploy it yourself explicitly. +To disable the peers monitoring, use the `--standalone` CLI option. +""" + +import asyncio +import datetime +import getpass +import logging +import os +import random +import socket +from typing import Optional, Mapping, Iterable + +import iso8601 +import kubernetes + +from kopf.reactor.registry import Resource + +logger = logging.getLogger(__name__) + +# The CRD info on the special sync-object. +PEERING_CRD_RESOURCE = Resource('zalando.org', 'v1', 'kopfpeerings') +PEERING_DEFAULT_NAME = 'default' + + +# The class used to represent a peer in the parsed peers list (for convenience). +# The extra fields are for easier calculation when and if the peer is dead to the moment. +class Peer: + + def __init__(self, + id: str, *, + peering: str, + priority: int = 0, + lastseen: Optional[str] = None, + lifetime: int = 60, + namespace: Optional[str] = None, + **kwargs): # for the forward-compatibility with the new fields + super().__init__() + self.id = id + self.peering = peering + self.namespace = namespace + self.priority = (priority) + self.lifetime = (lifetime if isinstance(lifetime, datetime.timedelta) else + datetime.timedelta(seconds=int(lifetime))) + self.lastseen = (lastseen if isinstance(lastseen, datetime.datetime) else + iso8601.parse_date(lastseen) if lastseen is not None else + datetime.datetime.utcnow()) + self.lastseen = self.lastseen.replace(tzinfo=None) # only the naive utc -- for comparison + self.deadline = self.lastseen + self.lifetime + self.is_dead = self.deadline <= datetime.datetime.utcnow() + + def __repr__(self): + return f"{self.__class__.__name__}({self.id}, namespace={self.namespace}, priority={self.priority}, lastseen={self.lastseen}, lifetime={self.lifetime})" + + def as_dict(self): + # Only the non-calculated and non-identifying fields. + return { + 'namespace': self.namespace, + 'priority': self.priority, + 'lastseen': self.lastseen.isoformat(), + 'lifetime': self.lifetime.total_seconds(), + } + + def touch(self, *, lifetime: Optional[int] = None): + self.lastseen = datetime.datetime.utcnow() + self.lifetime = (self.lifetime if lifetime is None else + lifetime if isinstance(lifetime, datetime.timedelta) else + datetime.timedelta(seconds=int(lifetime))) + self.deadline = self.lastseen + self.lifetime + self.is_dead = self.deadline <= datetime.datetime.utcnow() + + def keepalive(self): + """ + Add a peer to the peers, and update its alive status. + """ + self.touch() + apply_peers([self], peering=self.peering) + + def disappear(self): + """ + Remove a peer from the peers (gracefully). + """ + self.touch(lifetime=0) + apply_peers([self], peering=self.peering) + + +def apply_peers( + peers: Iterable[Peer], + peering: str, +): + """ + Apply the changes in the peers to the sync-object. + + The dead peers are removed, the new or alive peers are stored. + Note: this does NOT change their `lastseen` field, so do it explicitly with ``touch()``. + """ + api = kubernetes.client.CustomObjectsApi() + api.patch_cluster_custom_object( + group=PEERING_CRD_RESOURCE.group, + version=PEERING_CRD_RESOURCE.version, + plural=PEERING_CRD_RESOURCE.plural, + name=peering, + body={'status': {peer.id: None if peer.is_dead else peer.as_dict() for peer in peers}}, + ) + + +async def peers_handler( + *, + event: Mapping, + freeze: asyncio.Event, + ourselves: Peer, + autoclean: bool = True, +): + """ + Handle a single update of the peers by us or by other operators. + + When an operator with a higher priority appears, switch to the freeze-mode. + The these operators disappear or become presumably dead, resume the event handling. + + The `freeze` object is passed both to the peers handler to set/clear it, + and to all the resource handlers to check its value when the events arrive + (see `create_tasks` and `run` functions). + """ + + # Silently ignore the peering objects which are not ours to worry. + body = event['object'] + name = body.get('metadata', {}).get('name', None) + if name != ourselves.peering: + return + + # Find if we are still the highest priority operator. + pairs = body.get('status', {}).items() + peers = [Peer(id=opid, peering=name, **opinfo) for opid, opinfo in pairs] + dead_peers = [peer for peer in peers if peer.is_dead] + prio_peers = [peer for peer in peers if not peer.is_dead and peer.priority > ourselves.priority] + same_peers = [peer for peer in peers if not peer.is_dead and peer.priority == ourselves.priority and peer.id != ourselves.id] + + if autoclean and dead_peers: + apply_peers(dead_peers, peering=ourselves.peering) # NB: sync and blocking, but this is fine. + + if prio_peers: + if not freeze.is_set(): + logger.info(f"Freezing operations in favour of {prio_peers}.") + freeze.set() + else: + if same_peers: + logger.warning(f"Possibly conflicting operators with the same priority: {same_peers}.") + if freeze.is_set(): + logger.info(f"Resuming operations after the freeze.") + freeze.clear() + + +async def peers_keepalive( + *, + ourselves: Peer, +): + """ + An ever-running coroutine to regularly send our own keep-alive status for the peers. + """ + try: + while True: + logger.debug(f"Peering keep-alive update for {ourselves.id} (priority {ourselves.priority})") + ourselves.keepalive() + + # How often do we update. Keep limited to avoid k8s api flooding. + # Should be slightly less than the lifetime, enough for a patch request to finish. + await asyncio.sleep(max(1, ourselves.lifetime.total_seconds()-10)) + finally: + try: + ourselves.disappear() + except: + pass + + +def detect_own_id() -> str: + """ + Detect or generate the id for ourselves, i.e. the execute operator. + + It is constructed easy to detect in which pod it is running + (if in the cluster), or who runs the operator (if not in the cluster, + i.e. in the dev-mode), and how long ago was it started. + + The pod id can be specified by:: + + env: + - name: POD_ID + valueFrom: + fieldRef: + fieldPath: metadata.name + + Used in the `kopf.reactor.queueing` when the reactor starts, + but is kept here, close to the rest of the peering logic. + """ + + pod = os.environ.get('POD_ID', None) + if pod is not None: + return pod + + user = getpass.getuser() + host = socket.getfqdn() + now = datetime.datetime.utcnow().isoformat() + rnd = ''.join(random.choices('abcdefhijklmnopqrstuvwxyz0123456789', k=6)) + return f'{user}@{host}/{now}/{rnd}' diff --git a/kopf/reactor/queueing.py b/kopf/reactor/queueing.py new file mode 100644 index 00000000..67a8d936 --- /dev/null +++ b/kopf/reactor/queueing.py @@ -0,0 +1,248 @@ +""" +Kubernetes watching/streaming and the per-object queueing system. + +The framework can handle multiple resources at once. +Every custom resource type is "watched" (as in `kubectl get --watch`) +in a separate asyncio task in the never-ending loop. + +The events for this resource type (of all its objects) are then pushed +to the per-object queues, which are created and destroyed dynamically. +The per-object queues are created on demand. + +Every object is identified by its uid, and is handled sequentially: +i.e. the low-level events are processed in the order of their arrival. +Other objects are handled in parallel in their own sequential tasks. + +To prevent the memory leaks over the long run, the queues and the workers +of each object are destroyed if no new events arrive for some time. +The destruction delay (usually few seconds, maybe minutes) is needed +to prevent the often queue/worker destruction and re-creation +in case the events are for any reason delayed by Kubernetes. + +The conversion of the low-level watch-events to the high-level causes +is done in the `kopf.reactor.handling` routines. +""" + +import asyncio +import functools +import logging +from typing import Optional, Callable, Tuple, Union, MutableMapping, NewType + +import aiojobs +import kubernetes.watch + +from kopf.reactor.handling import custom_object_handler +from kopf.reactor.lifecycles import get_default_lifecycle +from kopf.reactor.peering import peers_keepalive, peers_handler, Peer, detect_own_id +from kopf.reactor.peering import PEERING_CRD_RESOURCE, PEERING_DEFAULT_NAME +from kopf.reactor.registry import get_default_registry, BaseRegistry, Resource + +logger = logging.getLogger(__name__) + +ObjectUid = NewType('ObjectUid', str) +ObjectRef = Tuple[Resource, ObjectUid] +Queues = MutableMapping[ObjectRef, asyncio.Queue] + + +# TODO: add the label_selector support for the dev-mode? +async def watcher( + namespace: Union[None, str], + resource: Resource, + handler: Callable, +): + """ + The watchers watches for the resource events via the API, and spawns the handlers for every object. + + All resources and objects are done in parallel, but one single object is handled sequentially + (otherwise, concurrent handling of multiple events of the same object could cause data damage). + + The watcher is as non-blocking and async, as possible. It does neither call any external routines, + nor it makes the API calls via the sync libraries. + + The watcher is generally a never-ending task (unless an error happens or it is cancelled). + The workers, on the other hand, are limited approximately to the life-time of an object's event. + """ + + # If not wrapped, causes TypeError: 'async for' requires an object with __aiter__ method, got generator + loop = asyncio.get_event_loop() + async def _async_wrapper(src): + while True: + yield await loop.run_in_executor(None, next, src) + + # All per-object workers are handled as fire-and-forget jobs via the scheduler, + # and communicated via the per-object event queues. + scheduler = await aiojobs.create_scheduler(limit=10) + queues = {} + try: + + # Make a Kubernetes call to watch for the events via the API. + w = kubernetes.watch.Watch() + api = kubernetes.client.CustomObjectsApi() + stream = w.stream(api.list_cluster_custom_object, resource.group, resource.version, resource.plural) + async for event in _async_wrapper(stream): + key = (resource, event['object']['metadata']['uid']) + + # Filter out all unrelated events as soon as possible (before queues), and silently. + # TODO: Reimplement via api.list_namespaced_custom_object, and API-level filtering. + ns = event['object'].get('metadata', {}).get('namespace', None) + if namespace is not None and ns is not None and ns != namespace: + continue + + # Either use the existing object's queue, or create a new one together with the per-object job. + # "Fire-and-forget": we do not wait for the result; the job destroys itself when it is fully done. + try: + await queues[key].put(event) + except KeyError: + queues[key] = asyncio.Queue() + await queues[key].put(event) + await scheduler.spawn(worker(handler=handler, queues=queues, key=key)) + + finally: + # Forcedly terminate all the fire-and-forget per-object jobs, of they are still running. + await scheduler.close() + + +async def worker( + handler: Callable, + queues: Queues, + key: ObjectRef, +): + """ + The per-object workers consume the object's events and invoke the handler. + + The handler is expected to be an async coroutine, always the one from the framework. + In fact, it is either a peering handler, which monitors the peer operators, + or a generic resource handler, which internally calls the registered synchronous handlers. + + The per-object worker is a time-limited task, which ends as soon as all the object's events + have been handled. The watcher will spawn a new job when and if the new events arrive. + + To prevent the queue/job deletion and re-creation to happen too often, the jobs wait some + reasonable, but small enough time (few seconds) before actually finishing -- + in case the new events are there, but the API or the watcher task lags a bit. + """ + queue = queues[key] + try: + while True: + + # Try ASAP, but give it few seconds for the new events to arrive, maybe. + # If the queue is empty for some time, then indeed finish the object's worker. + try: + event = await asyncio.wait_for(queue.get(), timeout=5.0) + except asyncio.TimeoutError: + break + + # Try the handler. In case of errors, show the error, but continue the queue processing. + try: + await handler(event=event) + except Exception as e: + # TODO: handler is a functools.partial. make the prints a bit nicer by removing it. + logger.exception(f"{handler} failed with an exception. Ignoring the event.") + # raise + + finally: + # Whether an exception or a break or a success, notify the caller, and garbage-collect our queue. + # The queue must not be left in the queue-cache without a corresponding job handling this queue. + try: + del queues[key] + except KeyError: + pass + + +def create_tasks( + lifecycle: Optional[Callable] = None, + registry: Optional[BaseRegistry] = None, + standalone: bool = False, + priority: int = 0, + peering: str = PEERING_DEFAULT_NAME, + namespace: Optional[str] = None, +): + """ + Create all the tasks needed to run the operator, but do not spawn/start them. + The tasks are properly inter-connected depending on the runtime specification. + They can be injected into any event loop as needed. + """ + + # The freezer and the registry are scoped to this whole task-set, to sync them all. + lifecycle = lifecycle if lifecycle is not None else get_default_lifecycle() + registry = registry if registry is not None else get_default_registry() + freeze = asyncio.Event() + tasks = [] + + # Monitor the peers, unless explicitly disabled. + if not standalone: + ourselves = Peer( + id=detect_own_id(), + priority=priority, + peering=peering, + namespace=namespace, + ) + tasks.extend([ + asyncio.Task(peers_keepalive(ourselves=ourselves)), + asyncio.Task(watcher(namespace=None, # peering is cluster-object + resource=PEERING_CRD_RESOURCE, + handler=functools.partial(peers_handler, + ourselves=ourselves, + freeze=freeze))), # freeze is set/cleared + ]) + + # Resource event handling, only once for every known resource (de-duplicated). + for resource in registry.resources: + tasks.extend([ + asyncio.Task(watcher(namespace=namespace, + resource=resource, + handler=functools.partial(custom_object_handler, + lifecycle=lifecycle, + registry=registry, + resource=resource, + freeze=freeze))), # freeze is only checked + ]) + + return tasks + + +def run( + lifecycle: Optional[Callable] = None, + registry: Optional[BaseRegistry] = None, + standalone: bool = False, + priority: int = 0, + loop: Optional[asyncio.BaseEventLoop] = None, + peering: str = PEERING_DEFAULT_NAME, + namespace: Optional[str] = None, +): + """ + Serve the events for all the registered resources and handlers. + + This process typically never ends, unless an unhandled error happens + in one of the consumers/producers. + """ + loop = loop if loop is not None else asyncio.get_event_loop() + tasks = create_tasks( + lifecycle=lifecycle, + registry=registry, + standalone=standalone, + namespace=namespace, + priority=priority, + peering=peering, + ) + + # Run the presumably infinite tasks until one of them fails (they never exit normally). + done, pending = loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)) + + # Allow the remaining tasks to handle the cancellation before re-raising (e.g. via try-finally). + # The errors in the cancellation stage will be ignored anyway (never re-raised below). + for task in pending: + task.cancel() + cancelled, pending = loop.run_until_complete(asyncio.wait(pending, return_when=asyncio.ALL_COMPLETED)) + assert not pending # must be empty by now, the tasks are either done or cancelled. + + # Check the results of the non-cancelled tasks, and re-raise of there were any exceptions. + # The cancelled tasks are not re-raised, as it is a normal flow for the "first-completed" run. + # TODO: raise all of the cancelled+done, if there were 2+ failed ones. + for task in cancelled: + try: + task.result() # can raise the regular (non-cancellation) exceptions. + except asyncio.CancelledError: + pass + for task in done: + task.result() diff --git a/kopf/reactor/registry.py b/kopf/reactor/registry.py new file mode 100644 index 00000000..e3e0edfd --- /dev/null +++ b/kopf/reactor/registry.py @@ -0,0 +1,160 @@ +""" +A registry of the handlers, attached to the resources or events. + +The global registry is populated by the `kopf.on` decorators, and is used +to register the resources being watched and handled, and to attach +the handlers to the specific causes (create/update/delete/field-change). + +The simple registry is part of the global registry (for each individual +resource), and also used for the sub-handlers within a top-level handler. + +Both are used in the `kopf.reactor.handling` to retrieve the list +of the handlers to be executed on each reaction cycle. +""" +import abc +import collections +import functools +from types import FunctionType, MethodType + +# The constants for the event types, to prevent the direct string usage and typos. +# They are not exposed by the framework, but are used internally. See also: `kopf.on`. +CREATE = 'create' +UPDATE = 'update' +DELETE = 'delete' +FIELD = 'field' + + +# An immutable reference to a custom resource definition. +Resource = collections.namedtuple('Resource', 'group version plural') + +# A registered handler (function + event meta info). +Handler = collections.namedtuple('Handler', 'fn id event field timeout') + + +class BaseRegistry(metaclass=abc.ABCMeta): + """ + A registry stores the handlers and provides them to the reactor. + """ + + def get_handlers(self, cause): + return list(self.iter_handlers(cause=cause)) + + @abc.abstractmethod + def iter_handlers(self, cause): + pass + + +class SimpleRegistry(BaseRegistry): + """ + A simple registry is just a list of handlers, no grouping. + """ + + def __init__(self, prefix=None): + super().__init__() + self.prefix = prefix + self._handlers = [] # [Handler, ...] + + def append(self, handler): + self._handlers.append(handler) + + def register(self, fn, id=None, event=None, field=None, timeout=None): + + if field is None: + field = None # for the non-field events + elif isinstance(field, str): + field = tuple(field.split('.')) + elif isinstance(field, (list, tuple)): + field = tuple(field) + else: + raise ValueError(f"Field must be either a str, or a list/tuple. Got {field!r}") + + id = id if id is not None else get_callable_id(fn) + id = id if field is None else f'{id}/{".".join(field)}' + id = id if self.prefix is None else f'{self.prefix}/{id}' + handler = Handler(id=id, fn=fn, event=event, field=field, timeout=timeout) + + self.append(handler) + return fn # to be usable as a decorator too. + + def iter_handlers(self, cause): + fields = {field for _, field, _, _ in cause.diff or []} + for handler in self._handlers: + if handler.event == FIELD: + if any(field[:len(handler.field)] == handler.field for field in fields): + yield handler + elif handler.event is None or handler.event == cause.event: + yield handler + + +def get_callable_id(c): + """ Get an reasonably good id of any commonly used callable. """ + if c is None: + return None + elif isinstance(c, functools.partial): + return get_callable_id(c.func) + elif hasattr(c, '__wrapped__'): # @functools.wraps() + return get_callable_id(c.__wrapped__) + elif isinstance(c, FunctionType) and c.__name__ == '': + # The best we can do to keep the id stable across the process restarts, + # assuming at least no code changes. The code changes are not detectable. + line = c.__code__.co_firstlineno + path = c.__code__.co_filename + return f'lambda:{path}:{line}' + elif isinstance(c, (FunctionType, MethodType)): + return getattr(c, '__qualname__', getattr(c, '__name__', repr(c))) + else: + raise ValueError(f"Cannot get id of {c!r}.") + + +class GlobalRegistry(BaseRegistry): + """ + A global registry is used for handling of the multiple resources. + It is usually populated by the `@kopf.on...` decorators. + """ + + def __init__(self): + super().__init__() + self._handlers = {} # {Resource: SimpleRegistry[Handler, ...]} + + def register(self, group, version, plural, event, fn, id=None, field=None, timeout=None): + """ + Register an additional handler function for the specific resource and specific event. + """ + resource = Resource(group, version, plural) + registry = self._handlers.setdefault(resource, SimpleRegistry()) + registry.register(event=event, field=field, fn=fn, id=id, timeout=timeout) + return fn # to be usable as a decorator too. + + @property + def resources(self): + """ All known resources in the registry. """ + return frozenset(self._handlers) + + def iter_handlers(self, cause): + """ + Iterate all handlers for this and special FIELD event, in the order they were registered (even if mixed). + For the FIELD event, also filter only the handlers where the field matches one of the actually changed fields. + """ + resource_registry = self._handlers.get(cause.resource, None) + if resource_registry is not None: + yield from resource_registry.iter_handlers(cause=cause) + + +_default_registry = GlobalRegistry() + + +def get_default_registry() -> GlobalRegistry: + """ + Get the default registry to be used by the decorators and the reactor + unless the explicit registry is provided to them. + """ + return _default_registry + + +def set_default_registry(registry: GlobalRegistry): + """ + Set the default registry to be used by the decorators and the reactor + unless the explicit registry is provided to them. + """ + global _default_registry + _default_registry = registry diff --git a/kopf/structs/__init__.py b/kopf/structs/__init__.py new file mode 100644 index 00000000..717ffb1e --- /dev/null +++ b/kopf/structs/__init__.py @@ -0,0 +1,12 @@ +""" +All the functions to manipulate the resource fields, state changes, etc. + +Grouped by the type of the fields and the purpose of the manipulation. + +Used in the handling routines to check if there were significant changes at all +(i.e. not our own internal and system changes, like the uids, links, etc), +and to get the exact per-field diffs for the specific handler functions. + +All the functions are purely data-manipulative and computational. +No external calls or any i/o activities are done here. +""" diff --git a/kopf/structs/diffs.py b/kopf/structs/diffs.py new file mode 100644 index 00000000..d44492b9 --- /dev/null +++ b/kopf/structs/diffs.py @@ -0,0 +1,55 @@ +""" +All the functions to calculate the diffs of the dicts. +""" + +import collections + +from typing import Any, Tuple, NewType, Generator, Sequence, Mapping + +DiffOp = NewType('DiffOp', str) +DiffPath = Tuple[str, ...] +DiffItem = Tuple[DiffOp, DiffPath, Any, Any] +Diff = Sequence[DiffItem] + + +def resolve(d: Mapping, path: DiffPath): + result = d + for key in path: + result = result[key] + return result + + +def diff(a: Any, b: Any, path: DiffPath = ()) -> Generator[DiffItem, None, None]: + """ + Calculate the diff between two dicts. + + Yields the tuple of form `(op, path, old, new)`, + where `op` is either `"add"`/`"change"`/`"remove"`, + `path` is a tuple with the field names (empty tuple means root), + and the `old` & `new` values (`None` for addition/removal). + + List values are treated as a whole, and not recursed into. + Therefore, an addition/removal of a list item is considered + as a change of the whole value. + + If the deep diff for lists/sets is needed, see the libraries: + + * https://dictdiffer.readthedocs.io/en/latest/ + * https://github.com/seperman/deepdiff + * https://python-json-patch.readthedocs.io/en/latest/tutorial.html + """ + if type(a) != type(b): + yield ('change', path, a, b) + elif a == b: + pass # to exclude the case as soon as possible + elif isinstance(a, collections.Mapping): + a_keys = frozenset(a.keys()) + b_keys = frozenset(b.keys()) + for key in b_keys - a_keys: + yield ('add', path+(key,), None, b[key]) + for key in a_keys - b_keys: + yield ('remove', path+(key,), a[key], None) + for key in a_keys & b_keys: + yield from diff(a[key], b[key], path=path+(key,)) + else: + yield ('change', path, a, b) diff --git a/kopf/structs/finalizers.py b/kopf/structs/finalizers.py new file mode 100644 index 00000000..438017e3 --- /dev/null +++ b/kopf/structs/finalizers.py @@ -0,0 +1,31 @@ +""" +All the functions to manipulate the object finalization and deletion. + +Finalizers are used to block the actual deletion until the finalizers +are removed, meaning that the operator has done all its duties +to "release" the object (e.g. cleanups; delete-handlers in our case). +""" + +# A string marker to be put on the list of the finalizers to block +# the object from being deleted without the permission of the framework. +FINALIZER = 'KopfFinalizerMarker' + + +def is_deleted(body): + return body.get('metadata', {}).get('deletionTimestamp', None) is not None + + +def has_finalizers(body): + return 'finalizers' in body['metadata'] and FINALIZER in body['metadata']['finalizers'] + + +def append_finalizers(*, body, patch): + finalizers = body.get('metadata', {}).get('finalizers', []) + patch.setdefault('metadata', {}).setdefault('finalizers', list(finalizers)) + patch['metadata']['finalizers'].append(FINALIZER) + + +def remove_finalizers(*, body, patch): + finalizers = body.get('metadata', {}).get('finalizers', []) + patch.setdefault('metadata', {}).setdefault('finalizers', list(finalizers)) + patch['metadata']['finalizers'].remove(FINALIZER) diff --git a/kopf/structs/hierarchies.py b/kopf/structs/hierarchies.py new file mode 100644 index 00000000..e1a5dc40 --- /dev/null +++ b/kopf/structs/hierarchies.py @@ -0,0 +1,114 @@ +""" +All the functions to properly build the object hierarchies. +""" + + +def build_object_reference(body): + """ + Construct an object reference for the events. + """ + return dict( + apiVersion=body['apiVersion'], + kind=body['kind'], + name=body['metadata']['name'], + uid=body['metadata']['uid'], + namespace=body['metadata']['namespace'], + ) + + +def build_owner_reference(body): + """ + Construct an owner reference object for the parent-children relationships. + + The structure needed to link the children objects to the current object as a parent. + See https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/ + """ + return dict( + controller=True, + blockOwnerDeletion=True, + apiVersion=body['apiVersion'], + kind=body['kind'], + name=body['metadata']['name'], + uid=body['metadata']['uid'], + ) + + +def append_owner_reference(objs, owner): + """ + Append an owner reference to the resource(s), if it is not yet there. + + Note: the owned objects are usually not the one being processed, + so the whole body can be modified, no patches are needed. + """ + if not isinstance(objs, (list, tuple)): + objs = [objs] + + owner = build_owner_reference(owner) + for obj in objs: + refs = obj.setdefault('metadata', {}).setdefault('ownerReferences', []) + matching = [ref for ref in refs if ref['uid'] == owner['uid']] + if not matching: + refs.append(owner) + + +def remove_owner_reference(objs, owner): + """ + Remove an owner reference to the resource(s), if it is there. + + Note: the owned objects are usually not the one being processed, + so the whole body can be modified, no patches are needed. + """ + if not isinstance(objs, (list, tuple)): + objs = [objs] + + owner = build_owner_reference(owner) + for obj in objs: + refs = obj.setdefault('metadata', {}).setdefault('ownerReferences', []) + matching = [ref for ref in refs if ref['uid'] == owner['uid']] + for ref in matching: + refs.remove(ref) + + +# TODO: make it also recursively if there are any .metadata.labels inside (e.g. job/pod templates). +def label(objs, labels, force=False): + """ + Apply the labels to the object(s). + """ + if not isinstance(objs, (list, tuple)): + objs = [objs] + + for obj in objs: + obj_labels = obj.setdefault('metadata', {}).setdefault('labels', {}) + for key, val in labels.items(): + if force: + obj_labels[key] = val + else: + obj_labels.setdefault(key, val) + + +def adopt(objs, owner): + """ + The children should be in the same namespace, named after their parent, and owned by it. + """ + if not isinstance(objs, (list, tuple)): + objs = [objs] + + # Mark the children as owned by the parent. + append_owner_reference(objs, owner=owner) + + # The children objects are usually in the same namespace as the parent, unless explicitly overridden. + ns = owner.get('metadata', {}).get('namespace', None) + if ns is not None: + for obj in objs: + obj.setdefault('metadata', {}).setdefault('namespace', ns) + + # Name the children prefixed with their parent's name, unless they already have a name or a prefix. + # "GenerateName" is the Kubernetes feature, we do not implement it ourselves. + name = owner.get('metadata', {}).get('name', None) + if name is not None: + for obj in objs: + if obj.get('metadata', {}).get('name', None) is None: + obj.setdefault('metadata', {}).setdefault('generateName', f'{name}-') + + # The children also bear the labels of the parent object, for easier selection. + label(objs, labels=owner.get('metadata', {}).get('labels', {})) diff --git a/kopf/structs/lastseen.py b/kopf/structs/lastseen.py new file mode 100644 index 00000000..59488e40 --- /dev/null +++ b/kopf/structs/lastseen.py @@ -0,0 +1,74 @@ +""" +All the functions to keep track of the last seen state. + +The "state" is a snapshot of meaningful fields, which must be tracked +to identify the actual changes on the object (or absence of such). + +Used in the handling routines to check if there were significant changes at all +(i.e. not the internal and system changes, like the uids, links, etc), +and to get the exact per-field diffs for the specific handler functions. + +Conceptually similar to how `kubectl apply` stores the applied state +on any object, and then uses that for the patch calculation: +https://kubernetes.io/docs/concepts/overview/object-management-kubectl/declarative-config/ +""" + +import copy +import json + +from kopf.structs.diffs import diff + +LAST_SEEN_ANNOTATION = 'kopf.zalando.org/last-handled-configuration' +""" The annotation name for the last stored state of the resource. """ + + +def get_state(body): + """ + Extract only the relevant fields for the state comparisons. + """ + body = copy.deepcopy(body) + if LAST_SEEN_ANNOTATION in body.get('metadata', {}).get('annotations', {}): + del body['metadata']['annotations'][LAST_SEEN_ANNOTATION] + if 'kubectl.kubernetes.io/last-applied-configuration' in body.get('metadata', {}).get('annotations', {}): + del body['metadata']['annotations']['kubectl.kubernetes.io/last-applied-configuration'] + if 'finalizers' in body.get('metadata', {}): + del body['metadata']['finalizers'] + if 'creationTimestamp' in body.get('metadata', {}): + del body['metadata']['creationTimestamp'] + if 'selfLink' in body.get('metadata', {}): + del body['metadata']['selfLink'] + if 'uid' in body.get('metadata', {}): + del body['metadata']['uid'] + if 'resourceVersion' in body.get('metadata', {}): + del body['metadata']['resourceVersion'] + if 'status' in body: + del body['status'] + return body + + +def has_state(body): + return LAST_SEEN_ANNOTATION in body['metadata'].get('annotations', {}) + + +def is_state_changed(body): + # TODO: make it more efficient, so that the dicts are not rebuilt locally every time. + old = retreive_state(body) + new = get_state(body) + return old != new + + +def get_state_diffs(body): + old = retreive_state(body) + new = get_state(body) + return old, new, list(diff(old, new)) + + +def retreive_state(body): + state_str = body['metadata'].get('annotations', {}).get(LAST_SEEN_ANNOTATION, None) + state_obj = json.loads(state_str) if state_str is not None else None + return state_obj + + +def refresh_last_seen_state(*, body, patch): + state = get_state(body) + patch.setdefault('metadata', {}).setdefault('annotations', {})[LAST_SEEN_ANNOTATION] = json.dumps(state) diff --git a/kopf/structs/progress.py b/kopf/structs/progress.py new file mode 100644 index 00000000..f9d53591 --- /dev/null +++ b/kopf/structs/progress.py @@ -0,0 +1,151 @@ +""" +The routines to manipulate the handler progression over the event cycle. + +Used to track which handlers are finished, which are not yet, +and how many retries were there. + +There could be more than one low-level k8s watch-events per one actual +high-level kopf-event (a cause). The handlers are called at different times, +and the overall handling routine should persist the handler status somewhere. + +The structure is this:: + + metainfo: ... + spec: ... + status: ... + kopf: + progress: + handler1: + started: 2018-12-31T23:59:59,999999 + stopped: 2018-01-01T12:34:56,789000 + success: true + handler2: + started: 2018-12-31T23:59:59,999999 + stopped: 2018-01-01T12:34:56,789000 + failure: true + message: "Error message." + handler3: + started: 2018-12-31T23:59:59,999999 + retries: 30 + handler3/sub1: + started: 2018-12-31T23:59:59,999999 + delayed: 2018-01-01T12:34:56,789000 + retries: 10 + message: "Not ready yet." + handler3/sub2: + started: 2018-12-31T23:59:59,999999 + +* `status.kopf.success` are the handlers that succeeded (no re-execution). +* `status.kopf.failure` are the handlers that failed completely (no retries). +* `status.kopf.delayed` are the timestamps, until which these handlers sleep. +* `status.kopf.retries` are number of retries for succeeded, failed, + and for the progressing handlers. + +When the full event cycle is executed (possibly including multiple re-runs), +the whole `status.kopf` section is purged. The life-long persistence of status +is not intended: otherwise, multiple distinct causes will clutter the status +and collide with the each other (especially critical for multiple updates). +""" + +import datetime + + +def is_started(*, body, handler): + progress = body.get('status', {}).get('kopf', {}).get('progress', {}) + return handler.id in progress + + +def is_sleeping(*, body, handler): + ts = get_awake_time(body=body, handler=handler) + finished = is_finished(body=body, handler=handler) + return not finished and ts is not None and ts > datetime.datetime.utcnow() + + +def is_awakened(*, body, handler): + finished = is_finished(body=body, handler=handler) + sleeping = is_sleeping(body=body, handler=handler) + return not finished and not sleeping + + +def is_finished(*, body, handler): + progress = body.get('status', {}).get('kopf', {}).get('progress', {}) + success = progress.get(handler.id, {}).get('success', None) + failure = progress.get(handler.id, {}).get('failure', None) + return success or failure + + +def get_start_time(*, body, patch, handler): + progress = patch.get('status', {}).get('kopf', {}).get('progress', {}) + new_value = progress.get(handler.id, {}).get('started', None) + progress = body.get('status', {}).get('kopf', {}).get('progress', {}) + old_value = progress.get(handler.id, {}).get('started', None) + value = new_value or old_value + return None if value is None else datetime.datetime.fromisoformat(value) + + +def get_awake_time(*, body, handler): + progress = body.get('status', {}).get('kopf', {}).get('progress', {}) + value = progress.get(handler.id, {}).get('delayed', None) + return None if value is None else datetime.datetime.fromisoformat(value) + + +def get_retry_count(*, body, handler): + progress = body.get('status', {}).get('kopf', {}).get('progress', {}) + return progress.get(handler.id, {}).get('retries', 0) + + +def set_start_time(*, body, patch, handler): + progress = patch.setdefault('status', {}).setdefault('kopf', {}).setdefault('progress', {}) + progress.setdefault(handler.id, {}).update({ + 'started': datetime.datetime.utcnow().isoformat(), + }) + + +def set_awake_time(*, body, patch, handler, delay=None): + if delay is not None: + ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay) + ts = ts.isoformat() + else: + ts = None + progress = patch.setdefault('status', {}).setdefault('kopf', {}).setdefault('progress', {}) + progress.setdefault(handler.id, {}).update({ + 'delayed': ts, + }) + + +def set_retry_time(*, body, patch, handler, delay=None): + retry = get_retry_count(body=body, handler=handler) + progress = patch.setdefault('status', {}).setdefault('kopf', {}).setdefault('progress', {}) + progress.setdefault(handler.id, {}).update({ + 'retries': retry + 1, + }) + set_awake_time(body=body, patch=patch, handler=handler, delay=delay) + + +def store_failure(*, body, patch, handler, exc): + retry = get_retry_count(body=body, handler=handler) + progress = patch.setdefault('status', {}).setdefault('kopf', {}).setdefault('progress', {}) + progress.setdefault(handler.id, {}).update({ + 'stopped': datetime.datetime.utcnow().isoformat(), + 'failure': True, + 'retries': retry + 1, + 'message': f'{exc}', + }) + + +def store_success(*, body, patch, handler, result=None): + retry = get_retry_count(body=body, handler=handler) + progress = patch.setdefault('status', {}).setdefault('kopf', {}).setdefault('progress', {}) + progress.setdefault(handler.id, {}).update({ + 'stopped': datetime.datetime.utcnow().isoformat(), + 'success': True, + 'retries': retry + 1, + 'message': None, + }) + if result is not None: + # TODO: merge recursively (patch-merge), do not overwrite the keys if they are present. + patch.setdefault('status', {}).setdefault(handler.id, {}).update(result) + + +def purge_progress(*, body, patch): + patch.setdefault('status', {}).setdefault('kopf', {})['progress'] = None diff --git a/peering.yaml b/peering.yaml new file mode 100644 index 00000000..ed8cf7b0 --- /dev/null +++ b/peering.yaml @@ -0,0 +1,22 @@ +--- +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: kopfpeerings.zalando.org +spec: + group: zalando.org + versions: + - name: v1 + served: true + storage: true + names: + kind: KopfPeering + plural: kopfpeerings + singular: kopfpeering + scope: Cluster + +--- +apiVersion: zalando.org/v1 +kind: KopfPeering +metadata: + name: default diff --git a/setup.py b/setup.py index d83be3b3..08e953b3 100644 --- a/setup.py +++ b/setup.py @@ -3,13 +3,27 @@ setup( name='kopf', use_scm_version=True, + url='https://pypi.org/project/kopf/', + + author='Sergey Vasilyev', + author_email='sergey.vasilyev@zalando.de', packages=find_packages(), include_package_data=True, + entry_points={ + 'console_scripts': [ + 'kopf = kopf.cli:main', + ], + }, + python_requires='>=3.7', setup_requires=[ 'setuptools_scm', ], install_requires=[ + 'click', + 'iso8601', + 'aiojobs', + 'kubernetes', ], )