Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor CachingUsernamePassword realm #32646

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@
*/
package org.elasticsearch.xpack.security.authc.support;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
Expand All @@ -30,17 +28,17 @@

public abstract class CachingUsernamePasswordRealm extends UsernamePasswordRealm implements CachingRealm {

private final Cache<String, ListenableFuture<Tuple<AuthenticationResult, UserWithHash>>> cache;
private final Cache<String, ListenableFuture<UserWithHash>> cache;
private final ThreadPool threadPool;
final Hasher cacheHasher;

protected CachingUsernamePasswordRealm(String type, RealmConfig config, ThreadPool threadPool) {
super(type, config);
cacheHasher = Hasher.resolve(CachingUsernamePasswordRealmSettings.CACHE_HASH_ALGO_SETTING.get(config.settings()));
this.threadPool = threadPool;
TimeValue ttl = CachingUsernamePasswordRealmSettings.CACHE_TTL_SETTING.get(config.settings());
final TimeValue ttl = CachingUsernamePasswordRealmSettings.CACHE_TTL_SETTING.get(config.settings());
if (ttl.getNanos() > 0) {
cache = CacheBuilder.<String, ListenableFuture<Tuple<AuthenticationResult, UserWithHash>>>builder()
cache = CacheBuilder.<String, ListenableFuture<UserWithHash>>builder()
.setExpireAfterWrite(ttl)
.setMaximumWeight(CachingUsernamePasswordRealmSettings.CACHE_MAX_USERS_SETTING.get(config.settings()))
.build();
Expand All @@ -49,13 +47,15 @@ protected CachingUsernamePasswordRealm(String type, RealmConfig config, ThreadPo
}
}

@Override
public final void expire(String username) {
if (cache != null) {
logger.trace("invalidating cache for user [{}] in realm [{}]", username, name());
cache.invalidate(username);
}
}

@Override
public final void expireAll() {
if (cache != null) {
logger.trace("invalidating cache for all users in realm [{}]", name());
Expand All @@ -72,108 +72,84 @@ public final void expireAll() {
*/
@Override
public final void authenticate(AuthenticationToken authToken, ActionListener<AuthenticationResult> listener) {
UsernamePasswordToken token = (UsernamePasswordToken) authToken;
final UsernamePasswordToken token = (UsernamePasswordToken) authToken;
try {
if (cache == null) {
doAuthenticate(token, listener);
} else {
authenticateWithCache(token, listener);
}
} catch (Exception e) {
} catch (final Exception e) {
// each realm should handle exceptions, if we get one here it should be considered fatal
listener.onFailure(e);
}
}

/**
* This validates the {@code token} while making sure there is only one inflight
* request to the authentication source. Only successful responses are cached
* and any subsequent requests, bearing the <b>same</b> password, will succeed
* without reaching to the authentication source. A different password in a
* subsequent request, however, will clear the cache and <b>try</b> to reach to
* the authentication source.
*
* @param token The authentication token
* @param listener to be called at completion
*/
private void authenticateWithCache(UsernamePasswordToken token, ActionListener<AuthenticationResult> listener) {
try {
final SetOnce<User> authenticatedUser = new SetOnce<>();
final AtomicBoolean createdAndStartedFuture = new AtomicBoolean(false);
final ListenableFuture<Tuple<AuthenticationResult, UserWithHash>> future = cache.computeIfAbsent(token.principal(), k -> {
final ListenableFuture<Tuple<AuthenticationResult, UserWithHash>> created = new ListenableFuture<>();
if (createdAndStartedFuture.compareAndSet(false, true) == false) {
throw new IllegalStateException("something else already started this. how?");
}
return created;
final AtomicBoolean authenticationInCache = new AtomicBoolean(true);
final ListenableFuture<UserWithHash> listenableCacheEntry = cache.computeIfAbsent(token.principal(), k -> {
authenticationInCache.set(false);
return new ListenableFuture<>();
});

if (createdAndStartedFuture.get()) {
doAuthenticate(token, ActionListener.wrap(result -> {
if (result.isAuthenticated()) {
final User user = result.getUser();
authenticatedUser.set(user);
final UserWithHash userWithHash = new UserWithHash(user, token.credentials(), cacheHasher);
future.onResponse(new Tuple<>(result, userWithHash));
} else {
future.onResponse(new Tuple<>(result, null));
}
}, future::onFailure));
}

future.addListener(ActionListener.wrap(tuple -> {
if (tuple != null) {
final UserWithHash userWithHash = tuple.v2();
final boolean performedAuthentication = createdAndStartedFuture.get() && userWithHash != null &&
tuple.v2().user == authenticatedUser.get();
handleResult(future, createdAndStartedFuture.get(), performedAuthentication, token, tuple, listener);
} else {
handleFailure(future, createdAndStartedFuture.get(), token, new IllegalStateException("unknown error authenticating"),
listener);
}
}, e -> handleFailure(future, createdAndStartedFuture.get(), token, e, listener)),
threadPool.executor(ThreadPool.Names.GENERIC));
} catch (ExecutionException e) {
listener.onResponse(AuthenticationResult.unsuccessful("", e));
}
}

private void handleResult(ListenableFuture<Tuple<AuthenticationResult, UserWithHash>> future, boolean createdAndStartedFuture,
boolean performedAuthentication, UsernamePasswordToken token,
Tuple<AuthenticationResult, UserWithHash> result, ActionListener<AuthenticationResult> listener) {
final AuthenticationResult authResult = result.v1();
if (authResult == null) {
// this was from a lookup; clear and redo
cache.invalidate(token.principal(), future);
authenticateWithCache(token, listener);
} else if (authResult.isAuthenticated()) {
if (performedAuthentication) {
listener.onResponse(authResult);
} else {
UserWithHash userWithHash = result.v2();
if (userWithHash.verify(token.credentials())) {
if (userWithHash.user.enabled()) {
User user = userWithHash.user;
logger.debug("realm [{}] authenticated user [{}], with roles [{}]",
name(), token.principal(), user.roles());
if (authenticationInCache.get()) {
// there is a cached or an inflight authenticate request
listenableCacheEntry.addListener(ActionListener.wrap(authenticatedUserWithHash -> {
if (authenticatedUserWithHash != null && authenticatedUserWithHash.verify(token.credentials())) {
// cached credential hash matches the credential hash for this forestalled request
final User user = authenticatedUserWithHash.user;
logger.debug("realm [{}] authenticated user [{}], with roles [{}], from cache", name(), token.principal(),
user.roles());
listener.onResponse(AuthenticationResult.success(user));
} else {
// re-auth to see if user has been enabled
cache.invalidate(token.principal(), future);
// The inflight request has failed or its credential hash does not match the
// hash of the credential for this forestalled request.
// clear cache and try to reach the authentication source again because password
// might have changed there and the local cached hash got stale
cache.invalidate(token.principal(), listenableCacheEntry);
authenticateWithCache(token, listener);
}
} else {
// could be a password change?
cache.invalidate(token.principal(), future);
}, e -> {
// the inflight request failed, so try again, but first (always) make sure cache
// is cleared of the failed authentication
cache.invalidate(token.principal(), listenableCacheEntry);
authenticateWithCache(token, listener);
}
}
} else {
cache.invalidate(token.principal(), future);
if (createdAndStartedFuture) {
listener.onResponse(authResult);
}), threadPool.executor(ThreadPool.Names.GENERIC));
} else {
authenticateWithCache(token, listener);
// attempt authentication against the authentication source
doAuthenticate(token, ActionListener.wrap(authResult -> {
if (authResult.isAuthenticated() && authResult.getUser().enabled()) {
// compute the credential hash of this successful authentication request
final UserWithHash userWithHash = new UserWithHash(authResult.getUser(), token.credentials(), cacheHasher);
// notify any forestalled request listeners; they will not reach to the
// authentication request and instead will use this hash for comparison
listenableCacheEntry.onResponse(userWithHash);
} else {
// notify any forestalled request listeners; they will retry the request
listenableCacheEntry.onResponse(null);
}
// notify the listener of the inflight authentication request; this request is not retried
listener.onResponse(authResult);
}, e -> {
// notify any staved off listeners; they will retry the request
listenableCacheEntry.onFailure(e);
// notify the listener of the inflight authentication request; this request is not retried
listener.onFailure(e);
}));
}
}
}

private void handleFailure(ListenableFuture<Tuple<AuthenticationResult, UserWithHash>> future, boolean createdAndStarted,
UsernamePasswordToken token, Exception e, ActionListener<AuthenticationResult> listener) {
cache.invalidate(token.principal(), future);
if (createdAndStarted) {
} catch (final ExecutionException e) {
listener.onFailure(e);
} else {
authenticateWithCache(token, listener);
}
}

Expand All @@ -193,38 +169,57 @@ protected int getCacheSize() {

@Override
public final void lookupUser(String username, ActionListener<User> listener) {
if (cache != null) {
try {
ListenableFuture<Tuple<AuthenticationResult, UserWithHash>> future = cache.computeIfAbsent(username, key -> {
ListenableFuture<Tuple<AuthenticationResult, UserWithHash>> created = new ListenableFuture<>();
doLookupUser(username, ActionListener.wrap(user -> {
if (user != null) {
UserWithHash userWithHash = new UserWithHash(user, null, null);
created.onResponse(new Tuple<>(null, userWithHash));
} else {
created.onResponse(new Tuple<>(null, null));
}
}, created::onFailure));
return created;
});

future.addListener(ActionListener.wrap(tuple -> {
if (tuple != null) {
if (tuple.v2() == null) {
cache.invalidate(username, future);
listener.onResponse(null);
} else {
listener.onResponse(tuple.v2().user);
}
try {
if (cache == null) {
doLookupUser(username, listener);
} else {
lookupWithCache(username, listener);
}
} catch (final Exception e) {
// each realm should handle exceptions, if we get one here it should be
// considered fatal
listener.onFailure(e);
}
}

private void lookupWithCache(String username, ActionListener<User> listener) {
try {
final AtomicBoolean lookupInCache = new AtomicBoolean(true);
final ListenableFuture<UserWithHash> listenableCacheEntry = cache.computeIfAbsent(username, key -> {
lookupInCache.set(false);
return new ListenableFuture<>();
});
if (false == lookupInCache.get()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we invert this condition to make it more like the authenticate version? so the actual lookup happens in the else of this if statement

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lookupInCache is tailored to be similar to authenticationInCache, just that, in the if branch, there is no action (cf. the next comment https://github.com/elastic/elasticsearch/pull/32646/files#r212724950).
Should I still invert it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need

// attempt lookup against the user directory
doLookupUser(username, ActionListener.wrap(user -> {
if (user != null) {
// user found
final UserWithHash userWithHash = new UserWithHash(user, null, null);
// notify forestalled request listeners
listenableCacheEntry.onResponse(userWithHash);
} else {
listener.onResponse(null);
// user not found, invalidate cache so that subsequent requests are forwarded to
// the user directory
cache.invalidate(username, listenableCacheEntry);
// notify forestalled request listeners
listenableCacheEntry.onResponse(null);
}
}, listener::onFailure), threadPool.executor(ThreadPool.Names.GENERIC));
} catch (ExecutionException e) {
listener.onFailure(e);
}, e -> {
// the next request should be forwarded, not halted by a failed lookup attempt
cache.invalidate(username, listenableCacheEntry);
// notify forestalled listeners
listenableCacheEntry.onFailure(e);
}));
}
} else {
doLookupUser(username, listener);
listenableCacheEntry.addListener(ActionListener.wrap(userWithHash -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be in an else block IMO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, for lookup there is, and there was, no retrying; the listener for the the reaching-out-request is notified just like all the other listeners.
When a lookup returns negatively, it clears the cache, but deferred requests will not be retried, they return negatively as well.

if (userWithHash != null) {
listener.onResponse(userWithHash.user);
} else {
listener.onResponse(null);
}
}, listener::onFailure), threadPool.executor(ThreadPool.Names.GENERIC));
} catch (final ExecutionException e) {
listener.onFailure(e);
}
}

Expand Down