from pathlib import Path from typing import Callable, OrderedDict import matplotlib.pyplot as plt import torch from scimba_torch.approximation_space.abstract_space import AbstractApproxSpace from scimba_torch.approximation_space.nn_space import NNxSpace from scimba_torch.domain.meshless_domain.domain_2d import Square2D from scimba_torch.integration.monte_carlo import DomainSampler, TensorizedSampler from scimba_torch.integration.monte_carlo_parameters import UniformParametricSampler from scimba_torch.neural_nets.coordinates_based_nets.mlp import GenericMLP from scimba_torch.numerical_solvers.elliptic_pde.pinns import ( NaturalGradientPinnsElliptic, ) from scimba_torch.physical_models.elliptic_pde.abstract_elliptic_pde import ( StrongFormEllipticPDE, ) from scimba_torch.plots.plots_nd import plot_abstract_approx_spaces from scimba_torch.utils.scimba_tensors import LabelTensor torch.manual_seed(0) class NavierStokes2DStrongForm(StrongFormEllipticPDE): def __init__(self, space: AbstractApproxSpace, fixed_params, **kwargs): super().__init__( space, linear=True, residual_size=4, bc_residual_size=4, **kwargs ) spatial_dim = self.space.spatial_domain.dim assert spatial_dim == 2, ( "This class is designed for 2D Navier-Stokes equations." ) self.fixed_params = fixed_params self.Cp = fixed_params["Cp"] # Specific heat capacity self.beta = fixed_params["beta"] # Thermal expansion coefficient self.rho = fixed_params["rho"] # Density self.gravity = fixed_params["g"] # Gravitational acceleration def rhs(self, w, x, mu): # Zero vector for the residuals x1, _ = x.get_components() zeros = torch.zeros_like(x1) return zeros, zeros, zeros, zeros def operator(self, w, x, mu): u, v, p, T = w.get_components() mu_, k_f = mu.get_components() # Incompressibility condition u_x, u_y = self.grad(u, x) v_x, v_y = self.grad(v, x) op1 = u_x + v_y # Navier-Stokes equations u_xx, _ = self.grad(u_x, x) _, u_yy = self.grad(u_y, x) v_xx, _ = self.grad(v_x, x) _, v_yy = self.grad(v_y, x) p_x, p_y = self.grad(p, x) op2a = self.rho * (u * u_x + v * u_y) + p_x - mu_ * (u_xx + u_yy) op2b = ( self.rho * (u * v_x + v * v_y) + p_y - mu_ * (v_xx + v_yy) - self.gravity * (1 + self.beta * T) ) # Energy equation T_x, T_y = self.grad(T, x) T_xx, _ = self.grad(T_x, x) _, T_yy = self.grad(T_y, x) op3 = self.rho * (u * T_x + v * T_y) - k_f / self.Cp * (T_xx + T_yy) return op1, op2a, op2b, op3 def bc_rhs(self, w, x, n, mu): x1_0, _ = x.get_components( label=0 ) # Assuming label 0 corresponds to the first component x1_1, _ = x.get_components(label=1) x1_2, _ = x.get_components( label=2 ) # Assuming label 2 corresponds to the third component x1_3, _ = x.get_components(label=3) return ( torch.zeros_like(x1_0), torch.zeros_like(x1_1), torch.zeros_like(x1_2), torch.zeros_like(x1_3), ) def bc_operator(self, w, x, n, mu): _, _, _, T = w.get_components() ldown, _, ltop, _ = 0, 1, 2, 3 # Neumann condition (top and bottom) T_x, T_y = self.grad(T, x) T_x_top = w.restrict_to_labels(T_x, labels=[ltop]) T_y_top = w.restrict_to_labels(T_y, labels=[ltop]) nx, ny = n.get_components(label=ltop) T_top = T_x_top * nx + T_y_top * ny T_x_down = w.restrict_to_labels(T_x, labels=[ldown]) T_y_down = w.restrict_to_labels(T_y, labels=[ldown]) nx, ny = n.get_components(label=ldown) T_down = T_x_down * nx + T_y_down * ny x1_1, _ = x.get_components(label=1) x1_3, _ = x.get_components(label=3) T_right = torch.zeros_like(x1_1) T_left = torch.zeros_like(x1_3) return T_down, T_right, T_top, T_left def functional_operator_bc_top_down(self, func, x, n, mu, theta): T_xy = torch.func.jacrev(lambda *args: func(*args)[3], 0)(x, mu, theta) return (T_xy @ n)[None] def functional_operator_bc_left_right(self, func, x, n, mu, theta): return torch.zeros_like(x[0])[None] def functional_operator_bc(self) -> OrderedDict: return OrderedDict( [ (0, self.functional_operator_bc_top_down), (1, self.functional_operator_bc_left_right), (2, self.functional_operator_bc_top_down), (3, self.functional_operator_bc_left_right), ] ) def functional_operator( self, func: Callable, x: torch.Tensor, mu: torch.Tensor, theta: torch.Tensor, ) -> torch.Tensor: mu_, k_f = mu[0], mu[1] f_vals = func(x, mu, theta) u, v, _, T = f_vals[0], f_vals[1], f_vals[2], f_vals[3] # Incompressibility condition grad_u = torch.func.jacrev(lambda *args: func(*args)[0], 0) grad_v = torch.func.jacrev(lambda *args: func(*args)[1], 0) u_xy = grad_u(x, mu, theta) v_xy = grad_v(x, mu, theta) u_x, u_y = u_xy[0:1], u_xy[1:2] v_x, v_y = v_xy[0:1], v_xy[1:2] op1 = u_x + v_y # Navier-Stokes equations hessian_u = torch.func.jacrev(grad_u, 0)(x, mu, theta) hessian_v = torch.func.jacrev(grad_v, 0)(x, mu, theta) u_xx, u_yy = hessian_u[0, 0], hessian_u[1, 1] v_xx, v_yy = hessian_v[0, 0], hessian_v[1, 1] p_xy = torch.func.jacrev(lambda *args: func(*args)[2], 0)(x, mu, theta) p_x, p_y = p_xy[0:1], p_xy[1:2] op2a = self.rho * (u * u_x + v * u_y) + p_x - mu_ * (u_xx + u_yy) op2b = self.rho * (u * v_x + v * v_y) + p_y - mu_ * (v_xx + v_yy) op2b -= self.gravity * (1 + self.beta * T) # Energy equation grad_T = torch.func.jacrev(lambda *args: func(*args)[3], 0) hessian_T = torch.func.jacrev(grad_T, 0)(x, mu, theta) T_xy = grad_T(x, mu, theta) T_x, T_y = T_xy[0:1], T_xy[1:2] T_xx, T_yy = hessian_T[0, 0], hessian_T[1, 1] op3 = self.rho * (u * T_x + v * T_y) - k_f / self.Cp * (T_xx + T_yy) return torch.concatenate((op1, op2a, op2b, op3), dim=0) def phi_fct(box, x): x1, x2 = x a, b = box[0] a2, b2 = box[1] return (x1 - a) * (b - x1) * (x2 - a2) * (b2 - x2) def functional_phi_fct(box, x: torch.Tensor) -> torch.Tensor: a, b = box[0] a2, b2 = box[1] return (x[0] - a) * (b - x[0]) * (x[1] - a2) * (b2 - x[1]) def post_proc(box): def post_processing(inputs: torch.Tensor, x: LabelTensor, mu: LabelTensor): x1, x2 = x.get_components() a, b = box[0] phi = torch.ones_like(inputs) phi_square = phi_fct(box, [x1, x2]) phi[:, 0] = phi_square[:, 0] phi[:, 1] = phi_square[:, 0] phi[:, -1:] = (x1 - a) * (b - x1) g = torch.zeros_like(inputs) g[:, -1:] = b - (x1 - a) return inputs * phi + g # return the modified inputs return post_processing def functional_post_proc(box): def functional_post_processing( func, x: torch.Tensor, mu: torch.Tensor, theta: torch.Tensor ) -> torch.Tensor: a, b = box[0] inputs = func(x, mu, theta) # print("inputs.shape: ", inputs.shape) phi = torch.ones_like(inputs) phi_square = functional_phi_fct(box, x) # print("phi_square.shape: ", phi_square.shape) phi[0] = phi_square phi[1] = phi_square phi[-1:] = (x[0] - a) * (b - x[0]) g = torch.zeros_like(inputs) g[-1:] = b - (x[0] - a) # print("res shape: ", inputs * phi + g) return inputs * phi + g # return the modified inputs return functional_post_processing def run_navier_stokes_2d(param=[0.1, 0.1], save_fig=False, new_training=False): # Domain box = [[-1.0, 1.0], [-1.0, 1.0]] parameter_domain = [[0.01, 0.1], [0.01, 0.1]] domain_x = Square2D(box, is_main_domain=True) full_bc_domain = domain_x.full_bc_domain() for bc in full_bc_domain: domain_x.add_bc_domain(bc) sampler = TensorizedSampler( [DomainSampler(domain_x), UniformParametricSampler(parameter_domain)] ) # Network nb_unknowns, nb_parameters = 4, 2 tlayers = [40, 60, 60, 60, 40] # Example layer sizes for the MLP tlayers = [16, 32, 16] # Example layer sizes for the MLP space = NNxSpace( nb_unknowns, nb_parameters, GenericMLP, domain_x, sampler, layer_sizes=tlayers, activation_type="sine", post_processing=post_proc(box), ) # EDP fixed_params = {"rho": 1.0, "Cp": 1.0, "g": 9.81, "beta": 0.1} pde = NavierStokes2DStrongForm(space, fixed_params=fixed_params) # losses = GenericLosses( # [ # ("incomp", torch.nn.MSELoss(), 1.0), # ("NS 1", torch.nn.MSELoss(), 1.0), # ("NS 2", torch.nn.MSELoss(), 1.0), # ("energy", torch.nn.MSELoss(), 1.0), # ("bc Td", torch.nn.MSELoss(), 2.0), # ("bc Tr", torch.nn.MSELoss(), 2.0), # ("bc Tt", torch.nn.MSELoss(), 2.0), # ("bc Tl", torch.nn.MSELoss(), 2.0), # ], # ) # opt_1 = { # "name": "adam", # "optimizer_args": {"lr": 7e-3, "betas": (0.9, 0.999)}, # # "scheduler": # } # opt_2 = { # "name": "lbfgs", # "switch_at_epoch": 3000, # # "switch_at_plateau_ratio": 3.0, # } # opt = OptimizerData(opt_1, opt_2) # pinns = PinnsElliptic(pde, bc_type="weak", optimizers=opt, losses=losses) # one_loss_by_equation=True pinns = NaturalGradientPinnsElliptic( pde, bc_type="weak", bc_weight=20.0, one_loss_by_equation=True, functional_post_processing=functional_post_proc(box), matrix_regularization=1e-6, ) # one_loss_by_equation=True ## Load or train the PINN filename_extension = "ns_square" default_path = Path(__file__).parent default_folder_name = "results" if new_training or not pinns.load( __file__, filename_extension, default_path, default_folder_name ): # pinns.solve(epochs=10000, n_collocation=40000, n_bc_collocation=30000, verbose=True) pinns.solve( epochs=100, n_collocation=16000, n_bc_collocation=12000, verbose=True ) pinns.save(__file__, filename_extension, default_path, default_folder_name) # for plotting pinns if save_fig: plot_abstract_approx_spaces( pinns.space, domain_x, parameter_domain, n_visu=200, loss=pinns.losses, residual=pinns.pde, draw_contours=True, n_drawn_contours=20, parameters_values=(param), loss_groups=["bc"], ) plt.show() figfilename = "navierstokes.png" fig_path = default_path / default_folder_name / figfilename plt.savefig(fig_path, bbox_inches="tight") print(f"Figure saved at {fig_path}") return pinns if __name__ == "__main__": # Run the Navier-Stokes 2D problem with the specified parameters run_navier_stokes_2d(param=[0.05, 0.05], new_training=True, save_fig=True)