Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelization: switch from multiprocessing to joblib #137

Merged
merged 5 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 26 additions & 21 deletions mgwr/gwr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
__author__ = "Taylor Oshan [email protected]"

import copy
import os
from typing import Optional
import numpy as np
import numpy.linalg as la
Expand All @@ -13,10 +14,10 @@
from spglm.glm import GLM, GLMResults
from spglm.iwls import iwls, _compute_betas_gwr
from spglm.utils import cache_readonly
from joblib import Parallel, delayed
from .diagnostics import get_AIC, get_AICc, get_BIC, corr
from .kernels import *
from .summary import *
import multiprocessing as mp


class GWR(GLM):
Expand Down Expand Up @@ -86,6 +87,10 @@ class GWR(GLM):
name_x : list of strings
Names of independent variables for use in output

n_jobs : integer
The number of jobs (default 1) to run in parallel. -1 means using all processors.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know I am coming late to the party but would you consider using -1 as a default? That is quite common across ML world and it is what users generally expect.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was curious what others' opinion was on this... i tend to default to -1 personally, but joblib itself [indirectly] defaults to 1, and in esda we do both (join counts are conservative, G defaults to -1, etc)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think scikit usually defaults to 1 so i dont think there's a standard expectation in ML world

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind either as long as it is documented (which it is). But for heavily parallelisable code like this one, I tend to prefer parallel execution by default.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was only looking at scikit-learn and adapts to what they have. I actually personally prefer -1 as the default.



Attributes
----------
coords : array-like
Expand Down Expand Up @@ -210,7 +215,7 @@ class GWR(GLM):

def __init__(self, coords, y, X, bw, family=Gaussian(), offset=None,
sigma2_v1=True, kernel='bisquare', fixed=False, constant=True,
spherical=False, hat_matrix=False, name_x=None):
spherical=False, hat_matrix=False, name_x=None,n_jobs=1):
"""
Initialize class
"""
Expand All @@ -234,6 +239,7 @@ def __init__(self, coords, y, X, bw, family=Gaussian(), offset=None,
self.spherical = spherical
self.hat_matrix = hat_matrix
self.name_x = name_x
self.n_jobs = n_jobs

def _build_wi(self, i, bw):

Expand Down Expand Up @@ -285,7 +291,7 @@ def _local_fit(self, i):
return influ, resid, predy, betas.reshape(-1), w, Si, tr_STS_i, CCT

def fit(self, ini_params=None, tol=1.0e-5, max_iter=20, solve='iwls',
lite=False, pool=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a hard-breaking change we should avoid. I suggest keeping the keyword and warning when it is not None.

lite=False):
"""
Method that fits a model with a particular estimation routine.

Expand All @@ -312,7 +318,6 @@ def fit(self, ini_params=None, tol=1.0e-5, max_iter=20, solve='iwls',
bandwidth selection (could speed up
bandwidth selection for GWR) or to estimate
a full GWR. Default is False.
pool : A multiprocessing Pool object to enable parallel fitting; default is None.

Returns
-------
Expand All @@ -335,11 +340,7 @@ def fit(self, ini_params=None, tol=1.0e-5, max_iter=20, solve='iwls',
else:
m = self.points.shape[0]

if pool:
rslt = pool.map(self._local_fit,
range(m)) #parallel using mp.Pool
else:
rslt = map(self._local_fit, range(m)) #sequential
rslt = Parallel(n_jobs=self.n_jobs)(delayed(self._local_fit)(i) for i in range(m))
martinfleis marked this conversation as resolved.
Show resolved Hide resolved

rslt_list = list(zip(*rslt))
influ = np.array(rslt_list[0]).reshape(-1, 1)
Expand Down Expand Up @@ -1492,6 +1493,9 @@ class MGWR(GWR):
name_x : list of strings
Names of independent variables for use in output

n_jobs : integer
The number of jobs (default 1) to run in parallel. -1 means using all processors.

Examples
--------

Expand Down Expand Up @@ -1521,7 +1525,7 @@ class MGWR(GWR):

def __init__(self, coords, y, X, selector, sigma2_v1=True,
kernel='bisquare', fixed=False, constant=True,
spherical=False, hat_matrix=False, name_x=None):
spherical=False, hat_matrix=False, name_x=None,n_jobs=1):
"""
Initialize class
"""
Expand All @@ -1544,6 +1548,7 @@ def __init__(self, coords, y, X, selector, sigma2_v1=True,
self.exog_scale = None
self_fit_params = None
self.name_x = name_x
self.n_jobs = n_jobs

def _chunk_compute_R(self, chunk_id=0):
"""
Expand Down Expand Up @@ -1599,7 +1604,7 @@ def _chunk_compute_R(self, chunk_id=0):
return ENP_j, CCT, pR
return ENP_j, CCT

def fit(self, n_chunks=1, pool=None):
def fit(self, n_chunks=1):
"""
Compute MGWR inference by chunk to reduce memory footprint.
See Li and Fotheringham, 2020, IJGIS.
Expand All @@ -1610,7 +1615,6 @@ def fit(self, n_chunks=1, pool=None):
n_chunks : integer, optional
A number of chunks parameter to reduce memory usage.
e.g. n_chunks=2 should reduce overall memory usage by 2.
pool : A multiprocessing Pool object to enable parallel fitting; default is None.

Returns
-------
Expand All @@ -1627,15 +1631,16 @@ def tqdm(x, total=0,
desc=''): #otherwise, just passthrough the range
return x

if pool:
self.n_chunks = pool._processes * n_chunks
rslt = tqdm(
pool.imap(self._chunk_compute_R, range(self.n_chunks)),
total=self.n_chunks, desc='Inference')
if self.n_jobs == -1:
max_processors = os.cpu_count()
self.n_chunks = max_processors * n_chunks
else:
self.n_chunks = n_chunks
rslt = map(self._chunk_compute_R,
tqdm(range(self.n_chunks), desc='Inference'))
self.n_chunks = self.n_jobs * n_chunks

# Using joblib for parallel processing with a tqdm progress bar
rslt = tqdm(Parallel(n_jobs=self.n_jobs)(
delayed(self._chunk_compute_R)(i) for i in range(self.n_chunks)),
total=self.n_chunks, desc='Inference')

rslt_list = list(zip(*rslt))
ENP_j = np.sum(np.array(rslt_list[0]), axis=0)
Expand Down Expand Up @@ -1666,7 +1671,7 @@ def exact_fit(self):
Q = []
I = np.eye(self.n)
for j1 in range(self.k):
Aj = GWR(self.coords,self.y,self.X[:,j1].reshape(-1,1),bw=self.bws[j1],hat_matrix=True,constant=False).fit().S
Aj = GWR(self.coords,self.y,self.X[:,j1].reshape(-1,1),bw=self.bws[j1],hat_matrix=True,constant=False,n_jobs=self.n_jobs).fit().S
Pj = []
for j2 in range(self.k):
if j1 == j2:
Expand Down
22 changes: 10 additions & 12 deletions mgwr/sel_bw.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import spreg.user_output as USER
import numpy as np
import multiprocessing as mp
from scipy.spatial.distance import pdist
from scipy.optimize import minimize_scalar
from spglm.family import Gaussian, Poisson, Binomial
Expand Down Expand Up @@ -59,6 +58,8 @@ class Sel_BW(object):
spherical : boolean
True for shperical coordinates (long-lat),
False for projected coordinates (defalut).
n_jobs : integer
The number of jobs (default 1) to run in parallel. -1 means using all processors.

Attributes
----------
Expand Down Expand Up @@ -175,7 +176,7 @@ class Sel_BW(object):

def __init__(self, coords, y, X_loc, X_glob=None, family=Gaussian(),
offset=None, kernel='bisquare', fixed=False, multi=False,
constant=True, spherical=False):
constant=True, spherical=False,n_jobs=1):
self.coords = np.array(coords)
self.y = y
self.X_loc = X_loc
Expand All @@ -194,14 +195,15 @@ def __init__(self, coords, y, X_loc, X_glob=None, family=Gaussian(),
self._functions = []
self.constant = constant
self.spherical = spherical
self.n_jobs = n_jobs
self.search_params = {}

def search(self, search_method='golden_section', criterion='AICc',
bw_min=None, bw_max=None, interval=0.0, tol=1.0e-6,
max_iter=200, init_multi=None, tol_multi=1.0e-5,
rss_score=False, max_iter_multi=200, multi_bw_min=[None],
multi_bw_max=[None
], bws_same_times=5, pool=None, verbose=False):
], bws_same_times=5, verbose=False):
"""
Method to select one unique bandwidth for a gwr model or a
bandwidth vector for a mgwr model.
Expand Down Expand Up @@ -247,8 +249,6 @@ def search(self, search_method='golden_section', criterion='AICc',
bws_same_times : If bandwidths keep the same between iterations for
bws_same_times (default 5) in backfitting, then use the
current set of bandwidths as final bandwidths.
pool : A multiprocessing Pool object to enbale parallel fitting;
default is None
verbose : Boolean
If true, bandwidth searching history is printed out; default is False.

Expand All @@ -268,7 +268,6 @@ def search(self, search_method='golden_section', criterion='AICc',
self.bw_min = bw_min
self.bw_max = bw_max
self.bws_same_times = bws_same_times
self.pool = pool
self.verbose = verbose

if len(multi_bw_min) == k:
Expand Down Expand Up @@ -319,14 +318,13 @@ def search(self, search_method='golden_section', criterion='AICc',
self._bw()
self.sel_hist = self.bw[-1]

self.pool = None
return self.bw[0]

def _bw(self):
gwr_func = lambda bw: getDiag[self.criterion](GWR(
self.coords, self.y, self.X_loc, bw, family=self.family, kernel=
self.kernel, fixed=self.fixed, constant=self.constant, offset=self.
offset, spherical=self.spherical).fit(lite=True, pool=self.pool))
offset, spherical=self.spherical, n_jobs=self.n_jobs).fit(lite=True))

self._optimized_function = gwr_func

Expand Down Expand Up @@ -381,20 +379,20 @@ def _mbw(self):
def gwr_func(y, X, bw):
return GWR(coords, y, X, bw, family=family, kernel=kernel,
fixed=fixed, offset=offset, constant=False,
spherical=self.spherical, hat_matrix=False).fit(
lite=True, pool=self.pool)
spherical=self.spherical, hat_matrix=False,n_jobs=self.n_jobs).fit(
lite=True)

def bw_func(y, X):
selector = Sel_BW(coords, y, X, X_glob=[], family=family,
kernel=kernel, fixed=fixed, offset=offset,
constant=False, spherical=self.spherical)
constant=False, spherical=self.spherical,n_jobs=self.n_jobs)
return selector

def sel_func(bw_func, bw_min=None, bw_max=None):
return bw_func.search(
search_method=search_method, criterion=criterion,
bw_min=bw_min, bw_max=bw_max, interval=interval, tol=tol,
max_iter=max_iter, pool=self.pool, verbose=False)
max_iter=max_iter, verbose=False)

self.bw = multi_bw(self.init_multi, y, X, n, k, family, self.tol_multi,
self.max_iter_multi, self.rss_score, gwr_func,
Expand Down
Loading