From 765ba43438f91bf75773e9de358dbf58f34c5d87 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Fri, 31 Jan 2025 13:33:24 +0300 Subject: [PATCH 01/18] Allow pageserver unreachable errors in test_scrubber_tenant_snapshot (#10585) ## Problem test_scrubber_tenant_snapshot restarts pageservers, but log validation fails tests on any non white listed storcon warnings, making the test flaky. ## Summary of changes Allow warns like 2025-01-29T12:37:42.622179Z WARN reconciler{seq=1 tenant_id=2011077aea9b4e8a60e8e8a19407634c shard_id=0004}: Call to node 2 (localhost:15352) management API failed, will retry (attempt 1): receive body: error sending request for url (http://localhost:15352/v1/tenant/2011077aea9b4e8a60e8e8a19407634c-0004/location_config): client error (Connect) ref https://github.com/neondatabase/neon/issues/10462 --- test_runner/regress/test_storage_scrubber.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test_runner/regress/test_storage_scrubber.py b/test_runner/regress/test_storage_scrubber.py index 1304d302b79c..7e92cc01cd2f 100644 --- a/test_runner/regress/test_storage_scrubber.py +++ b/test_runner/regress/test_storage_scrubber.py @@ -32,6 +32,12 @@ def test_scrubber_tenant_snapshot(neon_env_builder: NeonEnvBuilder, shard_count: neon_env_builder.num_pageservers = shard_count if shard_count is not None else 1 env = neon_env_builder.init_start() + # We restart pageserver(s), which will cause storage storage controller + # requests to fail and warn. + env.storage_controller.allowed_errors.append(".*management API still failed.*") + env.storage_controller.allowed_errors.append( + ".*Reconcile error.*error sending request for url.*" + ) tenant_id = env.initial_tenant timeline_id = env.initial_timeline branch = "main" From f09cfd11cb3029f9395c0ad0bccd61d2a848a6db Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 31 Jan 2025 10:54:14 +0000 Subject: [PATCH 02/18] pageserver: exclude archived timelines from freeze+flush on shutdown (#10594) ## Problem If offloading races with normal shutdown, we get a "failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited". This is harmless but points to it being quite strange to try and freeze and flush such a timeline. flushing on shutdown for an archived timeline isn't useful. Related: https://github.com/neondatabase/neon/issues/10389 ## Summary of changes - During Timeline::shutdown, ignore ShutdownMode::FreezeAndFlush if the timeline is archived --- pageserver/src/tenant/timeline.rs | 71 ++++++++++++++++++------------- 1 file changed, 41 insertions(+), 30 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 827601fa8b86..d6a8eaa4d957 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1818,7 +1818,7 @@ impl Timeline { self.last_record_lsn.shutdown(); if let ShutdownMode::FreezeAndFlush = mode { - if let Some((open, frozen)) = self + let do_flush = if let Some((open, frozen)) = self .layers .read() .await @@ -1827,43 +1827,54 @@ impl Timeline { .ok() .filter(|(open, frozen)| *open || *frozen > 0) { - tracing::info!(?open, frozen, "flushing and freezing on shutdown"); + if self.remote_client.is_archived() == Some(true) { + // No point flushing on shutdown for an archived timeline: it is not important + // to have it nice and fresh after our restart, and trying to flush here might + // race with trying to offload it (which also stops the flush loop) + false + } else { + tracing::info!(?open, frozen, "flushing and freezing on shutdown"); + true + } } else { - // this is double-shutdown, ignore it - } + // this is double-shutdown, it'll be a no-op + true + }; // we shut down walreceiver above, so, we won't add anything more // to the InMemoryLayer; freeze it and wait for all frozen layers // to reach the disk & upload queue, then shut the upload queue and // wait for it to drain. - match self.freeze_and_flush().await { - Ok(_) => { - // drain the upload queue - // if we did not wait for completion here, it might be our shutdown process - // didn't wait for remote uploads to complete at all, as new tasks can forever - // be spawned. - // - // what is problematic is the shutting down of RemoteTimelineClient, because - // obviously it does not make sense to stop while we wait for it, but what - // about corner cases like s3 suddenly hanging up? - self.remote_client.shutdown().await; - } - Err(FlushLayerError::Cancelled) => { - // this is likely the second shutdown, ignore silently. - // TODO: this can be removed once https://github.com/neondatabase/neon/issues/5080 - debug_assert!(self.cancel.is_cancelled()); - } - Err(e) => { - // Non-fatal. Shutdown is infallible. Failures to flush just mean that - // we have some extra WAL replay to do next time the timeline starts. - warn!("failed to freeze and flush: {e:#}"); + if do_flush { + match self.freeze_and_flush().await { + Ok(_) => { + // drain the upload queue + // if we did not wait for completion here, it might be our shutdown process + // didn't wait for remote uploads to complete at all, as new tasks can forever + // be spawned. + // + // what is problematic is the shutting down of RemoteTimelineClient, because + // obviously it does not make sense to stop while we wait for it, but what + // about corner cases like s3 suddenly hanging up? + self.remote_client.shutdown().await; + } + Err(FlushLayerError::Cancelled) => { + // this is likely the second shutdown, ignore silently. + // TODO: this can be removed once https://github.com/neondatabase/neon/issues/5080 + debug_assert!(self.cancel.is_cancelled()); + } + Err(e) => { + // Non-fatal. Shutdown is infallible. Failures to flush just mean that + // we have some extra WAL replay to do next time the timeline starts. + warn!("failed to freeze and flush: {e:#}"); + } } - } - // `self.remote_client.shutdown().await` above should have already flushed everything from the queue, but - // we also do a final check here to ensure that the queue is empty. - if !self.remote_client.no_pending_work() { - warn!("still have pending work in remote upload queue, but continuing shutting down anyways"); + // `self.remote_client.shutdown().await` above should have already flushed everything from the queue, but + // we also do a final check here to ensure that the queue is empty. + if !self.remote_client.no_pending_work() { + warn!("still have pending work in remote upload queue, but continuing shutting down anyways"); + } } } From 7d5c70c717c5ad543fdbd6115e422e27e0e86da9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 31 Jan 2025 12:23:12 +0100 Subject: [PATCH 03/18] Update AWS SDK crates (#10588) We want to keep the AWS SDK up to date as that way we benefit from new developments and improvements. Prior update was in #10056 --- Cargo.lock | 75 ++++++++++++++++++++++++------------------------------ 1 file changed, 33 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cada9604ff53..6b63c3c388ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -290,9 +290,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "aws-config" -version = "1.5.10" +version = "1.5.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b49afaa341e8dd8577e1a2200468f98956d6eda50bcf4a53246cc00174ba924" +checksum = "dc47e70fc35d054c8fcd296d47a61711f043ac80534a10b4f741904f81e73a90" dependencies = [ "aws-credential-types", "aws-runtime", @@ -301,7 +301,7 @@ dependencies = [ "aws-sdk-sts", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json 0.60.7", + "aws-smithy-json", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -332,9 +332,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.4.4" +version = "1.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5ac934720fbb46206292d2c75b57e67acfc56fe7dfd34fb9a02334af08409ea" +checksum = "bee7643696e7fdd74c10f9eb42848a87fe469d35eae9c3323f80aa98f350baac" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -366,7 +366,7 @@ dependencies = [ "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json 0.61.1", + "aws-smithy-json", "aws-smithy-query", "aws-smithy-runtime", "aws-smithy-runtime-api", @@ -389,7 +389,7 @@ dependencies = [ "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json 0.61.1", + "aws-smithy-json", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -414,7 +414,7 @@ dependencies = [ "aws-smithy-checksums", "aws-smithy-eventstream", "aws-smithy-http", - "aws-smithy-json 0.61.1", + "aws-smithy-json", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -437,15 +437,15 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.50.0" +version = "1.57.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05ca43a4ef210894f93096039ef1d6fa4ad3edfabb3be92b80908b9f2e4b4eab" +checksum = "c54bab121fe1881a74c338c5f723d1592bf3b53167f80268a1274f404e1acc38" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json 0.61.1", + "aws-smithy-json", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -459,15 +459,15 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.51.0" +version = "1.58.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abaf490c2e48eed0bb8e2da2fb08405647bd7f253996e0f93b981958ea0f73b0" +checksum = "8c8234fd024f7ac61c4e44ea008029bde934250f371efe7d4a39708397b1080c" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json 0.61.1", + "aws-smithy-json", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -481,15 +481,15 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.51.0" +version = "1.58.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b68fde0d69c8bfdc1060ea7da21df3e39f6014da316783336deff0a9ec28f4bf" +checksum = "ba60e1d519d6f23a9df712c04fdeadd7872ac911c84b2f62a8bda92e129b7962" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json 0.61.1", + "aws-smithy-json", "aws-smithy-query", "aws-smithy-runtime", "aws-smithy-runtime-api", @@ -504,9 +504,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.2.6" +version = "1.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d3820e0c08d0737872ff3c7c1f21ebbb6693d832312d6152bf18ef50a5471c2" +checksum = "690118821e46967b3c4501d67d7d52dd75106a9c54cf36cefa1985cedbe94e05" dependencies = [ "aws-credential-types", "aws-smithy-eventstream", @@ -533,9 +533,9 @@ dependencies = [ [[package]] name = "aws-smithy-async" -version = "1.2.1" +version = "1.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62220bc6e97f946ddd51b5f1361f78996e704677afc518a4ff66b7a72ea1378c" +checksum = "fa59d1327d8b5053c54bf2eaae63bf629ba9e904434d0835a28ed3c0ed0a614e" dependencies = [ "futures-util", "pin-project-lite", @@ -565,9 +565,9 @@ dependencies = [ [[package]] name = "aws-smithy-eventstream" -version = "0.60.5" +version = "0.60.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cef7d0a272725f87e51ba2bf89f8c21e4df61b9e49ae1ac367a6d69916ef7c90" +checksum = "8b18559a41e0c909b77625adf2b8c50de480a8041e5e4a3f5f7d177db70abc5a" dependencies = [ "aws-smithy-types", "bytes", @@ -576,9 +576,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.60.11" +version = "0.60.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c8bc3e8fdc6b8d07d976e301c02fe553f72a39b7a9fea820e023268467d7ab6" +checksum = "7809c27ad8da6a6a68c454e651d4962479e81472aa19ae99e59f9aba1f9713cc" dependencies = [ "aws-smithy-eventstream", "aws-smithy-runtime-api", @@ -597,18 +597,9 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.60.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4683df9469ef09468dad3473d129960119a0d3593617542b7d52086c8486f2d6" -dependencies = [ - "aws-smithy-types", -] - -[[package]] -name = "aws-smithy-json" -version = "0.61.1" +version = "0.61.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee4e69cc50921eb913c6b662f8d909131bb3e6ad6cb6090d3a39b66fc5c52095" +checksum = "623a51127f24c30776c8b374295f2df78d92517386f77ba30773f15a30ce1422" dependencies = [ "aws-smithy-types", ] @@ -625,9 +616,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.7.4" +version = "1.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f20685047ca9d6f17b994a07f629c813f08b5bce65523e47124879e60103d45" +checksum = "865f7050bbc7107a6c98a397a9fcd9413690c27fa718446967cf03b2d3ac517e" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -669,9 +660,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.2.9" +version = "1.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fbd94a32b3a7d55d3806fe27d98d3ad393050439dd05eb53ece36ec5e3d3510" +checksum = "a28f6feb647fb5e0d5b50f0472c19a7db9462b74e2fec01bb0b44eedcc834e97" dependencies = [ "base64-simd", "bytes", @@ -704,9 +695,9 @@ dependencies = [ [[package]] name = "aws-types" -version = "1.3.3" +version = "1.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5221b91b3e441e6675310829fd8984801b772cb1546ef6c0e54dec9f1ac13fef" +checksum = "b0df5a18c4f951c645300d365fec53a61418bcf4650f604f85fe2a665bfaa0c2" dependencies = [ "aws-credential-types", "aws-smithy-async", From afbcebe7f761dd555a3433aa34802b601367a82f Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 31 Jan 2025 12:31:58 +0100 Subject: [PATCH 04/18] test_runner: force-compact in `test_sharding_autosplit` (#10605) ## Problem This test may not fully detect data corruption during splits, since we don't force-compact the entire keyspace. ## Summary of changes Force-compact all data in `test_sharding_autosplit`. --- test_runner/performance/test_sharding_autosplit.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/test_runner/performance/test_sharding_autosplit.py b/test_runner/performance/test_sharding_autosplit.py index 76c3ad01a4a7..e5a9f17da8c2 100644 --- a/test_runner/performance/test_sharding_autosplit.py +++ b/test_runner/performance/test_sharding_autosplit.py @@ -247,7 +247,7 @@ def assert_all_split(): log.info(f"{shard_zero_id} timeline: {timeline_info}") # Run compaction for all tenants, restart endpoint so that on subsequent reads we will - # definitely hit pageserver for reads. This compaction passis expected to drop unwanted + # definitely hit pageserver for reads. This compaction pass is expected to drop unwanted # layers but not do any rewrites (we're still in the same generation) for tenant_id, tenant_state in tenants.items(): tenant_state.endpoint.stop() @@ -296,6 +296,16 @@ def assert_all_split(): for fut in pgbench_futs: fut.result() + # Run a full forced compaction, to detect any data corruption. + for tenant_id, tenant_state in tenants.items(): + for shard_id, shard_ps in tenant_get_shards(env, tenant_id): + shard_ps.http_client().timeline_compact( + shard_id, + tenant_state.timeline_id, + force_image_layer_creation=True, + force_l0_compaction=True, + ) + # Assert that some rewrites happened # TODO: uncomment this after https://github.com/neondatabase/neon/pull/7531 is merged # assert any(ps.log_contains(".*Rewriting layer after shard split.*") for ps in env.pageservers) From 89cff08354f7c2f2bbb0a92df2eca6de828fa4fe Mon Sep 17 00:00:00 2001 From: Fedor Dikarev Date: Fri, 31 Jan 2025 12:46:33 +0100 Subject: [PATCH 05/18] unify pg-build-nonroot-with-cargo base layer and config retries in curl (#10575) Ref: https://github.com/neondatabase/cloud/issues/23461 ## Problem Just made changes around and see these 2 base layers could be optimised. and after review comment from @myrrc setting up timeouts and retries in `alpine/curl` image ## Summary of changes --- compute/compute-node.Dockerfile | 43 +++++++++++++++------------------ 1 file changed, 19 insertions(+), 24 deletions(-) diff --git a/compute/compute-node.Dockerfile b/compute/compute-node.Dockerfile index 1ef449f0b06c..32226c56a52d 100644 --- a/compute/compute-node.Dockerfile +++ b/compute/compute-node.Dockerfile @@ -825,11 +825,11 @@ RUN case "${PG_VERSION}" in "v17") \ ######################################################################################### # -# Layer "rust extensions" -# This layer is used to build `pgrx` deps +# Layer "pg build with nonroot user and cargo installed" +# This layer is base and common for layers with `pgrx` # ######################################################################################### -FROM pg-build AS rust-extensions-build +FROM pg-build AS pg-build-nonroot-with-cargo ARG PG_VERSION RUN apt update && \ @@ -847,8 +847,18 @@ RUN echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 30 RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && \ chmod +x rustup-init && \ ./rustup-init -y --no-modify-path --profile minimal --default-toolchain stable && \ - rm rustup-init && \ - case "${PG_VERSION}" in \ + rm rustup-init + +######################################################################################### +# +# Layer "rust extensions" +# This layer is used to build `pgrx` deps +# +######################################################################################### +FROM pg-build-nonroot-with-cargo AS rust-extensions-build +ARG PG_VERSION + +RUN case "${PG_VERSION}" in \ 'v17') \ echo 'v17 is not supported yet by pgrx. Quit' && exit 0;; \ esac && \ @@ -867,26 +877,10 @@ USER root # and eventually get merged with `rust-extensions-build` # ######################################################################################### -FROM pg-build AS rust-extensions-build-pgrx12 +FROM pg-build-nonroot-with-cargo AS rust-extensions-build-pgrx12 ARG PG_VERSION -RUN apt update && \ - apt install --no-install-recommends --no-install-suggests -y curl libclang-dev && \ - apt clean && rm -rf /var/lib/apt/lists/* && \ - useradd -ms /bin/bash nonroot -b /home - -ENV HOME=/home/nonroot -ENV PATH="/home/nonroot/.cargo/bin:$PATH" -USER nonroot -WORKDIR /home/nonroot - -RUN echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /home/nonroot/.curlrc - -RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && \ - chmod +x rustup-init && \ - ./rustup-init -y --no-modify-path --profile minimal --default-toolchain stable && \ - rm rustup-init && \ - cargo install --locked --version 0.12.9 cargo-pgrx && \ +RUN cargo install --locked --version 0.12.9 cargo-pgrx && \ /bin/bash -c 'cargo pgrx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config' USER root @@ -1283,7 +1277,8 @@ FROM alpine/curl:${ALPINE_CURL_VERSION} AS exporters ARG TARGETARCH # Keep sql_exporter version same as in build-tools.Dockerfile and # test_runner/regress/test_compute_metrics.py -RUN if [ "$TARGETARCH" = "amd64" ]; then\ +RUN echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc; \ + if [ "$TARGETARCH" = "amd64" ]; then\ postgres_exporter_sha256='027e75dda7af621237ff8f5ac66b78a40b0093595f06768612b92b1374bd3105';\ pgbouncer_exporter_sha256='c9f7cf8dcff44f0472057e9bf52613d93f3ffbc381ad7547a959daa63c5e84ac';\ sql_exporter_sha256='38e439732bbf6e28ca4a94d7bc3686d3fa1abdb0050773d5617a9efdb9e64d08';\ From 503bc72d31ce15620479346dfa1081771a7f0a95 Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Fri, 31 Jan 2025 11:48:46 +0000 Subject: [PATCH 06/18] CI: add `diesel print-schema` check (#10527) ## Problem We want to check that `diesel print-schema` doesn't generate any changes (`storage_controller/src/schema.rs`) in comparison with the list of migration. ## Summary of changes - Add `diesel_cli` to `build-tools` image - Add `Check diesel schema` step to `build-neon` job, at this stage we have all required binaries, so don't need to compile anything additionally - Check runs only on x86 release builds to be sure we do it at least once per CI run. --- .github/workflows/_build-and-test-locally.yml | 20 +++++++++++++++++++ .github/workflows/_check-codestyle-rust.yml | 3 +++ build-tools.Dockerfile | 3 +++ 3 files changed, 26 insertions(+) diff --git a/.github/workflows/_build-and-test-locally.yml b/.github/workflows/_build-and-test-locally.yml index 2daed9038688..e9483492c967 100644 --- a/.github/workflows/_build-and-test-locally.yml +++ b/.github/workflows/_build-and-test-locally.yml @@ -271,6 +271,26 @@ jobs: path: /tmp/neon aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }} + - name: Check diesel schema + if: inputs.build-type == 'release' && inputs.arch == 'x64' + env: + DATABASE_URL: postgresql://localhost:1235/storage_controller + POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install + run: | + /tmp/neon/bin/neon_local init + /tmp/neon/bin/neon_local storage_controller start + + diesel print-schema > storage_controller/src/schema.rs + + if [ -n "$(git diff storage_controller/src/schema.rs)" ]; then + echo >&2 "Uncommitted changes in diesel schema" + + git diff . + exit 1 + fi + + /tmp/neon/bin/neon_local storage_controller stop + # XXX: keep this after the binaries.list is formed, so the coverage can properly work later - name: Merge and upload coverage data if: inputs.build-type == 'debug' diff --git a/.github/workflows/_check-codestyle-rust.yml b/.github/workflows/_check-codestyle-rust.yml index cbc47c640640..f7518d650027 100644 --- a/.github/workflows/_check-codestyle-rust.yml +++ b/.github/workflows/_check-codestyle-rust.yml @@ -16,6 +16,9 @@ defaults: run: shell: bash -euxo pipefail {0} +# No permission for GITHUB_TOKEN by default; the **minimal required** set of permissions should be granted in each job. +permissions: {} + jobs: check-codestyle-rust: strategy: diff --git a/build-tools.Dockerfile b/build-tools.Dockerfile index 9c13e480c125..dfcc7d06b493 100644 --- a/build-tools.Dockerfile +++ b/build-tools.Dockerfile @@ -261,6 +261,7 @@ ARG CARGO_HAKARI_VERSION=0.9.33 ARG CARGO_DENY_VERSION=0.16.2 ARG CARGO_HACK_VERSION=0.6.33 ARG CARGO_NEXTEST_VERSION=0.9.85 +ARG CARGO_DIESEL_CLI_VERSION=2.2.6 RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && whoami && \ chmod +x rustup-init && \ ./rustup-init -y --default-toolchain ${RUSTC_VERSION} && \ @@ -274,6 +275,8 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux cargo install cargo-deny --locked --version ${CARGO_DENY_VERSION} && \ cargo install cargo-hack --version ${CARGO_HACK_VERSION} && \ cargo install cargo-nextest --version ${CARGO_NEXTEST_VERSION} && \ + cargo install diesel_cli --version ${CARGO_DIESEL_CLI_VERSION} \ + --features postgres-bundled --no-default-features && \ rm -rf /home/nonroot/.cargo/registry && \ rm -rf /home/nonroot/.cargo/git From dce617fe070e8528aba5a5628b138e2fe3eb4c85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 31 Jan 2025 13:40:20 +0100 Subject: [PATCH 07/18] Update to rebased rust-postgres (#10584) Update to a rebased version of our rust-postgres patches, rebased on [this](https://github.com/sfackler/rust-postgres/commit/98f5a11bc0a8e451552d8941ffa078c7eb6cd60c) commit this time. With #10280 reapplied, this means that the rust-postgres crates will be deduplicated, as the new crate versions are finally compatible with the requirements of diesel-async. Earlier update: #10561 rust-postgres PR: https://github.com/neondatabase/rust-postgres/pull/39 --- Cargo.lock | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6b63c3c388ce..cdc620e48575 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4465,8 +4465,8 @@ dependencies = [ [[package]] name = "postgres" -version = "0.19.6" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#8b44892f7851e705810b2cb54504325699966070" +version = "0.19.7" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9" dependencies = [ "bytes", "fallible-iterator", @@ -4479,9 +4479,9 @@ dependencies = [ [[package]] name = "postgres-protocol" version = "0.6.6" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#8b44892f7851e705810b2cb54504325699966070" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9" dependencies = [ - "base64 0.21.1", + "base64 0.22.1", "byteorder", "bytes", "fallible-iterator", @@ -4513,7 +4513,7 @@ dependencies = [ [[package]] name = "postgres-types" version = "0.2.6" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#8b44892f7851e705810b2cb54504325699966070" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9" dependencies = [ "bytes", "chrono", @@ -6871,8 +6871,8 @@ dependencies = [ [[package]] name = "tokio-postgres" -version = "0.7.9" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#8b44892f7851e705810b2cb54504325699966070" +version = "0.7.10" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9" dependencies = [ "async-trait", "byteorder", From 10cf5e7a38d45037f3f51b666c519b7e5c6c72a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?JC=20Gr=C3=BCnhage?= Date: Fri, 31 Jan 2025 14:42:59 +0100 Subject: [PATCH 08/18] Move cargo-deny into a separate workflow on a schedule (#10289) ## Problem There are two (related) problems with the previous handling of `cargo-deny`: - When a new advisory is added to rustsec that affects a dependency, unrelated pull requests will fail. - New advisories rely on pushes or PRs to be surfaced. Problems that already exist on main will only be found if we try to merge new things into main. ## Summary of changes We split out `cargo-deny` into a separate workflow that runs on all PRs that touch `Cargo.lock`, and on a schedule on `main`, `release`, `release-compute` and `release-proxy` to find new advisories. --- .github/actionlint.yml | 1 + .github/file-filters.yaml | 1 + .github/workflows/_check-codestyle-rust.yml | 5 -- .github/workflows/build_and_test.yml | 39 +++++++++++++- .github/workflows/cargo-deny.yml | 57 +++++++++++++++++++++ .github/workflows/pre-merge-checks.yml | 3 +- 6 files changed, 99 insertions(+), 7 deletions(-) create mode 100644 .github/workflows/cargo-deny.yml diff --git a/.github/actionlint.yml b/.github/actionlint.yml index ecff0cc70b22..2b96ce95da32 100644 --- a/.github/actionlint.yml +++ b/.github/actionlint.yml @@ -27,3 +27,4 @@ config-variables: - SLACK_ON_CALL_QA_STAGING_STREAM - DEV_AWS_OIDC_ROLE_MANAGE_BENCHMARK_EC2_VMS_ARN - SLACK_ON_CALL_STORAGE_STAGING_STREAM + - SLACK_CICD_CHANNEL_ID diff --git a/.github/file-filters.yaml b/.github/file-filters.yaml index 886cd3919ac2..02ee383d5ed3 100644 --- a/.github/file-filters.yaml +++ b/.github/file-filters.yaml @@ -1,4 +1,5 @@ rust_code: ['**/*.rs', '**/Cargo.toml', '**/Cargo.lock'] +rust_dependencies: ['**/Cargo.lock'] v14: ['vendor/postgres-v14/**', 'Makefile', 'pgxn/**'] v15: ['vendor/postgres-v15/**', 'Makefile', 'pgxn/**'] diff --git a/.github/workflows/_check-codestyle-rust.yml b/.github/workflows/_check-codestyle-rust.yml index f7518d650027..c4c76914aa64 100644 --- a/.github/workflows/_check-codestyle-rust.yml +++ b/.github/workflows/_check-codestyle-rust.yml @@ -87,8 +87,3 @@ jobs: run: | cargo hakari generate --diff # workspace-hack Cargo.toml is up-to-date cargo hakari manage-deps --dry-run # all workspace crates depend on workspace-hack - - # https://github.com/EmbarkStudios/cargo-deny - - name: Check rust licenses/bans/advisories/sources - if: ${{ !cancelled() }} - run: cargo deny check --hide-inclusion-graph diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index e588fc5a0e88..1274543429ca 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -45,6 +45,26 @@ jobs: run cancel-previous-in-concurrency-group.yml \ --field concurrency_group="${{ env.E2E_CONCURRENCY_GROUP }}" + files-changed: + needs: [ check-permissions ] + runs-on: [ self-hosted, small ] + timeout-minutes: 3 + outputs: + check-rust-dependencies: ${{ steps.files-changed.outputs.rust_dependencies }} + + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + submodules: true + + - name: Check for file changes + uses: dorny/paths-filter@de90cc6fb38fc0963ad72b210f1f284cd68cea36 # v3.0.2 + id: files-changed + with: + token: ${{ secrets.GITHUB_TOKEN }} + filters: .github/file-filters.yaml + tag: needs: [ check-permissions ] runs-on: [ self-hosted, small ] @@ -170,6 +190,14 @@ jobs: archs: '["x64", "arm64"]' secrets: inherit + check-dependencies-rust: + needs: [ files-changed, build-build-tools-image ] + if: ${{ needs.files-changed.outputs.check-rust-dependencies == 'true' }} + uses: ./.github/workflows/cargo-deny.yml + with: + build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm + secrets: inherit + build-and-test-locally: needs: [ tag, build-build-tools-image ] strategy: @@ -1332,6 +1360,8 @@ jobs: - build-and-test-locally - check-codestyle-python - check-codestyle-rust + - check-dependencies-rust + - files-changed - promote-images-dev - test-images - trigger-custom-extensions-build-and-wait @@ -1344,4 +1374,11 @@ jobs: if: | contains(needs.*.result, 'failure') || contains(needs.*.result, 'cancelled') - || contains(needs.*.result, 'skipped') + || (needs.check-dependencies-rust.result == 'skipped' && needs.files-changed.outputs.check-rust-dependencies == 'true') + || needs.build-and-test-locally.result == 'skipped' + || needs.check-codestyle-python.result == 'skipped' + || needs.check-codestyle-rust.result == 'skipped' + || needs.files-changed.result == 'skipped' + || needs.promote-images-dev.result == 'skipped' + || needs.test-images.result == 'skipped' + || needs.trigger-custom-extensions-build-and-wait.result == 'skipped' diff --git a/.github/workflows/cargo-deny.yml b/.github/workflows/cargo-deny.yml new file mode 100644 index 000000000000..433b377c327e --- /dev/null +++ b/.github/workflows/cargo-deny.yml @@ -0,0 +1,57 @@ +name: cargo deny checks + +on: + workflow_call: + inputs: + build-tools-image: + required: false + type: string + schedule: + - cron: '0 0 * * *' + +jobs: + cargo-deny: + strategy: + matrix: + ref: >- + ${{ + fromJSON( + github.event_name == 'schedule' + && '["main","release","release-proxy","release-compute"]' + || format('["{0}"]', github.sha) + ) + }} + + runs-on: [self-hosted, small] + + container: + image: ${{ inputs.build-tools-image || 'neondatabase/build-tools:pinned' }} + credentials: + username: ${{ secrets.NEON_DOCKERHUB_USERNAME }} + password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }} + options: --init + + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + ref: ${{ matrix.ref }} + + - name: Check rust licenses/bans/advisories/sources + env: + CARGO_DENY_TARGET: >- + ${{ github.event_name == 'schedule' && 'advisories' || 'all' }} + run: cargo deny check --hide-inclusion-graph $CARGO_DENY_TARGET + + - name: Post to a Slack channel + if: ${{ github.event_name == 'schedule' && failure() }} + uses: slackapi/slack-github-action@v2 + with: + method: chat.postMessage + token: ${{ secrets.SLACK_BOT_TOKEN }} + payload: | + channel: ${{ vars.SLACK_CICD_CHANNEL_ID }} + text: | + Periodic cargo-deny on ${{ matrix.ref }}: ${{ job.status }} + <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run> + Pinging @oncall-devprod. diff --git a/.github/workflows/pre-merge-checks.yml b/.github/workflows/pre-merge-checks.yml index e6dfbaeed871..e92a153db90a 100644 --- a/.github/workflows/pre-merge-checks.yml +++ b/.github/workflows/pre-merge-checks.yml @@ -124,6 +124,7 @@ jobs: - name: Fail the job if any of the dependencies do not succeed or skipped run: exit 1 if: | - (contains(needs.check-codestyle-python.result, 'skipped') && needs.get-changed-files.outputs.python-changed == 'true') + (needs.check-codestyle-python.result == 'skipped' && needs.get-changed-files.outputs.python-changed == 'true') + || (needs.check-codestyle-rust.result == 'skipped' && needs.get-changed-files.outputs.rust-changed == 'true') || contains(needs.*.result, 'failure') || contains(needs.*.result, 'cancelled') From a93e9f22fc0e35ad2863d1e57db9f3a01326b710 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 31 Jan 2025 17:43:31 +0000 Subject: [PATCH 09/18] pageserver: remove faulty debug assertion in compaction (#10610) ## Problem This assertion is incorrect: it is legal to see another shard's data at this point, after a shard split. Closes: https://github.com/neondatabase/neon/issues/10609 ## Summary of changes - Remove faulty assertion --- pageserver/src/tenant/timeline/compaction.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 7242f73a82d8..7244e946cb79 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1503,11 +1503,9 @@ impl Timeline { .await .map_err(CompactionError::Other)?; } else { - let shard = self.shard_identity.shard_index(); let owner = self.shard_identity.get_shard_number(&key); - if cfg!(debug_assertions) { - panic!("key {key} does not belong on shard {shard}, owned by {owner}"); - } + + // This happens after a shard split, when we're compacting an L0 created by our parent shard debug!("dropping key {key} during compaction (it belongs on shard {owner})"); } From aedeb1c7c277703af861894455764a8c248df9b8 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 31 Jan 2025 17:43:54 +0000 Subject: [PATCH 10/18] pageserver: revise logging of cancelled request results (#10604) ## Problem When a client dropped before a request completed, and a handler returned an ApiError, we would log that at error severity. That was excessive in the case of a request erroring on a shutdown, and could cause test flakes. example: https://neon-github-public-dev.s3.amazonaws.com/reports/main/13067651123/index.html#suites/ad9c266207b45eafe19909d1020dd987/6021ce86a0d72ae7/ ``` Cancelled request finished with an error: ShuttingDown ``` ## Summary of changes - Log a different info-level on ShuttingDown and ResourceUnavailable API errors from cancelled requests --- pageserver/src/http/routes.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index eb9cb4da0c20..94f7510a4abf 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -3393,7 +3393,17 @@ where let status = response.status(); info!(%status, "Cancelled request finished successfully") } - Err(e) => error!("Cancelled request finished with an error: {e:?}"), + Err(e) => match e { + ApiError::ShuttingDown | ApiError::ResourceUnavailable(_) => { + // Don't log this at error severity: they are normal during lifecycle of tenants/process + info!("Cancelled request aborted for shutdown") + } + _ => { + // Log these in a highly visible way, because we have no client to send the response to, but + // would like to know that something went wrong. + error!("Cancelled request finished with an error: {e:?}") + } + }, } } // only logging for cancelled panicked request handlers is the tracing_panic_hook, From 48c87dc458a84fa9132ff22b1ae1fcc3d3094cda Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Fri, 31 Jan 2025 18:07:26 +0000 Subject: [PATCH 11/18] CI(pre-merge-checks): fix condition (#10617) ## Problem Merge Queue fails if changes include Rust code. ## Summary of changes - Fix condition for `build-build-tools-image` - Add a couple of no-op `false ||` to make predicates look symmetric --- .github/workflows/pre-merge-checks.yml | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/.github/workflows/pre-merge-checks.yml b/.github/workflows/pre-merge-checks.yml index e92a153db90a..d39ccecac9e9 100644 --- a/.github/workflows/pre-merge-checks.yml +++ b/.github/workflows/pre-merge-checks.yml @@ -59,7 +59,10 @@ jobs: echo "${RUST_CHANGED_FILES}" build-build-tools-image: - if: needs.get-changed-files.outputs.python-changed == 'true' + if: | + false + || needs.get-changed-files.outputs.python-changed == 'true' + || needs.get-changed-files.outputs.rust-changed == 'true' needs: [ get-changed-files ] uses: ./.github/workflows/build-build-tools-image.yml with: @@ -124,7 +127,8 @@ jobs: - name: Fail the job if any of the dependencies do not succeed or skipped run: exit 1 if: | - (needs.check-codestyle-python.result == 'skipped' && needs.get-changed-files.outputs.python-changed == 'true') - || (needs.check-codestyle-rust.result == 'skipped' && needs.get-changed-files.outputs.rust-changed == 'true') + false + || (needs.check-codestyle-python.result == 'skipped' && needs.get-changed-files.outputs.python-changed == 'true') + || (needs.check-codestyle-rust.result == 'skipped' && needs.get-changed-files.outputs.rust-changed == 'true') || contains(needs.*.result, 'failure') || contains(needs.*.result, 'cancelled') From bc7822d90c82046de709b211faa03d3f720a6931 Mon Sep 17 00:00:00 2001 From: Peter Bendel Date: Fri, 31 Jan 2025 19:41:17 +0100 Subject: [PATCH 12/18] temporarily disable some steps and run more often to expose more pgbench --initialize in benchmarking workflow (#10616) ## Problem we want to disable some steps in benchmarking workflow that do not initialize new projects and instead run the test more frequently Test run https://github.com/neondatabase/neon/actions/runs/13077737888 --- .github/workflows/benchmarking.yml | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/.github/workflows/benchmarking.yml b/.github/workflows/benchmarking.yml index 49f23e895b22..20a8a6e2c9dc 100644 --- a/.github/workflows/benchmarking.yml +++ b/.github/workflows/benchmarking.yml @@ -11,7 +11,8 @@ on: # │ │ ┌───────────── day of the month (1 - 31) # │ │ │ ┌───────────── month (1 - 12 or JAN-DEC) # │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT) - - cron: '0 3 * * *' # run once a day, timezone is utc + # - cron: '0 3 * * *' # run once a day, timezone is utc + - cron: '0 */10 * * *' # Runs every 10 hours at minute 0 workflow_dispatch: # adds ability to run this manually inputs: region_id: @@ -550,6 +551,7 @@ jobs: SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }} pgbench-pgvector: + if: false permissions: contents: write statuses: write @@ -683,7 +685,8 @@ jobs: # # *_CLICKBENCH_CONNSTR: Genuine ClickBench DB with ~100M rows # *_CLICKBENCH_10M_CONNSTR: DB with the first 10M rows of ClickBench DB - if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }} + # if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }} + if: false permissions: contents: write statuses: write @@ -810,7 +813,8 @@ jobs: # We might change it after https://github.com/neondatabase/neon/issues/2900. # # *_TPCH_S10_CONNSTR: DB generated with scale factor 10 (~10 GB) - if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }} + # if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }} + if: false permissions: contents: write statuses: write @@ -929,7 +933,8 @@ jobs: SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }} user-examples-compare: - if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }} + # if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }} + if: false permissions: contents: write statuses: write From fcd195c2b63fdfbb5323258a5422469e1e850175 Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Fri, 31 Jan 2025 13:04:26 -0600 Subject: [PATCH 13/18] Migrate compute_ctl arg parsing to clap derive (#10497) The primary benefit is that all the ad hoc get_matches() calls are no longer necessary. Now all it takes to get at the CLI arguments is referencing a struct member. It's also great the we can replace the ad hoc CLI struct we had with this more formal solution. Signed-off-by: Tristan Partin --- compute_tools/src/bin/compute_ctl.rs | 346 +++++++++------------------ 1 file changed, 111 insertions(+), 235 deletions(-) diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index b98cf706d343..47fc9cb7fe9d 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -34,6 +34,7 @@ //! -r http://pg-ext-s3-gateway \ //! ``` use std::collections::HashMap; +use std::ffi::OsString; use std::fs::File; use std::path::Path; use std::process::exit; @@ -44,7 +45,7 @@ use std::{thread, time::Duration}; use anyhow::{Context, Result}; use chrono::Utc; -use clap::Arg; +use clap::Parser; use compute_tools::disk_quota::set_disk_quota; use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static; use signal_hook::consts::{SIGQUIT, SIGTERM}; @@ -73,10 +74,75 @@ use utils::failpoint_support; // in-case of not-set environment var const BUILD_TAG_DEFAULT: &str = "latest"; +// Compatibility hack: if the control plane specified any remote-ext-config +// use the default value for extension storage proxy gateway. +// Remove this once the control plane is updated to pass the gateway URL +fn parse_remote_ext_config(arg: &str) -> Result { + if arg.starts_with("http") { + Ok(arg.trim_end_matches('/').to_string()) + } else { + Ok("http://pg-ext-s3-gateway".to_string()) + } +} + +#[derive(Parser)] +#[command(rename_all = "kebab-case")] +struct Cli { + #[arg(short = 'b', long, default_value = "postgres", env = "POSTGRES_PATH")] + pub pgbin: String, + + #[arg(short = 'r', long, value_parser = parse_remote_ext_config)] + pub remote_ext_config: Option, + + #[arg(long, default_value_t = 3080)] + pub http_port: u16, + + #[arg(short = 'D', long, value_name = "DATADIR")] + pub pgdata: String, + + #[arg(short = 'C', long, value_name = "DATABASE_URL")] + pub connstr: String, + + #[cfg(target_os = "linux")] + #[arg(long, default_value = "neon-postgres")] + pub cgroup: String, + + #[cfg(target_os = "linux")] + #[arg( + long, + default_value = "host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable application_name=vm-monitor" + )] + pub filecache_connstr: String, + + #[cfg(target_os = "linux")] + #[arg(long, default_value = "0.0.0.0:10301")] + pub vm_monitor_addr: String, + + #[arg(long, action = clap::ArgAction::SetTrue)] + pub resize_swap_on_bind: bool, + + #[arg(long)] + pub set_disk_quota_for_fs: Option, + + #[arg(short = 's', long = "spec", group = "spec")] + pub spec_json: Option, + + #[arg(short = 'S', long, group = "spec-path")] + pub spec_path: Option, + + #[arg(short = 'i', long, group = "compute-id", conflicts_with_all = ["spec", "spec-path"])] + pub compute_id: Option, + + #[arg(short = 'p', long, conflicts_with_all = ["spec", "spec-path"], requires = "compute-id", value_name = "CONTROL_PLANE_API_BASE_URL")] + pub control_plane_uri: Option, +} + fn main() -> Result<()> { - let scenario = failpoint_support::init(); + let cli = Cli::parse(); - let (build_tag, clap_args) = init()?; + let build_tag = init()?; + + let scenario = failpoint_support::init(); // enable core dumping for all child processes setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?; @@ -85,13 +151,11 @@ fn main() -> Result<()> { // Enter startup tracing context let _startup_context_guard = startup_context_from_env(); - let cli_args = process_cli(&clap_args)?; - - let cli_spec = try_spec_from_cli(&clap_args, &cli_args)?; + let cli_spec = try_spec_from_cli(&cli)?; - let wait_spec_result = wait_spec(build_tag, cli_args, cli_spec)?; + let compute = wait_spec(build_tag, &cli, cli_spec)?; - start_postgres(&clap_args, wait_spec_result)? + start_postgres(&cli, compute)? // Startup is finished, exit the startup tracing span }; @@ -108,7 +172,7 @@ fn main() -> Result<()> { deinit_and_exit(wait_pg_result); } -fn init() -> Result<(String, clap::ArgMatches)> { +fn init() -> Result { init_tracing_and_logging(DEFAULT_LOG_LEVEL)?; let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?; @@ -123,66 +187,7 @@ fn init() -> Result<(String, clap::ArgMatches)> { .to_string(); info!("build_tag: {build_tag}"); - Ok((build_tag, cli().get_matches())) -} - -fn process_cli(matches: &clap::ArgMatches) -> Result { - let pgbin_default = "postgres"; - let pgbin = matches - .get_one::("pgbin") - .map(|s| s.as_str()) - .unwrap_or(pgbin_default); - - let ext_remote_storage = matches - .get_one::("remote-ext-config") - // Compatibility hack: if the control plane specified any remote-ext-config - // use the default value for extension storage proxy gateway. - // Remove this once the control plane is updated to pass the gateway URL - .map(|conf| { - if conf.starts_with("http") { - conf.trim_end_matches('/') - } else { - "http://pg-ext-s3-gateway" - } - }); - - let http_port = *matches - .get_one::("http-port") - .expect("http-port is required"); - let pgdata = matches - .get_one::("pgdata") - .expect("PGDATA path is required"); - let connstr = matches - .get_one::("connstr") - .expect("Postgres connection string is required"); - let spec_json = matches.get_one::("spec"); - let spec_path = matches.get_one::("spec-path"); - let resize_swap_on_bind = matches.get_flag("resize-swap-on-bind"); - let set_disk_quota_for_fs = matches.get_one::("set-disk-quota-for-fs"); - - Ok(ProcessCliResult { - connstr, - pgdata, - pgbin, - ext_remote_storage, - http_port, - spec_json, - spec_path, - resize_swap_on_bind, - set_disk_quota_for_fs, - }) -} - -struct ProcessCliResult<'clap> { - connstr: &'clap str, - pgdata: &'clap str, - pgbin: &'clap str, - ext_remote_storage: Option<&'clap str>, - http_port: u16, - spec_json: Option<&'clap String>, - spec_path: Option<&'clap String>, - resize_swap_on_bind: bool, - set_disk_quota_for_fs: Option<&'clap String>, + Ok(build_tag) } fn startup_context_from_env() -> Option { @@ -235,19 +240,9 @@ fn startup_context_from_env() -> Option { } } -fn try_spec_from_cli( - matches: &clap::ArgMatches, - ProcessCliResult { - spec_json, - spec_path, - .. - }: &ProcessCliResult, -) -> Result { - let compute_id = matches.get_one::("compute-id"); - let control_plane_uri = matches.get_one::("control-plane-uri"); - +fn try_spec_from_cli(cli: &Cli) -> Result { // First, try to get cluster spec from the cli argument - if let Some(spec_json) = spec_json { + if let Some(ref spec_json) = cli.spec_json { info!("got spec from cli argument {}", spec_json); return Ok(CliSpecParams { spec: Some(serde_json::from_str(spec_json)?), @@ -256,7 +251,7 @@ fn try_spec_from_cli( } // Second, try to read it from the file if path is provided - if let Some(spec_path) = spec_path { + if let Some(ref spec_path) = cli.spec_path { let file = File::open(Path::new(spec_path))?; return Ok(CliSpecParams { spec: Some(serde_json::from_reader(file)?), @@ -264,17 +259,20 @@ fn try_spec_from_cli( }); } - let Some(compute_id) = compute_id else { + if cli.compute_id.is_none() { panic!( "compute spec should be provided by one of the following ways: \ --spec OR --spec-path OR --control-plane-uri and --compute-id" ); }; - let Some(control_plane_uri) = control_plane_uri else { + if cli.control_plane_uri.is_none() { panic!("must specify both --control-plane-uri and --compute-id or none"); }; - match get_spec_from_control_plane(control_plane_uri, compute_id) { + match get_spec_from_control_plane( + cli.control_plane_uri.as_ref().unwrap(), + cli.compute_id.as_ref().unwrap(), + ) { Ok(spec) => Ok(CliSpecParams { spec, live_config_allowed: true, @@ -298,21 +296,12 @@ struct CliSpecParams { fn wait_spec( build_tag: String, - ProcessCliResult { - connstr, - pgdata, - pgbin, - ext_remote_storage, - resize_swap_on_bind, - set_disk_quota_for_fs, - http_port, - .. - }: ProcessCliResult, + cli: &Cli, CliSpecParams { spec, live_config_allowed, }: CliSpecParams, -) -> Result { +) -> Result> { let mut new_state = ComputeState::new(); let spec_set; @@ -324,7 +313,7 @@ fn wait_spec( } else { spec_set = false; } - let connstr = Url::parse(connstr).context("cannot parse connstr as a URL")?; + let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?; let conn_conf = postgres::config::Config::from_str(connstr.as_str()) .context("cannot build postgres config from connstr")?; let tokio_conn_conf = tokio_postgres::config::Config::from_str(connstr.as_str()) @@ -333,14 +322,14 @@ fn wait_spec( connstr, conn_conf, tokio_conn_conf, - pgdata: pgdata.to_string(), - pgbin: pgbin.to_string(), - pgversion: get_pg_version_string(pgbin), - http_port, + pgdata: cli.pgdata.clone(), + pgbin: cli.pgbin.clone(), + pgversion: get_pg_version_string(&cli.pgbin), + http_port: cli.http_port, live_config_allowed, state: Mutex::new(new_state), state_changed: Condvar::new(), - ext_remote_storage: ext_remote_storage.map(|s| s.to_string()), + ext_remote_storage: cli.remote_ext_config.clone(), ext_download_progress: RwLock::new(HashMap::new()), build_tag, }; @@ -357,7 +346,7 @@ fn wait_spec( // Launch http service first, so that we can serve control-plane requests // while configuration is still in progress. let _http_handle = - launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread"); + launch_http_server(cli.http_port, &compute).expect("cannot launch http endpoint thread"); if !spec_set { // No spec provided, hang waiting for it. @@ -389,27 +378,12 @@ fn wait_spec( launch_lsn_lease_bg_task_for_static(&compute); - Ok(WaitSpecResult { - compute, - resize_swap_on_bind, - set_disk_quota_for_fs: set_disk_quota_for_fs.cloned(), - }) -} - -struct WaitSpecResult { - compute: Arc, - resize_swap_on_bind: bool, - set_disk_quota_for_fs: Option, + Ok(compute) } fn start_postgres( - // need to allow unused because `matches` is only used if target_os = "linux" - #[allow(unused_variables)] matches: &clap::ArgMatches, - WaitSpecResult { - compute, - resize_swap_on_bind, - set_disk_quota_for_fs, - }: WaitSpecResult, + cli: &Cli, + compute: Arc, ) -> Result<(Option, StartPostgresResult)> { // We got all we need, update the state. let mut state = compute.state.lock().unwrap(); @@ -437,7 +411,7 @@ fn start_postgres( let mut delay_exit = false; // Resize swap to the desired size if the compute spec says so - if let (Some(size_bytes), true) = (swap_size_bytes, resize_swap_on_bind) { + if let (Some(size_bytes), true) = (swap_size_bytes, cli.resize_swap_on_bind) { // To avoid 'swapoff' hitting postgres startup, we need to run resize-swap to completion // *before* starting postgres. // @@ -464,9 +438,9 @@ fn start_postgres( // Set disk quota if the compute spec says so if let (Some(disk_quota_bytes), Some(disk_quota_fs_mountpoint)) = - (disk_quota_bytes, set_disk_quota_for_fs) + (disk_quota_bytes, cli.set_disk_quota_for_fs.as_ref()) { - match set_disk_quota(disk_quota_bytes, &disk_quota_fs_mountpoint) { + match set_disk_quota(disk_quota_bytes, disk_quota_fs_mountpoint) { Ok(()) => { let size_mib = disk_quota_bytes as f32 / (1 << 20) as f32; // just for more coherent display. info!(%disk_quota_bytes, %size_mib, "set disk quota"); @@ -509,13 +483,7 @@ fn start_postgres( if #[cfg(target_os = "linux")] { use std::env; use tokio_util::sync::CancellationToken; - let vm_monitor_addr = matches - .get_one::("vm-monitor-addr") - .expect("--vm-monitor-addr should always be set because it has a default arg"); - let file_cache_connstr = matches.get_one::("filecache-connstr"); - let cgroup = matches.get_one::("cgroup"); - // Only make a runtime if we need to. // Note: it seems like you can make a runtime in an inner scope and // if you start a task in it it won't be dropped. However, make it // in the outermost scope just to be safe. @@ -538,15 +506,15 @@ fn start_postgres( let pgconnstr = if disable_lfc_resizing.unwrap_or(false) { None } else { - file_cache_connstr.cloned() + Some(cli.filecache_connstr.clone()) }; let vm_monitor = rt.as_ref().map(|rt| { rt.spawn(vm_monitor::start( Box::leak(Box::new(vm_monitor::Args { - cgroup: cgroup.cloned(), + cgroup: Some(cli.cgroup.clone()), pgconnstr, - addr: vm_monitor_addr.clone(), + addr: cli.vm_monitor_addr.clone(), })), token.clone(), )) @@ -702,105 +670,6 @@ fn deinit_and_exit(WaitPostgresResult { exit_code }: WaitPostgresResult) -> ! { exit(exit_code.unwrap_or(1)) } -fn cli() -> clap::Command { - // Env variable is set by `cargo` - let version = option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"); - clap::Command::new("compute_ctl") - .version(version) - .arg( - Arg::new("http-port") - .long("http-port") - .value_name("HTTP_PORT") - .default_value("3080") - .value_parser(clap::value_parser!(u16)) - .required(false), - ) - .arg( - Arg::new("connstr") - .short('C') - .long("connstr") - .value_name("DATABASE_URL") - .required(true), - ) - .arg( - Arg::new("pgdata") - .short('D') - .long("pgdata") - .value_name("DATADIR") - .required(true), - ) - .arg( - Arg::new("pgbin") - .short('b') - .long("pgbin") - .default_value("postgres") - .value_name("POSTGRES_PATH"), - ) - .arg( - Arg::new("spec") - .short('s') - .long("spec") - .value_name("SPEC_JSON"), - ) - .arg( - Arg::new("spec-path") - .short('S') - .long("spec-path") - .value_name("SPEC_PATH"), - ) - .arg( - Arg::new("compute-id") - .short('i') - .long("compute-id") - .value_name("COMPUTE_ID"), - ) - .arg( - Arg::new("control-plane-uri") - .short('p') - .long("control-plane-uri") - .value_name("CONTROL_PLANE_API_BASE_URI"), - ) - .arg( - Arg::new("remote-ext-config") - .short('r') - .long("remote-ext-config") - .value_name("REMOTE_EXT_CONFIG"), - ) - // TODO(fprasx): we currently have default arguments because the cloud PR - // to pass them in hasn't been merged yet. We should get rid of them once - // the PR is merged. - .arg( - Arg::new("vm-monitor-addr") - .long("vm-monitor-addr") - .default_value("0.0.0.0:10301") - .value_name("VM_MONITOR_ADDR"), - ) - .arg( - Arg::new("cgroup") - .long("cgroup") - .default_value("neon-postgres") - .value_name("CGROUP"), - ) - .arg( - Arg::new("filecache-connstr") - .long("filecache-connstr") - .default_value( - "host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable application_name=vm-monitor", - ) - .value_name("FILECACHE_CONNSTR"), - ) - .arg( - Arg::new("resize-swap-on-bind") - .long("resize-swap-on-bind") - .action(clap::ArgAction::SetTrue), - ) - .arg( - Arg::new("set-disk-quota-for-fs") - .long("set-disk-quota-for-fs") - .value_name("SET_DISK_QUOTA_FOR_FS") - ) -} - /// When compute_ctl is killed, send also termination signal to sync-safekeepers /// to prevent leakage. TODO: it is better to convert compute_ctl to async and /// wait for termination which would be easy then. @@ -810,7 +679,14 @@ fn handle_exit_signal(sig: i32) { exit(1); } -#[test] -fn verify_cli() { - cli().debug_assert() +#[cfg(test)] +mod test { + use clap::CommandFactory; + + use super::Cli; + + #[test] + fn verify_cli() { + Cli::command().debug_assert() + } } From ad1a41157affa94c9e818239b7c2d9fd26bb3de6 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Fri, 31 Jan 2025 19:14:27 +0000 Subject: [PATCH 14/18] feat(proxy): optimizing the chances of large write in copy_bidirectional (#10608) We forked copy_bidirectional to solve some issues like fast-shutdown (disallowing half-open connections) and to introduce better error tracking (which side of the conn closed down). A change recently made its way upstream offering performance improvements: https://github.com/tokio-rs/tokio/pull/6532. These seem applicable to our fork, thus it makes sense to apply them here as well. --- proxy/src/proxy/copy_bidirectional.rs | 35 +++++++++++++++------------ 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/proxy/src/proxy/copy_bidirectional.rs b/proxy/src/proxy/copy_bidirectional.rs index 3336a9556a5b..861f1766e84c 100644 --- a/proxy/src/proxy/copy_bidirectional.rs +++ b/proxy/src/proxy/copy_bidirectional.rs @@ -201,25 +201,26 @@ impl CopyBuffer { W: AsyncWrite + ?Sized, { loop { - // If our buffer is empty, then we need to read some data to - // continue. - if self.pos == self.cap && !self.read_done { - self.pos = 0; - self.cap = 0; - + // If there is some space left in our buffer, then we try to read some + // data to continue, thus maximizing the chances of a large write. + if self.cap < self.buf.len() && !self.read_done { match self.poll_fill_buf(cx, reader.as_mut()) { Poll::Ready(Ok(())) => (), Poll::Ready(Err(err)) => return Poll::Ready(Err(ErrorDirection::Read(err))), Poll::Pending => { - // Try flushing when the reader has no progress to avoid deadlock - // when the reader depends on buffered writer. - if self.need_flush { - ready!(writer.as_mut().poll_flush(cx)) - .map_err(ErrorDirection::Write)?; - self.need_flush = false; + // Ignore pending reads when our buffer is not empty, because + // we can try to write data immediately. + if self.pos == self.cap { + // Try flushing when the reader has no progress to avoid deadlock + // when the reader depends on buffered writer. + if self.need_flush { + ready!(writer.as_mut().poll_flush(cx)) + .map_err(ErrorDirection::Write)?; + self.need_flush = false; + } + + return Poll::Pending; } - - return Poll::Pending; } } } @@ -246,9 +247,13 @@ impl CopyBuffer { "writer returned length larger than input slice" ); + // All data has been written, the buffer can be considered empty again + self.pos = 0; + self.cap = 0; + // If we've written all the data and we've seen EOF, flush out the // data and finish the transfer. - if self.pos == self.cap && self.read_done { + if self.read_done { ready!(writer.as_mut().poll_flush(cx)).map_err(ErrorDirection::Write)?; return Poll::Ready(Ok(self.amt)); } From 6dd48ba148af2eaf90c9d8b5505a760a9995f173 Mon Sep 17 00:00:00 2001 From: Stefan Radig Date: Fri, 31 Jan 2025 21:32:57 +0100 Subject: [PATCH 15/18] feat(proxy): Implement access control with VPC endpoint checks and block for public internet / VPC (#10143) - Wired up filtering on VPC endpoints - Wired up block access from public internet / VPC depending on per project flag - Added cache invalidation for VPC endpoints (partially based on PR from Raphael) - Removed BackendIpAllowlist trait --------- Co-authored-by: Ivan Efremov --- proxy/src/auth/backend/console_redirect.rs | 32 ++- proxy/src/auth/backend/mod.rs | 152 ++++++++---- proxy/src/auth/mod.rs | 25 ++ proxy/src/bin/local_proxy.rs | 1 + proxy/src/bin/proxy.rs | 1 + proxy/src/cache/project_info.rs | 224 +++++++++++++++++- proxy/src/cancellation.rs | 55 ++++- proxy/src/config.rs | 1 + proxy/src/console_redirect_proxy.rs | 3 +- proxy/src/context/mod.rs | 11 +- .../control_plane/client/cplane_proxy_v1.rs | 174 +++++++++++++- proxy/src/control_plane/client/mock.rs | 42 +++- proxy/src/control_plane/client/mod.rs | 51 +++- proxy/src/control_plane/messages.rs | 26 +- proxy/src/control_plane/mod.rs | 33 ++- proxy/src/intern.rs | 22 +- proxy/src/metrics.rs | 13 + proxy/src/proxy/mod.rs | 3 +- proxy/src/proxy/tests/mod.rs | 16 +- proxy/src/redis/notifications.rs | 48 +++- proxy/src/serverless/backend.rs | 42 +++- proxy/src/types.rs | 2 + 22 files changed, 844 insertions(+), 133 deletions(-) diff --git a/proxy/src/auth/backend/console_redirect.rs b/proxy/src/auth/backend/console_redirect.rs index 1cbf91d3ae73..9be29c38c938 100644 --- a/proxy/src/auth/backend/console_redirect.rs +++ b/proxy/src/auth/backend/console_redirect.rs @@ -7,8 +7,8 @@ use thiserror::Error; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::{info, info_span}; -use super::{ComputeCredentialKeys, ControlPlaneApi}; -use crate::auth::backend::{BackendIpAllowlist, ComputeUserInfo}; +use super::ComputeCredentialKeys; +use crate::auth::backend::ComputeUserInfo; use crate::auth::IpPattern; use crate::cache::Cached; use crate::config::AuthenticationConfig; @@ -84,26 +84,15 @@ pub(crate) fn new_psql_session_id() -> String { hex::encode(rand::random::<[u8; 8]>()) } -#[async_trait] -impl BackendIpAllowlist for ConsoleRedirectBackend { - async fn get_allowed_ips( - &self, - ctx: &RequestContext, - user_info: &ComputeUserInfo, - ) -> auth::Result> { - self.api - .get_allowed_ips_and_secret(ctx, user_info) - .await - .map(|(ips, _)| ips.as_ref().clone()) - .map_err(|e| e.into()) - } -} - impl ConsoleRedirectBackend { pub fn new(console_uri: reqwest::Url, api: cplane_proxy_v1::NeonControlPlaneClient) -> Self { Self { console_uri, api } } + pub(crate) fn get_api(&self) -> &cplane_proxy_v1::NeonControlPlaneClient { + &self.api + } + pub(crate) async fn authenticate( &self, ctx: &RequestContext, @@ -191,6 +180,15 @@ async fn authenticate( } } + // Check if the access over the public internet is allowed, otherwise block. Note that + // the console redirect is not behind the VPC service endpoint, so we don't need to check + // the VPC endpoint ID. + if let Some(public_access_allowed) = db_info.public_access_allowed { + if !public_access_allowed { + return Err(auth::AuthError::NetworkNotAllowed); + } + } + client.write_message_noflush(&Be::NoticeResponse("Connecting to database."))?; // This config should be self-contained, because we won't diff --git a/proxy/src/auth/backend/mod.rs b/proxy/src/auth/backend/mod.rs index d17d91a56d96..7ef096207aed 100644 --- a/proxy/src/auth/backend/mod.rs +++ b/proxy/src/auth/backend/mod.rs @@ -26,10 +26,12 @@ use crate::context::RequestContext; use crate::control_plane::client::ControlPlaneClient; use crate::control_plane::errors::GetAuthInfoError; use crate::control_plane::{ - self, AuthSecret, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret, ControlPlaneApi, + self, AccessBlockerFlags, AuthSecret, CachedAccessBlockerFlags, CachedAllowedIps, + CachedAllowedVpcEndpointIds, CachedNodeInfo, CachedRoleSecret, ControlPlaneApi, }; use crate::intern::EndpointIdInt; use crate::metrics::Metrics; +use crate::protocol2::ConnectionInfoExtra; use crate::proxy::connect_compute::ComputeConnectBackend; use crate::proxy::NeonOptions; use crate::rate_limiter::{BucketRateLimiter, EndpointRateLimiter}; @@ -99,6 +101,13 @@ impl Backend<'_, T> { Self::Local(l) => Backend::Local(MaybeOwned::Borrowed(l)), } } + + pub(crate) fn get_api(&self) -> &ControlPlaneClient { + match self { + Self::ControlPlane(api, _) => api, + Self::Local(_) => panic!("Local backend has no API"), + } + } } impl<'a, T> Backend<'a, T> { @@ -247,15 +256,6 @@ impl AuthenticationConfig { } } -#[async_trait::async_trait] -pub(crate) trait BackendIpAllowlist { - async fn get_allowed_ips( - &self, - ctx: &RequestContext, - user_info: &ComputeUserInfo, - ) -> auth::Result>; -} - /// True to its name, this function encapsulates our current auth trade-offs. /// Here, we choose the appropriate auth flow based on circumstances. /// @@ -282,23 +282,51 @@ async fn auth_quirks( Ok(info) => (info, None), }; - debug!("fetching user's authentication info"); - let (allowed_ips, maybe_secret) = api.get_allowed_ips_and_secret(ctx, &info).await?; + debug!("fetching authentication info and allowlists"); // check allowed list - if config.ip_allowlist_check_enabled - && !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips) - { - return Err(auth::AuthError::ip_address_not_allowed(ctx.peer_addr())); + let allowed_ips = if config.ip_allowlist_check_enabled { + let allowed_ips = api.get_allowed_ips(ctx, &info).await?; + if !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips) { + return Err(auth::AuthError::ip_address_not_allowed(ctx.peer_addr())); + } + allowed_ips + } else { + Cached::new_uncached(Arc::new(vec![])) + }; + + // check if a VPC endpoint ID is coming in and if yes, if it's allowed + let access_blocks = api.get_block_public_or_vpc_access(ctx, &info).await?; + if config.is_vpc_acccess_proxy { + if access_blocks.vpc_access_blocked { + return Err(AuthError::NetworkNotAllowed); + } + + let incoming_vpc_endpoint_id = match ctx.extra() { + None => return Err(AuthError::MissingEndpointName), + Some(ConnectionInfoExtra::Aws { vpce_id }) => { + // Convert the vcpe_id to a string + String::from_utf8(vpce_id.to_vec()).unwrap_or_default() + } + Some(ConnectionInfoExtra::Azure { link_id }) => link_id.to_string(), + }; + let allowed_vpc_endpoint_ids = api.get_allowed_vpc_endpoint_ids(ctx, &info).await?; + // TODO: For now an empty VPC endpoint ID list means all are allowed. We should replace that. + if !allowed_vpc_endpoint_ids.is_empty() + && !allowed_vpc_endpoint_ids.contains(&incoming_vpc_endpoint_id) + { + return Err(AuthError::vpc_endpoint_id_not_allowed( + incoming_vpc_endpoint_id, + )); + } + } else if access_blocks.public_access_blocked { + return Err(AuthError::NetworkNotAllowed); } if !endpoint_rate_limiter.check(info.endpoint.clone().into(), 1) { return Err(AuthError::too_many_connections()); } - let cached_secret = match maybe_secret { - Some(secret) => secret, - None => api.get_role_secret(ctx, &info).await?, - }; + let cached_secret = api.get_role_secret(ctx, &info).await?; let (cached_entry, secret) = cached_secret.take_value(); let secret = if let Some(secret) = secret { @@ -440,34 +468,38 @@ impl Backend<'_, ComputeUserInfo> { } } - pub(crate) async fn get_allowed_ips_and_secret( + pub(crate) async fn get_allowed_ips( + &self, + ctx: &RequestContext, + ) -> Result { + match self { + Self::ControlPlane(api, user_info) => api.get_allowed_ips(ctx, user_info).await, + Self::Local(_) => Ok(Cached::new_uncached(Arc::new(vec![]))), + } + } + + pub(crate) async fn get_allowed_vpc_endpoint_ids( &self, ctx: &RequestContext, - ) -> Result<(CachedAllowedIps, Option), GetAuthInfoError> { + ) -> Result { match self { Self::ControlPlane(api, user_info) => { - api.get_allowed_ips_and_secret(ctx, user_info).await + api.get_allowed_vpc_endpoint_ids(ctx, user_info).await } - Self::Local(_) => Ok((Cached::new_uncached(Arc::new(vec![])), None)), + Self::Local(_) => Ok(Cached::new_uncached(Arc::new(vec![]))), } } -} -#[async_trait::async_trait] -impl BackendIpAllowlist for Backend<'_, ()> { - async fn get_allowed_ips( + pub(crate) async fn get_block_public_or_vpc_access( &self, ctx: &RequestContext, - user_info: &ComputeUserInfo, - ) -> auth::Result> { - let auth_data = match self { - Self::ControlPlane(api, ()) => api.get_allowed_ips_and_secret(ctx, user_info).await, - Self::Local(_) => Ok((Cached::new_uncached(Arc::new(vec![])), None)), - }; - - auth_data - .map(|(ips, _)| ips.as_ref().clone()) - .map_err(|e| e.into()) + ) -> Result { + match self { + Self::ControlPlane(api, user_info) => { + api.get_block_public_or_vpc_access(ctx, user_info).await + } + Self::Local(_) => Ok(Cached::new_uncached(AccessBlockerFlags::default())), + } } } @@ -514,7 +546,10 @@ mod tests { use crate::auth::{ComputeUserInfoMaybeEndpoint, IpPattern}; use crate::config::AuthenticationConfig; use crate::context::RequestContext; - use crate::control_plane::{self, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret}; + use crate::control_plane::{ + self, AccessBlockerFlags, CachedAccessBlockerFlags, CachedAllowedIps, + CachedAllowedVpcEndpointIds, CachedNodeInfo, CachedRoleSecret, + }; use crate::proxy::NeonOptions; use crate::rate_limiter::{EndpointRateLimiter, RateBucketInfo}; use crate::scram::threadpool::ThreadPool; @@ -523,6 +558,8 @@ mod tests { struct Auth { ips: Vec, + vpc_endpoint_ids: Vec, + access_blocker_flags: AccessBlockerFlags, secret: AuthSecret, } @@ -535,17 +572,31 @@ mod tests { Ok(CachedRoleSecret::new_uncached(Some(self.secret.clone()))) } - async fn get_allowed_ips_and_secret( + async fn get_allowed_ips( + &self, + _ctx: &RequestContext, + _user_info: &super::ComputeUserInfo, + ) -> Result { + Ok(CachedAllowedIps::new_uncached(Arc::new(self.ips.clone()))) + } + + async fn get_allowed_vpc_endpoint_ids( + &self, + _ctx: &RequestContext, + _user_info: &super::ComputeUserInfo, + ) -> Result { + Ok(CachedAllowedVpcEndpointIds::new_uncached(Arc::new( + self.vpc_endpoint_ids.clone(), + ))) + } + + async fn get_block_public_or_vpc_access( &self, _ctx: &RequestContext, _user_info: &super::ComputeUserInfo, - ) -> Result< - (CachedAllowedIps, Option), - control_plane::errors::GetAuthInfoError, - > { - Ok(( - CachedAllowedIps::new_uncached(Arc::new(self.ips.clone())), - Some(CachedRoleSecret::new_uncached(Some(self.secret.clone()))), + ) -> Result { + Ok(CachedAccessBlockerFlags::new_uncached( + self.access_blocker_flags.clone(), )) } @@ -575,6 +626,7 @@ mod tests { rate_limiter: AuthRateLimiter::new(&RateBucketInfo::DEFAULT_AUTH_SET), rate_limit_ip_subnet: 64, ip_allowlist_check_enabled: true, + is_vpc_acccess_proxy: false, is_auth_broker: false, accept_jwts: false, console_redirect_confirmation_timeout: std::time::Duration::from_secs(5), @@ -642,6 +694,8 @@ mod tests { let ctx = RequestContext::test(); let api = Auth { ips: vec![], + vpc_endpoint_ids: vec![], + access_blocker_flags: AccessBlockerFlags::default(), secret: AuthSecret::Scram(ServerSecret::build("my-secret-password").await.unwrap()), }; @@ -722,6 +776,8 @@ mod tests { let ctx = RequestContext::test(); let api = Auth { ips: vec![], + vpc_endpoint_ids: vec![], + access_blocker_flags: AccessBlockerFlags::default(), secret: AuthSecret::Scram(ServerSecret::build("my-secret-password").await.unwrap()), }; @@ -774,6 +830,8 @@ mod tests { let ctx = RequestContext::test(); let api = Auth { ips: vec![], + vpc_endpoint_ids: vec![], + access_blocker_flags: AccessBlockerFlags::default(), secret: AuthSecret::Scram(ServerSecret::build("my-secret-password").await.unwrap()), }; diff --git a/proxy/src/auth/mod.rs b/proxy/src/auth/mod.rs index 0198cc306e08..6082695a6b1b 100644 --- a/proxy/src/auth/mod.rs +++ b/proxy/src/auth/mod.rs @@ -55,6 +55,12 @@ pub(crate) enum AuthError { )] MissingEndpointName, + #[error( + "VPC endpoint ID is not specified. \ + This endpoint requires a VPC endpoint ID to connect." + )] + MissingVPCEndpointId, + #[error("password authentication failed for user '{0}'")] PasswordFailed(Box), @@ -69,6 +75,15 @@ pub(crate) enum AuthError { )] IpAddressNotAllowed(IpAddr), + #[error("This connection is trying to access this endpoint from a blocked network.")] + NetworkNotAllowed, + + #[error( + "This VPC endpoint id {0} is not allowed to connect to this endpoint. \ + Please add it to the allowed list in the Neon console." + )] + VpcEndpointIdNotAllowed(String), + #[error("Too many connections to this endpoint. Please try again later.")] TooManyConnections, @@ -95,6 +110,10 @@ impl AuthError { AuthError::IpAddressNotAllowed(ip) } + pub(crate) fn vpc_endpoint_id_not_allowed(id: String) -> Self { + AuthError::VpcEndpointIdNotAllowed(id) + } + pub(crate) fn too_many_connections() -> Self { AuthError::TooManyConnections } @@ -122,8 +141,11 @@ impl UserFacingError for AuthError { Self::BadAuthMethod(_) => self.to_string(), Self::MalformedPassword(_) => self.to_string(), Self::MissingEndpointName => self.to_string(), + Self::MissingVPCEndpointId => self.to_string(), Self::Io(_) => "Internal error".to_string(), Self::IpAddressNotAllowed(_) => self.to_string(), + Self::NetworkNotAllowed => self.to_string(), + Self::VpcEndpointIdNotAllowed(_) => self.to_string(), Self::TooManyConnections => self.to_string(), Self::UserTimeout(_) => self.to_string(), Self::ConfirmationTimeout(_) => self.to_string(), @@ -142,8 +164,11 @@ impl ReportableError for AuthError { Self::BadAuthMethod(_) => crate::error::ErrorKind::User, Self::MalformedPassword(_) => crate::error::ErrorKind::User, Self::MissingEndpointName => crate::error::ErrorKind::User, + Self::MissingVPCEndpointId => crate::error::ErrorKind::User, Self::Io(_) => crate::error::ErrorKind::ClientDisconnect, Self::IpAddressNotAllowed(_) => crate::error::ErrorKind::User, + Self::NetworkNotAllowed => crate::error::ErrorKind::User, + Self::VpcEndpointIdNotAllowed(_) => crate::error::ErrorKind::User, Self::TooManyConnections => crate::error::ErrorKind::RateLimit, Self::UserTimeout(_) => crate::error::ErrorKind::User, Self::ConfirmationTimeout(_) => crate::error::ErrorKind::User, diff --git a/proxy/src/bin/local_proxy.rs b/proxy/src/bin/local_proxy.rs index ee8b3d4ef579..7a855bf54b41 100644 --- a/proxy/src/bin/local_proxy.rs +++ b/proxy/src/bin/local_proxy.rs @@ -284,6 +284,7 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig rate_limiter: BucketRateLimiter::new(vec![]), rate_limit_ip_subnet: 64, ip_allowlist_check_enabled: true, + is_vpc_acccess_proxy: false, is_auth_broker: false, accept_jwts: true, console_redirect_confirmation_timeout: Duration::ZERO, diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index e1affe8391a6..de685a82c627 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -630,6 +630,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { rate_limiter: AuthRateLimiter::new(args.auth_rate_limit.clone()), rate_limit_ip_subnet: args.auth_rate_limit_ip_subnet, ip_allowlist_check_enabled: !args.is_private_access_proxy, + is_vpc_acccess_proxy: args.is_private_access_proxy, is_auth_broker: args.is_auth_broker, accept_jwts: args.is_auth_broker, console_redirect_confirmation_timeout: args.webauth_confirmation_timeout, diff --git a/proxy/src/cache/project_info.rs b/proxy/src/cache/project_info.rs index a5e71f1a8744..7651eb71a2e0 100644 --- a/proxy/src/cache/project_info.rs +++ b/proxy/src/cache/project_info.rs @@ -15,13 +15,16 @@ use tracing::{debug, info}; use super::{Cache, Cached}; use crate::auth::IpPattern; use crate::config::ProjectInfoCacheOptions; -use crate::control_plane::AuthSecret; -use crate::intern::{EndpointIdInt, ProjectIdInt, RoleNameInt}; +use crate::control_plane::{AccessBlockerFlags, AuthSecret}; +use crate::intern::{AccountIdInt, EndpointIdInt, ProjectIdInt, RoleNameInt}; use crate::types::{EndpointId, RoleName}; #[async_trait] pub(crate) trait ProjectInfoCache { fn invalidate_allowed_ips_for_project(&self, project_id: ProjectIdInt); + fn invalidate_allowed_vpc_endpoint_ids_for_projects(&self, project_ids: Vec); + fn invalidate_allowed_vpc_endpoint_ids_for_org(&self, account_id: AccountIdInt); + fn invalidate_block_public_or_vpc_access_for_project(&self, project_id: ProjectIdInt); fn invalidate_role_secret_for_project(&self, project_id: ProjectIdInt, role_name: RoleNameInt); async fn decrement_active_listeners(&self); async fn increment_active_listeners(&self); @@ -51,6 +54,8 @@ impl From for Entry { struct EndpointInfo { secret: std::collections::HashMap>>, allowed_ips: Option>>>, + block_public_or_vpc_access: Option>, + allowed_vpc_endpoint_ids: Option>>>, } impl EndpointInfo { @@ -92,9 +97,52 @@ impl EndpointInfo { } None } + pub(crate) fn get_allowed_vpc_endpoint_ids( + &self, + valid_since: Instant, + ignore_cache_since: Option, + ) -> Option<(Arc>, bool)> { + if let Some(allowed_vpc_endpoint_ids) = &self.allowed_vpc_endpoint_ids { + if valid_since < allowed_vpc_endpoint_ids.created_at { + return Some(( + allowed_vpc_endpoint_ids.value.clone(), + Self::check_ignore_cache( + ignore_cache_since, + allowed_vpc_endpoint_ids.created_at, + ), + )); + } + } + None + } + pub(crate) fn get_block_public_or_vpc_access( + &self, + valid_since: Instant, + ignore_cache_since: Option, + ) -> Option<(AccessBlockerFlags, bool)> { + if let Some(block_public_or_vpc_access) = &self.block_public_or_vpc_access { + if valid_since < block_public_or_vpc_access.created_at { + return Some(( + block_public_or_vpc_access.value.clone(), + Self::check_ignore_cache( + ignore_cache_since, + block_public_or_vpc_access.created_at, + ), + )); + } + } + None + } + pub(crate) fn invalidate_allowed_ips(&mut self) { self.allowed_ips = None; } + pub(crate) fn invalidate_allowed_vpc_endpoint_ids(&mut self) { + self.allowed_vpc_endpoint_ids = None; + } + pub(crate) fn invalidate_block_public_or_vpc_access(&mut self) { + self.block_public_or_vpc_access = None; + } pub(crate) fn invalidate_role_secret(&mut self, role_name: RoleNameInt) { self.secret.remove(&role_name); } @@ -111,6 +159,8 @@ pub struct ProjectInfoCacheImpl { cache: ClashMap, project2ep: ClashMap>, + // FIXME(stefan): we need a way to GC the account2ep map. + account2ep: ClashMap>, config: ProjectInfoCacheOptions, start_time: Instant, @@ -120,6 +170,63 @@ pub struct ProjectInfoCacheImpl { #[async_trait] impl ProjectInfoCache for ProjectInfoCacheImpl { + fn invalidate_allowed_vpc_endpoint_ids_for_projects(&self, project_ids: Vec) { + info!( + "invalidating allowed vpc endpoint ids for projects `{}`", + project_ids + .iter() + .map(|id| id.to_string()) + .collect::>() + .join(", ") + ); + for project_id in project_ids { + let endpoints = self + .project2ep + .get(&project_id) + .map(|kv| kv.value().clone()) + .unwrap_or_default(); + for endpoint_id in endpoints { + if let Some(mut endpoint_info) = self.cache.get_mut(&endpoint_id) { + endpoint_info.invalidate_allowed_vpc_endpoint_ids(); + } + } + } + } + + fn invalidate_allowed_vpc_endpoint_ids_for_org(&self, account_id: AccountIdInt) { + info!( + "invalidating allowed vpc endpoint ids for org `{}`", + account_id + ); + let endpoints = self + .account2ep + .get(&account_id) + .map(|kv| kv.value().clone()) + .unwrap_or_default(); + for endpoint_id in endpoints { + if let Some(mut endpoint_info) = self.cache.get_mut(&endpoint_id) { + endpoint_info.invalidate_allowed_vpc_endpoint_ids(); + } + } + } + + fn invalidate_block_public_or_vpc_access_for_project(&self, project_id: ProjectIdInt) { + info!( + "invalidating block public or vpc access for project `{}`", + project_id + ); + let endpoints = self + .project2ep + .get(&project_id) + .map(|kv| kv.value().clone()) + .unwrap_or_default(); + for endpoint_id in endpoints { + if let Some(mut endpoint_info) = self.cache.get_mut(&endpoint_id) { + endpoint_info.invalidate_block_public_or_vpc_access(); + } + } + } + fn invalidate_allowed_ips_for_project(&self, project_id: ProjectIdInt) { info!("invalidating allowed ips for project `{}`", project_id); let endpoints = self @@ -178,6 +285,7 @@ impl ProjectInfoCacheImpl { Self { cache: ClashMap::new(), project2ep: ClashMap::new(), + account2ep: ClashMap::new(), config, ttl_disabled_since_us: AtomicU64::new(u64::MAX), start_time: Instant::now(), @@ -226,6 +334,49 @@ impl ProjectInfoCacheImpl { } Some(Cached::new_uncached(value)) } + pub(crate) fn get_allowed_vpc_endpoint_ids( + &self, + endpoint_id: &EndpointId, + ) -> Option>>> { + let endpoint_id = EndpointIdInt::get(endpoint_id)?; + let (valid_since, ignore_cache_since) = self.get_cache_times(); + let endpoint_info = self.cache.get(&endpoint_id)?; + let value = endpoint_info.get_allowed_vpc_endpoint_ids(valid_since, ignore_cache_since); + let (value, ignore_cache) = value?; + if !ignore_cache { + let cached = Cached { + token: Some(( + self, + CachedLookupInfo::new_allowed_vpc_endpoint_ids(endpoint_id), + )), + value, + }; + return Some(cached); + } + Some(Cached::new_uncached(value)) + } + pub(crate) fn get_block_public_or_vpc_access( + &self, + endpoint_id: &EndpointId, + ) -> Option> { + let endpoint_id = EndpointIdInt::get(endpoint_id)?; + let (valid_since, ignore_cache_since) = self.get_cache_times(); + let endpoint_info = self.cache.get(&endpoint_id)?; + let value = endpoint_info.get_block_public_or_vpc_access(valid_since, ignore_cache_since); + let (value, ignore_cache) = value?; + if !ignore_cache { + let cached = Cached { + token: Some(( + self, + CachedLookupInfo::new_block_public_or_vpc_access(endpoint_id), + )), + value, + }; + return Some(cached); + } + Some(Cached::new_uncached(value)) + } + pub(crate) fn insert_role_secret( &self, project_id: ProjectIdInt, @@ -256,6 +407,43 @@ impl ProjectInfoCacheImpl { self.insert_project2endpoint(project_id, endpoint_id); self.cache.entry(endpoint_id).or_default().allowed_ips = Some(allowed_ips.into()); } + pub(crate) fn insert_allowed_vpc_endpoint_ids( + &self, + account_id: Option, + project_id: ProjectIdInt, + endpoint_id: EndpointIdInt, + allowed_vpc_endpoint_ids: Arc>, + ) { + if self.cache.len() >= self.config.size { + // If there are too many entries, wait until the next gc cycle. + return; + } + if let Some(account_id) = account_id { + self.insert_account2endpoint(account_id, endpoint_id); + } + self.insert_project2endpoint(project_id, endpoint_id); + self.cache + .entry(endpoint_id) + .or_default() + .allowed_vpc_endpoint_ids = Some(allowed_vpc_endpoint_ids.into()); + } + pub(crate) fn insert_block_public_or_vpc_access( + &self, + project_id: ProjectIdInt, + endpoint_id: EndpointIdInt, + access_blockers: AccessBlockerFlags, + ) { + if self.cache.len() >= self.config.size { + // If there are too many entries, wait until the next gc cycle. + return; + } + self.insert_project2endpoint(project_id, endpoint_id); + self.cache + .entry(endpoint_id) + .or_default() + .block_public_or_vpc_access = Some(access_blockers.into()); + } + fn insert_project2endpoint(&self, project_id: ProjectIdInt, endpoint_id: EndpointIdInt) { if let Some(mut endpoints) = self.project2ep.get_mut(&project_id) { endpoints.insert(endpoint_id); @@ -264,6 +452,14 @@ impl ProjectInfoCacheImpl { .insert(project_id, HashSet::from([endpoint_id])); } } + fn insert_account2endpoint(&self, account_id: AccountIdInt, endpoint_id: EndpointIdInt) { + if let Some(mut endpoints) = self.account2ep.get_mut(&account_id) { + endpoints.insert(endpoint_id); + } else { + self.account2ep + .insert(account_id, HashSet::from([endpoint_id])); + } + } fn get_cache_times(&self) -> (Instant, Option) { let mut valid_since = Instant::now() - self.config.ttl; // Only ignore cache if ttl is disabled. @@ -334,11 +530,25 @@ impl CachedLookupInfo { lookup_type: LookupType::AllowedIps, } } + pub(self) fn new_allowed_vpc_endpoint_ids(endpoint_id: EndpointIdInt) -> Self { + Self { + endpoint_id, + lookup_type: LookupType::AllowedVpcEndpointIds, + } + } + pub(self) fn new_block_public_or_vpc_access(endpoint_id: EndpointIdInt) -> Self { + Self { + endpoint_id, + lookup_type: LookupType::BlockPublicOrVpcAccess, + } + } } enum LookupType { RoleSecret(RoleNameInt), AllowedIps, + AllowedVpcEndpointIds, + BlockPublicOrVpcAccess, } impl Cache for ProjectInfoCacheImpl { @@ -360,6 +570,16 @@ impl Cache for ProjectInfoCacheImpl { endpoint_info.invalidate_allowed_ips(); } } + LookupType::AllowedVpcEndpointIds => { + if let Some(mut endpoint_info) = self.cache.get_mut(&key.endpoint_id) { + endpoint_info.invalidate_allowed_vpc_endpoint_ids(); + } + } + LookupType::BlockPublicOrVpcAccess => { + if let Some(mut endpoint_info) = self.cache.get_mut(&key.endpoint_id) { + endpoint_info.invalidate_block_public_or_vpc_access(); + } + } } } } diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index 9a0b954341bb..4d919f374a2d 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -12,13 +12,15 @@ use tokio::net::TcpStream; use tokio::sync::{mpsc, oneshot}; use tracing::{debug, info}; -use crate::auth::backend::{BackendIpAllowlist, ComputeUserInfo}; +use crate::auth::backend::ComputeUserInfo; use crate::auth::{check_peer_addr_is_in_list, AuthError}; use crate::config::ComputeConfig; use crate::context::RequestContext; +use crate::control_plane::ControlPlaneApi; use crate::error::ReportableError; use crate::ext::LockExt; use crate::metrics::{CancelChannelSizeGuard, CancellationRequest, Metrics, RedisMsgKind}; +use crate::protocol2::ConnectionInfoExtra; use crate::rate_limiter::LeakyBucketRateLimiter; use crate::redis::keys::KeyPrefix; use crate::redis::kv_ops::RedisKVClient; @@ -133,6 +135,9 @@ pub(crate) enum CancelError { #[error("IP is not allowed")] IpNotAllowed, + #[error("VPC endpoint id is not allowed to connect")] + VpcEndpointIdNotAllowed, + #[error("Authentication backend error")] AuthError(#[from] AuthError), @@ -152,8 +157,9 @@ impl ReportableError for CancelError { } CancelError::Postgres(_) => crate::error::ErrorKind::Compute, CancelError::RateLimit => crate::error::ErrorKind::RateLimit, - CancelError::IpNotAllowed => crate::error::ErrorKind::User, - CancelError::NotFound => crate::error::ErrorKind::User, + CancelError::IpNotAllowed + | CancelError::VpcEndpointIdNotAllowed + | CancelError::NotFound => crate::error::ErrorKind::User, CancelError::AuthError(_) => crate::error::ErrorKind::ControlPlane, CancelError::InternalError => crate::error::ErrorKind::Service, } @@ -265,11 +271,12 @@ impl CancellationHandler { /// Will fetch IP allowlist internally. /// /// return Result primarily for tests - pub(crate) async fn cancel_session( + pub(crate) async fn cancel_session( &self, key: CancelKeyData, ctx: RequestContext, - check_allowed: bool, + check_ip_allowed: bool, + check_vpc_allowed: bool, auth_backend: &T, ) -> Result<(), CancelError> { let subnet_key = match ctx.peer_addr() { @@ -304,11 +311,11 @@ impl CancellationHandler { return Err(CancelError::NotFound); }; - if check_allowed { + if check_ip_allowed { let ip_allowlist = auth_backend .get_allowed_ips(&ctx, &cancel_closure.user_info) .await - .map_err(CancelError::AuthError)?; + .map_err(|e| CancelError::AuthError(e.into()))?; if !check_peer_addr_is_in_list(&ctx.peer_addr(), &ip_allowlist) { // log it here since cancel_session could be spawned in a task @@ -320,6 +327,40 @@ impl CancellationHandler { } } + // check if a VPC endpoint ID is coming in and if yes, if it's allowed + let access_blocks = auth_backend + .get_block_public_or_vpc_access(&ctx, &cancel_closure.user_info) + .await + .map_err(|e| CancelError::AuthError(e.into()))?; + + if check_vpc_allowed { + if access_blocks.vpc_access_blocked { + return Err(CancelError::AuthError(AuthError::NetworkNotAllowed)); + } + + let incoming_vpc_endpoint_id = match ctx.extra() { + None => return Err(CancelError::AuthError(AuthError::MissingVPCEndpointId)), + Some(ConnectionInfoExtra::Aws { vpce_id }) => { + // Convert the vcpe_id to a string + String::from_utf8(vpce_id.to_vec()).unwrap_or_default() + } + Some(ConnectionInfoExtra::Azure { link_id }) => link_id.to_string(), + }; + + let allowed_vpc_endpoint_ids = auth_backend + .get_allowed_vpc_endpoint_ids(&ctx, &cancel_closure.user_info) + .await + .map_err(|e| CancelError::AuthError(e.into()))?; + // TODO: For now an empty VPC endpoint ID list means all are allowed. We should replace that. + if !allowed_vpc_endpoint_ids.is_empty() + && !allowed_vpc_endpoint_ids.contains(&incoming_vpc_endpoint_id) + { + return Err(CancelError::VpcEndpointIdNotAllowed); + } + } else if access_blocks.public_access_blocked { + return Err(CancelError::VpcEndpointIdNotAllowed); + } + Metrics::get() .proxy .cancellation_requests_total diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 8502edcfab09..1dcd37712ea2 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -68,6 +68,7 @@ pub struct AuthenticationConfig { pub rate_limiter: AuthRateLimiter, pub rate_limit_ip_subnet: u8, pub ip_allowlist_check_enabled: bool, + pub is_vpc_acccess_proxy: bool, pub jwks_cache: JwkCache, pub is_auth_broker: bool, pub accept_jwts: bool, diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index 78bfb6deacc3..c4548a7ddd95 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -182,7 +182,8 @@ pub(crate) async fn handle_client( cancel_key_data, ctx, config.authentication_config.ip_allowlist_check_enabled, - backend, + config.authentication_config.is_vpc_acccess_proxy, + backend.get_api(), ) .await .inspect_err(|e | debug!(error = ?e, "cancel_session failed")).ok(); diff --git a/proxy/src/context/mod.rs b/proxy/src/context/mod.rs index a9fb513d3ceb..3236b2e1bfb0 100644 --- a/proxy/src/context/mod.rs +++ b/proxy/src/context/mod.rs @@ -19,7 +19,7 @@ use crate::intern::{BranchIdInt, ProjectIdInt}; use crate::metrics::{ ConnectOutcome, InvalidEndpointsGroup, LatencyTimer, Metrics, Protocol, Waiting, }; -use crate::protocol2::ConnectionInfo; +use crate::protocol2::{ConnectionInfo, ConnectionInfoExtra}; use crate::types::{DbName, EndpointId, RoleName}; pub mod parquet; @@ -312,6 +312,15 @@ impl RequestContext { .ip() } + pub(crate) fn extra(&self) -> Option { + self.0 + .try_lock() + .expect("should not deadlock") + .conn_info + .extra + .clone() + } + pub(crate) fn cold_start_info(&self) -> ColdStartInfo { self.0 .try_lock() diff --git a/proxy/src/control_plane/client/cplane_proxy_v1.rs b/proxy/src/control_plane/client/cplane_proxy_v1.rs index ece03156d1fa..ef6621fc598a 100644 --- a/proxy/src/control_plane/client/cplane_proxy_v1.rs +++ b/proxy/src/control_plane/client/cplane_proxy_v1.rs @@ -22,7 +22,8 @@ use crate::control_plane::errors::{ use crate::control_plane::locks::ApiLocks; use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, Reason}; use crate::control_plane::{ - AuthInfo, AuthSecret, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret, NodeInfo, + AccessBlockerFlags, AuthInfo, AuthSecret, CachedAccessBlockerFlags, CachedAllowedIps, + CachedAllowedVpcEndpointIds, CachedNodeInfo, CachedRoleSecret, NodeInfo, }; use crate::metrics::{CacheOutcome, Metrics}; use crate::rate_limiter::WakeComputeRateLimiter; @@ -137,9 +138,6 @@ impl NeonControlPlaneClient { } }; - // Ivan: don't know where it will be used, so I leave it here - let _endpoint_vpc_ids = body.allowed_vpc_endpoint_ids.unwrap_or_default(); - let secret = if body.role_secret.is_empty() { None } else { @@ -153,10 +151,23 @@ impl NeonControlPlaneClient { .proxy .allowed_ips_number .observe(allowed_ips.len() as f64); + let allowed_vpc_endpoint_ids = body.allowed_vpc_endpoint_ids.unwrap_or_default(); + Metrics::get() + .proxy + .allowed_vpc_endpoint_ids + .observe(allowed_vpc_endpoint_ids.len() as f64); + let block_public_connections = body.block_public_connections.unwrap_or_default(); + let block_vpc_connections = body.block_vpc_connections.unwrap_or_default(); Ok(AuthInfo { secret, allowed_ips, + allowed_vpc_endpoint_ids, project_id: body.project_id, + account_id: body.account_id, + access_blocker_flags: AccessBlockerFlags { + public_access_blocked: block_public_connections, + vpc_access_blocked: block_vpc_connections, + }, }) } .inspect_err(|e| tracing::debug!(error = ?e)) @@ -299,6 +310,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient { return Ok(role_secret); } let auth_info = self.do_get_auth_info(ctx, user_info).await?; + let account_id = auth_info.account_id; if let Some(project_id) = auth_info.project_id { let normalized_ep_int = normalized_ep.into(); self.caches.project_info.insert_role_secret( @@ -312,24 +324,35 @@ impl super::ControlPlaneApi for NeonControlPlaneClient { normalized_ep_int, Arc::new(auth_info.allowed_ips), ); + self.caches.project_info.insert_allowed_vpc_endpoint_ids( + account_id, + project_id, + normalized_ep_int, + Arc::new(auth_info.allowed_vpc_endpoint_ids), + ); + self.caches.project_info.insert_block_public_or_vpc_access( + project_id, + normalized_ep_int, + auth_info.access_blocker_flags, + ); ctx.set_project_id(project_id); } // When we just got a secret, we don't need to invalidate it. Ok(Cached::new_uncached(auth_info.secret)) } - async fn get_allowed_ips_and_secret( + async fn get_allowed_ips( &self, ctx: &RequestContext, user_info: &ComputeUserInfo, - ) -> Result<(CachedAllowedIps, Option), GetAuthInfoError> { + ) -> Result { let normalized_ep = &user_info.endpoint.normalize(); if let Some(allowed_ips) = self.caches.project_info.get_allowed_ips(normalized_ep) { Metrics::get() .proxy - .allowed_ips_cache_misses + .allowed_ips_cache_misses // TODO SR: Should we rename this variable to something like allowed_ip_cache_stats? .inc(CacheOutcome::Hit); - return Ok((allowed_ips, None)); + return Ok(allowed_ips); } Metrics::get() .proxy @@ -337,7 +360,10 @@ impl super::ControlPlaneApi for NeonControlPlaneClient { .inc(CacheOutcome::Miss); let auth_info = self.do_get_auth_info(ctx, user_info).await?; let allowed_ips = Arc::new(auth_info.allowed_ips); + let allowed_vpc_endpoint_ids = Arc::new(auth_info.allowed_vpc_endpoint_ids); + let access_blocker_flags = auth_info.access_blocker_flags; let user = &user_info.user; + let account_id = auth_info.account_id; if let Some(project_id) = auth_info.project_id { let normalized_ep_int = normalized_ep.into(); self.caches.project_info.insert_role_secret( @@ -351,12 +377,136 @@ impl super::ControlPlaneApi for NeonControlPlaneClient { normalized_ep_int, allowed_ips.clone(), ); + self.caches.project_info.insert_allowed_vpc_endpoint_ids( + account_id, + project_id, + normalized_ep_int, + allowed_vpc_endpoint_ids.clone(), + ); + self.caches.project_info.insert_block_public_or_vpc_access( + project_id, + normalized_ep_int, + access_blocker_flags, + ); + ctx.set_project_id(project_id); + } + Ok(Cached::new_uncached(allowed_ips)) + } + + async fn get_allowed_vpc_endpoint_ids( + &self, + ctx: &RequestContext, + user_info: &ComputeUserInfo, + ) -> Result { + let normalized_ep = &user_info.endpoint.normalize(); + if let Some(allowed_vpc_endpoint_ids) = self + .caches + .project_info + .get_allowed_vpc_endpoint_ids(normalized_ep) + { + Metrics::get() + .proxy + .vpc_endpoint_id_cache_stats + .inc(CacheOutcome::Hit); + return Ok(allowed_vpc_endpoint_ids); + } + + Metrics::get() + .proxy + .vpc_endpoint_id_cache_stats + .inc(CacheOutcome::Miss); + + let auth_info = self.do_get_auth_info(ctx, user_info).await?; + let allowed_ips = Arc::new(auth_info.allowed_ips); + let allowed_vpc_endpoint_ids = Arc::new(auth_info.allowed_vpc_endpoint_ids); + let access_blocker_flags = auth_info.access_blocker_flags; + let user = &user_info.user; + let account_id = auth_info.account_id; + if let Some(project_id) = auth_info.project_id { + let normalized_ep_int = normalized_ep.into(); + self.caches.project_info.insert_role_secret( + project_id, + normalized_ep_int, + user.into(), + auth_info.secret.clone(), + ); + self.caches.project_info.insert_allowed_ips( + project_id, + normalized_ep_int, + allowed_ips.clone(), + ); + self.caches.project_info.insert_allowed_vpc_endpoint_ids( + account_id, + project_id, + normalized_ep_int, + allowed_vpc_endpoint_ids.clone(), + ); + self.caches.project_info.insert_block_public_or_vpc_access( + project_id, + normalized_ep_int, + access_blocker_flags, + ); + ctx.set_project_id(project_id); + } + Ok(Cached::new_uncached(allowed_vpc_endpoint_ids)) + } + + async fn get_block_public_or_vpc_access( + &self, + ctx: &RequestContext, + user_info: &ComputeUserInfo, + ) -> Result { + let normalized_ep = &user_info.endpoint.normalize(); + if let Some(access_blocker_flags) = self + .caches + .project_info + .get_block_public_or_vpc_access(normalized_ep) + { + Metrics::get() + .proxy + .access_blocker_flags_cache_stats + .inc(CacheOutcome::Hit); + return Ok(access_blocker_flags); + } + + Metrics::get() + .proxy + .access_blocker_flags_cache_stats + .inc(CacheOutcome::Miss); + + let auth_info = self.do_get_auth_info(ctx, user_info).await?; + let allowed_ips = Arc::new(auth_info.allowed_ips); + let allowed_vpc_endpoint_ids = Arc::new(auth_info.allowed_vpc_endpoint_ids); + let access_blocker_flags = auth_info.access_blocker_flags; + let user = &user_info.user; + let account_id = auth_info.account_id; + if let Some(project_id) = auth_info.project_id { + let normalized_ep_int = normalized_ep.into(); + self.caches.project_info.insert_role_secret( + project_id, + normalized_ep_int, + user.into(), + auth_info.secret.clone(), + ); + self.caches.project_info.insert_allowed_ips( + project_id, + normalized_ep_int, + allowed_ips.clone(), + ); + self.caches.project_info.insert_allowed_vpc_endpoint_ids( + account_id, + project_id, + normalized_ep_int, + allowed_vpc_endpoint_ids.clone(), + ); + self.caches.project_info.insert_block_public_or_vpc_access( + project_id, + normalized_ep_int, + access_blocker_flags.clone(), + ); ctx.set_project_id(project_id); } - Ok(( - Cached::new_uncached(allowed_ips), - Some(Cached::new_uncached(auth_info.secret)), - )) + Ok(Cached::new_uncached(access_blocker_flags)) } #[tracing::instrument(skip_all)] diff --git a/proxy/src/control_plane/client/mock.rs b/proxy/src/control_plane/client/mock.rs index 5f8bda0f35ae..1e6cde8fb080 100644 --- a/proxy/src/control_plane/client/mock.rs +++ b/proxy/src/control_plane/client/mock.rs @@ -13,12 +13,14 @@ use crate::auth::backend::ComputeUserInfo; use crate::auth::IpPattern; use crate::cache::Cached; use crate::context::RequestContext; -use crate::control_plane::client::{CachedAllowedIps, CachedRoleSecret}; +use crate::control_plane::client::{ + CachedAllowedIps, CachedAllowedVpcEndpointIds, CachedRoleSecret, +}; use crate::control_plane::errors::{ ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError, }; use crate::control_plane::messages::MetricsAuxInfo; -use crate::control_plane::{AuthInfo, AuthSecret, CachedNodeInfo, NodeInfo}; +use crate::control_plane::{AccessBlockerFlags, AuthInfo, AuthSecret, CachedNodeInfo, NodeInfo}; use crate::error::io_error; use crate::intern::RoleNameInt; use crate::types::{BranchId, EndpointId, ProjectId, RoleName}; @@ -121,7 +123,10 @@ impl MockControlPlane { Ok(AuthInfo { secret, allowed_ips, + allowed_vpc_endpoint_ids: vec![], project_id: None, + account_id: None, + access_blocker_flags: AccessBlockerFlags::default(), }) } @@ -214,16 +219,35 @@ impl super::ControlPlaneApi for MockControlPlane { )) } - async fn get_allowed_ips_and_secret( + async fn get_allowed_ips( + &self, + _ctx: &RequestContext, + user_info: &ComputeUserInfo, + ) -> Result { + Ok(Cached::new_uncached(Arc::new( + self.do_get_auth_info(user_info).await?.allowed_ips, + ))) + } + + async fn get_allowed_vpc_endpoint_ids( + &self, + _ctx: &RequestContext, + user_info: &ComputeUserInfo, + ) -> Result { + Ok(Cached::new_uncached(Arc::new( + self.do_get_auth_info(user_info) + .await? + .allowed_vpc_endpoint_ids, + ))) + } + + async fn get_block_public_or_vpc_access( &self, _ctx: &RequestContext, user_info: &ComputeUserInfo, - ) -> Result<(CachedAllowedIps, Option), GetAuthInfoError> { - Ok(( - Cached::new_uncached(Arc::new( - self.do_get_auth_info(user_info).await?.allowed_ips, - )), - None, + ) -> Result { + Ok(Cached::new_uncached( + self.do_get_auth_info(user_info).await?.access_blocker_flags, )) } diff --git a/proxy/src/control_plane/client/mod.rs b/proxy/src/control_plane/client/mod.rs index b879f3a59ff4..a06943726e50 100644 --- a/proxy/src/control_plane/client/mod.rs +++ b/proxy/src/control_plane/client/mod.rs @@ -17,7 +17,8 @@ use crate::cache::project_info::ProjectInfoCacheImpl; use crate::config::{CacheOptions, EndpointCacheConfig, ProjectInfoCacheOptions}; use crate::context::RequestContext; use crate::control_plane::{ - errors, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret, ControlPlaneApi, NodeInfoCache, + errors, CachedAccessBlockerFlags, CachedAllowedIps, CachedAllowedVpcEndpointIds, + CachedNodeInfo, CachedRoleSecret, ControlPlaneApi, NodeInfoCache, }; use crate::error::ReportableError; use crate::metrics::ApiLockMetrics; @@ -55,17 +56,45 @@ impl ControlPlaneApi for ControlPlaneClient { } } - async fn get_allowed_ips_and_secret( + async fn get_allowed_ips( &self, ctx: &RequestContext, user_info: &ComputeUserInfo, - ) -> Result<(CachedAllowedIps, Option), errors::GetAuthInfoError> { + ) -> Result { match self { - Self::ProxyV1(api) => api.get_allowed_ips_and_secret(ctx, user_info).await, + Self::ProxyV1(api) => api.get_allowed_ips(ctx, user_info).await, #[cfg(any(test, feature = "testing"))] - Self::PostgresMock(api) => api.get_allowed_ips_and_secret(ctx, user_info).await, + Self::PostgresMock(api) => api.get_allowed_ips(ctx, user_info).await, #[cfg(test)] - Self::Test(api) => api.get_allowed_ips_and_secret(), + Self::Test(api) => api.get_allowed_ips(), + } + } + + async fn get_allowed_vpc_endpoint_ids( + &self, + ctx: &RequestContext, + user_info: &ComputeUserInfo, + ) -> Result { + match self { + Self::ProxyV1(api) => api.get_allowed_vpc_endpoint_ids(ctx, user_info).await, + #[cfg(any(test, feature = "testing"))] + Self::PostgresMock(api) => api.get_allowed_vpc_endpoint_ids(ctx, user_info).await, + #[cfg(test)] + Self::Test(api) => api.get_allowed_vpc_endpoint_ids(), + } + } + + async fn get_block_public_or_vpc_access( + &self, + ctx: &RequestContext, + user_info: &ComputeUserInfo, + ) -> Result { + match self { + Self::ProxyV1(api) => api.get_block_public_or_vpc_access(ctx, user_info).await, + #[cfg(any(test, feature = "testing"))] + Self::PostgresMock(api) => api.get_block_public_or_vpc_access(ctx, user_info).await, + #[cfg(test)] + Self::Test(api) => api.get_block_public_or_vpc_access(), } } @@ -102,9 +131,15 @@ impl ControlPlaneApi for ControlPlaneClient { pub(crate) trait TestControlPlaneClient: Send + Sync + 'static { fn wake_compute(&self) -> Result; - fn get_allowed_ips_and_secret( + fn get_allowed_ips(&self) -> Result; + + fn get_allowed_vpc_endpoint_ids( + &self, + ) -> Result; + + fn get_block_public_or_vpc_access( &self, - ) -> Result<(CachedAllowedIps, Option), errors::GetAuthInfoError>; + ) -> Result; fn dyn_clone(&self) -> Box; } diff --git a/proxy/src/control_plane/messages.rs b/proxy/src/control_plane/messages.rs index d068614b24df..5883d02b92c7 100644 --- a/proxy/src/control_plane/messages.rs +++ b/proxy/src/control_plane/messages.rs @@ -4,7 +4,7 @@ use measured::FixedCardinalityLabel; use serde::{Deserialize, Serialize}; use crate::auth::IpPattern; -use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt, RoleNameInt}; +use crate::intern::{AccountIdInt, BranchIdInt, EndpointIdInt, ProjectIdInt, RoleNameInt}; use crate::proxy::retry::CouldRetry; /// Generic error response with human-readable description. @@ -227,8 +227,11 @@ pub(crate) struct UserFacingMessage { pub(crate) struct GetEndpointAccessControl { pub(crate) role_secret: Box, pub(crate) allowed_ips: Option>, + pub(crate) allowed_vpc_endpoint_ids: Option>, pub(crate) project_id: Option, - pub(crate) allowed_vpc_endpoint_ids: Option>, + pub(crate) account_id: Option, + pub(crate) block_public_connections: Option, + pub(crate) block_vpc_connections: Option, } /// Response which holds compute node's `host:port` pair. @@ -282,6 +285,10 @@ pub(crate) struct DatabaseInfo { pub(crate) aux: MetricsAuxInfo, #[serde(default)] pub(crate) allowed_ips: Option>, + #[serde(default)] + pub(crate) allowed_vpc_endpoint_ids: Option>, + #[serde(default)] + pub(crate) public_access_allowed: Option, } // Manually implement debug to omit sensitive info. @@ -293,6 +300,7 @@ impl fmt::Debug for DatabaseInfo { .field("dbname", &self.dbname) .field("user", &self.user) .field("allowed_ips", &self.allowed_ips) + .field("allowed_vpc_endpoint_ids", &self.allowed_vpc_endpoint_ids) .finish_non_exhaustive() } } @@ -457,19 +465,31 @@ mod tests { #[test] fn parse_get_role_secret() -> anyhow::Result<()> { - // Empty `allowed_ips` field. + // Empty `allowed_ips` and `allowed_vpc_endpoint_ids` field. + let json = json!({ + "role_secret": "secret", + }); + serde_json::from_str::(&json.to_string())?; + let json = json!({ + "role_secret": "secret", + "allowed_ips": ["8.8.8.8"], + }); + serde_json::from_str::(&json.to_string())?; let json = json!({ "role_secret": "secret", + "allowed_vpc_endpoint_ids": ["vpce-0abcd1234567890ef"], }); serde_json::from_str::(&json.to_string())?; let json = json!({ "role_secret": "secret", "allowed_ips": ["8.8.8.8"], + "allowed_vpc_endpoint_ids": ["vpce-0abcd1234567890ef"], }); serde_json::from_str::(&json.to_string())?; let json = json!({ "role_secret": "secret", "allowed_ips": ["8.8.8.8"], + "allowed_vpc_endpoint_ids": ["vpce-0abcd1234567890ef"], "project_id": "project", }); serde_json::from_str::(&json.to_string())?; diff --git a/proxy/src/control_plane/mod.rs b/proxy/src/control_plane/mod.rs index 1dca26d6866c..f92e4f3f6055 100644 --- a/proxy/src/control_plane/mod.rs +++ b/proxy/src/control_plane/mod.rs @@ -19,6 +19,7 @@ use crate::cache::{Cached, TimedLru}; use crate::config::ComputeConfig; use crate::context::RequestContext; use crate::control_plane::messages::{ControlPlaneErrorMessage, MetricsAuxInfo}; +use crate::intern::AccountIdInt; use crate::intern::ProjectIdInt; use crate::types::{EndpointCacheKey, EndpointId}; use crate::{compute, scram}; @@ -52,8 +53,14 @@ pub(crate) struct AuthInfo { pub(crate) secret: Option, /// List of IP addresses allowed for the autorization. pub(crate) allowed_ips: Vec, + /// List of VPC endpoints allowed for the autorization. + pub(crate) allowed_vpc_endpoint_ids: Vec, /// Project ID. This is used for cache invalidation. pub(crate) project_id: Option, + /// Account ID. This is used for cache invalidation. + pub(crate) account_id: Option, + /// Are public connections or VPC connections blocked? + pub(crate) access_blocker_flags: AccessBlockerFlags, } /// Info for establishing a connection to a compute node. @@ -95,11 +102,21 @@ impl NodeInfo { } } +#[derive(Clone, Default, Eq, PartialEq, Debug)] +pub(crate) struct AccessBlockerFlags { + pub public_access_blocked: bool, + pub vpc_access_blocked: bool, +} + pub(crate) type NodeInfoCache = TimedLru>>; pub(crate) type CachedNodeInfo = Cached<&'static NodeInfoCache, NodeInfo>; pub(crate) type CachedRoleSecret = Cached<&'static ProjectInfoCacheImpl, Option>; pub(crate) type CachedAllowedIps = Cached<&'static ProjectInfoCacheImpl, Arc>>; +pub(crate) type CachedAllowedVpcEndpointIds = + Cached<&'static ProjectInfoCacheImpl, Arc>>; +pub(crate) type CachedAccessBlockerFlags = + Cached<&'static ProjectInfoCacheImpl, AccessBlockerFlags>; /// This will allocate per each call, but the http requests alone /// already require a few allocations, so it should be fine. @@ -113,11 +130,23 @@ pub(crate) trait ControlPlaneApi { user_info: &ComputeUserInfo, ) -> Result; - async fn get_allowed_ips_and_secret( + async fn get_allowed_ips( + &self, + ctx: &RequestContext, + user_info: &ComputeUserInfo, + ) -> Result; + + async fn get_allowed_vpc_endpoint_ids( + &self, + ctx: &RequestContext, + user_info: &ComputeUserInfo, + ) -> Result; + + async fn get_block_public_or_vpc_access( &self, ctx: &RequestContext, user_info: &ComputeUserInfo, - ) -> Result<(CachedAllowedIps, Option), errors::GetAuthInfoError>; + ) -> Result; async fn get_endpoint_jwks( &self, diff --git a/proxy/src/intern.rs b/proxy/src/intern.rs index 79c6020302af..0d1382679c81 100644 --- a/proxy/src/intern.rs +++ b/proxy/src/intern.rs @@ -7,7 +7,7 @@ use std::sync::OnceLock; use lasso::{Capacity, MemoryLimits, Spur, ThreadedRodeo}; use rustc_hash::FxHasher; -use crate::types::{BranchId, EndpointId, ProjectId, RoleName}; +use crate::types::{AccountId, BranchId, EndpointId, ProjectId, RoleName}; pub trait InternId: Sized + 'static { fn get_interner() -> &'static StringInterner; @@ -206,6 +206,26 @@ impl From for ProjectIdInt { } } +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +pub struct AccountIdTag; +impl InternId for AccountIdTag { + fn get_interner() -> &'static StringInterner { + static ROLE_NAMES: OnceLock> = OnceLock::new(); + ROLE_NAMES.get_or_init(Default::default) + } +} +pub type AccountIdInt = InternedString; +impl From<&AccountId> for AccountIdInt { + fn from(value: &AccountId) -> Self { + AccountIdTag::get_interner().get_or_intern(value) + } +} +impl From for AccountIdInt { + fn from(value: AccountId) -> Self { + AccountIdTag::get_interner().get_or_intern(&value) + } +} + #[cfg(test)] #[expect(clippy::unwrap_used)] mod tests { diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index f3d281a26b59..25bcc81108b6 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -96,6 +96,16 @@ pub struct ProxyMetrics { #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))] pub allowed_ips_number: Histogram<10>, + /// Number of cache hits/misses for VPC endpoint IDs. + pub vpc_endpoint_id_cache_stats: CounterVec>, + + /// Number of cache hits/misses for access blocker flags. + pub access_blocker_flags_cache_stats: CounterVec>, + + /// Number of allowed VPC endpoints IDs + #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))] + pub allowed_vpc_endpoint_ids: Histogram<10>, + /// Number of connections (per sni). pub accepted_connections_by_sni: CounterVec>, @@ -570,6 +580,9 @@ pub enum RedisEventsCount { CancelSession, PasswordUpdate, AllowedIpsUpdate, + AllowedVpcEndpointIdsUpdateForProjects, + AllowedVpcEndpointIdsUpdateForAllProjectsInOrg, + BlockPublicOrVpcAccessUpdate, } pub struct ThreadPoolWorkers(usize); diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index ab173bd0d052..8a407c811971 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -283,7 +283,8 @@ pub(crate) async fn handle_client( cancel_key_data, ctx, config.authentication_config.ip_allowlist_check_enabled, - auth_backend, + config.authentication_config.is_vpc_acccess_proxy, + auth_backend.get_api(), ) .await .inspect_err(|e | debug!(error = ?e, "cancel_session failed")).ok(); diff --git a/proxy/src/proxy/tests/mod.rs b/proxy/src/proxy/tests/mod.rs index 10db2bcb303f..d8c00a9b4177 100644 --- a/proxy/src/proxy/tests/mod.rs +++ b/proxy/src/proxy/tests/mod.rs @@ -26,7 +26,7 @@ use crate::config::{ComputeConfig, RetryConfig}; use crate::control_plane::client::{ControlPlaneClient, TestControlPlaneClient}; use crate::control_plane::messages::{ControlPlaneErrorMessage, Details, MetricsAuxInfo, Status}; use crate::control_plane::{ - self, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret, NodeInfo, NodeInfoCache, + self, CachedAllowedIps, CachedAllowedVpcEndpointIds, CachedNodeInfo, NodeInfo, NodeInfoCache, }; use crate::error::ErrorKind; use crate::tls::client_config::compute_client_config_with_certs; @@ -526,9 +526,19 @@ impl TestControlPlaneClient for TestConnectMechanism { } } - fn get_allowed_ips_and_secret( + fn get_allowed_ips(&self) -> Result { + unimplemented!("not used in tests") + } + + fn get_allowed_vpc_endpoint_ids( + &self, + ) -> Result { + unimplemented!("not used in tests") + } + + fn get_block_public_or_vpc_access( &self, - ) -> Result<(CachedAllowedIps, Option), control_plane::errors::GetAuthInfoError> + ) -> Result { unimplemented!("not used in tests") } diff --git a/proxy/src/redis/notifications.rs b/proxy/src/redis/notifications.rs index 19fdd3280dfc..1a7024588aa1 100644 --- a/proxy/src/redis/notifications.rs +++ b/proxy/src/redis/notifications.rs @@ -10,7 +10,7 @@ use uuid::Uuid; use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider; use crate::cache::project_info::ProjectInfoCache; -use crate::intern::{ProjectIdInt, RoleNameInt}; +use crate::intern::{AccountIdInt, ProjectIdInt, RoleNameInt}; use crate::metrics::{Metrics, RedisErrors, RedisEventsCount}; const CPLANE_CHANNEL_NAME: &str = "neondb-proxy-ws-updates"; @@ -86,9 +86,7 @@ pub(crate) struct BlockPublicOrVpcAccessUpdated { #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] pub(crate) struct AllowedVpcEndpointsUpdatedForOrg { - // TODO: change type once the implementation is more fully fledged. - // See e.g. https://github.com/neondatabase/neon/pull/10073. - account_id: ProjectIdInt, + account_id: AccountIdInt, } #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] @@ -205,6 +203,24 @@ impl MessageHandler { .proxy .redis_events_count .inc(RedisEventsCount::PasswordUpdate); + } else if matches!( + msg, + Notification::AllowedVpcEndpointsUpdatedForProjects { .. } + ) { + Metrics::get() + .proxy + .redis_events_count + .inc(RedisEventsCount::AllowedVpcEndpointIdsUpdateForProjects); + } else if matches!(msg, Notification::AllowedVpcEndpointsUpdatedForOrg { .. }) { + Metrics::get() + .proxy + .redis_events_count + .inc(RedisEventsCount::AllowedVpcEndpointIdsUpdateForAllProjectsInOrg); + } else if matches!(msg, Notification::BlockPublicOrVpcAccessUpdated { .. }) { + Metrics::get() + .proxy + .redis_events_count + .inc(RedisEventsCount::BlockPublicOrVpcAccessUpdate); } // TODO: add additional metrics for the other event types. @@ -230,20 +246,26 @@ fn invalidate_cache(cache: Arc, msg: Notification) { Notification::AllowedIpsUpdate { allowed_ips_update } => { cache.invalidate_allowed_ips_for_project(allowed_ips_update.project_id); } + Notification::BlockPublicOrVpcAccessUpdated { + block_public_or_vpc_access_updated, + } => cache.invalidate_block_public_or_vpc_access_for_project( + block_public_or_vpc_access_updated.project_id, + ), + Notification::AllowedVpcEndpointsUpdatedForOrg { + allowed_vpc_endpoints_updated_for_org, + } => cache.invalidate_allowed_vpc_endpoint_ids_for_org( + allowed_vpc_endpoints_updated_for_org.account_id, + ), + Notification::AllowedVpcEndpointsUpdatedForProjects { + allowed_vpc_endpoints_updated_for_projects, + } => cache.invalidate_allowed_vpc_endpoint_ids_for_projects( + allowed_vpc_endpoints_updated_for_projects.project_ids, + ), Notification::PasswordUpdate { password_update } => cache .invalidate_role_secret_for_project( password_update.project_id, password_update.role_name, ), - Notification::BlockPublicOrVpcAccessUpdated { .. } => { - // https://github.com/neondatabase/neon/pull/10073 - } - Notification::AllowedVpcEndpointsUpdatedForOrg { .. } => { - // https://github.com/neondatabase/neon/pull/10073 - } - Notification::AllowedVpcEndpointsUpdatedForProjects { .. } => { - // https://github.com/neondatabase/neon/pull/10073 - } Notification::UnknownTopic => unreachable!(), } } diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index 6d5fb13681e9..0fb4a8a6cc70 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -30,6 +30,7 @@ use crate::control_plane::locks::ApiLocks; use crate::control_plane::CachedNodeInfo; use crate::error::{ErrorKind, ReportableError, UserFacingError}; use crate::intern::EndpointIdInt; +use crate::protocol2::ConnectionInfoExtra; use crate::proxy::connect_compute::ConnectMechanism; use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute}; use crate::rate_limiter::EndpointRateLimiter; @@ -57,23 +58,52 @@ impl PoolingBackend { let user_info = user_info.clone(); let backend = self.auth_backend.as_ref().map(|()| user_info.clone()); - let (allowed_ips, maybe_secret) = backend.get_allowed_ips_and_secret(ctx).await?; + let allowed_ips = backend.get_allowed_ips(ctx).await?; + if self.config.authentication_config.ip_allowlist_check_enabled && !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips) { return Err(AuthError::ip_address_not_allowed(ctx.peer_addr())); } + + let access_blocker_flags = backend.get_block_public_or_vpc_access(ctx).await?; + if self.config.authentication_config.is_vpc_acccess_proxy { + if access_blocker_flags.vpc_access_blocked { + return Err(AuthError::NetworkNotAllowed); + } + + let extra = ctx.extra(); + let incoming_endpoint_id = match extra { + None => String::new(), + Some(ConnectionInfoExtra::Aws { vpce_id }) => { + // Convert the vcpe_id to a string + String::from_utf8(vpce_id.to_vec()).unwrap_or_default() + } + Some(ConnectionInfoExtra::Azure { link_id }) => link_id.to_string(), + }; + + if incoming_endpoint_id.is_empty() { + return Err(AuthError::MissingVPCEndpointId); + } + + let allowed_vpc_endpoint_ids = backend.get_allowed_vpc_endpoint_ids(ctx).await?; + // TODO: For now an empty VPC endpoint ID list means all are allowed. We should replace that. + if !allowed_vpc_endpoint_ids.is_empty() + && !allowed_vpc_endpoint_ids.contains(&incoming_endpoint_id) + { + return Err(AuthError::vpc_endpoint_id_not_allowed(incoming_endpoint_id)); + } + } else if access_blocker_flags.public_access_blocked { + return Err(AuthError::NetworkNotAllowed); + } + if !self .endpoint_rate_limiter .check(user_info.endpoint.clone().into(), 1) { return Err(AuthError::too_many_connections()); } - let cached_secret = match maybe_secret { - Some(secret) => secret, - None => backend.get_role_secret(ctx).await?, - }; - + let cached_secret = backend.get_role_secret(ctx).await?; let secret = match cached_secret.value.clone() { Some(secret) => self.config.authentication_config.check_rate_limit( ctx, diff --git a/proxy/src/types.rs b/proxy/src/types.rs index 6e0bd61c9442..d5952d1d8b0a 100644 --- a/proxy/src/types.rs +++ b/proxy/src/types.rs @@ -97,6 +97,8 @@ smol_str_wrapper!(EndpointId); smol_str_wrapper!(BranchId); // 90% of project strings are 23 characters or less. smol_str_wrapper!(ProjectId); +// 90% of account strings are 23 characters or less. +smol_str_wrapper!(AccountId); // will usually equal endpoint ID smol_str_wrapper!(EndpointCacheKey); From 6318828c639ba85e66da321f7b32828ee945de12 Mon Sep 17 00:00:00 2001 From: Folke Behrens Date: Fri, 31 Jan 2025 21:52:17 +0100 Subject: [PATCH 16/18] Update rust to 1.84.1 (#10618) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We keep the practice of keeping the compiler up to date, pointing to the latest release. This is done by many other projects in the Rust ecosystem as well. [Release notes](https://releases.rs/docs/1.84.1/). Prior update was in https://github.com/neondatabase/neon/pull/10328. Co-authored-by: Arpad Müller --- build-tools.Dockerfile | 2 +- rust-toolchain.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build-tools.Dockerfile b/build-tools.Dockerfile index dfcc7d06b493..f744b44808c6 100644 --- a/build-tools.Dockerfile +++ b/build-tools.Dockerfile @@ -253,7 +253,7 @@ WORKDIR /home/nonroot # Rust # Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`) -ENV RUSTC_VERSION=1.84.0 +ENV RUSTC_VERSION=1.84.1 ENV RUSTUP_HOME="/home/nonroot/.rustup" ENV PATH="/home/nonroot/.cargo/bin:${PATH}" ARG RUSTFILT_VERSION=0.2.1 diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 06746d3e1dd5..38a7f202ba0a 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,5 +1,5 @@ [toolchain] -channel = "1.84.0" +channel = "1.84.1" profile = "default" # The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy. # https://rust-lang.github.io/rustup/concepts/profiles.html From b9e1a6724628aad5cf62737f6acab60ec23ff09b Mon Sep 17 00:00:00 2001 From: Peter Bendel Date: Sat, 1 Feb 2025 12:09:45 +0100 Subject: [PATCH 17/18] fix generate matrix for olap for saturdays (#10622) ## Problem when introducing pg17 for job step `Generate matrix for OLAP benchmarks` I introduced a syntax error that only hits on Saturdays. ## Summary of changes Remove trailing comma ## successful test run https://github.com/neondatabase/neon/actions/runs/13086363907 --- .github/workflows/benchmarking.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/benchmarking.yml b/.github/workflows/benchmarking.yml index 20a8a6e2c9dc..413af90deca8 100644 --- a/.github/workflows/benchmarking.yml +++ b/.github/workflows/benchmarking.yml @@ -341,7 +341,7 @@ jobs: ], "pg_version" : [ 16,17 - ], + ] }' if [ "$(date +%A)" = "Saturday" ] || [ ${RUN_AWS_RDS_AND_AURORA} = "true" ]; then From 8ae6f656a694a2d6892ce6ebd1475d1b831ba917 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 3 Feb 2025 05:11:06 +0100 Subject: [PATCH 18/18] Don't require partial backup semaphore capacity for deletions (#10628) In the safekeeper, we block deletions on the timeline's gate closing, and any `WalResidentTimeline` keeps the gate open (because it owns a gate lock object). Thus, unless the `main_task` function of a partial backup doesn't return, we can't delete the associated timeline. In order to make these tasks exit early, we call the cancellation token of the timeline upon its shutdown. However, the partial backup task wasn't looking for the cancellation while waiting to acquire a partial backup permit. On a staging safekeeper we have been in a situation in the past where the semaphore was already empty for a duration of many hours, rendering all attempted deletions unable to proceed until a restart where the semaphore was reset: https://neondb.slack.com/archives/C03H1K0PGKH/p1738416586442029 --- safekeeper/src/wal_backup_partial.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/safekeeper/src/wal_backup_partial.rs b/safekeeper/src/wal_backup_partial.rs index 4e5b34a9bf65..5ecb23e8e04b 100644 --- a/safekeeper/src/wal_backup_partial.rs +++ b/safekeeper/src/wal_backup_partial.rs @@ -535,6 +535,10 @@ pub async fn main_task( // limit concurrent uploads let _upload_permit = tokio::select! { acq = limiter.acquire_partial_backup() => acq, + _ = backup.tli.cancel.cancelled() => { + info!("timeline canceled"); + return None; + } _ = cancel.cancelled() => { info!("task canceled"); return None;