diff --git a/docs/p/api/at.latticetools.response_matrix.rst b/docs/p/api/at.latticetools.response_matrix.rst new file mode 100644 index 000000000..b60b5cb70 --- /dev/null +++ b/docs/p/api/at.latticetools.response_matrix.rst @@ -0,0 +1,15 @@ +at.latticetools.response\_matrix +================================ + +.. automodule:: at.latticetools.response_matrix + :inherited-members: + + + .. rubric:: Classes + + .. autosummary:: + + ResponseMatrix + OrbitResponseMatrix + TrajectoryResponseMatrix + \ No newline at end of file diff --git a/docs/p/index.rst b/docs/p/index.rst index 714b6ee7c..099afe969 100644 --- a/docs/p/index.rst +++ b/docs/p/index.rst @@ -36,7 +36,8 @@ Sub-packages howto/multiprocessing howto/CavityControl howto/Collective - Working with MAD-X files + Work with MAD-X files + Use response matrices .. autosummary:: :toctree: api diff --git a/docs/p/notebooks/observables.ipynb b/docs/p/notebooks/observables.ipynb index 668d8cef2..33e0c26a1 100644 --- a/docs/p/notebooks/observables.ipynb +++ b/docs/p/notebooks/observables.ipynb @@ -936,7 +936,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.20" + "version": "3.11.11" } }, "nbformat": 4, diff --git a/docs/p/notebooks/response_matrices.ipynb b/docs/p/notebooks/response_matrices.ipynb new file mode 100644 index 000000000..570c0f514 --- /dev/null +++ b/docs/p/notebooks/response_matrices.ipynb @@ -0,0 +1,932 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "addfb8d6-8b83-45b7-b3be-650fa0e5bed3", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "source": [ + "# Response matrices" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "ce595103-f8d4-4425-9c16-c4f766cd11c6", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [], + "source": [ + "import at\n", + "import numpy as np\n", + "import math\n", + "from pathlib import Path\n", + "from importlib.resources import files, as_file\n", + "from timeit import timeit\n", + "from at.future import VariableList, RefptsVariable" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "f21dfcdb-1fc4-4ab2-a34f-48d1eb467c23", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [], + "source": [ + "with as_file(files(\"machine_data\") / \"hmba.mat\") as path:\n", + " hmba_lattice = at.load_lattice(path)" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "f4b3676d-a5d3-4f16-9fd4-20788606b6b6", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [], + "source": [ + "for sx in hmba_lattice.select(at.Sextupole):\n", + " sx.KickAngle=[0,0]\n", + "hmba_lattice.enable_6d()" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "620def05-b2fe-4cce-8566-9bd10d2018af", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [], + "source": [ + "ring = hmba_lattice.repeat(8)" + ] + }, + { + "cell_type": "markdown", + "id": "64d73aed-8e8b-4be7-9489-0e0d4c70fd95", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "source": [ + "A {py:class}`.ResponseMatrix` object defines a general-purpose response matrix, based\n", + "on a {py:class}`.VariableList` of attributes which will be independently varied, and an\n", + "{py:class}`.ObservableList` of attributes which will be recorded for each\n", + "variable step.\n", + "\n", + "{py:class}`.ResponseMatrix` objects can be combined with the \"+\" operator to define\n", + "combined responses. This concatenates the variables and the observables.\n", + "\n", + "The module also defines two commonly used response matrices:\n", + "{py:class}`.OrbitResponseMatrix` for circular machines and\n", + "{py:class}`.TrajectoryResponseMatrix` for beam lines. Other matrices can be easily\n", + "defined by providing the desired Observables and Variables to the\n", + "{py:class}`.ResponseMatrix` base class." + ] + }, + { + "cell_type": "markdown", + "id": "08ae06fa-dea3-4542-acbc-4e59a14c66a7", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "source": [ + "## General purpose response matrix\n", + "\n", + "Let's take the horizontal displacements of all quadrupoles as variables:" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "68a54c6d-be1d-41a9-90f8-f496b22d076c", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [], + "source": [ + "variables = VariableList(RefptsVariable(ik, \"dx\", name=f\"dx_{ik}\", delta=.0001)\n", + " for ik in ring.get_uint32_index(at.Quadrupole))" + ] + }, + { + "cell_type": "markdown", + "id": "2f70b1b0-bfc0-4e01-8ada-d7d76b77f406", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "source": [ + "Variable names are set to _dx\\_nnnn_ where _nnnn_ is the index of the quadrupole in the ring.\n", + "\n", + "Let's take the horizontal positions at all beam position monitors as observables:" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "8490caa3-14c1-4c7f-ba1e-f4e64078b6a5", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [], + "source": [ + "observables = at.ObservableList([at.OrbitObservable(at.Monitor, axis='x')])" + ] + }, + { + "cell_type": "markdown", + "id": "fee7a274-979c-4503-90b1-b69d248bc317", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "source": [ + "We have a single Observable named _orbit[x]_ by default, with multiple values.\n", + "\n", + "### Instantiation" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "2d64d87b-ba18-4c22-921f-f6765f74080d", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [], + "source": [ + "resp_dx = at.ResponseMatrix(ring, variables, observables)" + ] + }, + { + "cell_type": "markdown", + "id": "9f283b27-6898-4c00-9ac5-ec9ba48bd9f5", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "source": [ + "At that point, the response matrix is empty.\n", + "\n", + "### Matrix Building\n", + "\n", + "A general purpose response matrix may be filled by several methods:\n", + "\n", + "1. Direct assignment of an array to the {py:attr}`~.ResponseMatrix.response` property.\n", + " The shape of the array is checked,\n", + "2. {py:meth}`~.ResponseMatrix.load` loads data from a file containing previously\n", + " saved values or experimentally measured values,\n", + "3. {py:meth}`~.ResponseMatrix.build_tracking` computes the matrix using tracking,\n", + "4. For some specialized response matrices\n", + " {py:meth}`~OrbitResponseMatrix.build_analytical` is available.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "23154d5d-bdf8-43ed-8b4a-cfc616d7e659", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [ + { + "data": { + "text/plain": [ + "array([[ 16.94897864, -7.67022307, 2.50968594, ..., 5.3125638 ,\n", + " -5.57239476, 14.39501938],\n", + " [-10.6549627 , 3.59085167, -6.21666755, ..., -2.46632948,\n", + " 8.42841189, -18.50171186],\n", + " [-10.99744814, 3.91741643, -5.60080281, ..., -2.73604877,\n", + " 7.61448343, -17.01886021],\n", + " ...,\n", + " [-17.0182359 , 7.61358522, -2.73824047, ..., -5.60018031,\n", + " 3.92050176, -11.00305834],\n", + " [-18.50166601, 8.42772786, -2.46888914, ..., -6.21619686,\n", + " 3.59444354, -10.66160249],\n", + " [ 14.38971545, -5.56943704, 5.31320321, ..., 2.50757509,\n", + " -7.6711941 , 16.94982252]], shape=(80, 128))" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "resp_dx.build_tracking(use_mp=True)" + ] + }, + { + "cell_type": "markdown", + "id": "6ac09735-b15f-49a4-b44e-4051889e0bae", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "source": [ + "### Matrix normalisation\n", + "\n", + "To be correctly inverted, the response matrix must be correctly normalised: the norms\n", + "of its columns must be of the same order of magnitude, and similarly for the rows.\n", + "\n", + "Normalisation is done by adjusting the weights {math}`w_v` for the variables {math}`\\mathbf{V}`\n", + "and {math}`w_o` for the observables {math}`\\mathbf{O}`.\n", + "With {math}`\\mathbf{R}` the response matrix:\n", + "\n", + ":::{math}\n", + "\n", + " \\mathbf{O} = \\mathbf{R} . \\mathbf{V}\n", + ":::\n", + "\n", + "The weighted response matrix {math}`\\mathbf{R}_w` is:\n", + "\n", + ":::{math}\n", + "\n", + " \\frac{\\mathbf{O}}{w_o} = \\mathbf{R}_w . \\frac{\\mathbf{V}}{w_v}\n", + ":::\n", + "The {math}`\\mathbf{R}_w` is dimensionless and should be normalised. This can be checked\n", + "using:\n", + "\n", + "* {py:meth}`~.ResponseMatrix.check_norm` which prints the ratio of the maximum / minimum\n", + " norms for variables and observables. These should be less than 10.\n", + "* {py:meth}`~.ResponseMatrix.plot_norm`\n", + "\n", + "Both natural and weighted response matrices can be retrieved with the\n", + "{py:meth}`~.ResponseMatrix.response` and {py:meth}`~.ResponseMatrix.weighted_response`\n", + "properties." + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "f74e856f-0c6f-4a89-9493-255c1cc3f03f", + "metadata": { + "editable": true, + "jp-MarkdownHeadingCollapsed": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "max/min Observables: 2.8352796928877786\n", + "max/min Variables: 4.768846272299542\n" + ] + }, + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "resp_dx.plot_norm()" + ] + }, + { + "cell_type": "markdown", + "id": "c348df35-e03e-478e-8d03-8b7656015b45", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "source": [ + "### Matrix pseudo-inversion\n", + "\n", + "The {py:meth}`~.ResponseMatrix.solve` method computes the singular values of the\n", + "weighted response matrix and its pseudo-inverse." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "64e02b6c-2b89-4f5a-b7e1-b9e4e8af0a8b", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [], + "source": [ + "resp_dx.solve()" + ] + }, + { + "cell_type": "markdown", + "id": "45546624-1884-4792-a9c1-d144a62488c9", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "source": [ + "We can plot the singular values:" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "bee36f70-ccab-4f5e-a746-c4d86cdbb3fc", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "resp_dx.plot_singular_values()" + ] + }, + { + "cell_type": "markdown", + "id": "5ceff349-0253-40c8-ae08-4a03aa935913", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "source": [ + "After solving, correction is available, for instance with\n", + "\n", + "* {py:meth}`~.ResponseMatrix.correction_matrix` which returns the correction matrix (pseudo-inverse of\n", + " the response matrix),\n", + "* {py:meth}`~.ResponseMatrix.get_correction` which returns a correction vector when given observed values,\n", + "* {py:meth}`~.ResponseMatrix.correct` which computes and optionally applies a correction\n", + " for the provided {py:class}`.Lattice`.\n", + "\n", + "### Exclusion of variables and observables\n", + "\n", + "Variables may be added to a set of excluded values, and similarly for observables.\n", + "Excluding an item does not change the response matrix. The values are excluded from the\n", + "pseudo-inversion of the response, possibly reducing the number of singular values.\n", + "After inversion, the correction matrix is expanded to its original size by inserting\n", + "zero lines and columns at the location of excluded items. This way:\n", + "\n", + "- error and correction vectors keep the same size independently of excluded values,\n", + "- excluded error values are ignored,\n", + "- excluded corrections are set to zero.\n", + "\n", + "#### Exclusion of variables\n", + "\n", + "Excluded variables are selected by their name or their index in the variable list:" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "317e2a0f-0e81-479e-89fb-71c49fe03d70", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [], + "source": [ + "resp_dx.exclude_vars(0, \"dx_9\", \"dx_47\", -1)" + ] + }, + { + "cell_type": "markdown", + "id": "d2a5eddb-56ec-4009-a7e8-d748214ee90c", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "source": [ + "Where *-1* refers to the last variable.\n", + "\n", + "#### Exclusion of observables\n", + "\n", + "Observables are selected by their name or their index in the observable list. In addition, for\n", + "{py:class}`.ElementObservable` observables, we need to specify a _refpts_ to identify which item\n", + "in the array will be excluded.\n", + "\n", + "Let's exclude all Monitors with name _BPM\\_07_:" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "4223c1bf-0108-4068-b938-4d9313c48684", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [], + "source": [ + "resp_dx.exclude_obs(obsid=\"orbit[x]\", refpts=\"BPM_07\")" + ] + }, + { + "cell_type": "markdown", + "id": "d09b2106-ee9b-4171-8c4c-36e855fb4958", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "source": [ + "Or by using the observable index:" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "6ab3809a-86e3-486f-a688-544c875c9550", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [ + "remove-output" + ] + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/var/folders/9d/tcctx2j125xd3nzkr5bp1zq4000103/T/ipykernel_63123/1399543609.py:1: AtWarning: No new excluded value\n", + " resp_dx.exclude_obs(obsid=0, refpts=\"BPM_07\")\n" + ] + } + ], + "source": [ + "resp_dx.exclude_obs(obsid=0, refpts=\"BPM_07\")" + ] + }, + { + "cell_type": "markdown", + "id": "9402e021-90b2-442a-9d7e-a1c02a1ec235", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "source": [ + "Or even, since there is a single observable and *obsid* defaults to *0*:" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "7acb0cf6-118b-4157-8bc1-6e464ffdaa71", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [ + "remove-output" + ] + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/var/folders/9d/tcctx2j125xd3nzkr5bp1zq4000103/T/ipykernel_63123/3319419291.py:1: AtWarning: No new excluded value\n", + " resp_dx.exclude_obs(refpts=\"BPM_07\")\n" + ] + } + ], + "source": [ + "resp_dx.exclude_obs(refpts=\"BPM_07\")" + ] + }, + { + "cell_type": "markdown", + "id": "2e806ea3-3edc-4265-9a6f-558720f0befb", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "source": [ + "After excluding items, the pseudo-inverse is discarded so one must recompute it again:" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "208f3e79-7ab5-4647-95bd-bb4d8787bea0", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "resp_dx.solve()\n", + "resp_dx.plot_singular_values()" + ] + }, + { + "cell_type": "markdown", + "id": "47f6baaa-1d3d-45c4-829d-0a129d849c5b", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "source": [ + "There are now only 72 singular values instead of 80 (number of active monitors).\n", + "\n", + "The excluded items can be retrieved with the {py:attr}`~.ResponseMatrix.excluded_obs` and\n", + "{py:attr}`~.ResponseMatrix.excluded_vars` properties:" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "1cb0b249-8cca-4409-9277-217ba1442a78", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'orbit[x]': array([ 79, 200, 321, 442, 563, 684, 805, 926], dtype=uint32)}\n", + "['dx_5', 'dx_9', 'dx_47', 'dx_964']\n" + ] + } + ], + "source": [ + "print(resp_dx.excluded_obs)\n", + "print(resp_dx.excluded_vars)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "29c9fde1-cc6d-4f62-8dba-cff7645a473f", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "source": [ + "The exclusion masks can be reset using {py:meth}`~.ResponseMatrix.reset_vars` and\n", + "{py:meth}`~.ResponseMatrix.reset_obs`.\n", + "\n", + "## Orbit response matrix\n", + "\n", + "An {py:class}`.OrbitResponseMatrix` defines its observables as instances of\n", + "{py:class}`.OrbitObservable` and its variables as _KickAngle_ attributes of elements.\n", + "\n", + "### Instantiation\n", + "\n", + "By default, the observables are all the {py:class}`.Monitor` elements, and the\n", + "variables are all the elements having a *KickAngle* attribute. This is equivalent to:\n", + "```python\n", + "resp_v = at.OrbitResponseMatrix(ring, \"v\", bpmrefs = at.Monitor,\n", + " steerrefs = at.checkattr(\"KickAngle\"))\n", + "```\n", + "The variable elements must have the *KickAngle* attribute used for correction.\n", + "It's available for all magnets, though not present by default except in\n", + "{py:class}`.Corrector` magnets. For other magnets, the attribute should be\n", + "explicitly created.\n", + "\n", + "There are options in {py:class}`.OrbitResponseMatrix` to include the RF frequency in the\n", + "variable list, and the sum of correction angles in the list of observables:" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "8e703ee8-0aba-452c-808d-0621ef4e5f1e", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "(81, 49)\n" + ] + } + ], + "source": [ + "resp_h = at.OrbitResponseMatrix(ring, \"h\", cavrefs=at.RFCavity, steersum=True)\n", + "print(resp_h.shape)" + ] + }, + { + "cell_type": "markdown", + "id": "ca0dca48-4b0d-4aa4-96c7-b5ed24280ec2", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "source": [ + "### Matrix building\n", + "\n", + "{py:class}`.OrbitResponseMatrix` has a {py:meth}`~.OrbitResponseMatrix.build_analytical` build method,\n", + "using formulas from [^Franchi].\n", + "\n", + "[^Franchi]: A. Franchi, S.M. Liuzzo, Z. Marti, _\"Analytic formulas for the rapid evaluation of the orbit\n", + "response matrix and chromatic functions from lattice parameters in circular accelerators\"_,\n", + "arXiv:1711.06589 [physics.acc-ph]" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "65a21d36-6d0d-46f0-ba72-5092fec3ab17", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [ + { + "data": { + "text/plain": [ + "array([[-8.54870374e+00, -1.81104969e+01, -1.21497454e+01, ...,\n", + " -2.85830326e+01, -1.60772960e+01, -5.75838151e-08],\n", + " [ 1.85290756e+01, 3.16574224e+01, 1.82799745e+01, ...,\n", + " 1.90656557e+01, 8.85865108e+00, -2.68128309e-06],\n", + " [ 1.68213604e+01, 2.87634523e+01, 1.65301788e+01, ...,\n", + " 1.94726063e+01, 9.44097149e+00, -2.44436213e-06],\n", + " ...,\n", + " [ 8.84897619e+00, 1.90656922e+01, 1.29411619e+01, ...,\n", + " 3.16578009e+01, 1.85390265e+01, -2.68138514e-06],\n", + " [-1.60775574e+01, -2.85833742e+01, -1.65903209e+01, ...,\n", + " -1.81113028e+01, -8.54900471e+00, -5.76356367e-08],\n", + " [ 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, ...,\n", + " 1.00000000e+00, 1.00000000e+00, 0.00000000e+00]],\n", + " shape=(81, 49))" + ] + }, + "execution_count": 19, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "resp_h.build_analytical()" + ] + }, + { + "cell_type": "markdown", + "id": "538b8409-99f1-4f9c-9f00-861a0bdb3ac4", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "source": [ + "### Matrix normalisation\n", + "\n", + "This is critical when including the RF frequency response which is not commensurate\n", + "with steerer responses. Similarly for rows, the sum of steerers is not commensurate with\n", + "monitor readings.\n", + "\n", + "By default, the normalisation is done automatically by adjusting the RF frequency step\n", + "and the weight of the steerer sum based on an approximate analytical response matrix.\n", + "Explicitly specifying the *cavdelta* and *stsumweight* prevents this automatic normalisation.\n", + "\n", + "After building the response matrix, and before solving, normalisation may be applied\n", + "with the {py:meth}`~.OrbitResponseMatrix.normalise` method. The default normalisation\n", + "gives a higher priority to RF response and steerer sum." + ] + }, + { + "cell_type": "markdown", + "id": "eb624b1d-1624-4196-909e-35c055f3aa34", + "metadata": { + "editable": true, + "raw_mimetype": "", + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "source": [ + "## Trajectory response matrix\n", + "\n", + "A {py:class}`.TrajectoryResponseMatrix` defines its observables as instances of\n", + "{py:class}`.TrajectoryObservable` and its variables as _KickAngle_ attributes of elements.\n", + "\n", + "### Instantiation\n", + "\n", + "By default, the observables are all the {py:class}`.Monitor` elements, and the\n", + "variables are all the elements having a *KickAngle* attribute. This is equivalent to:\n", + "```python\n", + "resp_v = at.TrajectoryResponseMatrix(lattice, \"v\", bpmrefs = at.Monitor,\n", + " steerrefs = at.checkattr(\"KickAngle\"))\n", + "```\n", + "The variable elements must have the *KickAngle* attribute used for correction.\n", + "It's available for all magnets, though not present by default except in\n", + "{py:class}`.Corrector` magnets. For other magnets, the attribute should be\n", + "explicitly created." + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "8c7de11a-d8e5-4fd9-9784-25a9244a0305", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "(80, 48)\n" + ] + } + ], + "source": [ + "resp_h = at.TrajectoryResponseMatrix(ring, \"h\")\n", + "print(resp_h.shape)" + ] + }, + { + "cell_type": "markdown", + "id": "896f42fa-6c67-44b4-8e54-3ff39589949a", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "source": [ + "## References" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.11" + }, + "toc": { + "base_numbering": 2 + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/pyat/at/lattice/axisdef.py b/pyat/at/lattice/axisdef.py index 6f51608cf..5fa4222be 100644 --- a/pyat/at/lattice/axisdef.py +++ b/pyat/at/lattice/axisdef.py @@ -1,23 +1,23 @@ """Helper functions for axis and plane descriptions""" from __future__ import annotations -from typing import Optional, Union -# For sys.version_info.minor < 9: +# Necessary for type aliases in python <= 3.8 : from typing import Tuple +from typing import Union AxisCode = Union[str, int, slice, None, type(Ellipsis)] AxisDef = Union[AxisCode, Tuple[AxisCode, AxisCode]] -_axis_def = dict( - x=dict(index=0, label="x", unit=" [m]"), - px=dict(index=1, label=r"$p_x$", unit=" [rad]"), - y=dict(index=2, label="y", unit=" [m]"), - py=dict(index=3, label=r"$p_y$", unit=" [rad]"), - dp=dict(index=4, label=r"$\delta$", unit=""), - ct=dict(index=5, label=r"$\beta c \tau$", unit=" [m]"), -) -for xk, xv in [it for it in _axis_def.items()]: +_axis_def = { + "x": {"index": 0, "label": "x", "unit": " [m]"}, + "px": {"index": 1, "label": r"$p_x$", "unit": " [rad]"}, + "y": {"index": 2, "label": "y", "unit": " [m]"}, + "py": {"index": 3, "label": r"$p_y$", "unit": " [rad]"}, + "dp": {"index": 4, "label": r"$\delta$", "unit": ""}, + "ct": {"index": 5, "label": r"$\beta c \tau$", "unit": " [m]"}, +} +for xk, xv in list(_axis_def.items()): xv["code"] = xk _axis_def[xv["index"]] = xv _axis_def[xk.upper()] = xv @@ -26,15 +26,15 @@ _axis_def["yp"] = _axis_def["py"] # For backward compatibility _axis_def["s"] = _axis_def["ct"] _axis_def["S"] = _axis_def["ct"] -_axis_def[None] = dict(index=None, label="", unit="", code=":") -_axis_def[Ellipsis] = dict(index=Ellipsis, label="", unit="", code="...") - -_plane_def = dict( - x=dict(index=0, label="x", unit=" [m]"), - y=dict(index=1, label="y", unit=" [m]"), - z=dict(index=2, label="z", unit=""), -) -for xk, xv in [it for it in _plane_def.items()]: +_axis_def[None] = {"index": None, "label": "", "unit": "", "code": ":"} +_axis_def[Ellipsis] = {"index": Ellipsis, "label": "", "unit": "", "code": "..."} + +_plane_def = { + "x": {"index": 0, "label": "x", "unit": " [m]"}, + "y": {"index": 1, "label": "y", "unit": " [m]"}, + "z": {"index": 2, "label": "z", "unit": ""}, +} +for xk, xv in list(_plane_def.items()): xv["code"] = xk _plane_def[xv["index"]] = xv _plane_def[xk.upper()] = xv @@ -42,25 +42,27 @@ _plane_def["v"] = _plane_def["y"] _plane_def["H"] = _plane_def["x"] _plane_def["V"] = _plane_def["y"] -_plane_def[None] = dict(index=None, label="", unit="", code=":") -_plane_def[Ellipsis] = dict(index=Ellipsis, label="", unit="", code="...") +_plane_def[None] = {"index": None, "label": "", "unit": "", "code": ":"} +_plane_def[Ellipsis] = {"index": Ellipsis, "label": "", "unit": "", "code": "..."} -def _descr(dd: dict, arg: AxisDef, key: Optional[str] = None): - if isinstance(arg, tuple): - return tuple(_descr(dd, a, key=key) for a in arg) - else: - try: - descr = dd[arg] - except (TypeError, KeyError): - descr = dict(index=arg, code=arg, label="", unit="") - if key is None: - return descr +def _descr(dd: dict, *args: AxisDef, key: str | None = None): + for arg in args: + if isinstance(arg, tuple): + for a in arg: + yield from _descr(dd, a, key=key) else: - return descr[key] + if isinstance(arg, slice): + descr = {"index": arg, "code": arg, "label": "", "unit": ""} + else: + descr = dd[arg] + if key is None: + yield descr + else: + yield descr[key] -def axis_(axis: AxisDef, key: Optional[str] = None): +def axis_(*axis: AxisDef, key: str | None = None): r"""Return axis descriptions Parameters: @@ -100,28 +102,32 @@ def axis_(axis: AxisDef, key: Optional[str] = None): Examples: - >>> axis_(('x','dp'), key='index') + >>> axis_("x", "dp", key="index") (0, 4) returns the indices in the standard coordinate vector - >>> dplabel = axis_('dp', key='label') + >>> dplabel = axis_("dp", key="label") >>> print(dplabel) $\delta$ returns the coordinate label for plot annotation - >>> axis_((0,'dp')) + >>> axis_(0, "dp") ({'plane': 0, 'label': 'x', 'unit': ' [m]', 'code': 'x'}, {'plane': 4, 'label': '$\\delta$', 'unit': '', 'code': 'dp'}) returns the entire description directories """ - return _descr(_axis_def, axis, key=key) + ret = tuple(_descr(_axis_def, *axis, key=key)) + if len(ret) > 1: + return ret + else: + return ret[0] -def plane_(plane: AxisDef, key: Optional[str] = None): +def plane_(*plane: AxisDef, key: str | None = None): r"""Return plane descriptions Parameters: @@ -154,16 +160,20 @@ def plane_(plane: AxisDef, key: Optional[str] = None): Examples: - >>> plane_('v', key='index') + >>> plane_("v", key="index") 1 returns the indices in the standard coordinate vector - >>> plane_(('x','y')) - ({'plane': 0, 'label': 'h', 'unit': ' [m]', 'code': 'h'}, - {'plane': 1, 'label': 'v', 'unit': ' [m]', 'code': 'v'}) + >>> plane_("x", "y") + ({'plane': 0, 'label': 'x', 'unit': ' [m]', 'code': 'h'}, + {'plane': 1, 'label': 'y', 'unit': ' [m]', 'code': 'v'}) returns the entire description directories """ - return _descr(_plane_def, plane, key=key) + ret = tuple(_descr(_plane_def, *plane, key=key)) + if len(ret) > 1: + return ret + else: + return ret[0] diff --git a/pyat/at/lattice/utils.py b/pyat/at/lattice/utils.py index ff87986a8..179cd97b2 100644 --- a/pyat/at/lattice/utils.py +++ b/pyat/at/lattice/utils.py @@ -30,47 +30,63 @@ is :py:obj:`True` for selected elements. """ -import numpy + +from __future__ import annotations + import functools import re -from typing import Callable, Optional, Sequence, Iterator -from typing import Union, Tuple, List, Type from enum import Enum +from fnmatch import fnmatch from itertools import compress from operator import attrgetter -from fnmatch import fnmatch +from typing import Optional, Union +# Necessary for type aliases in python <= 3.8 : +# from collections.abc import Callable, Sequence, Iterator +from typing import Callable, Sequence, Iterator, Type + +import numpy +import numpy.typing as npt + from .elements import Element, Dipole _GEOMETRY_EPSIL = 1.0e-3 -ElementFilter = Callable[[Element], bool] -BoolRefpts = numpy.ndarray -Uint32Refpts = numpy.ndarray - - -__all__ = ['All', 'End', 'AtError', 'AtWarning', 'axis_descr', - 'check_radiation', 'check_6d', - 'set_radiation', 'set_6d', - 'make_copy', 'uint32_refpts', 'bool_refpts', - 'get_uint32_index', 'get_bool_index', - 'checkattr', 'checktype', 'checkname', - 'get_elements', 'get_s_pos', - 'refpts_count', 'refpts_iterator', - 'set_shift', 'set_tilt', 'set_rotation', - 'tilt_elem', 'shift_elem', 'rotate_elem', - 'get_value_refpts', 'set_value_refpts', 'Refpts', - 'get_geometry', 'setval', 'getval'] - -_axis_def = dict( - x=dict(index=0, label="x", unit=" [m]"), - xp=dict(index=1, label="x'", unit=" [rad]"), - y=dict(index=2, label="y", unit=" [m]"), - yp=dict(index=3, label="y'", unit=" [rad]"), - dp=dict(index=4, label=r"$\delta$", unit=""), - ct=dict(index=5, label=r"$\beta c \tau$", unit=" [m]"), -) -for vvv in [vv for vv in _axis_def.values()]: - _axis_def[vvv['index']] = vvv +__all__ = [ + "All", + "End", + "AtError", + "AtWarning", + "BoolRefpts", + "Uint32Refpts", + "check_radiation", + "check_6d", + "set_radiation", + "set_6d", + "make_copy", + "uint32_refpts", + "bool_refpts", + "get_uint32_index", + "get_bool_index", + "checkattr", + "checktype", + "checkname", + "get_elements", + "get_s_pos", + "refpts_count", + "refpts_iterator", + "set_shift", + "set_tilt", + "set_rotation", + "tilt_elem", + "shift_elem", + "rotate_elem", + "get_value_refpts", + "set_value_refpts", + "Refpts", + "get_geometry", + "setval", + "getval", +] class AtError(Exception): @@ -83,14 +99,17 @@ class AtWarning(UserWarning): _typ1 = "None, All, End, int, bool" -_typ2 = "None, All, End, int, bool, str, Type[Element], ElementFilter" +_typ2 = "None, All, End, int, bool, str, type[Element], ElementFilter" class RefptsCode(Enum): - All = 'All' - End = 'End' + All = "All" + End = "End" +ElementFilter = Callable[[Element], bool] +BoolRefpts = npt.NDArray[bool] +Uint32Refpts = npt.NDArray[numpy.uint32] RefIndex = Union[None, int, Sequence[int], bool, Sequence[bool], RefptsCode] Refpts = Union[Type[Element], Element, ElementFilter, str, RefIndex] @@ -105,19 +124,44 @@ class RefptsCode(Enum): End = RefptsCode.End +def _chkattr(attrname: str, el): + return hasattr(el, attrname) + + +def _chkattrval(attrname: str, attrvalue, el): + try: + v = getattr(el, attrname) + except AttributeError: + return False + else: + return v == attrvalue + + +def _chkpattern(pattern: str, el): + return fnmatch(el.FamName, pattern) + + +def _chkregex(pattern: str, el): + rgx = re.compile(pattern) + return rgx.fullmatch(el.FamName) + + +def _chktype(eltype: type, el): + return isinstance(el, eltype) + + def _type_error(refpts, types): if isinstance(refpts, numpy.ndarray): tp = refpts.dtype.type else: tp = type(refpts) - return TypeError( - "Invalid refpts type {0}. Allowed types: {1}".format(tp, types)) + return TypeError(f"Invalid refpts type {tp}. Allowed types: {types}") # setval and getval return pickleable functions: no inner, nested function # are allowed. So nested functions are replaced be module-level callable # class instances -class _AttrItemGetter(object): +class _AttrItemGetter: __slots__ = ["attrname", "index"] def __init__(self, attrname: str, index: int): @@ -133,7 +177,7 @@ def getval(attrname: str, index: Optional[int] = None) -> Callable: attribute *attrname* of its operand. Examples: - After ``f = getval('Length')``, ``f(elem)`` returns ``elem.Length`` - - After ``f = getval('PolynomB, index=1)``, ``f(elem)`` returns + - After ``f = getval('PolynomB', index=1)``, ``f(elem)`` returns ``elem.PolynomB[1]`` """ @@ -143,7 +187,7 @@ def getval(attrname: str, index: Optional[int] = None) -> Callable: return _AttrItemGetter(attrname, index) -class _AttrSetter(object): +class _AttrSetter: __slots__ = ["attrname"] def __init__(self, attrname: str): @@ -153,7 +197,7 @@ def __call__(self, elem, value): setattr(elem, self.attrname, value) -class _AttrItemSetter(object): +class _AttrItemSetter: __slots__ = ["attrname", "index"] def __init__(self, attrname: str, index: int): @@ -170,7 +214,7 @@ def setval(attrname: str, index: Optional[int] = None) -> Callable: - After ``f = setval('Length')``, ``f(elem, value)`` is equivalent to ``elem.Length = value`` - - After ``f = setval('PolynomB, index=1)``, ``f(elem, value)`` is + - After ``f = setval('PolynomB', index=1)``, ``f(elem, value)`` is equivalent to ``elem.PolynomB[1] = value`` """ @@ -180,56 +224,6 @@ def setval(attrname: str, index: Optional[int] = None) -> Callable: return _AttrItemSetter(attrname, index) -# noinspection PyIncorrectDocstring -def axis_descr(*args, key=None) -> Tuple: - r"""axis_descr(axis [ ,axis], key=None) - - Return a tuple containing for each input argument the requested information - - Parameters: - axis (Union[int, str]): either an index in 0:6 or a string in - ['x', 'xp', 'y', 'yp', 'dp', 'ct'] - key: key in the coordinate description - dictionary, selecting the desired information. One of : - - 'index' - index in the standard AT coordinate vector - 'label' - label for plot annotation - 'unit' - coordinate unit - :py:obj:`None` - entire description dictionary - - Returns: - descr (Tuple): requested information for each input argument. - - Examples: - - >>> axis_descr('x','dp', key='index') - (0, 4) - - returns the indices in the standard coordinate vector - - >>> dplabel, = axis_descr('dp', key='label') - >>> print(dplabel) - $\delta$ - - returns the coordinate label for plot annotation - - >>> axis_descr('x','dp') - ({'index': 0, 'label': 'x', 'unit': ' [m]'}, - {'index': 4, 'label': '$\\delta$', 'unit': ''}) - - returns the entire description directories - - """ - if key is None: - return tuple(_axis_def[k] for k in args) - else: - return tuple(_axis_def[k][key] for k in args) - - def check_radiation(rad: bool) -> Callable: r"""Deprecated decorator for optics functions (see :py:func:`check_6d`). @@ -271,15 +265,17 @@ def check_6d(is_6d: bool) -> Callable: See Also: :py:func:`set_6d` """ + def radiation_decorator(func): @functools.wraps(func) def wrapper(ring, *args, **kwargs): - ringrad = getattr(ring, 'is_6d', is_6d) + ringrad = getattr(ring, "is_6d", is_6d) if ringrad != is_6d: - raise AtError('{0} needs "ring.is_6d" {1}'.format( - func.__name__, is_6d)) + raise AtError(f'{func.__name__} needs "ring.is_6d" {is_6d}') return func(ring, *args, **kwargs) + return wrapper + return radiation_decorator @@ -311,21 +307,26 @@ def set_6d(is_6d: bool) -> Callable: See Also: :py:func:`check_6d`, :py:meth:`.Lattice.enable_6d`, :py:meth:`.Lattice.disable_6d` - """ + """ if is_6d: + def setrad_decorator(func): @functools.wraps(func) def wrapper(ring, *args, **kwargs): rg = ring if ring.is_6d else ring.enable_6d(copy=True) return func(rg, *args, **kwargs) + return wrapper else: + def setrad_decorator(func): @functools.wraps(func) def wrapper(ring, *args, **kwargs): rg = ring.disable_6d(copy=True) if ring.is_6d else ring return func(rg, *args, **kwargs) + return wrapper + return setrad_decorator @@ -346,6 +347,7 @@ def make_copy(copy: bool) -> Callable: :pycode:`ring` """ if copy: + def copy_decorator(func): @functools.wraps(func) def wrapper(ring, refpts, *args, **kwargs): @@ -353,20 +355,22 @@ def wrapper(ring, refpts, *args, **kwargs): ring = ring.replace(refpts) except AttributeError: check = get_bool_index(ring, refpts) - ring = [el.deepcopy() if ok else el - for el, ok in zip(ring, check)] + ring = [el.deepcopy() if ok else el for el, ok in zip(ring, check)] func(ring, refpts, *args, **kwargs) return ring + return wrapper else: + def copy_decorator(func): return func + return copy_decorator -def uint32_refpts(refpts: RefIndex, n_elements: int, - endpoint: bool = True, - types: str = _typ1) -> Uint32Refpts: +def uint32_refpts( + refpts: RefIndex, n_elements: int, endpoint: bool = True, types: str = _typ1 +) -> Uint32Refpts: r"""Return a :py:obj:`~numpy.uint32` array of element indices selecting ring elements. @@ -390,7 +394,7 @@ def uint32_refpts(refpts: RefIndex, n_elements: int, """ refs = numpy.ravel(refpts) if refpts is RefptsCode.All: - stop = n_elements+1 if endpoint else n_elements + stop = n_elements + 1 if endpoint else n_elements return numpy.arange(stop, dtype=numpy.uint32) elif refpts is RefptsCode.End: if not endpoint: @@ -401,23 +405,22 @@ def uint32_refpts(refpts: RefIndex, n_elements: int, elif numpy.issubdtype(refs.dtype, numpy.bool_): return numpy.flatnonzero(refs).astype(numpy.uint32) elif numpy.issubdtype(refs.dtype, numpy.integer): - # Handle negative indices if endpoint: - refs = numpy.array([i if (i == n_elements) else i % n_elements - for i in refs], dtype=numpy.uint32) + refs = numpy.array( + [i if (i == n_elements) else i % n_elements for i in refs], + dtype=numpy.uint32, + ) else: - refs = numpy.array([i % n_elements - for i in refs], dtype=numpy.uint32) + refs = numpy.array([i % n_elements for i in refs], dtype=numpy.uint32) # Check ascending if refs.size > 1: prev = refs[0] for nxt in refs[1:]: if nxt < prev: - raise IndexError('Index out of range or not in ascending' - ' order') + raise IndexError("Index out of range or not in ascending order") elif nxt == prev: - raise IndexError('Duplicated index') + raise IndexError("Duplicated index") prev = nxt return refs @@ -426,9 +429,9 @@ def uint32_refpts(refpts: RefIndex, n_elements: int, # noinspection PyIncorrectDocstring -def get_uint32_index(ring: Sequence[Element], refpts: Refpts, - endpoint: bool = True, - regex: bool = False) -> Uint32Refpts: +def get_uint32_index( + ring: Sequence[Element], refpts: Refpts, endpoint: bool = True, regex: bool = False +) -> Uint32Refpts: # noinspection PyUnresolvedReferences, PyShadowingNames r"""Returns an integer array of element indices, selecting ring elements. @@ -456,7 +459,7 @@ def get_uint32_index(ring: Sequence[Element], refpts: Refpts, numpy array([:pycode:`len(ring)+1`]) - >>> get_uint32_index(ring, at.checkattr('Frequency')) + >>> get_uint32_index(ring, at.checkattr("Frequency")) array([0], dtype=uint32) numpy array of indices of all elements having a 'Frequency' @@ -473,13 +476,14 @@ def get_uint32_index(ring: Sequence[Element], refpts: Refpts, else: return uint32_refpts(refpts, len(ring), endpoint=endpoint, types=_typ2) - return numpy.fromiter((i for i, el in enumerate(ring) if checkfun(el)), - dtype=numpy.uint32) + return numpy.fromiter( + (i for i, el in enumerate(ring) if checkfun(el)), dtype=numpy.uint32 + ) -def bool_refpts(refpts: RefIndex, n_elements: int, - endpoint: bool = True, - types: str = _typ1) -> BoolRefpts: +def bool_refpts( + refpts: RefIndex, n_elements: int, endpoint: bool = True, types: str = _typ1 +) -> BoolRefpts: r"""Returns a :py:class:`bool` array of element indices, selecting ring elements. @@ -502,7 +506,7 @@ def bool_refpts(refpts: RefIndex, n_elements: int, :py:class:`.Element`\ s in a lattice. """ refs = numpy.ravel(refpts) - stop = n_elements+1 if endpoint else n_elements + stop = n_elements + 1 if endpoint else n_elements if refpts is RefptsCode.All: return numpy.ones(stop, dtype=bool) elif refpts is RefptsCode.End: @@ -528,8 +532,9 @@ def bool_refpts(refpts: RefIndex, n_elements: int, # noinspection PyIncorrectDocstring -def get_bool_index(ring: Sequence[Element], refpts: Refpts, - endpoint: bool = True, regex: bool = False) -> BoolRefpts: +def get_bool_index( + ring: Sequence[Element], refpts: Refpts, endpoint: bool = True, regex: bool = False +) -> BoolRefpts: # noinspection PyUnresolvedReferences, PyShadowingNames r"""Returns a bool array of element indices, selecting ring elements. @@ -557,7 +562,7 @@ def get_bool_index(ring: Sequence[Element], refpts: Refpts, Returns a numpy array of booleans where all elements whose *FamName* matches "Q[FD]*" are :py:obj:`True` - >>> refpts = get_bool_index(ring, at.checkattr('K', 0.0)) + >>> refpts = get_bool_index(ring, at.checkattr("K", 0.0)) Returns a numpy array of booleans where all elements whose *K* attribute is 0.0 are :py:obj:`True` @@ -583,8 +588,7 @@ def get_bool_index(ring: Sequence[Element], refpts: Refpts, return boolrefs -def checkattr(attrname: str, attrvalue: Optional = None) \ - -> ElementFilter: +def checkattr(attrname: str, attrvalue: Optional = None) -> ElementFilter: # noinspection PyUnresolvedReferences r"""Checks the presence or the value of an attribute @@ -605,27 +609,23 @@ def checkattr(attrname: str, attrvalue: Optional = None) \ Examples: - >>> cavs = filter(checkattr('Frequency'), ring) + >>> cavs = filter(checkattr("Frequency"), ring) Returns an iterator over all elements in *ring* that have a :pycode:`Frequency` attribute - >>> elts = filter(checkattr('K', 0.0), ring) + >>> elts = filter(checkattr("K", 0.0), ring) Returns an iterator over all elements in ring that have a :pycode:`K` attribute equal to 0.0 """ - def testf(el): - try: - v = getattr(el, attrname) - return (attrvalue is None) or (v == attrvalue) - except AttributeError: - return False - - return testf + if attrvalue is None: + return functools.partial(_chkattr, attrname) + else: + return functools.partial(_chkattrval, attrname, attrvalue) -def checktype(eltype: Union[type, Tuple[type, ...]]) -> ElementFilter: +def checktype(eltype: Union[type, tuple[type, ...]]) -> ElementFilter: # noinspection PyUnresolvedReferences r"""Checks the type of an element @@ -646,7 +646,7 @@ def checktype(eltype: Union[type, Tuple[type, ...]]) -> ElementFilter: Returns an iterator over all quadrupoles in ring """ - return lambda el: isinstance(el, eltype) + return functools.partial(_chktype, eltype) def checkname(pattern: str, regex: bool = False) -> ElementFilter: @@ -669,19 +669,19 @@ def checkname(pattern: str, regex: bool = False) -> ElementFilter: Examples: - >>> qps = filter(checkname('QF*'), ring) + >>> qps = filter(checkname("QF*"), ring) Returns an iterator over all with name starting with ``QF``. """ if regex: - rgx = re.compile(pattern) - return lambda el: rgx.fullmatch(el.FamName) + return functools.partial(_chkregex, pattern) else: - return lambda el: fnmatch(el.FamName, pattern) + return functools.partial(_chkpattern, pattern) -def refpts_iterator(ring: Sequence[Element], refpts: Refpts, - regex: bool = False) -> Iterator[Element]: +def refpts_iterator( + ring: Sequence[Element], refpts: Refpts, regex: bool = False +) -> Iterator[Element]: r"""Return an iterator over selected elements in a lattice Parameters: @@ -722,9 +722,9 @@ def refpts_iterator(ring: Sequence[Element], refpts: Refpts, # noinspection PyUnusedLocal,PyIncorrectDocstring -def refpts_count(refpts: RefIndex, n_elements: int, - endpoint: bool = True, - types: str = _typ1) -> int: +def refpts_count( + refpts: RefIndex, n_elements: int, endpoint: bool = True, types: str = _typ1 +) -> int: r"""Returns the number of reference points Parameters: @@ -745,7 +745,7 @@ def refpts_count(refpts: RefIndex, n_elements: int, """ refs = numpy.ravel(refpts) if refpts is RefptsCode.All: - return n_elements+1 if endpoint else n_elements + return n_elements + 1 if endpoint else n_elements elif refpts is RefptsCode.End: if not endpoint: raise IndexError('"End" index out of range') @@ -760,8 +760,9 @@ def refpts_count(refpts: RefIndex, n_elements: int, raise _type_error(refpts, types) -def _refcount(ring: Sequence[Element], refpts: Refpts, - endpoint: bool = True, regex: bool = False) -> int: +def _refcount( + ring: Sequence[Element], refpts: Refpts, endpoint: bool = True, regex: bool = False +) -> int: # noinspection PyUnresolvedReferences, PyShadowingNames r"""Returns the number of reference points @@ -792,7 +793,7 @@ def _refcount(ring: Sequence[Element], refpts: Refpts, 121 Returns *len(ring)* - """ + """ if isinstance(refpts, type): checkfun = checktype(refpts) elif callable(refpts): @@ -808,8 +809,7 @@ def _refcount(ring: Sequence[Element], refpts: Refpts, # noinspection PyUnusedLocal,PyIncorrectDocstring -def get_elements(ring: Sequence[Element], refpts: Refpts, - regex: bool = False) -> list: +def get_elements(ring: Sequence[Element], refpts: Refpts, regex: bool = False) -> list: r"""Returns a list of elements selected by *key*. Deprecated: :pycode:`get_elements(ring, refpts)` is :pycode:`ring[refpts]` @@ -827,9 +827,13 @@ def get_elements(ring: Sequence[Element], refpts: Refpts, return list(refpts_iterator(ring, refpts, regex=regex)) -def get_value_refpts(ring: Sequence[Element], refpts: Refpts, - attrname: str, index: Optional[int] = None, - regex: bool = False): +def get_value_refpts( + ring: Sequence[Element], + refpts: Refpts, + attrname: str, + index: Optional[int] = None, + regex: bool = False, +): r"""Extracts attribute values from selected lattice :py:class:`.Element`\ s. @@ -847,14 +851,21 @@ def get_value_refpts(ring: Sequence[Element], refpts: Refpts, attrvalues: numpy Array of attribute values. """ getf = getval(attrname, index=index) - return numpy.array([getf(elem) for elem in refpts_iterator(ring, refpts, - regex=regex)]) - - -def set_value_refpts(ring: Sequence[Element], refpts: Refpts, - attrname: str, attrvalues, index: Optional[int] = None, - increment: bool = False, - copy: bool = False, regex: bool = False): + return numpy.array( + [getf(elem) for elem in refpts_iterator(ring, refpts, regex=regex)] + ) + + +def set_value_refpts( + ring: Sequence[Element], + refpts: Refpts, + attrname: str, + attrvalues, + index: Optional[int] = None, + increment: bool = False, + copy: bool = False, + regex: bool = False, +): r"""Set the values of an attribute of an array of elements based on their refpts @@ -885,13 +896,11 @@ def set_value_refpts(ring: Sequence[Element], refpts: Refpts, """ setf = setval(attrname, index=index) if increment: - attrvalues += get_value_refpts(ring, refpts, - attrname, index=index, - regex=regex) + attrvalues += get_value_refpts(ring, refpts, attrname, index=index, regex=regex) else: - attrvalues = numpy.broadcast_to(attrvalues, - (_refcount(ring, refpts, - regex=regex),)) + attrvalues = numpy.broadcast_to( + attrvalues, (_refcount(ring, refpts, regex=regex),) + ) # noinspection PyShadowingNames @make_copy(copy) @@ -902,8 +911,9 @@ def apply(ring, refpts, values, regex): return apply(ring, refpts, attrvalues, regex) -def get_s_pos(ring: Sequence[Element], refpts: Refpts = All, - regex: bool = False) -> Sequence[float]: +def get_s_pos( + ring: Sequence[Element], refpts: Refpts = All, regex: bool = False +) -> Sequence[float]: # noinspection PyUnresolvedReferences r"""Returns the locations of selected elements @@ -923,16 +933,21 @@ def get_s_pos(ring: Sequence[Element], refpts: Refpts = All, array([26.37428795]) Position at the end of the last element: length of the lattice - """ + """ # Positions at the end of each element. - s_pos = numpy.cumsum([getattr(el, 'Length', 0.0) for el in ring]) + s_pos = numpy.cumsum([getattr(el, "Length", 0.0) for el in ring]) # Prepend position at the start of the first element. s_pos = numpy.concatenate(([0.0], s_pos)) return s_pos[get_bool_index(ring, refpts, regex=regex)] -def rotate_elem(elem: Element, tilt: float = 0.0, pitch: float = 0.0, - yaw: float = 0.0, relative: bool = False) -> None: +def rotate_elem( + elem: Element, + tilt: float = 0.0, + pitch: float = 0.0, + yaw: float = 0.0, + relative: bool = False, +) -> None: r"""Set the tilt, pitch and yaw angle of an :py:class:`.Element`. The tilt is a rotation around the *s*-axis, the pitch is a rotation around the *x*-axis and the yaw is a rotation around @@ -964,13 +979,14 @@ def rotate_elem(elem: Element, tilt: float = 0.0, pitch: float = 0.0, relative: If :py:obj:`True`, the rotation is added to the previous one """ + # noinspection PyShadowingNames def _get_rm_tv(le, tilt, pitch, yaw): tilt = numpy.around(tilt, decimals=15) pitch = numpy.around(pitch, decimals=15) yaw = numpy.around(yaw, decimals=15) ct, st = numpy.cos(tilt), numpy.sin(tilt) - ap, ay = 0.5*le*numpy.tan(pitch), 0.5*le*numpy.tan(yaw) + ap, ay = 0.5 * le * numpy.tan(pitch), 0.5 * le * numpy.tan(yaw) rr1 = numpy.asfortranarray(numpy.diag([ct, ct, ct, ct, 1.0, 1.0])) rr1[0, 2] = st rr1[1, 3] = st @@ -979,10 +995,10 @@ def _get_rm_tv(le, tilt, pitch, yaw): rr2 = rr1.T t1 = numpy.array([ay, numpy.sin(-yaw), -ap, numpy.sin(pitch), 0, 0]) t2 = numpy.array([ay, numpy.sin(yaw), -ap, numpy.sin(-pitch), 0, 0]) - rt1 = numpy.eye(6, order='F') + rt1 = numpy.eye(6, order="F") rt1[1, 4] = t1[1] rt1[3, 4] = t1[3] - rt2 = numpy.eye(6, order='F') + rt2 = numpy.eye(6, order="F") rt2[1, 4] = t2[1] rt2[3, 4] = t2[3] return rr1 @ rt1, rt2 @ rr2, t1, t2 @@ -992,17 +1008,17 @@ def _get_rm_tv(le, tilt, pitch, yaw): yaw0 = 0.0 t10 = numpy.zeros(6) t20 = numpy.zeros(6) - if hasattr(elem, 'R1') and hasattr(elem, 'R2'): - rr10 = numpy.eye(6, order='F') + if hasattr(elem, "R1") and hasattr(elem, "R2"): + rr10 = numpy.eye(6, order="F") rr10[:4, :4] = elem.R1[:4, :4] rt10 = rr10.T @ elem.R1 tilt0 = numpy.arctan2(rr10[0, 2], rr10[0, 0]) yaw0 = numpy.arcsin(-rt10[1, 4]) pitch0 = numpy.arcsin(rt10[3, 4]) _, _, t10, t20 = _get_rm_tv(elem.Length, tilt0, pitch0, yaw0) - if hasattr(elem, 'T1') and hasattr(elem, 'T2'): - t10 = elem.T1-t10 - t20 = elem.T2-t20 + if hasattr(elem, "T1") and hasattr(elem, "T2"): + t10 = elem.T1 - t10 + t20 = elem.T2 - t20 if relative: tilt += tilt0 pitch += pitch0 @@ -1011,8 +1027,8 @@ def _get_rm_tv(le, tilt, pitch, yaw): r1, r2, t1, t2 = _get_rm_tv(elem.Length, tilt, pitch, yaw) elem.R1 = r1 elem.R2 = r2 - elem.T1 = t1+t10 - elem.T2 = t2+t20 + elem.T1 = t1 + t10 + elem.T2 = t2 + t20 def tilt_elem(elem: Element, rots: float, relative: bool = False) -> None: @@ -1037,8 +1053,9 @@ def tilt_elem(elem: Element, rots: float, relative: bool = False) -> None: rotate_elem(elem, tilt=rots, relative=relative) -def shift_elem(elem: Element, deltax: float = 0.0, deltaz: float = 0.0, - relative: bool = False) -> None: +def shift_elem( + elem: Element, deltax: float = 0.0, deltaz: float = 0.0, relative: bool = False +) -> None: r"""Sets the transverse displacement of an :py:class:`.Element` The translation vectors are stored in the :pycode:`T1` and :pycode:`T2` @@ -1052,7 +1069,7 @@ def shift_elem(elem: Element, deltax: float = 0.0, deltaz: float = 0.0, existing one """ tr = numpy.array([deltax, 0.0, deltaz, 0.0, 0.0, 0.0]) - if relative and hasattr(elem, 'T1') and hasattr(elem, 'T2'): + if relative and hasattr(elem, "T1") and hasattr(elem, "T2"): elem.T1 -= tr elem.T2 += tr else: @@ -1060,8 +1077,9 @@ def shift_elem(elem: Element, deltax: float = 0.0, deltaz: float = 0.0, elem.T2 = tr -def set_rotation(ring: Sequence[Element], tilts=0.0, - pitches=0.0, yaws=0.0, relative=False) -> None: +def set_rotation( + ring: Sequence[Element], tilts=0.0, pitches=0.0, yaws=0.0, relative=False +) -> None: r"""Sets the tilts of a list of elements. Parameters: @@ -1115,12 +1133,13 @@ def set_shift(ring: Sequence[Element], dxs, dzs, relative=False) -> None: shift_elem(el, dx, dy, relative=relative) -def get_geometry(ring: List[Element], - refpts: Refpts = All, - start_coordinates: Tuple[float, float, float] = (0, 0, 0), - centered: bool = False, - regex: bool = False - ): +def get_geometry( + ring: list[Element], + refpts: Refpts = All, + start_coordinates: tuple[float, float, float] = (0, 0, 0), + centered: bool = False, + regex: bool = False, +): # noinspection PyShadowingNames r"""Compute the 2D ring geometry in cartesian coordinates @@ -1148,15 +1167,13 @@ def get_geometry(ring: List[Element], >>> geomdata, radius = get_geometry(ring) """ - geom_dtype = [("x", numpy.float64), - ("y", numpy.float64), - ("angle", numpy.float64)] + geom_dtype = [("x", numpy.float64), ("y", numpy.float64), ("angle", numpy.float64)] boolrefs = get_bool_index(ring, refpts, endpoint=True, regex=regex) nrefs = refpts_count(boolrefs, len(ring)) - geomdata = numpy.recarray((nrefs, ), dtype=geom_dtype) - xx = numpy.zeros(len(ring)+1) - yy = numpy.zeros(len(ring)+1) - angle = numpy.zeros(len(ring)+1) + geomdata = numpy.recarray((nrefs,), dtype=geom_dtype) + xx = numpy.zeros(len(ring) + 1) + yy = numpy.zeros(len(ring) + 1) + angle = numpy.zeros(len(ring) + 1) x0, y0, t0 = start_coordinates x, y = 0.0, 0.0 t = t0 @@ -1168,30 +1185,30 @@ def get_geometry(ring: List[Element], ll = el.Length if isinstance(el, Dipole) and el.BendingAngle != 0: ang = 0.5 * el.BendingAngle - ll *= numpy.sin(ang)/ang + ll *= numpy.sin(ang) / ang else: ang = 0.0 t -= ang x += ll * numpy.cos(t) y += ll * numpy.sin(t) t -= ang - xx[ind+1] = x - yy[ind+1] = y - angle[ind+1] = t + xx[ind + 1] = x + yy[ind + 1] = y + angle[ind + 1] = t dff = (t + _GEOMETRY_EPSIL) % (2.0 * numpy.pi) - _GEOMETRY_EPSIL if abs(dff) < _GEOMETRY_EPSIL: xcenter = numpy.mean(xx) ycenter = numpy.mean(yy) - elif abs(dff-numpy.pi) < _GEOMETRY_EPSIL: - xcenter = 0.5*x - ycenter = 0.5*y + elif abs(dff - numpy.pi) < _GEOMETRY_EPSIL: + xcenter = 0.5 * x + ycenter = 0.5 * y else: - num = numpy.cos(t)*x + numpy.sin(t)*y - den = numpy.sin(t-t0) - xcenter = -num*numpy.sin(t0)/den - ycenter = num*numpy.cos(t0)/den - radius = numpy.sqrt(xcenter*xcenter + ycenter*ycenter) + num = numpy.cos(t) * x + numpy.sin(t) * y + den = numpy.sin(t - t0) + xcenter = -num * numpy.sin(t0) / den + ycenter = num * numpy.cos(t0) / den + radius = numpy.sqrt(xcenter * xcenter + ycenter * ycenter) if centered: xx -= xcenter yy -= ycenter diff --git a/pyat/at/lattice/variables.py b/pyat/at/lattice/variables.py index 5b90e5f5a..08c7a0897 100644 --- a/pyat/at/lattice/variables.py +++ b/pyat/at/lattice/variables.py @@ -94,6 +94,7 @@ def _getfun(self, **kwargs): from typing import Union import numpy as np +import numpy.typing as npt Number = Union[int, float] @@ -403,6 +404,12 @@ class VariableList(list): appending, insertion or concatenation with the "+" operator. """ + def __getitem__(self, index): + if isinstance(index, slice): + return VariableList(super().__getitem__(index)) + else: + return super().__getitem__(index) + def get(self, ring=None, **kwargs) -> Sequence[float]: r"""Get the current values of Variables @@ -453,6 +460,12 @@ def __str__(self) -> str: return self.status() @property - def deltas(self) -> Sequence[Number]: + def deltas(self) -> npt.NDArray[Number]: """delta values of the variables""" return np.array([var.delta for var in self]) + + @deltas.setter + def deltas(self, value: Number | Sequence[Number]) -> None: + deltas = np.broadcast_to(value, len(self)) + for var, delta in zip(self, deltas): + var.delta = delta diff --git a/pyat/at/latticetools/__init__.py b/pyat/at/latticetools/__init__.py index b1b34e2fb..a634efcb2 100644 --- a/pyat/at/latticetools/__init__.py +++ b/pyat/at/latticetools/__init__.py @@ -1,5 +1,6 @@ -"""Defines classes for modifying a lattice and observing its parameters""" +"""Defines classes for modifying a lattice and observing its parameters.""" from .observables import * from .observablelist import * -from .matching import * +# from .matching import * +from .response_matrix import * diff --git a/pyat/at/latticetools/observables.py b/pyat/at/latticetools/observables.py index e0ba3c726..8f3012a3c 100644 --- a/pyat/at/latticetools/observables.py +++ b/pyat/at/latticetools/observables.py @@ -312,6 +312,9 @@ def evaluate(self, *data, initial: bool = False): sent to the evaluation function initial: It :py:obj:`None`, store the result as the initial value + + Returns: + value: The value of the observable. """ for d in data: if isinstance(d, Exception): @@ -339,6 +342,10 @@ def weight(self): """Observable weight.""" return np.broadcast_to(self.w, np.asarray(self._value).shape) + @weight.setter + def weight(self, w): + self.w = w + @property def weighted_value(self): """Weighted value of the Observable, computed as @@ -616,8 +623,8 @@ def __init__( Observe the horizontal closed orbit at monitor locations """ - name = self._set_name(name, "orbit", axis_(axis, "code")) - fun = _ArrayAccess(axis_(axis, "index")) + name = self._set_name(name, "orbit", axis_(axis, key="code")) + fun = _ArrayAccess(axis_(axis, key="index")) needs = {Need.ORBIT} super().__init__(fun, refpts, needs=needs, name=name, **kwargs) @@ -666,8 +673,8 @@ def __init__( Observe the transfer matrix from origin to monitor locations and extract T[0,1] """ - name = self._set_name(name, "matrix", axis_(axis, "code")) - fun = _ArrayAccess(axis_(axis, "index")) + name = self._set_name(name, "matrix", axis_(axis, key="code")) + fun = _ArrayAccess(axis_(axis, key="index")) needs = {Need.MATRIX} super().__init__(fun, refpts, needs=needs, name=name, **kwargs) @@ -700,12 +707,12 @@ def __init__( shape of *value*. """ needs = {Need.GLOBALOPTICS} - name = self._set_name(name, param, plane_(plane, "code")) + name = self._set_name(name, param, plane_(plane, key="code")) if callable(param): fun = param needs.add(Need.CHROMATICITY) else: - fun = _RecordAccess(param, plane_(plane, "index")) + fun = _RecordAccess(param, plane_(plane, key="index")) if param == "chromaticity": needs.add(Need.CHROMATICITY) super().__init__(fun, needs=needs, name=name, **kwargs) @@ -803,11 +810,11 @@ def __init__( ax_ = plane_ needs = {Need.LOCALOPTICS} - name = self._set_name(name, param, ax_(plane, "code")) + name = self._set_name(name, param, ax_(plane, key="code")) if callable(param): fun = param else: - fun = _RecordAccess(param, _all_rows(ax_(plane, "index"))) + fun = _RecordAccess(param, _all_rows(ax_(plane, key="index"))) if param == "mu" or all_points: needs.add(Need.ALL_POINTS) if param in {"W", "Wp", "dalpha", "dbeta", "dmu", "ddispersion", "dR"}: @@ -843,7 +850,9 @@ def __init__( Example: - >>> obs = LatticeObservable(at.Sextupole, "KickAngle", index=0, statfun=np.sum) + >>> obs = LatticeObservable( + ... at.Sextupole, "KickAngle", index=0, statfun=np.sum + ... ) Observe the sum of horizontal kicks in Sextupoles """ @@ -888,8 +897,8 @@ def __init__( The *target*, *weight* and *bounds* inputs must be broadcastable to the shape of *value*. """ - name = self._set_name(name, "trajectory", axis_(axis, "code")) - fun = _ArrayAccess(axis_(axis, "index")) + name = self._set_name(name, "trajectory", axis_(axis, key="code")) + fun = _ArrayAccess(axis_(axis, key="index")) needs = {Need.TRAJECTORY} super().__init__(fun, refpts, needs=needs, name=name, **kwargs) @@ -941,11 +950,11 @@ def __init__( Observe the horizontal emittance """ - name = self._set_name(name, param, plane_(plane, "code")) + name = self._set_name(name, param, plane_(plane, key="code")) if callable(param): fun = param else: - fun = _RecordAccess(param, plane_(plane, "index")) + fun = _RecordAccess(param, plane_(plane, key="index")) needs = {Need.EMITTANCE} super().__init__(fun, needs=needs, name=name, **kwargs) @@ -1008,10 +1017,10 @@ def GlobalOpticsObservable( """ if param == "tune" and use_integer: # noinspection PyProtectedMember - name = ElementObservable._set_name(name, param, plane_(plane, "code")) + name = ElementObservable._set_name(name, param, plane_(plane, key="code")) return LocalOpticsObservable( End, - _Tune(plane_(plane, "index")), + _Tune(plane_(plane, key="index")), name=name, summary=True, all_points=True, diff --git a/pyat/at/latticetools/response_matrix.py b/pyat/at/latticetools/response_matrix.py new file mode 100644 index 000000000..ddfad19b9 --- /dev/null +++ b/pyat/at/latticetools/response_matrix.py @@ -0,0 +1,1195 @@ +# noinspection PyUnresolvedReferences +r"""Definition of :py:class:`.ResponseMatrix` objects. + +A :py:class:`ResponseMatrix` object defines a general-purpose response matrix, based +on a :py:class:`.VariableList` of attributes which will be independently varied, and an +:py:class:`.ObservableList` of attributes which will be recorded for each +variable step. + +:py:class:`ResponseMatrix` objects can be combined with the "+" operator to define +combined responses. This concatenates the variables and the observables. + +This module also defines two commonly used response matrices: +:py:class:`OrbitResponseMatrix` for circular machines and +:py:class:`TrajectoryResponseMatrix` for beam lines. Other matrices can be easily +defined by providing the desired Observables and Variables to the +:py:class:`ResponseMatrix` base class. + +Generic response matrix +----------------------- + +The :py:class:`ResponseMatrix` class defines a general-purpose response matrix, based +on a :py:class:`.VariableList` of quantities which will be independently varied, and an +:py:class:`.ObservableList` of quantities which will be recorded for each step. + +For instance let's take the horizontal displacements of all quadrupoles as variables: + +>>> variables = VariableList( +... RefptsVariable(ik, "dx", name=f"dx_{ik}", delta=0.0001) +... for ik in ring.get_uint32_index(at.Quadrupole) +... ) + +The variables are the horizontal displacement ``dx`` of all quadrupoles. The variable +name is set to *dx_nnnn* where *nnnn* is the index of the quadrupole in the lattice. +The step is set to 0.0001 m. + +Let's take the horizontal positions at all beam position monitors as observables: + +>>> observables = at.ObservableList([at.OrbitObservable(at.Monitor, axis="x")]) + +This is a single observable named *orbit[x]* by default, with multiple values. + +Instantiation +^^^^^^^^^^^^^ + +>>> resp_dx = at.ResponseMatrix(ring, variables, observables) + +At that point, the response matrix is empty. + +Matrix Building +^^^^^^^^^^^^^^^ + +The response matrix may be filled by several means: + +#. Direct assignment of an array to the :py:attr:`~.ResponseMatrix.response` property. + The shape of the array is checked. +#. :py:meth:`~ResponseMatrix.load` loads data from a file containing previously + saved values or experimentally measured values, +#. :py:meth:`~ResponseMatrix.build_tracking` computes the matrix using tracking, +#. For some specialized response matrices a + :py:meth:`~OrbitResponseMatrix.build_analytical` method is available. + +Matrix normalisation +^^^^^^^^^^^^^^^^^^^^ + +To be correctly inverted, the response matrix must be correctly normalised: the norms +of its columns must be of the same order of magnitude, and similarly for the rows. + +Normalisation is done by adjusting the weights :math:`w_v` for the variables +:math:`\mathbf{V}` and :math:`w_o` for the observables :math:`\mathbf{O}`. +With :math:`\mathbf{R}` the response matrix: + +.. math:: + + \mathbf{O} = \mathbf{R} . \mathbf{V} + +The weighted response matrix :math:`\mathbf{R}_w` is: + +.. math:: + + \frac{\mathbf{O}}{w_o} = \mathbf{R}_w . \frac{\mathbf{V}}{w_v} + +The :math:`\mathbf{R}_w` is dimensionless and should be normalised. This can be checked +using: + +* :py:meth:`~ResponseMatrix.check_norm` which prints the ratio of the maximum / minimum + norms for variables and observables. These should be less than 10. +* :py:meth:`~.ResponseMatrix.plot_norm` + +Both natural and weighted response matrices can be retrieved with the +:py:attr:`~ResponseMatrix.response` and :py:attr:`~ResponseMatrix.weighted_response` +properties. + +Matrix pseudo-inversion +^^^^^^^^^^^^^^^^^^^^^^^ + +The :py:meth:`~ResponseMatrix.solve` method computes the singular values of the +weighted response matrix. + +After solving, correction is available, for instance with + +* :py:meth:`~ResponseMatrix.correction_matrix` which returns the correction matrix + (pseudo-inverse of the response matrix), +* :py:meth:`~ResponseMatrix.get_correction` which returns a correction vector when + given error values, +* :py:meth:`~ResponseMatrix.correct` which computes and optionally applies a correction + for the provided :py:class:`.Lattice`. + +Exclusion of variables and observables +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Variables may be added to a set of excluded values, and similarly for observables. +Excluding an item does not change the response matrix. The values are excluded from the +pseudo-inversion of the response, possibly reducing the number of singular values. +After inversion the correction matrix is expanded to its original size by inserting +zero lines and columns at the location of excluded items. This way: + +- error and correction vectors keep the same size independently of excluded values, +- excluded error values are ignored, +- excluded corrections are set to zero. + +Variables can be added to the set of excluded variables using +:py:meth:`~.ResponseMatrix.exclude_vars` and observables using +:py:meth:`~.ResponseMatrix.exclude_obs`. + +After excluding items, the pseudo-inverse is discarded so one must recompute it again +by calling :py:meth:`~ResponseMatrix.solve`. + +The exclusion masks can be reset with :py:meth:`~.ResponseMatrix.reset_vars` and +:py:meth:`~.ResponseMatrix.reset_obs`. +""" + +from __future__ import annotations + +__all__ = [ + "sequence_split", + "ResponseMatrix", + "OrbitResponseMatrix", + "TrajectoryResponseMatrix", +] + +import os +import multiprocessing +import concurrent.futures +import abc +import warnings +from collections.abc import Sequence, Generator, Callable +from typing import Any, ClassVar +from itertools import chain +from functools import partial +import math + +import numpy as np +import numpy.typing as npt + +from .observables import ElementObservable +from .observables import TrajectoryObservable, OrbitObservable, LatticeObservable +from .observables import LocalOpticsObservable, GlobalOpticsObservable +from .observablelist import ObservableList +from ..lattice import AtError, AtWarning, Refpts, Uint32Refpts, All +from ..lattice import AxisDef, plane_, Lattice, Monitor, checkattr +from ..lattice.lattice_variables import RefptsVariable +from ..lattice.variables import VariableList + +FloatArray = npt.NDArray[np.float64] + +_orbit_correctors = checkattr("KickAngle") + +_globring: Lattice | None = None +_globobs: ObservableList | None = None + +warnings.filterwarnings("always", category=AtWarning, module=__name__) + + +def sequence_split(seq: Sequence, nslices: int) -> Generator[Sequence, None, None]: + """Split a sequence into multiple sub-sequences. + + The length of *seq* does not have to be a multiple of *nslices*. + + Args: + seq: sequence to split + nslices: number of sub-sequences + + Returns: + subseqs: Iterator over sub-sequences + """ + + def _split(seqsizes): + beg = 0 + for size in seqsizes: + end = beg + size + yield seq[beg:end] + beg = end + + lna = len(seq) + sz, rem = divmod(lna, nslices) + lsubseqs = [sz] * nslices + for k in range(rem): + lsubseqs[k] += 1 + return _split(lsubseqs) + + +def _resp( + ring: Lattice, observables: ObservableList, variables: VariableList, **kwargs +): + def _resp_one(variable: RefptsVariable): + """Single response""" + variable.step_up(ring=ring) + observables.evaluate(ring, **kwargs) + op = observables.flat_values + variable.step_down(ring=ring) + observables.evaluate(ring, **kwargs) + om = observables.flat_values + variable.reset(ring=ring) + return (op - om) / (2.0 * variable.delta) + + return [_resp_one(v) for v in variables] + + +def _resp_fork(variables: VariableList, **kwargs): + """Response for fork parallel method.""" + return _resp(_globring, _globobs, variables, **kwargs) + + +class _SvdSolver(abc.ABC): + """SVD solver for response matrices.""" + + _shape: tuple[int, int] + _obsmask: npt.NDArray[bool] + _varmask: npt.NDArray[bool] + _response: FloatArray | None = None + _v: FloatArray | None = None + _uh: FloatArray | None = None + #: Singular values of the response matrix + singular_values: FloatArray | None = None + + def __init__(self, nobs: int, nvar: int): + self._shape = (nobs, nvar) + self._obsmask = np.ones(nobs, dtype=bool) + self._varmask = np.ones(nvar, dtype=bool) + + def reset_vars(self): + """Reset the variable exclusion mask: enable all variables""" + self._varmask = np.ones(self.shape[1], dtype=bool) + self._v = None + self._uh = None + self.singular_values = None + + def reset_obs(self): + """Reset the observable exclusion mask: enable all observables""" + self._obsmask = np.ones(self.shape[0], dtype=bool) + self._v = None + self._uh = None + self.singular_values = None + + @property + @abc.abstractmethod + def varweights(self) -> np.ndarray: ... + + @property + @abc.abstractmethod + def obsweights(self) -> np.ndarray: ... + + @property + def shape(self) -> tuple[int, int]: + """Shape of the response matrix.""" + return self._shape + + def solve(self) -> None: + """Compute the singular values of the response matrix.""" + resp = self.weighted_response + selected = np.ix_(self._obsmask, self._varmask) + u, s, vh = np.linalg.svd(resp[selected], full_matrices=False) + self._v = vh.T * (1.0 / s) * self.varweights[self._varmask].reshape(-1, 1) + self._uh = u.T / self.obsweights[self._obsmask] + self.singular_values = s + + def check_norm(self) -> tuple[FloatArray, FloatArray]: + """Display the norm of the rows and columns of the weighted response matrix. + + Adjusting the variables and observable weights to equalize the norms + of rows and columns is important. + + Returns: + obs_norms: Norms of observables (rows) + var_norms: Norms of Variables (columns) + """ + resp = self.weighted_response + obs = np.linalg.norm(resp, axis=1) + var = np.linalg.norm(resp, axis=0) + print(f"max/min Observables: {np.amax(obs) / np.amin(obs)}") + print(f"max/min Variables: {np.amax(var) / np.amin(var)}") + return obs, var + + @property + def response(self) -> FloatArray: + """Response matrix.""" + resp = self._response + if resp is None: + raise AtError("No matrix yet: run build() or load() first") + return resp + + @response.setter + def response(self, response: FloatArray) -> None: + l1, c1 = self._shape + l2, c2 = response.shape + if l1 != l1 or c1 != c2: + raise ValueError( + f"Input matrix has incompatible shape. Expected: {self.shape}" + ) + self._response = response + + @property + def weighted_response(self) -> FloatArray: + """Weighted response matrix.""" + return self.response * (self.varweights / self.obsweights.reshape(-1, 1)) + + def correction_matrix(self, nvals: int | None = None) -> FloatArray: + """Return the correction matrix (pseudo-inverse of the response matrix). + + Args: + nvals: Desired number of singular values. If :py:obj:`None`, use + all singular values + + Returns: + cormat: Correction matrix + """ + if self.singular_values is None: + self.solve() + if nvals is None: + nvals = len(self.singular_values) + cormat = np.zeros(self._shape[::-1]) + selected = np.ix_(self._varmask, self._obsmask) + cormat[selected] = self._v[:, :nvals] @ self._uh[:nvals, :] + return cormat + + def get_correction( + self, observed: FloatArray, nvals: int | None = None + ) -> FloatArray: + """Compute the correction of the given observation. + + Args: + observed: Vector of observed deviations, + nvals: Desired number of singular values. If :py:obj:`None`, use + all singular values + + Returns: + corr: Correction vector + """ + return -self.correction_matrix(nvals=nvals) @ observed + + def save(self, file) -> None: + """Save a response matrix in the NumPy .npy format. + + Args: + file: file-like object, string, or :py:class:`pathlib.Path`: File to + which the data is saved. If file is a file-object, it must be opened in + binary mode. If file is a string or Path, a .npy extension will + be appended to the filename if it does not already have one. + """ + if self._response is None: + raise AtError("No response matrix: run build_tracking() or load() first") + np.save(file, self._response) + + def load(self, file) -> None: + """Load a response matrix saved in the NumPy .npy format. + + Args: + file: file-like object, string, or :py:class:`pathlib.Path`: the file to + read. A file object must always be opened in binary mode. + """ + self.response = np.load(file) + + +class ResponseMatrix(_SvdSolver): + r"""Base class for response matrices. + + It is defined by any arbitrary set of :py:class:`~.variables.VariableBase`\ s and + :py:class:`.Observable`\s + + Addition is defined on :py:class:`ResponseMatrix` objects as the addition + of their :py:class:`~.variables.VariableBase`\ s and :py:class:`.Observable`\s to + produce combined responses. + """ + + ring: Lattice + variables: VariableList #: List of matrix :py:class:`Variable <.VariableBase>`\ s + observables: ObservableList #: List of matrix :py:class:`.Observable`\s + _eval_args: dict[str, Any] = {} + + def __init__( + self, + ring: Lattice, + variables: VariableList, + observables: ObservableList, + ): + r""" + Args: + ring: Design lattice, used to compute the response + variables: List of :py:class:`Variable <.VariableBase>`\ s + observables: List of :py:class:`.Observable`\s + """ + + def limits(obslist): + beg = 0 + for obs in obslist: + end = beg + obs.value.size + yield beg, end + beg = end + + # for efficiency of parallel computation, the variable's refpts must be integer + for var in variables: + var.refpts = ring.get_uint32_index(var.refpts) + self.ring = ring + self.variables = variables + self.observables = observables + variables.get(ring=ring, initial=True) + observables.evaluate(ring=ring, initial=True) + super().__init__(len(observables.flat_values), len(variables)) + self._ob = [self._obsmask[beg:end] for beg, end in limits(self.observables)] + + def __add__(self, other: ResponseMatrix): + if not isinstance(other, ResponseMatrix): + raise TypeError( + f"Cannot add {type(other).__name__} and {type(self).__name__}" + ) + return ResponseMatrix( + self.ring, + VariableList(self.variables + other.variables), + self.observables + other.observables, + ) + + def __str__(self): + no, nv = self.shape + return f"{type(self).__name__}({no} observables, {nv} variables)" + + @property + def varweights(self) -> np.ndarray: + """Variable weights.""" + return self.variables.deltas + + @property + def obsweights(self) -> np.ndarray: + """Observable weights.""" + return self.observables.flat_weights + + def correct( + self, ring: Lattice, nvals: int = None, niter: int = 1, apply: bool = False + ) -> FloatArray: + """Compute and optionally apply the correction. + + Args: + ring: Lattice description. The response matrix observables + will be evaluated for *ring* and the deviation from target will + be corrected + apply: If :py:obj:`True`, apply the correction to *ring* + niter: Number of iterations. For more than one iteration, + *apply* must be :py:obj:`True` + nvals: Desired number of singular values. If :py:obj:`None`, + use all singular values. *nvals* may be a scalar or an iterable with + *niter* values. + + Returns: + correction: Vector of correction values + """ + if niter > 1 and not apply: + raise ValueError("If niter > 1, 'apply' must be True") + obs = self.observables + if apply: + self.variables.get(ring=ring, initial=True) + sumcorr = np.array([0.0]) + for it, nv in zip(range(niter), np.broadcast_to(nvals, (niter,))): + print(f'step {it+1}, nvals = {nv}') + obs.evaluate(ring, **self._eval_args) + err = obs.flat_deviations + if np.any(np.isnan(err)): + raise AtError( + f"Step {it + 1}: Invalid observables, cannot compute correction" + ) + corr = self.get_correction(obs.flat_deviations, nvals=nv) + sumcorr = sumcorr + corr # non-broadcastable sumcorr + if apply: + self.variables.increment(corr, ring=ring) + return sumcorr + + def build_tracking( + self, + use_mp: bool = False, + pool_size: int | None = None, + start_method: str | None = None, + **kwargs, + ) -> FloatArray: + """Build the response matrix. + + Args: + use_mp: Use multiprocessing + pool_size: number of processes. If None, + :pycode:`min(len(self.variables, nproc)` is used + start_method: python multiprocessing start method. + :py:obj:`None` uses the python default that is considered safe. + Available values: ``'fork'``, ``'spawn'``, ``'forkserver'``. + Default for linux is ``'fork'``, default for macOS and Windows + is ``'spawn'``. ``'fork'`` may be used on macOS to speed up the + calculation, however it is considered unsafe. + + Keyword Args: + dp (float): Momentum deviation. Defaults to :py:obj:`None` + dct (float): Path lengthening. Defaults to :py:obj:`None` + df (float): Deviation from the nominal RF frequency. + Defaults to :py:obj:`None` + r_in (Orbit): Initial trajectory, used for + :py:class:`TrajectoryResponseMatrix`, Default: zeros(6) + + Returns: + response: Response matrix + """ + self._eval_args = kwargs + self.observables.evaluate(self.ring) + ring = self.ring.deepcopy() + + if use_mp: + global _globring + global _globobs + ctx = multiprocessing.get_context(start_method) + if pool_size is None: + pool_size = min(len(self.variables), os.cpu_count()) + obschunks = sequence_split(self.variables, pool_size) + if ctx.get_start_method() == "fork": + _globring = ring + _globobs = self.observables + _single_resp = partial(_resp_fork, **kwargs) + else: + _single_resp = partial(_resp, ring, self.observables, **kwargs) + with concurrent.futures.ProcessPoolExecutor( + max_workers=pool_size, + mp_context=ctx, + ) as pool: + results = list(chain(*pool.map(_single_resp, obschunks))) + _globring = None + _globobs = None + else: + results = _resp(ring, self.observables, self.variables, **kwargs) + + resp = np.stack(results, axis=-1) + self.response = resp + return resp + + def build_analytical(self) -> FloatArray: + """Build the response matrix.""" + raise NotImplementedError( + f"build_analytical not implemented for {self.__class__.__name__}" + ) + + def _on_obs(self, fun: Callable, *args, obsid: int | str = 0): + """Apply a function to the selected observable""" + if not isinstance(obsid, str): + return fun(self.observables[obsid], *args) + else: + for obs in self.observables: + if obs.name == obsid: + return fun(obs, *args) + else: + raise ValueError(f"Observable {obsid} not found") + + def get_target(self, *, obsid: int | str = 0) -> FloatArray: + r"""Return the target of the specified observable + + Args: + obsid: :py:class:`.Observable` name or index in the observable list. + + Returns: + target: observable target + """ + def _get(obs): + return obs.target + + return self._on_obs(_get, obsid=obsid) + + def set_target(self, target: npt.ArrayLike, *, obsid: int | str = 0) -> None: + r"""Set the target of the specified observable + + Args: + target: observable target. Must be broadcastable to the shape of the + observable value. + obsid: :py:class:`.Observable` name or index in the observable list. + """ + + def _set(obs, targ): + obs.target = targ + + return self._on_obs(_set, target, obsid=obsid) + + def exclude_obs(self, *, obsid: int | str = 0, refpts: Refpts = None) -> None: + # noinspection PyUnresolvedReferences + r"""Add an observable item to the set of excluded values + + After excluding observation points, the matrix must be inverted again using + :py:meth:`solve`. + + Args: + obsid: :py:class:`.Observable` name or index in the observable list. + refpts: location of elements to exclude for + :py:class:`.ElementObservable` objects, otherwise ignored. + + Raises: + ValueError: No observable with the given name. + IndexError: Observableindex out of range. + + Example: + >>> resp = OrbitResponseMatrix(ring, "h", Monitor, Corrector) + >>> resp.exclude_obs(obsid="x_orbit", refpts="BPM_02") + + Create an horizontal :py:class:`OrbitResponseMatrix` from + :py:class:`.Corrector` elements to :py:class:`.Monitor` elements, + and exclude the monitor with name "BPM_02" + """ + + def exclude(ob, msk): + inimask = msk.copy() + if isinstance(ob, ElementObservable) and not ob.summary: + boolref = self.ring.get_bool_index(refpts) + # noinspection PyProtectedMember + msk &= np.logical_not(boolref[ob._boolrefs]) + else: + msk[:] = False + if np.all(msk == inimask): + warnings.warn(AtWarning("No new excluded value"), stacklevel=3) + # Force a new computation + self.singular_values = None + + if not isinstance(obsid, str): + exclude(self.observables[obsid], self._ob[obsid]) + else: + for obs, mask in zip(self.observables, self._ob): + if obs.name == obsid: + exclude(obs, mask) + break + else: + raise ValueError(f"Observable {obsid} not found") + + @property + def excluded_obs(self) -> dict: + """Directory of excluded observables. + + The dictionary keys are the observable names, the values are the integer + indices of excluded items (empty list if no exclusion). + """ + + def ex(obs, mask): + if isinstance(obs, ElementObservable) and not obs.summary: + refpts = self.ring.get_bool_index(None) + # noinspection PyProtectedMember + refpts[obs._boolrefs] = np.logical_not(mask) + refpts = self.ring.get_uint32_index(refpts) + else: + refpts = np.arange(0 if np.all(mask) else mask.size, dtype=np.uint32) + return refpts + + return {ob.name: ex(ob, mask) for ob, mask in zip(self.observables, self._ob)} + + def exclude_vars(self, *varid: int | str) -> None: + # noinspection PyUnresolvedReferences + """Add variables to the set of excluded variables. + + Args: + *varid: :py:class:`Variable <.VariableBase>` names or variable indices + in the variable list + + After excluding variables, the matrix must be inverted again using + :py:meth:`solve`. + + Examples: + >>> resp.exclude_vars(0, "var1", -1) + + Exclude the 1st variable, the variable named "var1" and the last variable. + """ + nameset = set(nm for nm in varid if isinstance(nm, str)) + varidx = [nm for nm in varid if not isinstance(nm, str)] + mask = np.array([var.name in nameset for var in self.variables]) + mask[varidx] = True + miss = nameset - {var.name for var, ok in zip(self.variables, mask) if ok} + if miss: + raise ValueError(f"Unknown variables: {miss}") + self._varmask &= np.logical_not(mask) + + @property + def excluded_vars(self) -> list: + """List of excluded variables""" + return [var.name for var, ok in zip(self.variables, self._varmask) if not ok] + + +class OrbitResponseMatrix(ResponseMatrix): + # noinspection PyUnresolvedReferences + r"""Orbit response matrix. + + An :py:class:`OrbitResponseMatrix` applies to a single plane, horizontal or + vertical. A combined response matrix is obtained by adding horizontal and + vertical matrices. However, the resulting matrix has the :py:class:`ResponseMatrix` + class, which implies that the :py:class:`OrbitResponseMatrix` specific methods are + not available. + + Variables are a set of steerers and optionally the RF frequency. Steerer + variables are named ``xnnnn`` or ``ynnnn`` where nnnn is the index in the + lattice. The RF frequency variable is named ``RF frequency``. + + Observables are the closed orbit position at selected points, named + ``orbit[x]`` for the horizontal plane or ``orbit[y]`` for the vertical plane, + and optionally the sum of steerer angles named ``sum(h_kicks)`` or + ``sum(v_kicks)`` + + The variable elements must have the *KickAngle* attribute used for correction. + It's available for all magnets, though not present by default + except in :py:class:`.Corrector` magnets. For other magnets, the attribute + should be explicitly created. + + By default, the observables are all the :py:class:`.Monitor` elements, and the + variables are all the elements having a *KickAngle* attribute. + This is equivalent to: + + >>> resp_v = OrbitResponseMatrix( + ... ring, "v", bpmrefs=at.Monitor, steerrefs=at.checkattr("KickAngle") + ... ) + """ + + bpmrefs: Uint32Refpts #: location of position monitors + steerrefs: Uint32Refpts #: location of steerers + + def __init__( + self, + ring: Lattice, + plane: AxisDef, + bpmrefs: Refpts = Monitor, + steerrefs: Refpts = _orbit_correctors, + *, + cavrefs: Refpts = None, + bpmweight: float | Sequence[float] = 1.0, + bpmtarget: float | Sequence[float] = 0.0, + steerdelta: float | Sequence[float] = 0.0001, + cavdelta: float | None = None, + steersum: bool = False, + stsumweight: float | None = None, + ): + """ + Args: + ring: Design lattice, used to compute the response. + plane: One out of {0, 'x', 'h', 'H'} for horizontal orbit, or + one of {1, 'y', 'v', 'V'} for vertical orbit. + bpmrefs: Location of closed orbit observation points. + See ":ref:`Selecting elements in a lattice `". + Default: all :py:class:`.Monitor` elements. + steerrefs: Location of orbit steerers. Their *KickAngle* attribute + is used and must be present in the selected elements. + Default: All Elements having a *KickAngle* attribute. + cavrefs: Location of RF cavities. Their *Frequency* attribute + is used. If :py:obj:`None`, no cavity is included in the response. + Cavities must be active. Cavity variables are appended to the steerer + variables. + bpmweight: Weight of position readings. Must be broadcastable to the + number of BPMs. + bpmtarget: Target orbit position. Must be broadcastable to the number of + observation points. + cavdelta: Step on RF frequency for matrix computation [Hz]. This + is also the cavity weight. Default: automatically computed. + steerdelta: Step on steerers for matrix computation [rad]. This is + also the steerer weight. Must be broadcastable to the number of steerers. + steersum: If :py:obj:`True`, the sum of steerers is appended to the + Observables. + stsumweight: Weight on steerer summation. Default: automatically computed. + + :ivar VariableList variables: matrix variables + :ivar ObservableList observables: matrix observables + + By default, the weights of cavities and steerers summation are set to give + a factor 2 more efficiency than steerers and BPMs + + """ + + def steerer(ik, delta): + name = f"{plcode}{ik:04}" + return RefptsVariable(ik, "KickAngle", index=pl, name=name, delta=delta) + + def set_norm(): + bpm = LocalOpticsObservable(bpmrefs, "beta", plane=pl) + sts = LocalOpticsObservable(steerrefs, "beta", plane=pl) + dsp = LocalOpticsObservable(bpmrefs, "dispersion", plane=2 * pl) + tun = GlobalOpticsObservable("tune", plane=pl) + obs = ObservableList([bpm, sts, dsp, tun]) + result = obs.evaluate(ring=ring) + alpha = ring.disable_6d(copy=True).get_mcf(0) + freq = ring.get_rf_frequency(cavpts=cavrefs) + nr = np.outer( + np.sqrt(result[0]) / bpmweight, np.sqrt(result[1]) * steerdelta + ) + vv = np.mean(np.linalg.norm(nr, axis=0)) + vo = np.mean(np.linalg.norm(nr, axis=1)) + korb = 0.25 * math.sqrt(2.0) / math.sin(math.pi * result[3]) + cd = vv * korb * alpha * freq / np.linalg.norm(result[2] / bpmweight) + sw = np.linalg.norm(deltas) / vo / korb + return cd, sw + + pl = plane_(plane, key="index") + plcode = plane_(plane, key="code") + ids = ring.get_uint32_index(steerrefs) + nbsteers = len(ids) + deltas = np.broadcast_to(steerdelta, nbsteers) + if steersum and stsumweight is None or cavrefs and cavdelta is None: + cavd, stsw = set_norm() + + # Observables + bpms = OrbitObservable(bpmrefs, axis=2 * pl, target=bpmtarget, weight=bpmweight) + observables = ObservableList([bpms]) + if steersum: + # noinspection PyUnboundLocalVariable + sumobs = LatticeObservable( + steerrefs, + "KickAngle", + name=f"{plcode}_kicks", + target=0.0, + index=pl, + weight=stsumweight if stsumweight else stsw / 2.0, + statfun=np.sum, + ) + observables.append(sumobs) + + # Variables + variables = VariableList(steerer(ik, delta) for ik, delta in zip(ids, deltas)) + if cavrefs is not None: + active = (el.longt_motion for el in ring.select(cavrefs)) + if not all(active): + raise ValueError("Cavities are not active") + # noinspection PyUnboundLocalVariable + cavvar = RefptsVariable( + cavrefs, + "Frequency", + name="RF frequency", + delta=cavdelta if cavdelta else 2.0 * cavd, + ) + variables.append(cavvar) + + super().__init__(ring, variables, observables) + self.plane = pl + self.steerrefs = ids + self.nbsteers = nbsteers + self.bpmrefs = ring.get_uint32_index(bpmrefs) + + def exclude_obs(self, *, obsid: int | str = 0, refpts: Refpts = None) -> None: + # noinspection PyUnresolvedReferences + r"""Add an observable item to the set of excluded values. + + After excluding observation points, the matrix must be inverted again using + :py:meth:`solve`. + + Args: + obsid: If 0 (default), act on Monitors. Otherwise, + it must be 1 or "sum(x_kicks)" or "sum(y_kicks)" + refpts: location of Monitors to exclude + + Raises: + ValueError: No observable with the given name. + IndexError: Observableindex out of range. + + Example: + >>> resp = OrbitResponseMatrix(ring, "h") + >>> resp.exclude_obs("BPM_02") + + Create an horizontal :py:class:`OrbitResponseMatrix` from + :py:class:`.Corrector` elements to :py:class:`.Monitor` elements, + and exclude all monitors with name "BPM_02" + """ + super().exclude_obs(obsid=obsid, refpts=refpts) + + def exclude_vars(self, *varid: int | str, refpts: Refpts = None) -> None: + # noinspection PyUnresolvedReferences + """Add correctors to the set of excluded variables. + + Args: + *varid: :py:class:`Variable <.VariableBase>` names or variable indices + in the variable list + refpts: location of correctors to exclude + + After excluding correctors, the matrix must be inverted again using + :py:meth:`solve`. + + Examples: + >>> resp.exclude_vars(0, "x0097", -1) + + Exclude the 1st variable, the variable named "x0097" and the last variable. + + >>> resp.exclude_vars(refpts="SD1E") + + Exclude all variables associated with the element named "SD1E". + """ + plcode = plane_(self.plane, key="code") + names = [f"{plcode}{ik:04}" for ik in self.ring.get_uint32_index(refpts)] + super().exclude_vars(*varid, *names) + + def normalise( + self, cav_ampl: float | None = 2.0, stsum_ampl: float | None = 2.0 + ) -> None: + """Normalise the response matrix + + Adjust the RF cavity delta and/or the weight of steerer summation so that the + weighted response matrix is normalised. + + Args: + cav_ampl: Desired ratio between the cavity response and the average of + steerer responses. If :py:obj:`None`, do not normalise. + stsum_ampl: Desired inverse ratio between the weight of the steerer + summation and the average of Monitor responses. If :py:obj:`None`, + do not normalise. + + By default, the normalisation gives to the RF frequency and steerer summation + a factor 2 more efficiency than steerers and BPMs + """ + resp = self.weighted_response + normvar = np.linalg.norm(resp, axis=0) + normobs = np.linalg.norm(resp, axis=1) + if len(self.variables) > self.nbsteers and cav_ampl is not None: + self.cavdelta *= np.mean(normvar[:-1]) / normvar[-1] * cav_ampl + if len(self.observables) > 1 and stsum_ampl is not None: + self.stsumweight = ( + self.stsumweight * normobs[-1] / np.mean(normobs[:-1]) / stsum_ampl + ) + + def build_analytical(self, **kwargs) -> FloatArray: + """Build analytically the response matrix. + + Keyword Args: + dp (float): Momentum deviation. Defaults to :py:obj:`None` + dct (float): Path lengthening. Defaults to :py:obj:`None` + df (float): Deviation from the nominal RF frequency. + Defaults to :py:obj:`None` + + Returns: + response: Response matrix + + References: + .. [#Franchi] A. Franchi, S.M. Liuzzo, Z. Marti, *"Analytic formulas for + the rapid evaluation of the orbit response matrix and chromatic functions + from lattice parameters in circular accelerators"*, + arXiv:1711.06589 [physics.acc-ph] + """ + + def tauwj(muj, muw): + tau = muj - muw + if tau < 0.0: + tau += 2.0 * pi_tune + return tau - pi_tune + + ring = self.ring + pl = self.plane + _, ringdata, elemdata = ring.linopt6(All, **kwargs) + pi_tune = math.pi * ringdata.tune[pl] + dataw = elemdata[self.steerrefs] + dataj = elemdata[self.bpmrefs] + dispj = dataj.dispersion[:, 2 * pl] + dispw = dataw.dispersion[:, 2 * pl] + lw = np.array([elem.Length for elem in ring.select(self.steerrefs)]) + taufunc = np.frompyfunc(tauwj, 2, 1) + + sqbetaw = np.sqrt(dataw.beta[:, pl]) + ts = lw / sqbetaw / 2.0 + tc = sqbetaw - dataw.alpha[:, pl] * ts + twj = np.astype(taufunc.outer(dataj.mu[:, pl], dataw.mu[:, pl]), np.float64) + jcwj = tc * np.cos(twj) + ts * np.sin(twj) + coefj = np.sqrt(dataj.beta[:, pl]) / (2.0 * np.sin(pi_tune)) + resp = coefj[:, np.newaxis] * jcwj + if ring.is_6d: + alpha_c = ring.disable_6d(copy=True).get_mcf() + resp += np.outer(dispj, dispw) / (alpha_c * ring.circumference) + if len(self.variables) > self.nbsteers: + rfrsp = -dispj / (alpha_c * ring.rf_frequency) + resp = np.concatenate((resp, rfrsp[:, np.newaxis]), axis=1) + if len(self.observables) > 1: + sumst = np.ones(resp.shape[1], np.float64) + if len(self.variables) > self.nbsteers: + sumst[-1] = 0.0 + resp = np.concatenate((resp, sumst[np.newaxis]), axis=0) + self.response = resp + return resp + + @property + def bpmweight(self) -> FloatArray: + """Weight of position readings.""" + return self.observables[0].weight + + @bpmweight.setter + def bpmweight(self, value: npt.ArrayLike): + self.observables[0].weight = value + + @property + def stsumweight(self) -> FloatArray: + """Weight of steerer summation.""" + return self.observables[1].weight + + @stsumweight.setter + def stsumweight(self, value: float): + self.observables[1].weight = value + + @property + def steerdelta(self) -> FloatArray: + """Step and weight of steerers.""" + return self.variables[: self.nbsteers].deltas + + @steerdelta.setter + def steerdelta(self, value: npt.ArrayLike): + self.variables[: self.nbsteers].deltas = value + + @property + def cavdelta(self) -> FloatArray: + """Step and weight of RF frequency deviation.""" + return self.variables[self.nbsteers].delta + + @cavdelta.setter + def cavdelta(self, value: float): + self.variables[self.nbsteers].delta = value + + +class TrajectoryResponseMatrix(ResponseMatrix): + """Trajectory response matrix. + + A :py:class:`TrajectoryResponseMatrix` applies to a single plane, horizontal or + vertical. A combined response matrix is obtained by adding horizontal and vertical + matrices. However, the resulting matrix has the :py:class:`ResponseMatrix` + class, which implies that the :py:class:`OrbitResponseMatrix` specific methods are + not available. + + Variables are a set of steerers. Steerer variables are named ``xnnnn`` or + ``ynnnn`` where *nnnn* is the index in the lattice. + + Observables are the trajectory position at selected points, named ``trajectory[x]`` + for the horizontal plane or ``trajectory[y]`` for the vertical plane. + + The variable elements must have the *KickAngle* attribute used for correction. + It's available for all magnets, though not present by default + except in :py:class:`.Corrector` magnets. For other magnets, the attribute + should be explicitly created. + + By default, the observables are all the :py:class:`.Monitor` elements, and the + variables are all the elements having a *KickAngle* attribute. + + """ + + bpmrefs: Uint32Refpts + steerrefs: Uint32Refpts + _default_twiss_in: ClassVar[dict] = {"beta": np.ones(2), "alpha": np.zeros(2)} + + def __init__( + self, + ring: Lattice, + plane: AxisDef, + bpmrefs: Refpts = Monitor, + steerrefs: Refpts = _orbit_correctors, + *, + bpmweight: float = 1.0, + bpmtarget: float = 0.0, + steerdelta: float = 0.0001, + ): + """ + Args: + ring: Design lattice, used to compute the response + plane: One out of {0, 'x', 'h', 'H'} for horizontal orbit, or + one of {1, 'y', 'v', 'V'} for vertical orbit + bpmrefs: Location of closed orbit observation points. + See ":ref:`Selecting elements in a lattice `". + Default: all :py:class:`.Monitor` elements. + steerrefs: Location of orbit steerers. Their *KickAngle* attribute + is used and must be present in the selected elements. + Default: All Elements having a *KickAngle* attribute. + bpmweight: Weight on position readings. Must be broadcastable to the + number of BPMs + bpmtarget: Target position + steerdelta: Step on steerers for matrix computation [rad]. This is + also the steerer weight. Must be broadcastable to the number of steerers. + """ + + def steerer(ik, delta): + name = f"{plcode}{ik:04}" + return RefptsVariable(ik, "KickAngle", index=pl, name=name, delta=delta) + + pl = plane_(plane, key="index") + plcode = plane_(plane, key="code") + ids = ring.get_uint32_index(steerrefs) + nbsteers = len(ids) + deltas = np.broadcast_to(steerdelta, nbsteers) + # Observables + bpms = TrajectoryObservable( + bpmrefs, axis=2 * pl, target=bpmtarget, weight=bpmweight + ) + observables = ObservableList([bpms]) + # Variables + variables = VariableList(steerer(ik, delta) for ik, delta in zip(ids, deltas)) + + super().__init__(ring, variables, observables) + self.plane = pl + self.steerrefs = ids + self.nbsteers = nbsteers + self.bpmrefs = ring.get_uint32_index(bpmrefs) + + def build_analytical(self, **kwargs) -> FloatArray: + """Build analytically the response matrix. + + Keyword Args: + dp (float): Momentum deviation. Defaults to :py:obj:`None` + dct (float): Path lengthening. Defaults to :py:obj:`None` + df (float): Deviation from the nominal RF frequency. + Defaults to :py:obj:`None` + + Returns: + response: Response matrix + """ + ring = self.ring + pl = self.plane + twiss_in = self._eval_args.get("twiss_in", self._default_twiss_in) + _, _, elemdata = ring.linopt6(All, twiss_in=twiss_in, **kwargs) + dataj = elemdata[self.bpmrefs] + dataw = elemdata[self.steerrefs] + lw = np.array([elem.Length for elem in ring.select(self.steerrefs)]) + + sqbetaw = np.sqrt(dataw.beta[:, pl]) + ts = lw / sqbetaw / 2.0 + tc = sqbetaw - dataw.alpha[:, pl] * ts + twj = dataj.mu[:, pl].reshape(-1, 1) - dataw.mu[:, pl] + jswj = tc * np.sin(twj) - ts * np.cos(twj) + coefj = np.sqrt(dataj.beta[:, pl]) + resp = coefj[:, np.newaxis] * jswj + resp[twj < 0.0] = 0.0 + self.response = resp + return resp + + def exclude_obs(self, *, obsid: int | str = 0, refpts: Refpts = None) -> None: + # noinspection PyUnresolvedReferences + r"""Add a monitor to the set of excluded values. + + After excluding observation points, the matrix must be inverted again using + :py:meth:`solve`. + + Args: + refpts: location of Monitors to exclude + + Raises: + ValueError: No observable with the given name. + IndexError: Observableindex out of range. + + Example: + >>> resp = TrajectoryResponseMatrix(ring, "v") + >>> resp.exclude_obs("BPM_02") + + Create a vertical :py:class:`TrajectoryResponseMatrix` from + :py:class:`.Corrector` elements to :py:class:`.Monitor` elements, + and exclude all monitors with name "BPM_02" + """ + super().exclude_obs(obsid=0, refpts=refpts) + + def exclude_vars(self, *varid: int | str, refpts: Refpts = None) -> None: + # noinspection PyUnresolvedReferences + """Add correctors to the set of excluded variables. + + Args: + *varid: :py:class:`Variable <.VariableBase>` names or variable indices + in the variable list + refpts: location of correctors to exclude + + After excluding correctors, the matrix must be inverted again using + :py:meth:`solve`. + + Examples: + >>> resp.exclude_vars(0, "x0103", -1) + + Exclude the 1st variable, the variable named "x0103" and the last variable. + + >>> resp.exclude_vars(refpts="SD1E") + + Exclude all variables associated with the element named "SD1E". + """ + plcode = plane_(self.plane, key="code") + names = [f"{plcode}{ik:04}" for ik in self.ring.get_uint32_index(refpts)] + super().exclude_vars(*varid, *names) + + @property + def bpmweight(self) -> FloatArray: + """Weight of position readings.""" + return self.observables[0].weight + + @bpmweight.setter + def bpmweight(self, value: npt.ArrayLike): + self.observables[0].weight = value + + @property + def steerdelta(self) -> np.ndarray: + """Step and weight on steerers.""" + return self.variables.deltas + + @steerdelta.setter + def steerdelta(self, value): + self.variables.deltas = value diff --git a/pyat/at/plot/__init__.py b/pyat/at/plot/__init__.py index d810d35cb..3975fbd87 100644 --- a/pyat/at/plot/__init__.py +++ b/pyat/at/plot/__init__.py @@ -12,3 +12,4 @@ from .specific import * from .standalone import * from .resonances import * + from .response_matrix import * diff --git a/pyat/at/plot/response_matrix.py b/pyat/at/plot/response_matrix.py new file mode 100644 index 000000000..8533a3ee7 --- /dev/null +++ b/pyat/at/plot/response_matrix.py @@ -0,0 +1,113 @@ +from __future__ import annotations +from ..lattice import Lattice +from ..latticetools import ResponseMatrix +from typing import Optional +import matplotlib.pyplot as plt +from matplotlib.axes import Axes + + +def plot_norm(resp: ResponseMatrix, ax: Optional[tuple[Axes, Axes]] = None) -> None: + r"""Plot the norm of the lines and columns of the weighted response matrix + + For a stable solution, the norms should have the same order of magnitude. + If not, the weights of observables and variables should be adjusted. + + Args: + resp: Response matrix object + ax: tuple of :py:class:`~.matplotlib.axes.Axes`. If given, + plots will be drawn in these axes. + """ + obs, var = resp.check_norm() + if ax is None: + fig, (ax1, ax2) = plt.subplots(nrows=2, layout="constrained") + else: + ax1, ax2 = ax[:2] + ax1.bar(range(len(obs)), obs) + ax1.set_title("Norm of weighted observables") + ax1.set_xlabel("Observable #") + ax2.bar(range(len(var)), var) + ax2.set_title("Norm of weighted variables") + ax2.set_xlabel("Variable #") + + +def plot_singular_values( + resp: ResponseMatrix, ax: Axes = None, logscale: bool = True +) -> None: + r"""Plot the singular values of a response matrix + + Args: + resp: Response matrix object + logscale: If :py:obj:`True`, use log scale + ax: If given, plots will be drawn in these axes. + """ + if resp.singular_values is None: + resp.solve() + singvals = resp.singular_values + if ax is None: + fig, ax = plt.subplots() + ax.bar(range(len(singvals)), singvals) + if logscale: + ax.set_yscale("log") + ax.set_title("Singular values") + + +def plot_obs_analysis( + resp: ResponseMatrix, lattice: Lattice, ax: Axes = None, logscale: bool = True +) -> None: + """Plot the decomposition of an error vector on the basis of singular + vectors + + Args: + resp: Response matrix object + lattice: Lattice description. The response matrix observables + will be evaluated for this :py:class:`.Lattice` and the deviation + from target will be decomposed on the basis of singular vectors, + logscale: If :py:obj:`True`, use log scale + ax: If given, plots will be drawn in these axes. + """ + if resp.singular_values is None: + resp.solve() + obs = resp.observables + # noinspection PyProtectedMember + obs.evaluate(lattice, **resp._eval_args) + corr = resp._uh @ obs.flat_deviations + if ax is None: + fig, ax = plt.subplots() + ax.bar(range(len(corr)), corr) + if logscale: + ax.set_yscale("log") + ax.set_title("SVD decomposition") + ax.set_xlabel("Singular vector #") + + +def plot_var_analysis( + resp: ResponseMatrix, lattice: Lattice, ax: Axes = None, logscale: bool = False +) -> None: + """Plot the decomposition of a correction vector on the basis of singular + vectors + + Args: + resp: Response matrix object + lattice: Lattice description. The variables will be evaluated + for this :py:class:`.Lattice` and will be decomposed on the basis + of singular vectors, + logscale: If :py:obj:`True`, use log scale + ax: If given, plots will be drawn in these axes. + """ + if resp.singular_values is None: + resp.solve() + var = resp.variables + if ax is None: + fig, ax = plt.subplots() + corr = (resp._v * resp.singular_values).T @ var.get(lattice) + ax.bar(range(len(corr)), corr) + if logscale: + ax.set_yscale("log") + ax.set_title("SVD decomposition") + ax.set_xlabel("Singular vector #") + + +ResponseMatrix.plot_norm = plot_norm +ResponseMatrix.plot_singular_values = plot_singular_values +ResponseMatrix.plot_obs_analysis = plot_obs_analysis +ResponseMatrix.plot_var_analysis = plot_var_analysis