Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split up particles over multiple tiles for OpenMP #862

Open
wants to merge 15 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions examples/fodo_programmable/run_fodo_programmable.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ def my_ref_drift(pge, refpart):
pge1.beam_particles = lambda pti, refpart: my_drift(pge1, pti, refpart)
pge1.ref_particle = lambda refpart: my_ref_drift(pge1, refpart)
pge1.ds = 0.25
pge1.threadsafe = True # allow OpenMP threading for speed

# attention: assignment is a reference for pge2 = pge1

Expand All @@ -126,7 +125,6 @@ def my_ref_drift(pge, refpart):
pge2.beam_particles = lambda pti, refpart: my_drift(pge2, pti, refpart)
pge2.ref_particle = lambda refpart: my_ref_drift(pge2, refpart)
pge2.ds = 0.5
pge2.threadsafe = True # allow OpenMP threading for speed

# add beam diagnostics
monitor = elements.BeamMonitor("monitor", backend="h5")
Expand Down
10 changes: 4 additions & 6 deletions src/ImpactX.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,11 @@ namespace impactx {
// init blocks / grids & MultiFabs
amr_data->InitFromScratch(0.0);

// alloc particle containers
// have to resize here, not in the constructor because grids have not
// prepare particle containers
// have to do this here, not in the constructor because grids have not
// been built when constructor was called.
amr_data->track_particles.m_particle_container->reserveData();
amr_data->track_particles.m_particle_container->resizeData();
amr_data->track_particles.m_particles_lost->reserveData();
amr_data->track_particles.m_particles_lost->resizeData();
amr_data->track_particles.m_particle_container->prepare();
amr_data->track_particles.m_particles_lost->prepare();

// register shortcut
amr_data->track_particles.m_particle_container->SetLostParticleContainer(amr_data->track_particles.m_particles_lost.get());
Expand Down
7 changes: 7 additions & 0 deletions src/particles/ImpactXParticleContainer.H
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ namespace impactx
//! Destruct a particle container
virtual ~ImpactXParticleContainer() = default;

/** Prepares the container for use.
*
* This sets the tile size to a sensible default and resizes the
* underlying data structures.
*/
void prepare ();

/** Add new particles to the container for fixed s.
*
* Note: This can only be used *after* the initialization (grids) have
Expand Down
183 changes: 131 additions & 52 deletions src/particles/ImpactXParticleContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include <ablastr/constant.H>
#include <ablastr/particles/ParticleMoments.H>
#include <ablastr/warn_manager/WarnManager.H>

#include <AMReX.H>
#include <AMReX_AmrCore.H>
Expand Down Expand Up @@ -107,6 +108,52 @@ namespace impactx
SetParticleShape(v);
}

void
ImpactXParticleContainer::prepare ()
{
// make sure we have at least one box with enough tiles for each OpenMP thread

// make sure level 0, grid 0 exists
int lid = 0, gid = 0;
{
const auto& pmap = ParticleDistributionMap(lid).ProcessorMap();
auto it = std::find(pmap.begin(), pmap.end(), amrex::ParallelDescriptor::MyProc());
if (it == std::end(pmap)) {
amrex::Abort("Particle container needs to have at least one grid.");
} else {
gid = *it;
}
}

int nthreads = 1;
#if defined(AMREX_USE_OMP)
nthreads = omp_get_max_threads();
#endif
/**
When running without space charge and OpenMP parallelization, we need to make sure we have enough tiles on level 0, grid 0 to thread over the available tiles. We try to set the tile_size appropriately here. The tiles start as (long, 8, 8) in (x, y, z). So we try to reduce the tile size in the y and z directions alternately until there are enough tiles for the number of threads.
*/
const auto& ba = ParticleBoxArray(lid);
auto n_logical = numTilesInBox(ba[gid], true, tile_size);

int ntry = 0;
while ((n_logical < nthreads) && (ntry++ < 6)) {
int idim = (ntry % 2) + 1; // alternate between 1 and 2
tile_size[idim] /= 2;
AMREX_ALWAYS_ASSERT(tile_size[idim] > 0);
n_logical = numTilesInBox(ba[gid], true, tile_size);
}

if (n_logical < nthreads) {
amrex::Abort("ImpactParticleContainer::prepare() "
Copy link
Member

@ax3l ax3l Feb 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@atmyers I added more more concrete guidance to users here, can you please double check it?

In which situations would we not be able to find enough tiles? Is there other guidance we should give or can we make it a high warning and "just" be less parallel (i.e., less parallely efficient)?

Copy link
Contributor Author

@atmyers atmyers Feb 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the code will not just be efficient, it will give incorrect results if the number of tiles according to the tile_size is fewer than the number of threads. The issue is that the copyParticles routine in AMReX does not copy the ones that aren't on a valid tile. This resulted in some of the particles not getting written out some examples (since we us copyParticles to a pinned_pc in IO).

This can be changed in AMReX, I'd suggest changing this to a warning rather than an Abort at that point.

"could not find good tile size for the number of OpenMP threads. "
"Consider lowering the number of OpenMP threads via the environment variable OMP_NUM_THREADS."
);
}

reserveData();
resizeData();
}

void
ImpactXParticleContainer::AddNParticles (
amrex::Gpu::DeviceVector<amrex::ParticleReal> const & x,
Expand All @@ -130,8 +177,8 @@ namespace impactx
// number of particles to add
int const np = x.size();

// we add particles to lev 0, tile 0 of the first box assigned to this proc
int lid = 0, gid = 0, tid = 0;
// we add particles to lev 0, grid 0
int lid = 0, gid = 0;
{
const auto& pmap = ParticleDistributionMap(lid).ProcessorMap();
auto it = std::find(pmap.begin(), pmap.end(), amrex::ParallelDescriptor::MyProc());
Expand All @@ -141,61 +188,93 @@ namespace impactx
gid = *it;
}
}
auto& particle_tile = DefineAndReturnParticleTile(lid, gid, tid);

auto old_np = particle_tile.numParticles();
auto new_np = old_np + np;
particle_tile.resize(new_np);

// Update NextID to include particles created in this function
int pid;
#ifdef AMREX_USE_OMP
#pragma omp critical (add_beam_nextid)
int nthreads = 1;
#if defined(AMREX_USE_OMP)
nthreads = omp_get_max_threads();
#endif
{
pid = ParticleType::NextID();
ParticleType::NextID(pid+np);

// split up particles over nthreads tiles
AMREX_ALWAYS_ASSERT(numTilesInBox(ParticleBoxArray(lid)[gid], true, tile_size) >= nthreads);
for (int ithr = 0; ithr < nthreads; ++ithr) {
DefineAndReturnParticleTile(lid, gid, ithr);
}

int pid = ParticleType::NextID();
ParticleType::NextID(pid + np);
AMREX_ALWAYS_ASSERT_WITH_MESSAGE(
static_cast<amrex::Long>(pid) + static_cast<amrex::Long>(np) < amrex::LongParticleIds::LastParticleID,
"ERROR: overflow on particle id numbers");

const int cpuid = amrex::ParallelDescriptor::MyProc();

auto & soa = particle_tile.GetStructOfArrays().GetRealData();
amrex::ParticleReal * const AMREX_RESTRICT x_arr = soa[RealSoA::x].dataPtr();
amrex::ParticleReal * const AMREX_RESTRICT y_arr = soa[RealSoA::y].dataPtr();
amrex::ParticleReal * const AMREX_RESTRICT t_arr = soa[RealSoA::t].dataPtr();
amrex::ParticleReal * const AMREX_RESTRICT px_arr = soa[RealSoA::px].dataPtr();
amrex::ParticleReal * const AMREX_RESTRICT py_arr = soa[RealSoA::py].dataPtr();
amrex::ParticleReal * const AMREX_RESTRICT pt_arr = soa[RealSoA::pt].dataPtr();
amrex::ParticleReal * const AMREX_RESTRICT qm_arr = soa[RealSoA::qm].dataPtr();
amrex::ParticleReal * const AMREX_RESTRICT w_arr = soa[RealSoA::w ].dataPtr();

uint64_t * const AMREX_RESTRICT idcpu_arr = particle_tile.GetStructOfArrays().GetIdCPUData().dataPtr();

amrex::ParticleReal const * const AMREX_RESTRICT x_ptr = x.data();
amrex::ParticleReal const * const AMREX_RESTRICT y_ptr = y.data();
amrex::ParticleReal const * const AMREX_RESTRICT t_ptr = t.data();
amrex::ParticleReal const * const AMREX_RESTRICT px_ptr = px.data();
amrex::ParticleReal const * const AMREX_RESTRICT py_ptr = py.data();
amrex::ParticleReal const * const AMREX_RESTRICT pt_ptr = pt.data();

amrex::ParallelFor(np,
[=] AMREX_GPU_DEVICE (int i) noexcept
static_cast<amrex::Long>(pid) + static_cast<amrex::Long>(np) < amrex::LongParticleIds::LastParticleID,
"ERROR: overflow on particle id numbers"
);

#if defined(AMREX_USE_OMP)
#pragma omp parallel if (amrex::Gpu::notInLaunchRegion())
#endif
{
idcpu_arr[old_np+i] = amrex::SetParticleIDandCPU(pid + i, cpuid);

x_arr[old_np+i] = x_ptr[i];
y_arr[old_np+i] = y_ptr[i];
t_arr[old_np+i] = t_ptr[i];

px_arr[old_np+i] = px_ptr[i];
py_arr[old_np+i] = py_ptr[i];
pt_arr[old_np+i] = pt_ptr[i];
qm_arr[old_np+i] = qm;
w_arr[old_np+i] = bchchg/ablastr::constant::SI::q_e/np;
});
int tid = 1;
#if defined(AMREX_USE_OMP)
tid = omp_get_thread_num();
#endif

// we split up the np particles onto multiple tiles.
// some tiles will get nr and some will get nlft.
int nr = np / nthreads;
int nlft = np - nr*nthreads;

int num_to_add = 0; /* how many particles this tile will get*/
int my_index = 0;

if (tid < nlft) { // get nr+1 items
my_index = tid * (nr + 1);
num_to_add = nr + 1;
} else { // get nr items
my_index = tid * nr + nlft;
num_to_add = nr;
}

auto& particle_tile = ParticlesAt(lid, gid, tid);
auto old_np = particle_tile.numParticles();
auto new_np = old_np + num_to_add;
particle_tile.resize(new_np);

const int cpuid = amrex::ParallelDescriptor::MyProc();

auto & soa = particle_tile.GetStructOfArrays().GetRealData();
amrex::ParticleReal * const AMREX_RESTRICT x_arr = soa[RealSoA::x].dataPtr();
amrex::ParticleReal * const AMREX_RESTRICT y_arr = soa[RealSoA::y].dataPtr();
amrex::ParticleReal * const AMREX_RESTRICT t_arr = soa[RealSoA::t].dataPtr();
amrex::ParticleReal * const AMREX_RESTRICT px_arr = soa[RealSoA::px].dataPtr();
amrex::ParticleReal * const AMREX_RESTRICT py_arr = soa[RealSoA::py].dataPtr();
amrex::ParticleReal * const AMREX_RESTRICT pt_arr = soa[RealSoA::pt].dataPtr();
amrex::ParticleReal * const AMREX_RESTRICT qm_arr = soa[RealSoA::qm].dataPtr();
amrex::ParticleReal * const AMREX_RESTRICT w_arr = soa[RealSoA::w ].dataPtr();

uint64_t * const AMREX_RESTRICT idcpu_arr = particle_tile.GetStructOfArrays().GetIdCPUData().dataPtr();

amrex::ParticleReal const * const AMREX_RESTRICT x_ptr = x.data();
amrex::ParticleReal const * const AMREX_RESTRICT y_ptr = y.data();
amrex::ParticleReal const * const AMREX_RESTRICT t_ptr = t.data();
amrex::ParticleReal const * const AMREX_RESTRICT px_ptr = px.data();
amrex::ParticleReal const * const AMREX_RESTRICT py_ptr = py.data();
amrex::ParticleReal const * const AMREX_RESTRICT pt_ptr = pt.data();

amrex::ParallelFor(num_to_add,
[=] AMREX_GPU_DEVICE (int i) noexcept
{
idcpu_arr[old_np+i] = amrex::SetParticleIDandCPU(pid + my_index + i, cpuid);

x_arr[old_np+i] = x_ptr[my_index+i];
y_arr[old_np+i] = y_ptr[my_index+i];
t_arr[old_np+i] = t_ptr[my_index+i];

px_arr[old_np+i] = px_ptr[my_index+i];
py_arr[old_np+i] = py_ptr[my_index+i];
pt_arr[old_np+i] = pt_ptr[my_index+i];

qm_arr[old_np+i] = qm;
w_arr[old_np+i] = bchchg/ablastr::constant::SI::q_e/np;
});
}

// safety first: in case passed attribute arrays were temporary, we
// want to make sure the ParallelFor has ended here
Expand Down
Loading