PINNs: Navier Stokes equationΒΆ
Solves a 2D Navier Stokes equation on a square domain using PINNs.
[1]:
from pathlib import Path
from typing import Callable, OrderedDict
import matplotlib.pyplot as plt
import torch
from scimba_torch.approximation_space.abstract_space import AbstractApproxSpace
from scimba_torch.approximation_space.nn_space import NNxSpace
from scimba_torch.domain.meshless_domain.domain_2d import Square2D
from scimba_torch.integration.monte_carlo import DomainSampler, TensorizedSampler
from scimba_torch.integration.monte_carlo_parameters import UniformParametricSampler
from scimba_torch.neural_nets.coordinates_based_nets.mlp import GenericMLP
from scimba_torch.numerical_solvers.elliptic_pde.pinns import (
NaturalGradientPinnsElliptic,
)
from scimba_torch.physical_models.elliptic_pde.abstract_elliptic_pde import (
StrongFormEllipticPDE,
)
from scimba_torch.plots.plots_nd import plot_abstract_approx_spaces
from scimba_torch.utils.scimba_tensors import LabelTensor
torch.manual_seed(0)
class NavierStokes2DStrongForm(StrongFormEllipticPDE):
def __init__(self, space: AbstractApproxSpace, fixed_params, **kwargs):
super().__init__(
space, linear=True, residual_size=4, bc_residual_size=4, **kwargs
)
spatial_dim = self.space.spatial_domain.dim
assert spatial_dim == 2, (
"This class is designed for 2D Navier-Stokes equations."
)
self.fixed_params = fixed_params
self.Cp = fixed_params["Cp"] # Specific heat capacity
self.beta = fixed_params["beta"] # Thermal expansion coefficient
self.rho = fixed_params["rho"] # Density
self.gravity = fixed_params["g"] # Gravitational acceleration
def rhs(self, w, x, mu):
# Zero vector for the residuals
x1, _ = x.get_components()
zeros = torch.zeros_like(x1)
return zeros, zeros, zeros, zeros
def operator(self, w, x, mu):
u, v, p, T = w.get_components()
mu_, k_f = mu.get_components()
# Incompressibility condition
u_x, u_y = self.grad(u, x)
v_x, v_y = self.grad(v, x)
op1 = u_x + v_y
# Navier-Stokes equations
u_xx, _ = self.grad(u_x, x)
_, u_yy = self.grad(u_y, x)
v_xx, _ = self.grad(v_x, x)
_, v_yy = self.grad(v_y, x)
p_x, p_y = self.grad(p, x)
op2a = self.rho * (u * u_x + v * u_y) + p_x - mu_ * (u_xx + u_yy)
op2b = (
self.rho * (u * v_x + v * v_y)
+ p_y
- mu_ * (v_xx + v_yy)
- self.gravity * (1 + self.beta * T)
)
# Energy equation
T_x, T_y = self.grad(T, x)
T_xx, _ = self.grad(T_x, x)
_, T_yy = self.grad(T_y, x)
op3 = self.rho * (u * T_x + v * T_y) - k_f / self.Cp * (T_xx + T_yy)
return op1, op2a, op2b, op3
def bc_rhs(self, w, x, n, mu):
x1_0, _ = x.get_components(
label=0
) # Assuming label 0 corresponds to the first component
x1_1, _ = x.get_components(label=1)
x1_2, _ = x.get_components(
label=2
) # Assuming label 2 corresponds to the third component
x1_3, _ = x.get_components(label=3)
return (
torch.zeros_like(x1_0),
torch.zeros_like(x1_1),
torch.zeros_like(x1_2),
torch.zeros_like(x1_3),
)
def bc_operator(self, w, x, n, mu):
_, _, _, T = w.get_components()
ldown, _, ltop, _ = 0, 1, 2, 3
# Neumann condition (top and bottom)
T_x, T_y = self.grad(T, x)
T_x_top = w.restrict_to_labels(T_x, labels=[ltop])
T_y_top = w.restrict_to_labels(T_y, labels=[ltop])
nx, ny = n.get_components(label=ltop)
T_top = T_x_top * nx + T_y_top * ny
T_x_down = w.restrict_to_labels(T_x, labels=[ldown])
T_y_down = w.restrict_to_labels(T_y, labels=[ldown])
nx, ny = n.get_components(label=ldown)
T_down = T_x_down * nx + T_y_down * ny
x1_1, _ = x.get_components(label=1)
x1_3, _ = x.get_components(label=3)
T_right = torch.zeros_like(x1_1)
T_left = torch.zeros_like(x1_3)
return T_down, T_right, T_top, T_left
def functional_operator_bc_top_down(self, func, x, n, mu, theta):
T_xy = torch.func.jacrev(lambda *args: func(*args)[3], 0)(x, mu, theta)
return (T_xy @ n)[None]
def functional_operator_bc_left_right(self, func, x, n, mu, theta):
return torch.zeros_like(x[0])[None]
def functional_operator_bc(self) -> OrderedDict:
return OrderedDict(
[
(0, self.functional_operator_bc_top_down),
(1, self.functional_operator_bc_left_right),
(2, self.functional_operator_bc_top_down),
(3, self.functional_operator_bc_left_right),
]
)
def functional_operator(
self,
func: Callable,
x: torch.Tensor,
mu: torch.Tensor,
theta: torch.Tensor,
) -> torch.Tensor:
mu_, k_f = mu[0], mu[1]
f_vals = func(x, mu, theta)
u, v, _, T = f_vals[0], f_vals[1], f_vals[2], f_vals[3]
# Incompressibility condition
grad_u = torch.func.jacrev(lambda *args: func(*args)[0], 0)
grad_v = torch.func.jacrev(lambda *args: func(*args)[1], 0)
u_xy = grad_u(x, mu, theta)
v_xy = grad_v(x, mu, theta)
u_x, u_y = u_xy[0:1], u_xy[1:2]
v_x, v_y = v_xy[0:1], v_xy[1:2]
op1 = u_x + v_y
# Navier-Stokes equations
hessian_u = torch.func.jacrev(grad_u, 0)(x, mu, theta)
hessian_v = torch.func.jacrev(grad_v, 0)(x, mu, theta)
u_xx, u_yy = hessian_u[0, 0], hessian_u[1, 1]
v_xx, v_yy = hessian_v[0, 0], hessian_v[1, 1]
p_xy = torch.func.jacrev(lambda *args: func(*args)[2], 0)(x, mu, theta)
p_x, p_y = p_xy[0:1], p_xy[1:2]
op2a = self.rho * (u * u_x + v * u_y) + p_x - mu_ * (u_xx + u_yy)
op2b = self.rho * (u * v_x + v * v_y) + p_y - mu_ * (v_xx + v_yy)
op2b -= self.gravity * (1 + self.beta * T)
# Energy equation
grad_T = torch.func.jacrev(lambda *args: func(*args)[3], 0)
hessian_T = torch.func.jacrev(grad_T, 0)(x, mu, theta)
T_xy = grad_T(x, mu, theta)
T_x, T_y = T_xy[0:1], T_xy[1:2]
T_xx, T_yy = hessian_T[0, 0], hessian_T[1, 1]
op3 = self.rho * (u * T_x + v * T_y) - k_f / self.Cp * (T_xx + T_yy)
return torch.concatenate((op1, op2a, op2b, op3), dim=0)
def phi_fct(box, x):
x1, x2 = x
a, b = box[0]
a2, b2 = box[1]
return (x1 - a) * (b - x1) * (x2 - a2) * (b2 - x2)
def functional_phi_fct(box, x: torch.Tensor) -> torch.Tensor:
a, b = box[0]
a2, b2 = box[1]
return (x[0] - a) * (b - x[0]) * (x[1] - a2) * (b2 - x[1])
def post_proc(box):
def post_processing(inputs: torch.Tensor, x: LabelTensor, mu: LabelTensor):
x1, x2 = x.get_components()
a, b = box[0]
phi = torch.ones_like(inputs)
phi_square = phi_fct(box, [x1, x2])
phi[:, 0] = phi_square[:, 0]
phi[:, 1] = phi_square[:, 0]
phi[:, -1:] = (x1 - a) * (b - x1)
g = torch.zeros_like(inputs)
g[:, -1:] = b - (x1 - a)
return inputs * phi + g # return the modified inputs
return post_processing
def functional_post_proc(box):
def functional_post_processing(
func, x: torch.Tensor, mu: torch.Tensor, theta: torch.Tensor
) -> torch.Tensor:
a, b = box[0]
inputs = func(x, mu, theta)
# print("inputs.shape: ", inputs.shape)
phi = torch.ones_like(inputs)
phi_square = functional_phi_fct(box, x)
# print("phi_square.shape: ", phi_square.shape)
phi[0] = phi_square
phi[1] = phi_square
phi[-1:] = (x[0] - a) * (b - x[0])
g = torch.zeros_like(inputs)
g[-1:] = b - (x[0] - a)
# print("res shape: ", inputs * phi + g)
return inputs * phi + g # return the modified inputs
return functional_post_processing
def run_navier_stokes_2d(param=[0.1, 0.1], save_fig=False, new_training=False):
# Domain
box = [[-1.0, 1.0], [-1.0, 1.0]]
parameter_domain = [[0.01, 0.1], [0.01, 0.1]]
domain_x = Square2D(box, is_main_domain=True)
full_bc_domain = domain_x.full_bc_domain()
for bc in full_bc_domain:
domain_x.add_bc_domain(bc)
sampler = TensorizedSampler(
[DomainSampler(domain_x), UniformParametricSampler(parameter_domain)]
)
# Network
nb_unknowns, nb_parameters = 4, 2
tlayers = [40, 60, 60, 60, 40] # Example layer sizes for the MLP
tlayers = [16, 32, 16] # Example layer sizes for the MLP
space = NNxSpace(
nb_unknowns,
nb_parameters,
GenericMLP,
domain_x,
sampler,
layer_sizes=tlayers,
activation_type="sine",
post_processing=post_proc(box),
)
# EDP
fixed_params = {"rho": 1.0, "Cp": 1.0, "g": 9.81, "beta": 0.1}
pde = NavierStokes2DStrongForm(space, fixed_params=fixed_params)
pinns = NaturalGradientPinnsElliptic(
pde,
bc_type="weak",
bc_weight=20.0,
one_loss_by_equation=True,
functional_post_processing=functional_post_proc(box),
matrix_regularization=1e-6,
)
## Load or train the PINN
filename_extension = "ns_square"
default_path = Path(__file__).parent
default_folder_name = "results"
if new_training or not pinns.load(
__file__, filename_extension, default_path, default_folder_name
):
# pinns.solve(epochs=10000, n_collocation=40000, n_bc_collocation=30000, verbose=True)
pinns.solve(
epochs=100, n_collocation=16000, n_bc_collocation=12000, verbose=True
)
pinns.save(__file__, filename_extension, default_path, default_folder_name)
# for plotting pinns
if save_fig:
plot_abstract_approx_spaces(
pinns.space,
domain_x,
parameter_domain,
n_visu=200,
loss=pinns.losses,
residual=pinns.pde,
draw_contours=True,
n_drawn_contours=20,
parameters_values=(param),
loss_groups=["bc"],
)
#plt.show()
figfilename = "navierstokes.png"
fig_path = default_path / default_folder_name / figfilename
plt.savefig(fig_path, bbox_inches="tight")
print(f"Figure saved at {fig_path}")
return pinns
Uncomment the following to run the script.
[2]:
#run_navier_stokes_2d(param=[0.05, 0.05], new_training=True, save_fig=True)