Skip to content

Commit

Permalink
core/mr_cache: Do not hold lock when building new cache entry
Browse files Browse the repository at this point in the history
When a build a new cache entry (via util_mr_cache_create), we
allocate memory and register the region with the underlying
provider.  This can result in the generation of monitor notifications,
for example, intercepting the alloc calls.  Because the notifications
will acquire the cache lock in order to flush unusable entries, we
cannot hold that same lock while building the entry, or deadlock can
occur.

This has been seen by applications.  See issue ofiwg#5687.

To handle this, we build new cache entries outside of the lock, and
only acquire the lock when inserting them back into the cache. 
This opens a race condition where a conflicting entry can be inserted
into the cache between the first find() call and the insert() call.
We expect such occurences to be rare, as it requires a multi-threaded
app to post transfers referencing the same region simultaneously from
multiple threads.

In order to handle the race, we need to duplicate the find() check
after building the new entry prior to inserting it.  If a conflict
is found, we abort the insertion and restart the entire higher-level
search operation.

Signed-off-by: Sean Hefty <[email protected]>
  • Loading branch information
shefty committed Mar 16, 2020
1 parent b75552e commit a1de3c8
Showing 1 changed file with 58 additions and 47 deletions.
105 changes: 58 additions & 47 deletions prov/util/src/util_mr_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -213,42 +213,60 @@ void ofi_mr_cache_delete(struct ofi_mr_cache *cache, struct ofi_mr_entry *entry)
pthread_mutex_unlock(&cache->monitor->lock);
}

/*
* We cannot hold the monitor lock when allocating and registering the
* mr_entry without creating a potential deadlock situation with the
* memory monitor needing to acquire the same lock. The underlying
* calls may allocate memory, which can result in the monitor needing
* to handle address mapping changes. To handle this, we build the
* new entry, then check under lock that a conflict with another thread
* hasn't occurred. If a conflict occurred, we return -EAGAIN and
* restart the entire operation.
*/
static int
util_mr_cache_create(struct ofi_mr_cache *cache, const struct iovec *iov,
uint64_t access, struct ofi_mr_entry **entry)
util_mr_cache_create(struct ofi_mr_cache *cache, const struct ofi_mr_info *info,
struct ofi_mr_entry **entry)
{
struct ofi_mr_entry *cur;
int ret;

FI_DBG(cache->domain->prov, FI_LOG_MR, "create %p (len: %zu)\n",
iov->iov_base, iov->iov_len);
info->iov.iov_base, info->iov.iov_len);

*entry = util_mr_entry_alloc(cache);
if (!*entry)
return -FI_ENOMEM;

(*entry)->storage_context = NULL;
(*entry)->info.iov = *iov;
(*entry)->info = *info;
(*entry)->use_cnt = 1;

ret = cache->add_region(cache, *entry);
if (ret)
goto err;
goto free;

pthread_mutex_lock(&cache->monitor->lock);
cur = cache->storage.find(&cache->storage, info);
if (cur) {
ret = -FI_EAGAIN;
goto unlock;
}

if ((cache->cached_cnt >= cache_params.max_cnt) ||
(cache->cached_size >= cache_params.max_size)) {
cache->uncached_cnt++;
cache->uncached_size += iov->iov_len;
cache->uncached_size += info->iov.iov_len;
} else {
if (cache->storage.insert(&cache->storage,
&(*entry)->info, *entry)) {
ret = -FI_ENOMEM;
goto err;
goto unlock;
}
cache->cached_cnt++;
cache->cached_size += iov->iov_len;
cache->cached_size += info->iov.iov_len;

ret = ofi_monitor_subscribe(cache->monitor, iov->iov_base,
iov->iov_len);
ret = ofi_monitor_subscribe(cache->monitor, info->iov.iov_base,
info->iov.iov_len);
if (ret) {
util_mr_uncache_entry_storage(cache, *entry);
cache->uncached_cnt++;
Expand All @@ -257,10 +275,12 @@ util_mr_cache_create(struct ofi_mr_cache *cache, const struct iovec *iov,
(*entry)->subscribed = 1;
}
}

pthread_mutex_unlock(&cache->monitor->lock);
return 0;

err:
unlock:
pthread_mutex_unlock(&cache->monitor->lock);
free:
util_mr_free_entry(cache, *entry);
return ret;
}
Expand All @@ -269,62 +289,53 @@ int ofi_mr_cache_search(struct ofi_mr_cache *cache, const struct fi_mr_attr *att
struct ofi_mr_entry **entry)
{
struct ofi_mr_info info;
int ret = 0;
int ret;

assert(attr->iov_count == 1);
FI_DBG(cache->domain->prov, FI_LOG_MR, "search %p (len: %zu)\n",
attr->mr_iov->iov_base, attr->mr_iov->iov_len);

pthread_mutex_lock(&cache->monitor->lock);
cache->search_cnt++;
info.iov = *attr->mr_iov;

if ((cache->cached_cnt >= cache_params.max_cnt) ||
(cache->cached_size >= cache_params.max_size)) {
pthread_mutex_unlock(&cache->monitor->lock);
ofi_mr_cache_flush(cache);
do {
pthread_mutex_lock(&cache->monitor->lock);
}

info.iov = *attr->mr_iov;
retry:
*entry = cache->storage.find(&cache->storage, &info);
if (!*entry) {
ret = util_mr_cache_create(cache, attr->mr_iov,
attr->access, entry);
if (ret) {
if ((cache->cached_cnt >= cache_params.max_cnt) ||
(cache->cached_size >= cache_params.max_size)) {
pthread_mutex_unlock(&cache->monitor->lock);
if (!ofi_mr_cache_flush(cache))
return ret;

ofi_mr_cache_flush(cache);
pthread_mutex_lock(&cache->monitor->lock);
goto retry;
}
goto unlock;
}

/* This branch may be taken even if the new region encloses a previously
* cached smaller region. In this case, we need to see if other smaller
* regions may also be enclosed and release them all.
*/
if (!ofi_iov_within(attr->mr_iov, &(*entry)->info.iov)) {
do {
cache->search_cnt++;
*entry = cache->storage.find(&cache->storage, &info);
if (*entry && ofi_iov_within(attr->mr_iov, &(*entry)->info.iov))
goto hit;

/* Purge regions that overlap with new region */
while (*entry) {
/* New entry will expand range of subscription */
(*entry)->subscribed = 0;
util_mr_uncache_entry(cache, *entry);
} while ((*entry = cache->storage.find(&cache->storage, &info)));
*entry = cache->storage.find(&cache->storage, &info);
}
pthread_mutex_unlock(&cache->monitor->lock);

ret = util_mr_cache_create(cache, attr->mr_iov,
attr->access, entry);
goto unlock;
}
ret = util_mr_cache_create(cache, &info, entry);
if (ret && ret != -FI_EAGAIN) {
if (ofi_mr_cache_flush(cache))
ret = -FI_EAGAIN;
}
} while (ret == -FI_EAGAIN);

return ret;

hit:
cache->hit_cnt++;
if ((*entry)->use_cnt++ == 0)
dlist_remove_init(&(*entry)->list_entry);

unlock:
pthread_mutex_unlock(&cache->monitor->lock);
return ret;
return 0;
}

struct ofi_mr_entry *ofi_mr_cache_find(struct ofi_mr_cache *cache,
Expand Down

0 comments on commit a1de3c8

Please sign in to comment.