diff --git a/docs/changelog/94420.yaml b/docs/changelog/94420.yaml new file mode 100644 index 0000000000000..70699c55f7295 --- /dev/null +++ b/docs/changelog/94420.yaml @@ -0,0 +1,5 @@ +pr: 94420 +summary: Secondary credentials used with transforms should only require source and destination index privileges, not transform privileges +area: Transform +type: bug +issues: [] diff --git a/x-pack/plugin/transform/qa/multi-node-tests/build.gradle b/x-pack/plugin/transform/qa/multi-node-tests/build.gradle index 718dc1372a184..f57ba8cbd2e5a 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/build.gradle +++ b/x-pack/plugin/transform/qa/multi-node-tests/build.gradle @@ -49,4 +49,5 @@ testClusters.matching { it.name == 'javaRestTest' }.configureEach { user username: "x_pack_rest_user", password: "x-pack-test-password" user username: "john_junior", password: "x-pack-test-password", role: "transform_admin" user username: "bill_senior", password: "x-pack-test-password", role: "transform_admin,source_index_access" + user username: "not_a_transform_admin", password: "x-pack-test-password", role: "source_index_access" } diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java index a7c30a0efe4b7..4c118f1b6dbb6 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java @@ -44,6 +44,8 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase { private static final String JUNIOR_HEADER = basicAuthHeaderValue(JUNIOR_USERNAME, TEST_PASSWORD_SECURE_STRING); private static final String SENIOR_USERNAME = "bill_senior"; private static final String SENIOR_HEADER = basicAuthHeaderValue(SENIOR_USERNAME, TEST_PASSWORD_SECURE_STRING); + private static final String NOT_A_TRANSFORM_ADMIN = "not_a_transform_admin"; + private static final String NOT_A_TRANSFORM_ADMIN_HEADER = basicAuthHeaderValue(NOT_A_TRANSFORM_ADMIN, TEST_PASSWORD_SECURE_STRING); private static final int NUM_USERS = 28; @@ -87,7 +89,6 @@ private void testTransformPermissionsNoDeferValidation(boolean unattended) throw .build() ) ); - assertThat(e.getResponse().getStatusLine().getStatusCode(), is(equalTo(403))); assertThat( e.getMessage(), @@ -171,6 +172,40 @@ public void testTransformPermissionsDeferValidationNoUnattended() throws Excepti assertThat(extractValue(getTransformStats(transformId), "health", "status"), is(equalTo("green"))); } + /** + * defer_validation = true + * unattended = false + */ + @SuppressWarnings("unchecked") + public void testNoTransformAdminRoleInSecondaryAuth() throws Exception { + String transformId = "transform-permissions-no-admin-role"; + String sourceIndexName = transformId + "-index"; + String destIndexName = sourceIndexName + "-dest"; + createReviewsIndex(sourceIndexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow); + + TransformConfig config = createConfig(transformId, sourceIndexName, destIndexName, false); + + // PUT with defer_validation should work even though the secondary auth does not have transform_admin role + putTransform( + transformId, + Strings.toString(config), + RequestOptions.DEFAULT.toBuilder() + .addHeader(SECONDARY_AUTH_KEY, NOT_A_TRANSFORM_ADMIN_HEADER) + .addParameter("defer_validation", String.valueOf(true)) + .build() + ); + + // _update should work even though the secondary auth does not have transform_admin role + updateConfig( + transformId, + "{}", + RequestOptions.DEFAULT.toBuilder().addHeader(SECONDARY_AUTH_KEY, NOT_A_TRANSFORM_ADMIN_HEADER).build() + ); + + // _start works because user not_a_transform_admin has data access + startTransform(config.getId(), RequestOptions.DEFAULT); + } + /** * defer_validation = true * unattended = true @@ -212,6 +247,37 @@ public void testTransformPermissionsDeferValidationUnattended() throws Exception assertThat(extractValue(getTransformStats(transformId), "health", "status"), is(equalTo("green"))); } + public void testPreviewRequestFailsPermissionsCheck() throws Exception { + String transformId = "transform-permissions-preview"; + String sourceIndexName = transformId + "-index"; + String destIndexName = sourceIndexName + "-dest"; + createReviewsIndex(sourceIndexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow); + + TransformConfig config = createConfig(transformId, sourceIndexName, destIndexName, false); + + ResponseException e = expectThrows( + ResponseException.class, + () -> previewTransform(Strings.toString(config), RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, JUNIOR_HEADER).build()) + ); + assertThat(e.getResponse().getStatusLine().getStatusCode(), is(equalTo(403))); + assertThat( + e.getMessage(), + containsString( + String.format( + Locale.ROOT, + "Cannot preview transform [%s] because user %s lacks the required permissions " + + "[%s:[read, view_index_metadata], %s:[create_index, index, read]]", + transformId, + JUNIOR_USERNAME, + sourceIndexName, + destIndexName + ) + ) + ); + + previewTransform(Strings.toString(config), RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, SENIOR_HEADER).build()); + } + @Override protected Settings restAdminSettings() { return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", TEST_ADMIN_HEADER).build(); diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java index 012b35a28afc1..d44b642ebcb69 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java @@ -562,6 +562,15 @@ protected static Map getTransforms(int from, int size) throws IO return entityAsMap(response); } + protected Map getTransformConfig(String transformId, String authHeader) throws IOException { + Request getRequest = createRequestWithAuth("GET", getTransformEndpoint() + transformId, authHeader); + Map transforms = entityAsMap(client().performRequest(getRequest)); + assertEquals(1, XContentMapValues.extractValue("count", transforms)); + @SuppressWarnings("unchecked") + Map transformConfig = ((List>) transforms.get("transforms")).get(0); + return transformConfig; + } + protected static String getTransformState(String transformId) throws IOException { Map transformStatsAsMap = getTransformStateAndStats(transformId); return transformStatsAsMap == null ? null : (String) XContentMapValues.extractValue("state", transformStatsAsMap); diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUpdateIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUpdateIT.java index 11f12c38b2aba..3ea469c405957 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUpdateIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUpdateIT.java @@ -117,12 +117,9 @@ public void testUpdateDeprecatedSettings() throws Exception { createTransformRequest.setJsonEntity(config); Map createTransformResponse = entityAsMap(client().performRequest(createTransformRequest)); - assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); - Request getRequest = createRequestWithAuth("GET", getTransformEndpoint() + transformId, BASIC_AUTH_VALUE_TRANSFORM_USER); - Map transforms = entityAsMap(client().performRequest(getRequest)); - assertEquals(1, XContentMapValues.extractValue("count", transforms)); - Map transform = ((List>) XContentMapValues.extractValue("transforms", transforms)).get(0); + + Map transform = getTransformConfig(transformId, BASIC_AUTH_VALUE_TRANSFORM_USER); assertThat(XContentMapValues.extractValue("pivot.max_page_search_size", transform), equalTo(555)); final Request updateRequest = createRequestWithAuth( @@ -137,10 +134,7 @@ public void testUpdateDeprecatedSettings() throws Exception { assertNull(XContentMapValues.extractValue("pivot.max_page_search_size", updateResponse)); assertThat(XContentMapValues.extractValue("settings.max_page_search_size", updateResponse), equalTo(555)); - getRequest = createRequestWithAuth("GET", getTransformEndpoint() + transformId, BASIC_AUTH_VALUE_TRANSFORM_USER); - transforms = entityAsMap(client().performRequest(getRequest)); - assertEquals(1, XContentMapValues.extractValue("count", transforms)); - transform = ((List>) XContentMapValues.extractValue("transforms", transforms)).get(0); + transform = getTransformConfig(transformId, BASIC_AUTH_VALUE_TRANSFORM_USER); assertNull(XContentMapValues.extractValue("pivot.max_page_search_size", transform)); assertThat(XContentMapValues.extractValue("settings.max_page_search_size", transform), equalTo(555)); @@ -210,14 +204,10 @@ private void updateTransferRightsTester(boolean useSecondaryAuthHeaders) throws createTransformRequest.setJsonEntity(config); Map createTransformResponse = entityAsMap(client().performRequest(createTransformRequest)); - assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); - Request getRequest = createRequestWithAuth("GET", getTransformEndpoint() + transformId, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_2); - Map transforms = entityAsMap(client().performRequest(getRequest)); - assertEquals(1, XContentMapValues.extractValue("count", transforms)); + + Map transformConfig = getTransformConfig(transformId, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_2); // Confirm the roles were recorded as expected in the stored headers - @SuppressWarnings("unchecked") - Map transformConfig = ((List>) transforms.get("transforms")).get(0); assertThat(transformConfig.get("authorization"), equalTo(Map.of("roles", List.of("transform_admin", DATA_ACCESS_ROLE_2)))); // create a 2nd, identical one @@ -231,16 +221,14 @@ private void updateTransferRightsTester(boolean useSecondaryAuthHeaders) throws // getting the transform with the just deleted admin 2 user should fail try { - client().performRequest(getRequest); + getTransformConfig(transformId, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_2); fail("request should have failed"); } catch (ResponseException e) { assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(401)); } // get the transform with admin 1 - getRequest = createRequestWithAuth("GET", getTransformEndpoint() + transformId, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1); - transforms = entityAsMap(client().performRequest(getRequest)); - assertEquals(1, XContentMapValues.extractValue("count", transforms)); + transformConfig = getTransformConfig(transformId, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1); // start using admin 1, but as the header is still admin 2 // This fails as the stored header is still admin 2 @@ -278,9 +266,7 @@ private void updateTransferRightsTester(boolean useSecondaryAuthHeaders) throws assertOK(client().performRequest(updateRequest)); // get should still work - getRequest = createRequestWithAuth("GET", getTransformEndpoint() + transformIdCloned, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1); - transforms = entityAsMap(client().performRequest(getRequest)); - assertEquals(1, XContentMapValues.extractValue("count", transforms)); + getTransformConfig(transformIdCloned, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1); // start with updated configuration should succeed if (useSecondaryAuthHeaders) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformUpdater.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformUpdater.java index eb015b80451a7..a0d06066afb35 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformUpdater.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformUpdater.java @@ -204,10 +204,9 @@ private static void validateTransform( TimeValue timeout, ActionListener> listener ) { - ClientHelper.executeWithHeadersAsync( - config.getHeaders(), - ClientHelper.TRANSFORM_ORIGIN, + ClientHelper.executeAsyncWithOrigin( client, + ClientHelper.TRANSFORM_ORIGIN, ValidateTransformAction.INSTANCE, new ValidateTransformAction.Request(config, deferValidation, timeout), ActionListener.wrap(response -> listener.onResponse(response.getDestIndexMappings()), listener::onFailure) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java index 626d2b9158afe..47d768e01daf0 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java @@ -38,7 +38,6 @@ import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentType; -import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.common.validation.SourceDestValidator; import org.elasticsearch.xpack.core.security.SecurityContext; @@ -65,7 +64,7 @@ import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.core.transform.action.PreviewTransformAction.DUMMY_DEST_INDEX_FOR_PREVIEW; -import static org.elasticsearch.xpack.transform.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable; +import static org.elasticsearch.xpack.transform.utils.SecondaryAuthorizationUtils.getSecurityHeadersPreferringSecondary; public class TransportPreviewTransformAction extends HandledTransportAction { @@ -138,19 +137,16 @@ protected void doExecute(Task task, Request request, ActionListener li // <4> Validate transform query ActionListener validateConfigListener = ActionListener.wrap( - validateConfigResponse -> useSecondaryAuthIfAvailable( - securityContext, - () -> getPreview( - parentTaskId, - request.timeout(), - config.getId(), // note: @link{PreviewTransformAction} sets an id, so this is never null - function, - config.getSource(), - config.getDestination().getPipeline(), - config.getDestination().getIndex(), - config.getSyncConfig(), - listener - ) + validateConfigResponse -> getPreview( + parentTaskId, + request.timeout(), + config.getId(), // note: @link{PreviewTransformAction} sets an id, so this is never null + function, + config.getSource(), + config.getDestination().getPipeline(), + config.getDestination().getIndex(), + config.getSyncConfig(), + listener ), listener::onFailure ); @@ -209,6 +205,12 @@ private void getPreview( final SetOnce> mappings = new SetOnce<>(); + final Map filteredHeaders = getSecurityHeadersPreferringSecondary( + threadPool, + securityContext, + clusterService.state() + ); + ActionListener pipelineResponseActionListener = ActionListener.wrap(simulatePipelineResponse -> { List> docs = new ArrayList<>(simulatePipelineResponse.getResults().size()); List> errors = new ArrayList<>(); @@ -276,7 +278,7 @@ private void getPreview( function.preview( parentTaskAssigningClient, timeout, - ClientHelper.getPersistableSafeSecurityHeaders(threadPool.getThreadContext(), clusterService.state()), + filteredHeaders, source, deducedMappings, NUMBER_OF_PREVIEW_BUCKETS, @@ -284,6 +286,6 @@ private void getPreview( ); }, listener::onFailure); - function.deduceMappings(parentTaskAssigningClient, source, deduceMappingsListener); + function.deduceMappings(parentTaskAssigningClient, filteredHeaders, source, deduceMappingsListener); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java index 877cd6ee2b7dd..780aac0c1ee5b 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java @@ -44,10 +44,9 @@ import java.time.Instant; import java.util.List; -import java.util.Map; import static org.elasticsearch.core.Strings.format; -import static org.elasticsearch.xpack.transform.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable; +import static org.elasticsearch.xpack.transform.utils.SecondaryAuthorizationUtils.getSecurityHeadersPreferringSecondary; public class TransportPutTransformAction extends AcknowledgedTransportMasterNodeAction { @@ -92,61 +91,52 @@ public TransportPutTransformAction( @Override protected void masterOperation(Task task, Request request, ClusterState clusterState, ActionListener listener) { XPackPlugin.checkReadyForXPackCustomMetadata(clusterState); - useSecondaryAuthIfAvailable(securityContext, () -> { - // set headers to run transform as calling user - Map filteredHeaders = ClientHelper.getPersistableSafeSecurityHeaders( - threadPool.getThreadContext(), - clusterService.state() - ); - TransformConfig config = request.getConfig() - .setHeaders(filteredHeaders) - .setCreateTime(Instant.now()) - .setVersion(Version.CURRENT); - - String transformId = config.getId(); - // quick check whether a transform has already been created under that name - if (PersistentTasksCustomMetadata.getTaskWithId(clusterState, transformId) != null) { - listener.onFailure( - new ResourceAlreadyExistsException( - TransformMessages.getMessage(TransformMessages.REST_PUT_TRANSFORM_EXISTS, transformId) - ) - ); - return; - } + TransformConfig config = request.getConfig().setCreateTime(Instant.now()).setVersion(Version.CURRENT); + config.setHeaders(getSecurityHeadersPreferringSecondary(threadPool, securityContext, clusterState)); - // <3> Create the transform - ActionListener validateTransformListener = ActionListener.wrap( - validationResponse -> putTransform(request, listener), - listener::onFailure + String transformId = config.getId(); + // quick check whether a transform has already been created under that name + if (PersistentTasksCustomMetadata.getTaskWithId(clusterState, transformId) != null) { + listener.onFailure( + new ResourceAlreadyExistsException(TransformMessages.getMessage(TransformMessages.REST_PUT_TRANSFORM_EXISTS, transformId)) ); + return; + } - // <2> Validate source and destination indices - ActionListener checkPrivilegesListener = ActionListener.wrap( - aVoid -> client.execute( - ValidateTransformAction.INSTANCE, - new ValidateTransformAction.Request(config, request.isDeferValidation(), request.timeout()), - validateTransformListener - ), - listener::onFailure - ); + // <3> Create the transform + ActionListener validateTransformListener = ActionListener.wrap( + validationResponse -> putTransform(request, listener), + listener::onFailure + ); - // <1> Early check to verify that the user can create the destination index and can read from the source - if (XPackSettings.SECURITY_ENABLED.get(settings) && request.isDeferValidation() == false) { - TransformPrivilegeChecker.checkPrivileges( - "create", - securityContext, - indexNameExpressionResolver, - clusterState, - client, - config, - true, - checkPrivilegesListener - ); - } else { // No security enabled, just move on - checkPrivilegesListener.onResponse(null); - } - }); + // <2> Validate source and destination indices + ActionListener checkPrivilegesListener = ActionListener.wrap( + aVoid -> ClientHelper.executeAsyncWithOrigin( + client, + ClientHelper.TRANSFORM_ORIGIN, + ValidateTransformAction.INSTANCE, + new ValidateTransformAction.Request(config, request.isDeferValidation(), request.timeout()), + validateTransformListener + ), + listener::onFailure + ); + + // <1> Early check to verify that the user can create the destination index and can read from the source + if (XPackSettings.SECURITY_ENABLED.get(settings) && request.isDeferValidation() == false) { + TransformPrivilegeChecker.checkPrivileges( + "create", + securityContext, + indexNameExpressionResolver, + clusterState, + client, + config, + true, + checkPrivilegesListener + ); + } else { // No security enabled, just move on + checkPrivilegesListener.onResponse(null); + } } @Override diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportScheduleNowTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportScheduleNowTransformAction.java index 98918bb7dcc88..57a740995b16a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportScheduleNowTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportScheduleNowTransformAction.java @@ -42,7 +42,6 @@ import java.util.List; import static org.elasticsearch.core.Strings.format; -import static org.elasticsearch.xpack.transform.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable; public class TransportScheduleNowTransformAction extends TransportTasksAction { @@ -83,48 +82,40 @@ protected void doExecute(Task task, Request request, ActionListener li final ClusterState clusterState = clusterService.state(); XPackPlugin.checkReadyForXPackCustomMetadata(clusterState); - useSecondaryAuthIfAvailable(securityContext, () -> { - ActionListener getTransformListener = ActionListener.wrap(unusedConfig -> { - PersistentTasksCustomMetadata.PersistentTask transformTask = TransformTask.getTransformTask( - request.getId(), - clusterState - ); - - // to send a request to schedule now the transform at runtime, several requirements must be met: - // - transform must be running, meaning a task exists - // - transform is not failed (stopped transforms do not have a task) - if (transformTask != null - && transformTask.isAssigned() - && transformTask.getState() instanceof TransformState - && ((TransformState) transformTask.getState()).getTaskState() != TransformTaskState.FAILED) { - - ActionListener taskScheduleNowListener = ActionListener.wrap(listener::onResponse, e -> { - // benign: A transform might have been stopped meanwhile, this is not a problem - if (e instanceof TransformTaskDisappearedDuringScheduleNowException) { - logger.debug( - () -> format("[%s] transform task disappeared during schedule_now, ignoring.", request.getId()), - e - ); - listener.onResponse(Response.TRUE); - return; - } - if (e instanceof TransformTaskScheduleNowException) { - logger.warn(() -> format("[%s] failed to schedule now the running transform.", request.getId()), e); - listener.onResponse(Response.TRUE); - return; - } - listener.onFailure(e); - }); - request.setNodes(transformTask.getExecutorNode()); - super.doExecute(task, request, taskScheduleNowListener); - } else { - listener.onResponse(Response.TRUE); - } - }, listener::onFailure); + ActionListener getTransformListener = ActionListener.wrap(unusedConfig -> { + PersistentTasksCustomMetadata.PersistentTask transformTask = TransformTask.getTransformTask(request.getId(), clusterState); + + // to send a request to schedule now the transform at runtime, several requirements must be met: + // - transform must be running, meaning a task exists + // - transform is not failed (stopped transforms do not have a task) + if (transformTask != null + && transformTask.isAssigned() + && transformTask.getState() instanceof TransformState + && ((TransformState) transformTask.getState()).getTaskState() != TransformTaskState.FAILED) { + + ActionListener taskScheduleNowListener = ActionListener.wrap(listener::onResponse, e -> { + // benign: A transform might have been stopped meanwhile, this is not a problem + if (e instanceof TransformTaskDisappearedDuringScheduleNowException) { + logger.debug(() -> format("[%s] transform task disappeared during schedule_now, ignoring.", request.getId()), e); + listener.onResponse(Response.TRUE); + return; + } + if (e instanceof TransformTaskScheduleNowException) { + logger.warn(() -> format("[%s] failed to schedule now the running transform.", request.getId()), e); + listener.onResponse(Response.TRUE); + return; + } + listener.onFailure(e); + }); + request.setNodes(transformTask.getExecutorNode()); + super.doExecute(task, request, taskScheduleNowListener); + } else { + listener.onResponse(Response.TRUE); + } + }, listener::onFailure); - // <1> Get the config to verify it exists and is valid - transformConfigManager.getTransformConfiguration(request.getId(), getTransformListener); - }); + // <1> Get the config to verify it exists and is valid + transformConfigManager.getTransformConfiguration(request.getId(), getTransformListener); } @Override diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java index cc9463de310e6..abe8450b3a374 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java @@ -234,10 +234,9 @@ protected void masterOperation( ) ); transformConfigHolder.set(config); - ClientHelper.executeWithHeadersAsync( - config.getHeaders(), - ClientHelper.TRANSFORM_ORIGIN, + ClientHelper.executeAsyncWithOrigin( client, + ClientHelper.TRANSFORM_ORIGIN, ValidateTransformAction.INSTANCE, new ValidateTransformAction.Request(config, false, request.timeout()), validationListener diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java index 204e6488f2dc2..ef410affcc4b1 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java @@ -28,7 +28,6 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.security.SecurityContext; @@ -47,10 +46,9 @@ import org.elasticsearch.xpack.transform.transforms.TransformTask; import java.util.List; -import java.util.Map; import static org.elasticsearch.core.Strings.format; -import static org.elasticsearch.xpack.transform.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable; +import static org.elasticsearch.xpack.transform.utils.SecondaryAuthorizationUtils.getSecurityHeadersPreferringSecondary; public class TransportUpdateTransformAction extends TransportTasksAction { @@ -117,95 +115,88 @@ protected void doExecute(Task task, Request request, ActionListener li } return; } - useSecondaryAuthIfAvailable(securityContext, () -> { - // set headers to run transform as calling user - Map filteredHeaders = ClientHelper.getPersistableSafeSecurityHeaders( - threadPool.getThreadContext(), - clusterService.state() - ); - TransformConfigUpdate update = request.getUpdate(); - update.setHeaders(filteredHeaders); - - // GET transform and attempt to update - // We don't want the update to complete if the config changed between GET and INDEX - transformConfigManager.getTransformConfigurationForUpdate( - request.getId(), - ActionListener.wrap( - configAndVersion -> TransformUpdater.updateTransform( - securityContext, - indexNameExpressionResolver, - clusterState, - settings, - client, - transformConfigManager, - configAndVersion.v1(), - update, - configAndVersion.v2(), - request.isDeferValidation(), - false, // dryRun - true, // checkAccess - request.getTimeout(), - ActionListener.wrap(updateResponse -> { - TransformConfig updatedConfig = updateResponse.getConfig(); - auditor.info(updatedConfig.getId(), "Updated transform."); - logger.debug("[{}] Updated transform [{}]", updatedConfig.getId(), updateResponse.getStatus()); - - checkTransformConfigAndLogWarnings(updatedConfig); - - if (update.changesSettings(configAndVersion.v1())) { - PersistentTasksCustomMetadata.PersistentTask transformTask = TransformTask.getTransformTask( - request.getId(), - clusterState - ); - - // to send a request to apply new settings at runtime, several requirements must be met: - // - transform must be running, meaning a task exists - // - transform is not failed (stopped transforms do not have a task) - if (transformTask != null - && transformTask.isAssigned() - && transformTask.getState() instanceof TransformState - && ((TransformState) transformTask.getState()).getTaskState() != TransformTaskState.FAILED) { - - ActionListener taskUpdateListener = ActionListener.wrap(listener::onResponse, e -> { - // benign: A transform might be stopped meanwhile, this is not a problem - if (e instanceof TransformTaskDisappearedDuringUpdateException) { - logger.debug("[{}] transform task disappeared during update, ignoring", request.getId()); - listener.onResponse(new Response(updatedConfig)); - return; - } - - if (e instanceof TransformTaskUpdateException) { - // BWC: only log a warning as response object can not be changed - logger.warn( - () -> format( - "[%s] failed to notify running transform task about update. " - + "New settings will be applied after next checkpoint.", - request.getId() - ), - e - ); - - listener.onResponse(new Response(updatedConfig)); - return; - } - - listener.onFailure(e); - }); - - request.setNodes(transformTask.getExecutorNode()); - request.setConfig(updatedConfig); - super.doExecute(task, request, taskUpdateListener); - return; - } + TransformConfigUpdate update = request.getUpdate(); + update.setHeaders(getSecurityHeadersPreferringSecondary(threadPool, securityContext, clusterState)); + + // GET transform and attempt to update + // We don't want the update to complete if the config changed between GET and INDEX + transformConfigManager.getTransformConfigurationForUpdate( + request.getId(), + ActionListener.wrap( + configAndVersion -> TransformUpdater.updateTransform( + securityContext, + indexNameExpressionResolver, + clusterState, + settings, + client, + transformConfigManager, + configAndVersion.v1(), + update, + configAndVersion.v2(), + request.isDeferValidation(), + false, // dryRun + true, // checkAccess + request.getTimeout(), + ActionListener.wrap(updateResponse -> { + TransformConfig updatedConfig = updateResponse.getConfig(); + auditor.info(updatedConfig.getId(), "Updated transform."); + logger.debug("[{}] Updated transform [{}]", updatedConfig.getId(), updateResponse.getStatus()); + + checkTransformConfigAndLogWarnings(updatedConfig); + + if (update.changesSettings(configAndVersion.v1())) { + PersistentTasksCustomMetadata.PersistentTask transformTask = TransformTask.getTransformTask( + request.getId(), + clusterState + ); + + // to send a request to apply new settings at runtime, several requirements must be met: + // - transform must be running, meaning a task exists + // - transform is not failed (stopped transforms do not have a task) + if (transformTask != null + && transformTask.isAssigned() + && transformTask.getState() instanceof TransformState + && ((TransformState) transformTask.getState()).getTaskState() != TransformTaskState.FAILED) { + + ActionListener taskUpdateListener = ActionListener.wrap(listener::onResponse, e -> { + // benign: A transform might be stopped meanwhile, this is not a problem + if (e instanceof TransformTaskDisappearedDuringUpdateException) { + logger.debug("[{}] transform task disappeared during update, ignoring", request.getId()); + listener.onResponse(new Response(updatedConfig)); + return; + } + + if (e instanceof TransformTaskUpdateException) { + // BWC: only log a warning as response object can not be changed + logger.warn( + () -> format( + "[%s] failed to notify running transform task about update. " + + "New settings will be applied after next checkpoint.", + request.getId() + ), + e + ); + + listener.onResponse(new Response(updatedConfig)); + return; + } + + listener.onFailure(e); + }); + + request.setNodes(transformTask.getExecutorNode()); + request.setConfig(updatedConfig); + super.doExecute(task, request, taskUpdateListener); + return; } - listener.onResponse(new Response(updatedConfig)); - }, listener::onFailure) - ), - listener::onFailure - ) - ); - }); + } + listener.onResponse(new Response(updatedConfig)); + }, listener::onFailure) + ), + listener::onFailure + ) + ); } private void checkTransformConfigAndLogWarnings(TransformConfig config) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportValidateTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportValidateTransformAction.java index 5d0e54c3a2fda..c7b8a0c7b8634 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportValidateTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportValidateTransformAction.java @@ -127,7 +127,7 @@ protected void doExecute(Task task, Request request, ActionListener li if (request.isDeferValidation()) { deduceMappingsListener.onResponse(null); } else { - function.deduceMappings(client, config.getSource(), deduceMappingsListener); + function.deduceMappings(client, config.getHeaders(), config.getSource(), deduceMappingsListener); } }, listener::onFailure); @@ -136,7 +136,7 @@ protected void doExecute(Task task, Request request, ActionListener li if (request.isDeferValidation()) { validateQueryListener.onResponse(true); } else { - function.validateQuery(client, config.getSource(), request.timeout(), validateQueryListener); + function.validateQuery(client, config.getHeaders(), config.getSource(), request.timeout(), validateQueryListener); } }, listener::onFailure); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index c105d3d1c9906..df6951b5132f8 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -275,10 +275,9 @@ void doGetFieldMappings(ActionListener> fieldMappingsListene } void validate(ActionListener listener) { - ClientHelper.executeWithHeadersAsync( - transformConfig.getHeaders(), - ClientHelper.TRANSFORM_ORIGIN, + ClientHelper.executeAsyncWithOrigin( client, + ClientHelper.TRANSFORM_ORIGIN, ValidateTransformAction.INSTANCE, new ValidateTransformAction.Request(transformConfig, false, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT), ActionListener.wrap(response -> listener.onResponse(null), listener::onFailure) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java index de23cc77623cd..e49fd65296825 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java @@ -117,10 +117,16 @@ interface ChangeCollector { * Deduce mappings based on the input mappings and the known configuration. * * @param client a client instance for querying the source mappings + * @param headers headers to be used to query only for what the caller is allowed to * @param sourceConfig the source configuration * @param listener listener to take the deduced mapping */ - void deduceMappings(Client client, SourceConfig sourceConfig, ActionListener> listener); + void deduceMappings( + Client client, + Map headers, + SourceConfig sourceConfig, + ActionListener> listener + ); /** * Create a preview of the function. @@ -178,11 +184,18 @@ void preview( * Runtime validation by querying the source and checking if source and config fit. * * @param client a client instance for querying the source + * @param headers headers to be used to query only for what the caller is allowed to * @param sourceConfig the source configuration * @param timeout search query timeout * @param listener the result listener */ - void validateQuery(Client client, SourceConfig sourceConfig, @Nullable TimeValue timeout, ActionListener listener); + void validateQuery( + Client client, + Map headers, + SourceConfig sourceConfig, + @Nullable TimeValue timeout, + ActionListener listener + ); /** * Create a change collector instance and return it diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java index 0fed9ded47b0f..df6f0552e1669 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java @@ -104,31 +104,44 @@ public void preview( } @Override - public void validateQuery(Client client, SourceConfig sourceConfig, TimeValue timeout, ActionListener listener) { + public void validateQuery( + Client client, + Map headers, + SourceConfig sourceConfig, + TimeValue timeout, + ActionListener listener + ) { SearchRequest searchRequest = buildSearchRequest(sourceConfig, timeout, TEST_QUERY_PAGE_SIZE); - client.execute(SearchAction.INSTANCE, searchRequest, ActionListener.wrap(response -> { - if (response == null) { - listener.onFailure(new ValidationException().addValidationError("Unexpected null response from test query")); - return; - } - if (response.status() != RestStatus.OK) { + ClientHelper.executeWithHeadersAsync( + headers, + ClientHelper.TRANSFORM_ORIGIN, + client, + SearchAction.INSTANCE, + searchRequest, + ActionListener.wrap(response -> { + if (response == null) { + listener.onFailure(new ValidationException().addValidationError("Unexpected null response from test query")); + return; + } + if (response.status() != RestStatus.OK) { + listener.onFailure( + new ValidationException().addValidationError( + format("Unexpected status from response of test query: %s", response.status()) + ) + ); + return; + } + listener.onResponse(true); + }, e -> { + Throwable unwrapped = ExceptionsHelper.unwrapCause(e); + RestStatus status = unwrapped instanceof ElasticsearchException + ? ((ElasticsearchException) unwrapped).status() + : RestStatus.SERVICE_UNAVAILABLE; listener.onFailure( - new ValidationException().addValidationError( - format("Unexpected status from response of test query: %s", response.status()) - ) + new ValidationException(unwrapped).addValidationError(format("Failed to test query, received status: %s", status)) ); - return; - } - listener.onResponse(true); - }, e -> { - Throwable unwrapped = ExceptionsHelper.unwrapCause(e); - RestStatus status = unwrapped instanceof ElasticsearchException - ? ((ElasticsearchException) unwrapped).status() - : RestStatus.SERVICE_UNAVAILABLE; - listener.onFailure( - new ValidationException(unwrapped).addValidationError(format("Failed to test query, received status: %s", status)) - ); - })); + }) + ); } @Override diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java index 8b09328dec753..3d8b61fc10b23 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java @@ -113,7 +113,12 @@ public List getPerformanceCriticalFields() { } @Override - public void deduceMappings(Client client, SourceConfig sourceConfig, ActionListener> listener) { + public void deduceMappings( + Client client, + Map headers, + SourceConfig sourceConfig, + ActionListener> listener + ) { listener.onResponse(emptyMap()); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java index ce875acb8ec32..56f101e518536 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java @@ -87,12 +87,17 @@ public List getPerformanceCriticalFields() { } @Override - public void deduceMappings(Client client, SourceConfig sourceConfig, final ActionListener> listener) { + public void deduceMappings( + Client client, + Map headers, + SourceConfig sourceConfig, + final ActionListener> listener + ) { if (Boolean.FALSE.equals(settings.getDeduceMappings())) { listener.onResponse(emptyMap()); return; } - SchemaUtil.deduceMappings(client, config, sourceConfig.getIndex(), sourceConfig.getRuntimeMappings(), listener); + SchemaUtil.deduceMappings(client, headers, config, sourceConfig.getIndex(), sourceConfig.getRuntimeMappings(), listener); } /** diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java index 4054ac7e2d770..14259bffdb43d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java @@ -94,6 +94,7 @@ public static Object dropFloatingPointComponentIfTypeRequiresIt(String type, dou */ public static void deduceMappings( final Client client, + final Map headers, final PivotConfig config, final String[] sourceIndex, final Map runtimeMappings, @@ -142,6 +143,7 @@ public static void deduceMappings( getSourceFieldMappings( client, + headers, sourceIndex, allFieldNames.values().stream().filter(Objects::nonNull).toArray(String[]::new), runtimeMappings, @@ -244,6 +246,7 @@ private static Map resolveMappings( */ static void getSourceFieldMappings( Client client, + Map headers, String[] index, String[] fields, Map runtimeMappings, @@ -257,7 +260,10 @@ static void getSourceFieldMappings( .fields(fields) .runtimeFields(runtimeMappings) .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); - client.execute( + ClientHelper.executeWithHeadersAsync( + headers, + ClientHelper.TRANSFORM_ORIGIN, + client, FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, ActionListener.wrap(response -> listener.onResponse(extractFieldMappings(response)), listener::onFailure) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/SecondaryAuthorizationUtils.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/SecondaryAuthorizationUtils.java index b61ed60381d04..780f871fea2c3 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/SecondaryAuthorizationUtils.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/SecondaryAuthorizationUtils.java @@ -7,13 +7,38 @@ package org.elasticsearch.xpack.transform.utils; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.core.security.authc.support.SecondaryAuthentication; +import java.util.Map; + public final class SecondaryAuthorizationUtils { private SecondaryAuthorizationUtils() {} + /** + * Returns security headers preferring secondary auth if it exists. + */ + public static Map getSecurityHeadersPreferringSecondary( + ThreadPool threadPool, + SecurityContext securityContext, + ClusterState clusterState + ) { + SetOnce> filteredHeadersHolder = new SetOnce<>(); + useSecondaryAuthIfAvailable(securityContext, () -> { + Map filteredHeaders = ClientHelper.getPersistableSafeSecurityHeaders( + threadPool.getThreadContext(), + clusterState + ); + filteredHeadersHolder.set(filteredHeaders); + }); + return filteredHeadersHolder.get(); + } + /** * This executes the supplied runnable inside the secondary auth context if it exists; */ @@ -23,9 +48,10 @@ public static void useSecondaryAuthIfAvailable(SecurityContext securityContext, return; } SecondaryAuthentication secondaryAuth = securityContext.getSecondaryAuthentication(); - if (secondaryAuth != null) { - runnable = secondaryAuth.wrap(runnable); + if (secondaryAuth == null) { + runnable.run(); + return; } - runnable.run(); + secondaryAuth.wrap(runnable).run(); } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationSchemaAndResultTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationSchemaAndResultTests.java index 17dad261bbda0..519fa8d771146 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationSchemaAndResultTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationSchemaAndResultTests.java @@ -142,7 +142,7 @@ public void testBasic() throws InterruptedException { .count(); this.>assertAsync( - listener -> SchemaUtil.deduceMappings(client, pivotConfig, new String[] { "source-index" }, emptyMap(), listener), + listener -> SchemaUtil.deduceMappings(client, emptyMap(), pivotConfig, new String[] { "source-index" }, emptyMap(), listener), mappings -> { assertEquals("Mappings were: " + mappings, numGroupsWithoutScripts + 15, mappings.size()); assertEquals("long", mappings.get("max_rating")); @@ -214,7 +214,7 @@ public void testNested() throws InterruptedException { .count(); this.>assertAsync( - listener -> SchemaUtil.deduceMappings(client, pivotConfig, new String[] { "source-index" }, emptyMap(), listener), + listener -> SchemaUtil.deduceMappings(client, emptyMap(), pivotConfig, new String[] { "source-index" }, emptyMap(), listener), mappings -> { assertEquals(numGroupsWithoutScripts + 12, mappings.size()); assertEquals("long", mappings.get("filter_1")); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java index 6919cc0a982d9..43cfa8a18b86c 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java @@ -65,6 +65,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static java.util.Collections.emptyMap; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.contains; @@ -449,7 +450,7 @@ private static void assertInvalidTransform(Client client, SourceConfig source, F private static void validate(Client client, SourceConfig source, Function pivot, boolean expectValid) throws Exception { CountDownLatch latch = new CountDownLatch(1); final AtomicReference exceptionHolder = new AtomicReference<>(); - pivot.validateQuery(client, source, null, ActionListener.wrap(validity -> { + pivot.validateQuery(client, emptyMap(), source, null, ActionListener.wrap(validity -> { assertEquals(expectValid, validity); latch.countDown(); }, e -> { diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtilTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtilTests.java index d0e89b6a5b2c0..eefa7bf377d83 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtilTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtilTests.java @@ -98,7 +98,14 @@ public void testGetSourceFieldMappings() throws InterruptedException { try (Client client = new FieldCapsMockClient(getTestName())) { // fields is null this.>assertAsync( - listener -> SchemaUtil.getSourceFieldMappings(client, new String[] { "index-1", "index-2" }, null, emptyMap(), listener), + listener -> SchemaUtil.getSourceFieldMappings( + client, + emptyMap(), + new String[] { "index-1", "index-2" }, + null, + emptyMap(), + listener + ), mappings -> { assertNotNull(mappings); assertTrue(mappings.isEmpty()); @@ -109,6 +116,7 @@ public void testGetSourceFieldMappings() throws InterruptedException { this.>assertAsync( listener -> SchemaUtil.getSourceFieldMappings( client, + emptyMap(), new String[] { "index-1", "index-2" }, new String[] {}, emptyMap(), @@ -122,7 +130,14 @@ public void testGetSourceFieldMappings() throws InterruptedException { // indices is null this.>assertAsync( - listener -> SchemaUtil.getSourceFieldMappings(client, null, new String[] { "field-1", "field-2" }, emptyMap(), listener), + listener -> SchemaUtil.getSourceFieldMappings( + client, + emptyMap(), + null, + new String[] { "field-1", "field-2" }, + emptyMap(), + listener + ), mappings -> { assertNotNull(mappings); assertTrue(mappings.isEmpty()); @@ -133,6 +148,7 @@ public void testGetSourceFieldMappings() throws InterruptedException { this.>assertAsync( listener -> SchemaUtil.getSourceFieldMappings( client, + emptyMap(), new String[] {}, new String[] { "field-1", "field-2" }, emptyMap(), @@ -148,6 +164,7 @@ public void testGetSourceFieldMappings() throws InterruptedException { this.>assertAsync( listener -> SchemaUtil.getSourceFieldMappings( client, + emptyMap(), new String[] { "index-1", "index-2" }, new String[] { "field-1", "field-2" }, emptyMap(), @@ -174,6 +191,7 @@ public void testGetSourceFieldMappingsWithRuntimeMappings() throws InterruptedEx this.>assertAsync( listener -> SchemaUtil.getSourceFieldMappings( client, + emptyMap(), new String[] { "index-1", "index-2" }, new String[] { "field-1", "field-2" }, runtimeMappings, diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/SecondaryAuthorizationUtilsTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/SecondaryAuthorizationUtilsTests.java new file mode 100644 index 0000000000000..d2c35a74b4de8 --- /dev/null +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/SecondaryAuthorizationUtilsTests.java @@ -0,0 +1,95 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.utils; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.security.SecurityContext; +import org.elasticsearch.xpack.core.security.authc.AuthenticationField; +import org.elasticsearch.xpack.core.security.authc.support.SecondaryAuthentication; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SecondaryAuthorizationUtilsTests extends ESTestCase { + + private static final String AUTH_KEY = AuthenticationField.AUTHENTICATION_KEY; + private static final String SECONDARY_AUTH_KEY = SecondaryAuthentication.THREAD_CTX_KEY; + private static final String NOT_AN_AUTH_KEY = "not-an-auth-key"; + + private static final String JOHN_HEADER = + "45XtAwAXdHJhbnNmb3JtX2FkbWluX25vX2RhdGEBD3RyYW5zZm9ybV9hZG1pbgoAAAABAA5qYXZhUmVzdFRlc3QtMA5kZWZhdWx0X25hdGl2ZQZuYXRpdmUAAAAA"; + private static final String BILL_HEADER = + "45XtAwARdHJhbnNmb3JtX2FkbWluXzICD3RyYW5zZm9ybV9hZG1pbhJ0ZXN0X2RhdGFfYWNjZXNzXzIKAAAAAQAOamF2YVJlc3RUZXN0LTAOZGVmYXVsdF9uYXRp" + + "dmUGbmF0aXZlAAAAAA=="; + private static final String NOT_AN_AUTH_HEADER = "not-an-auth-header"; + + public void testGetSecurityHeadersPreferringSecondary() { + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(threadContext); + SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext); + + Map filteredHeaders = SecondaryAuthorizationUtils.getSecurityHeadersPreferringSecondary( + threadPool, + securityContext, + ClusterState.EMPTY_STATE + ); + assertThat(filteredHeaders.keySet(), is(empty())); + + threadContext.setHeaders( + Tuple.tuple(Map.of(AUTH_KEY, JOHN_HEADER, SECONDARY_AUTH_KEY, BILL_HEADER, NOT_AN_AUTH_KEY, NOT_AN_AUTH_HEADER), Map.of()) + ); + filteredHeaders = SecondaryAuthorizationUtils.getSecurityHeadersPreferringSecondary( + threadPool, + securityContext, + ClusterState.EMPTY_STATE + ); + assertThat(filteredHeaders.keySet(), contains(AUTH_KEY)); + } + + public void testUseSecondaryAuthIfAvailable() { + // Counter used to verify that the runnable has been run + AtomicInteger counter = new AtomicInteger(0); + + SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable(null, () -> counter.incrementAndGet()); + assertThat(counter.get(), is(equalTo(1))); + + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext); + + SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable(securityContext, () -> counter.incrementAndGet()); + assertThat(counter.get(), is(equalTo(2))); + + threadContext.setHeaders( + Tuple.tuple(Map.of(AUTH_KEY, JOHN_HEADER, SECONDARY_AUTH_KEY, BILL_HEADER, NOT_AN_AUTH_KEY, NOT_AN_AUTH_HEADER), Map.of()) + ); + SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable(securityContext, () -> { + counter.incrementAndGet(); + // The only remaining header is the secondary auth header under the auth key + assertThat(threadContext.getHeaders(), is(equalTo(Map.of(AUTH_KEY, BILL_HEADER)))); + }); + assertThat(counter.get(), is(equalTo(3))); + // The headers are restored + assertThat( + threadContext.getHeaders(), + is(equalTo(Map.of(AUTH_KEY, JOHN_HEADER, SECONDARY_AUTH_KEY, BILL_HEADER, NOT_AN_AUTH_KEY, NOT_AN_AUTH_HEADER))) + ); + } +}