-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpylbm.py
159 lines (151 loc) · 8.07 KB
/
pylbm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""simple LBM: 2D, that can be extended to 3D, multiphase, etc through callbacks
@author: Ted Tower
"""
import numpy as np
import time
class LBM():
def __init__(self,nzyx,nphase=1,tau=None):
self.dim=nzyx
self.nphase=nphase
self.ndir=9
self.c=np.array([[0,0,0, 0, 0,0, 0, 0, 0], # z -- direction vectors for D2Q9
[0,0,1, 0,-1,1, 1,-1,-1], # y
[0,1,0,-1, 0,1,-1,-1, 1]]);# x
t1=4./9;t2=1./9;t3=1./36;
self.w=np.array([t1,t2,t2,t2,t2,t3,t3,t3,t3]) # D2Q9 weights in each dir
self.fields={'tau':np.ones((*self.dim,self.nphase)), # relaxation time
'u':np.zeros((*self.dim,3)), # macroscopic velocity
'ueq':np.zeros((*self.dim,self.nphase,3)), # phase-velocity
'rho':np.ones((*self.dim,self.nphase)), # density
'ns':np.zeros((*self.dim,self.nphase)), # scattering (e.g. walls=1)
'Fin':np.zeros((*self.dim,self.nphase,self.ndir)), # incoming dist
'Fout':np.zeros((*self.dim,self.nphase,self.ndir)), # outgoing dist
'Feq':np.zeros((*self.dim,self.nphase,self.ndir)), # equilibrium dist
'Fpop':np.zeros((*self.dim,self.nphase,self.ndir)), # external forces
'flowMode':np.zeros((*self.dim,self.nphase))+2}
if(tau is not None): self.fields['tau']=tau
self.flowIdx=[]
self.getFlowIndex()
self.fluidPhases=list(range(nphase))
self.initDistribution();
self.reflected=[0,3,4,1,2,7,8,5,6] # reflection directions for D2Q9
self.step=0
self.status=(0,'ok')
def initDistribution(self): # initialize the equilibrium distribution based on rho&v
self.fields['Feq']=np.zeros((*self.dim,self.nphase,self.ndir))
self.calcFeq();
self.fields['Fout']=self.fields['Feq'].copy()
def getFlowIndex(self): # determine regions that diffuse, advect, fully N-S flow
self.flowIdx=[]
for pp in range(self.nphase):
k0=np.where(self.fields['flowMode'][...,pp]==0)
k1=np.where(self.fields['flowMode'][...,pp]==1)
k2=np.where(self.fields['flowMode'][...,pp]>=1)
self.flowIdx.append((k0,k1,k2))
def calcFeq(self): # calc the equilibrium distribution
ueq=self.fields['ueq']
for pp in range(self.nphase):
u2c=1.5*(self.fields['ueq'][...,pp,:]**2).sum(axis=-1)
#k0,k1,k2=self.flowIdx[pp]
for ii in range(self.ndir):
cuns=3*(self.c[0,ii]*ueq[...,pp,0]+self.c[1,ii]*ueq[...,pp,1]+self.c[2,ii]*ueq[...,pp,2])
self.fields['Feq'][...,pp,ii]=self.w[ii]*self.fields['rho'][...,pp]*(1+cuns+0.5*cuns**2-u2c)
#if(k0[0].size): self.fields['Feq'][k0+(pp,ii,)]=self.w[ii]*self.fields['rho'][k0+(pp,)]
#if(k1[0].size): self.fields['Feq'][k1+(pp,ii,)]=self.w[ii]*self.fields['rho'][k1+(pp,)]*(1+cuns[k1])
#if(k2[0].size): self.fields['Feq'][k2+(pp,ii,)]=self.w[ii]*self.fields['rho'][k2+(pp,)]*(1+cuns[k2]+0.5*cuns[k2]**2-u2c[k2])
def calcMacro(self): # calculate macroscopic variables (hard-coded for D2Q9)
self.fields['rho']=self.fields['Fin'].sum(axis=-1);
F=self.fields['Fin'][...,self.fluidPhases,:]
rho=self.fields['rho'][...,self.fluidPhases]
tau=self.fields['tau'][...,self.fluidPhases]
rhoOmegaTot=(rho/tau).sum(axis=-1)+1e-10
#with np.errstate(invalid='ignore'):
ux=((F[...,1]+F[...,5]+F[...,8])-(F[...,3]+F[...,6]+F[...,7]))
self.fields['u'][...,2]=(ux/tau).sum(axis=-1)/rhoOmegaTot
uy=((F[...,2]+F[...,5]+F[...,6])-(F[...,4]+F[...,7]+F[...,8]))
self.fields['u'][...,1]=(uy/tau).sum(axis=-1)/rhoOmegaTot
ksolid=np.where(self.fields['ns'][...,0]==1)
self.fields['u'][ksolid+(0,)]=0
self.fields['u'][ksolid+(1,)]=0
self.fields['u'][ksolid+(2,)]=0
def calcUeq(self): # copy the velocity field 'v' into each phase of 'ueq'
for ii in range(self.nphase):
self.fields['ueq'][...,ii,:]=self.fields['u']
def stream(self): # stream outgoing distributions into incoming
Fout=self.fields['Fout']
nz,ny,nx,nph,ndir=Fout.shape
iz=np.arange(nz)
iy=np.arange(ny)
ix=np.arange(nx)
for ii in range(self.ndir):
#self.fields['Fin'][...,ii]=np.roll(self.fields['Fout'][...,ii],
# (self.c[0,ii],self.c[1,ii],self.c[2,ii],0),axis=(0,1,2,3))
Fd=Fout[...,ii]
Fd=Fd[iz[(np.arange(nz)-self.c[0,ii])%nz],:,:,:]
Fd=Fd[:,iy[(np.arange(ny)-self.c[1,ii])%ny],:,:]
Fd=Fd[:,:,ix[(np.arange(nx)-self.c[2,ii])%nx],:]
Fout[...,ii]=Fd
self.fields['Fin']=Fout
def collide(self): # collide the incoming distributions into a new outgoing distribution
invtau=np.tile(1/self.fields['tau'][...,np.newaxis],(1,1,1,1,9))
self.fields['Fout']=invtau*self.fields['Feq']+(1-invtau)*self.fields['Fin']+self.fields['Fpop'];
def PBB(self,ON): # partial bounceback (Walsh-Bxxx-Saar)
F=self.fields['Fout']
for ii in range(self.ndir):
k=ON+(ii,); # "on" spatial locations, append index for D2Q9 direction
F[k]=F[k]+self.fields['ns'][ON]*(self.fields['Fin'][ON+(self.reflected[ii],)]-F[k])
def sim(self,steps=1,callbacks=None,verbose=False):
if(callbacks is None): callbacks={}
for k in ['init','postStream','postMacro','postUeq','postFeq','postCollision','final']:
if(k not in callbacks): callbacks[k]=[]
ON=np.where(self.fields['ns'])
self.getFlowIndex()
for cb in callbacks['init']: cb(self)
t0=time.time();
for ii in range(steps):
self.step=ii
self.stream()
for cb in callbacks['postStream']: cb(self) # modify distributions
self.calcMacro()
for cb in callbacks['postMacro']: cb(self) # BC: set v and rho, explicitly; output
self.calcUeq()
for cb in callbacks['postUeq']: cb(self) # BC: adjust ueq (e.g. Shan-Chen)
self.calcFeq();
for cb in callbacks['postFeq']: cb(self) # BC: forcing functions (e.g. gravity)
self.collide()
for cb in callbacks['postCollision']: cb(self) # BC: forcing functions (e.g. gravity)
self.PBB(ON)
if(np.any(self.fields['u']>1)):
print('ack!: velocity too high! (step %d)'%self.step);break
#self.step=-1 # simple state flag
for cb in callbacks['final']: cb(self)
if(verbose):
print('done! (%.2fmin)'%((time.time()-t0)/60))
if(__name__=='__main__'):
import matplotlib.pyplot as plt
# This is an example of the workflow for a simple single-phase flow
# with an obstacle.
#
# Dimensions specified/accessed in (nz,ny,nx) order.
# instance LBM object
S=LBM((1,30,60),nphase=1)
# specify/reassign field variables such as scattering (z,y,x,phase)
S.fields['ns'][0,10:20,10:20,0]=1
# create callback to specify velocity boundary conditions
# -- velocity is (z,y,x,3) for 3 components of velocity. In a 2D simulation
# you will see [...,1] for y-component and [...,2] for x-component.
def cb_vel(self):
self.fields['u'][0,1:-1,0,2]=.1 # specified vel from left
self.fields['u'][0,1:-1,-1,:]=self.fields['u'][0,1:-1,-2,:] # "open" right
# create a convenience function for plotting velocity
def myplot(self,prefix):
vmag=((self.fields['u'][0]**2).sum(axis=-1))**0.5
plt.imshow(vmag);plt.title('%s:|u|'%prefix);plt.colorbar()
# use that in callbacks at different stages of the simulation
def cb_postMacro(self):
if(self.step%50==0):
plt.figure();myplot(self,prefix='postMacro(%d)'%self.step);plt.show()
# gather callbacks to pass to the simulation method
cb={'postMacro':[cb_vel,cb_postMacro]}
# call the sim method to run for 500 steps
S.sim(steps=500,callbacks=cb)