Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-6122 POC new cancel handler for Collections api #3033

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.api.endpoint;

import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import jakarta.ws.rs.PUT;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.QueryParam;
import org.apache.solr.client.api.model.SolrJerseyResponse;

/** V2 API definition for canceling CollectionApi Call. */
@Path("/cluster/cancel/{requestid}")
public interface CancelCollectionApiCallApi {
@PUT
@Operation(
summary = "Cancel collections api call",
tags = {"cluster-cancel"})
SolrJerseyResponse cancelCollectionApiCall(
@Parameter(description = "The name of the collection api to be canceled.", required = true)
@PathParam("requestid")
String requestId,
@Parameter(description = "An ID to track the request asynchronously") @QueryParam("async")
String asyncId)
throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.api.model;

import com.fasterxml.jackson.annotation.JsonProperty;

public class CancelCollectionApiResponse extends SolrJerseyResponse {
@JsonProperty("msg")
public String msg;

@JsonProperty("asyncid")
public String asyncid;

@JsonProperty("status")
public String status;
}
2 changes: 1 addition & 1 deletion solr/core/src/java/org/apache/solr/cloud/Overseer.java
Original file line number Diff line number Diff line change
Expand Up @@ -1076,7 +1076,7 @@ static ZkDistributedQueue getInternalWorkQueue(final SolrZkClient zkClient, Stat
return new ZkDistributedQueue(zkClient, "/overseer/queue-work", zkStats);
}

/* Internal map for failed tasks, not to be used outside of the Overseer */
/* Internal map for running tasks, not to be used outside of the Overseer */
static DistributedMap getRunningMap(final SolrZkClient zkClient) {
return new DistributedMap(zkClient, "/overseer/collection-map-running");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;

import java.io.IOException;
import java.util.Objects;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.util.Utils;

public class OverseerAsyncIdSerializer {

/**
* This method serializes the content of an {@code OverseerSolrResponse}. Note that:
*
* <ul>
* <li>The elapsed time is not serialized
* <li>"Unknown" elements for the Javabin format will be serialized as Strings. See {@link
* org.apache.solr.common.util.JavaBinCodec#writeVal}
* </ul>
*/
public static byte[] serialize(String asyncId) throws IOException {
Objects.requireNonNull(asyncId);
try {
return Utils.toJavabin(asyncId).readAllBytes();
} catch (IOException | RuntimeException e) {
throw new SolrException(
ErrorCode.SERVER_ERROR, "Exception serializing asyncId to Javabin", e);
}
}

public static String deserialize(byte[] asyncIdBytes) throws IOException {
Objects.requireNonNull(asyncIdBytes);
try {
@SuppressWarnings("unchecked")
String asyncId = (String) Utils.fromJavabin(asyncIdBytes);
return asyncId;
} catch (IOException | RuntimeException e) {
throw new SolrException(
ErrorCode.SERVER_ERROR, "Exception deserializing response from Javabin", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package org.apache.solr.cloud;

import static org.apache.solr.cloud.OverseerConfigSetMessageHandler.CONFIGSETS_ACTION_PREFIX;
import static org.apache.solr.cloud.api.collections.OverseerCancelMessageHandler.CANCEL_PREFIX;

import java.io.IOException;
import org.apache.solr.cloud.api.collections.OverseerCancelMessageHandler;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
Expand Down Expand Up @@ -110,17 +112,23 @@ private static OverseerMessageHandlerSelector getOverseerMessageHandlerSelector(
// coreContainer is passed instead of configSetService as configSetService is loaded late
final OverseerConfigSetMessageHandler configMessageHandler =
new OverseerConfigSetMessageHandler(zkStateReader, overseer.getCoreContainer());

final OverseerCancelMessageHandler overseerCancelMessageHandler =
new OverseerCancelMessageHandler(shardHandlerFactory, stats, overseer);
return new OverseerMessageHandlerSelector() {
@Override
public void close() throws IOException {
IOUtils.closeQuietly(collMessageHandler);
}

@Override
public OverseerMessageHandler selectOverseerMessageHandler(ZkNodeProps message) {
public OverseerMessageHandler selectOverseerMessageHandler(
ZkNodeProps message, OverseerTaskProcessor overseerTaskProcessor) {
String operation = message.getStr(Overseer.QUEUE_OPERATION);
if (operation != null && operation.startsWith(CONFIGSETS_ACTION_PREFIX)) {
return configMessageHandler;
} else if (operation != null && operation.startsWith((CANCEL_PREFIX))) {
return overseerCancelMessageHandler.withOverseerTaskProcessor(overseerTaskProcessor);
}
return collMessageHandler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
*/
package org.apache.solr.cloud;

import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.getCollectionAction;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonParams.ID;

import com.codahale.metrics.Timer;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -39,11 +41,13 @@
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.logging.MDCLoggingContext;
import org.apache.solr.metrics.SolrMetricsContext;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -339,7 +343,8 @@ public void run() {
workQueue.remove(head);
continue;
}
OverseerMessageHandler messageHandler = selector.selectOverseerMessageHandler(message);
OverseerMessageHandler messageHandler =
selector.selectOverseerMessageHandler(message, this);
OverseerMessageHandler.Lock lock = messageHandler.lockTask(message, batchSessionId);
if (lock == null) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -519,11 +524,13 @@ public boolean isClosed() {
}

private void markTaskAsRunning(QueueEvent head, String asyncId)
throws KeeperException, InterruptedException {
throws KeeperException, InterruptedException, IOException {
runningZKTasks.add(head.getId());
runningTasks.add(head.getId());

if (asyncId != null) runningMap.put(asyncId, null);
if (asyncId != null) {
runningMap.put(asyncId, OverseerAsyncIdSerializer.serialize(head.getId()));
}
}

protected class Runner implements Runnable {
Expand Down Expand Up @@ -693,13 +700,62 @@ String getId() {
return myId;
}

public boolean isAsyncTaskInProgress(String asyncId)
throws KeeperException, InterruptedException {
// Check if the task is currently running or still in the work queue
return runningMap.contains(asyncId);
}

public boolean isAsyncTaskInSubmitted(String asyncId)
throws KeeperException, InterruptedException {
return workQueue.containsTaskWithRequestId(ASYNC, asyncId);
}

public boolean removeSubmittedTask(String asyncId) throws KeeperException, InterruptedException {
// Remove the task from the work queue if it hasn't started yet
if (workQueue.containsTaskWithRequestId(ASYNC, asyncId)) {
workQueue.removeTaskWithRequestId(ASYNC, asyncId);

// clear blockedTasks in case submitted task is in blocked tasks
blockedTasks.clear();

return true;
}
return false;
}

public boolean cancelInProgressAsyncTask(String asyncId)
throws KeeperException, InterruptedException, IOException {
String workQueueZkId = OverseerAsyncIdSerializer.deserialize(runningMap.get(asyncId));
QueueEvent taskData = workQueue.get(workQueueZkId);
final ZkNodeProps message = ZkNodeProps.load(taskData.getBytes());

String operation = message.getStr(Overseer.QUEUE_OPERATION);
CollectionParams.CollectionAction action = getCollectionAction(operation);
CollectionsHandler.CollectionOperation collectionOperationToCancel =
CollectionsHandler.CollectionOperation.get(action);
boolean isInProgressCancelable = collectionOperationToCancel.isInProgressCancelable();

if (isInProgressCancelable) {
if (runningMap.contains(asyncId)) {
runningMap.remove(asyncId);
runningTasks.remove(workQueueZkId);
workQueue.removeTaskWithRequestId(ASYNC, asyncId);
return true;
}
}

return false;
}

/**
* An interface to determine which {@link OverseerMessageHandler} handles a given message. This
* could be a single OverseerMessageHandler for the case where a single type of message is handled
* (e.g. collection messages only) , or a different handler could be selected based on the
* contents of the message.
*/
public interface OverseerMessageHandlerSelector extends Closeable {
OverseerMessageHandler selectOverseerMessageHandler(ZkNodeProps message);
OverseerMessageHandler selectOverseerMessageHandler(
ZkNodeProps message, OverseerTaskProcessor overseerTaskProcessor);
}
}
59 changes: 53 additions & 6 deletions solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,32 +73,67 @@ public void allowOverseerPendingTasksToComplete() {
}
}

/** Returns true if the queue contains a task with the specified async id. */
public boolean containsTaskWithRequestId(String requestIdKey, String requestId)
/**
* Helper method to find the path of a first task with a specific request ID.
*
* @param requestIdKey The key of the request ID in the task message.
* @param requestId The request ID to look for.
* @return The path of the task if found, or null if not found.
*/
private String findTaskWithRequestId(String requestIdKey, String requestId)
throws KeeperException, InterruptedException {

List<String> childNames = zookeeper.getChildren(dir, null, true);
stats.setQueueLength(childNames.size());
for (String childName : childNames) {
if (childName != null && childName.startsWith(PREFIX)) {
String path = dir + "/" + childName;
try {
byte[] data = zookeeper.getData(dir + "/" + childName, null, null, true);
byte[] data = zookeeper.getData(path, null, null, true);
if (data != null) {
ZkNodeProps message = ZkNodeProps.load(data);
if (message.containsKey(requestIdKey)) {
if (log.isDebugEnabled()) {
log.debug("Looking for {}, found {}", message.get(requestIdKey), requestId);
log.debug(
"Looking for requestId '{}', found '{}'", requestId, message.get(requestIdKey));
}
if (message.get(requestIdKey).equals(requestId)) {
return path;
}
if (message.get(requestIdKey).equals(requestId)) return true;
}
}
} catch (KeeperException.NoNodeException e) {
// Another client removed the node first, try next
}
}
}
return null;
}

/** Returns true if the queue contains a task with the specified request ID. */
public boolean containsTaskWithRequestId(String requestIdKey, String requestId)
throws KeeperException, InterruptedException {

String path = findTaskWithRequestId(requestIdKey, requestId);
return path != null;
}

return false;
/** Removes the first task with the specified request ID from the queue. */
public void removeTaskWithRequestId(String requestIdKey, String requestId)
throws KeeperException, InterruptedException {

String path = findTaskWithRequestId(requestIdKey, requestId);
if (path != null) {
try {
zookeeper.delete(path, -1, true);
log.info("Removed task with requestId '{}' at path {}", requestId, path);
} catch (KeeperException.NoNodeException e) {
// Node might have been removed already
log.warn("Node at path {} does not exist. It might have been removed already.", path);
}
} else {
log.info("No task with requestId '{}' was found in the queue.", requestId);
}
}

/** Remove the event and save the response into the other path. */
Expand All @@ -125,6 +160,18 @@ public void remove(QueueEvent event) throws KeeperException, InterruptedExceptio
}
}

/** Return event based on ZK path within queue * */
public QueueEvent get(String path) throws KeeperException, InterruptedException {
try {
byte[] data = zookeeper.getData(path, null, null, true);
return new QueueEvent(path, data, null);
} catch (KeeperException.NoNodeException ignored) {
// we must handle the race case where the node no longer exists
log.info("ZK path: {} doesn't exist. Requestor may have disconnected from ZooKeeper", path);
}
return null;
}

/** Watcher that blocks until a WatchedEvent occurs for a znode. */
static final class LatchWatcher implements Watcher {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public class CollApiCmds {
* Interface implemented by all Collection API commands. Collection API commands are defined in
* classes whose names ends in {@code Cmd}.
*/
protected interface CollectionApiCommand {
public interface CollectionApiCommand {
void call(ClusterState state, ZkNodeProps message, NamedList<Object> results) throws Exception;
}

Expand Down
Loading
Loading