r"""Solves the Navier-Stokes equations in 2D using a PINN. The setting is described in https://arxiv.org/pdf/2402.03153 """ from pathlib import Path from typing import OrderedDict import matplotlib.pyplot as plt import torch from scimba_torch.approximation_space.abstract_space import AbstractApproxSpace from scimba_torch.approximation_space.nn_space import NNxtSpace from scimba_torch.domain.meshless_domain.domain_2d import Disk2D, Square2D from scimba_torch.integration.monte_carlo import DomainSampler, TensorizedSampler from scimba_torch.integration.monte_carlo_parameters import UniformParametricSampler from scimba_torch.integration.monte_carlo_time import UniformTimeSampler from scimba_torch.neural_nets.coordinates_based_nets.mlp import GenericMLP from scimba_torch.numerical_solvers.temporal_pde.pinns import ( NaturalGradientTemporalPinns, ) from scimba_torch.physical_models.temporal_pde.abstract_temporal_pde import ( TemporalPDE, ) from scimba_torch.plots.plots_nd import plot_abstract_approx_spaces from scimba_torch.utils.scimba_tensors import LabelTensor # Calcul du volume du domaine réel def compute_domain_volume(sampler, N=10000000): """ Calcule le volume du domaine réel en utilisant une méthode de Monte Carlo. Args: sampler: Le sampler tensorisé contenant le sampler spatial N: Nombre de points à échantillonner pour l'estimation : float: Le volume estimé du domaine réel """ x_sampler = sampler.list_sampler[1].vol_sampler # Récupérer le sampler spatial # Récupérer les bounds du domaine cubique bounds = x_sampler.domain.bounds # Shape: (dim, 2) # Calculer le volume du cube englobant volume_cube = torch.prod(bounds[:, 1] - bounds[:, 0]).item() # Échantillonner N points dans le cube pts = x_sampler.base_sampler.sample((N,)) # Compter combien de points sont dans le domaine réel percent_in = x_sampler.domain.is_inside(pts).sum().item() / N # Volume du domaine réel = volume du cube × proportion de points dedans volume_domaine = volume_cube * percent_in return volume_domaine torch.manual_seed(0) def uin(y, u0, h): return (4 / h**2) * (y * (h - y)) * u0 class NavierStokes2DStrongForm(TemporalPDE): def __init__(self, space: AbstractApproxSpace, **kwargs): super().__init__( space, residual_size=3, bc_residual_size=10, ic_residual_size=3, **kwargs ) self.residual_size = 3 self.bc_residual_size = 10 self.ic_residual_size = 3 spatial_dim = self.space.spatial_domain.dim assert spatial_dim == 2, ( "This class is designed for 2D Navier-Stokes equations." ) # self.fixed_params = fixed_params # self.nu = fixed_params["nu"] # Viscosity def rhs(self, w, t, x, mu): # Zero vector for the residuals x1, _ = x.get_components() zeros = torch.zeros_like(x1) return zeros, zeros, zeros def time_operator(self, w, t, x, mu): u, v, p = w.get_components() # mu_, k_f = mu.get_components() u_t = self.grad(u, t) v_t = self.grad(v, t) x1, x2 = x.get_components() r = torch.sqrt((x1 - 0.2) ** 2 + (x2 - 0.2) ** 2) rc = 0.05 x1, _ = x.get_components() zeros = torch.zeros_like(x1) return u_t * (r - rc), v_t * (r - rc), zeros def space_operator(self, w, t, x, mu): u, v, p = w.get_components() mu_ = mu.get_components() # Incompressibility condition u_x, u_y = self.grad(u, x) v_x, v_y = self.grad(v, x) op1 = u_x + v_y # Navier-Stokes equations u_xx, _ = self.grad(u_x, x) _, u_yy = self.grad(u_y, x) v_xx, _ = self.grad(v_x, x) _, v_yy = self.grad(v_y, x) p_x, p_y = self.grad(p, x) x1, x2 = x.get_components() r = torch.sqrt((x1 - 0.2) ** 2 + (x2 - 0.2) ** 2) rc = 0.05 op2a = (u * u_x + v * u_y) + p_x - mu_ * (u_xx + u_yy) op2b = (u * v_x + v * v_y) + p_y - mu_ * (v_xx + v_yy) return op2a * (r - rc), op2b * (r - rc), op1 * (r - rc) def bc_rhs(self, w, t, x, n, mu): x1_0, _ = x.get_components(label=0) x1_1, _ = x.get_components(label=1) x1_2, _ = x.get_components(label=2) _, y1_3 = x.get_components(label=3) x1_4, _ = x.get_components(label=4) return ( torch.zeros_like(x1_0), torch.zeros_like(x1_0), torch.zeros_like(x1_1), torch.zeros_like(x1_1), torch.zeros_like(x1_2), torch.zeros_like(x1_2), uin(y1_3, 0.3, 0.41), torch.zeros_like(y1_3), torch.zeros_like(x1_4), torch.zeros_like(x1_4), ) def bc_operator(self, w, t, x, n, mu): u, v, p = w.get_components() ldown, lright, ltop, lleft, circle = 0, 1, 2, 3, 4 # Dirichlet conditions u_down = w.restrict_to_labels(u, labels=[ldown]) v_down = w.restrict_to_labels(v, labels=[ldown]) u_right = w.restrict_to_labels(u, labels=[lright]) p_right = w.restrict_to_labels(p, labels=[lright]) u_up = w.restrict_to_labels(u, labels=[ltop]) v_up = w.restrict_to_labels(v, labels=[ltop]) u_left = w.restrict_to_labels(u, labels=[lleft]) v_left = w.restrict_to_labels(v, labels=[lleft]) u_circle = w.restrict_to_labels(u, labels=[circle]) v_circle = w.restrict_to_labels(v, labels=[circle]) return ( u_down, v_down, torch.zeros_like(u_right), p_right, u_up, v_up, u_left, v_left, u_circle, v_circle, ) def initial_condition(self, x, mu): x1, _ = x.get_components() zeros = torch.zeros_like(x1) return zeros, zeros, zeros def functional_time_operator(self, func, t, x, mu, theta): u_t = torch.func.jacrev(lambda *args: func(*args)[0], 0)(t, x, mu, theta) v_t = torch.func.jacrev(lambda *args: func(*args)[1], 0)(t, x, mu, theta) zeros = torch.zeros_like(x[0:1]) r = torch.sqrt((x[0:1] - 0.2) ** 2 + (x[1:2] - 0.2) ** 2) rc = 0.05 return torch.concatenate((u_t * (r - rc), v_t * (r - rc), zeros), dim=0) def functional_space_operator(self, func, t, x, mu, theta): f_vals = func(t, x, mu, theta) u, v, _ = f_vals[0], f_vals[1], f_vals[2] # Incompressibility condition grad_u = torch.func.jacrev(lambda *args: func(*args)[0], 1) grad_v = torch.func.jacrev(lambda *args: func(*args)[1], 1) u_xy = grad_u(t, x, mu, theta) v_xy = grad_v(t, x, mu, theta) u_x, u_y = u_xy[0:1], u_xy[1:2] v_x, v_y = v_xy[0:1], v_xy[1:2] op1 = u_x + v_y # Navier-Stokes equations hessian_u = torch.func.jacrev(grad_u, 1)(t, x, mu, theta) hessian_v = torch.func.jacrev(grad_v, 1)(t, x, mu, theta) u_xx, u_yy = hessian_u[0, 0], hessian_u[1, 1] v_xx, v_yy = hessian_v[0, 0], hessian_v[1, 1] p_xy = torch.func.jacrev(lambda *args: func(*args)[2], 1)(t, x, mu, theta) p_x, p_y = p_xy[0:1], p_xy[1:2] op2a = (u * u_x + v * u_y) + p_x - mu * (u_xx + u_yy) op2b = (u * v_x + v * v_y) + p_y - mu * (v_xx + v_yy) r = torch.sqrt((x[0:1] - 0.2) ** 2 + (x[1:2] - 0.2) ** 2) rc = 0.05 return torch.concatenate( (op2a * (r - rc), op2b * (r - rc), op1 * (r - rc)), dim=0 ) def functional_operator(self, func, t, x, mu, theta): res = self.functional_time_operator(func, t, x, mu, theta) res += self.functional_space_operator(func, t, x, mu, theta) return res def functional_operator_bc_right(self, func, t, x, n, mu, theta): p = func(t, x, mu, theta)[2:3] return torch.concatenate((0 * p, p), dim=0) def functional_operator_bc_other(self, func, t, x, n, mu, theta): f_vals = func(t, x, mu, theta) return f_vals[0:2] def functional_operator_bc(self) -> OrderedDict: return OrderedDict( [ ((0, 2, 3, 4), self.functional_operator_bc_other), (1, self.functional_operator_bc_right), ] ) def functional_operator_ic(self, func, x, mu, theta) -> torch.Tensor: t = torch.zeros_like(mu) return func(t, x, mu, theta) def post_processing( inputs: torch.Tensor, t: LabelTensor, xy: LabelTensor, mu: LabelTensor ) -> torch.Tensor: h = 0.41 length = 2.2 r_c = 0.05 u0 = 0.3 x, y = xy.get_components() t_ = t.get_components() r = torch.sqrt((x - 0.2) ** 2 + (y - 0.2) ** 2) u = inputs[:, 0:1] * (r - r_c) * y * (h - y) * x + uin(y, u0, h) * ( (r - r_c) / (torch.sqrt((y - 0.2) ** 2 + 0.2**2) - r_c) ) v = inputs[:, 1:2] * (r - r_c) * y * (h - y) * x p = inputs[:, 2:3] * (length - x) return t_ * torch.concatenate((u, v, p), dim=1) def functional_post_processing(func, t, xy, mu, theta) -> torch.Tensor: inputs = func(t, xy, mu, theta) h = 0.41 length = 2.2 r_c = 0.05 u0 = 0.3 x, y = xy[0], xy[1] r = torch.sqrt((x - 0.2) ** 2 + (y - 0.2) ** 2) u = inputs[0:1] * (r - r_c) * y * (h - y) * x + uin(y, u0, h) * ( (r - r_c) / (torch.sqrt((y - 0.2) ** 2 + 0.2**2) - r_c) ) v = inputs[1:2] * (r - r_c) * y * (h - y) * x p = inputs[2:3] * (length - x) return t * torch.concatenate((u, v, p), dim=-1) def run_navier_stokes_2d( param=[0.1, 0.1], plot_fig=True, save_fig=False, new_training=False ): # Domain t_min, t_max = 0.0, 12.0 domain_t = [t_min, t_max] box = [[0, 2.2], [0, 0.41]] center = (0.2, 0.2) radius = 0.05 parameter_domain = [param] domain_x = Square2D(box, is_main_domain=True) hole = Disk2D(center, radius, is_main_domain=False) domain_x.add_hole(hole) sampler = TensorizedSampler( [ UniformTimeSampler(domain_t), DomainSampler(domain_x), UniformParametricSampler(parameter_domain), ] ) # Estimer le volume du domaine volume_estimé = compute_domain_volume(sampler) print(f"Volume estimé du domaine: {volume_estimé:.6f}") # Network nb_unknowns, nb_parameters = 3, 1 tlayers = [16, 16, 20, 24] # Example layer sizes for the MLP # tlayers = [20, 40, 40, 20] space = NNxtSpace( nb_unknowns, nb_parameters, GenericMLP, domain_x, sampler, layer_sizes=tlayers, activation_type="tanh", post_processing=post_processing, random_fact_weights_bool=False, ) print("number of dof:", space.ndof) # EDP pde = NavierStokes2DStrongForm(space) pinns = NaturalGradientTemporalPinns( pde, bc_type="strong", in_weights=[1.8, 1.2, 4.0], one_loss_by_equation=True, matrix_regularization=3.0e-6, functional_post_processing=functional_post_processing, ) ## Load or train the PINN filename_extension = "ns_rectangle" default_path = Path(__file__).parent default_folder_name = "results" if new_training or not pinns.load( __file__, filename_extension, default_path, default_folder_name ): pinns.solve( epochs=1000, n_collocation=4000, verbose=True, ) pinns.save(__file__, filename_extension, default_path, default_folder_name) plot_abstract_approx_spaces( pinns.space, domain_x, parameter_domain, domain_t, time_values=[ t_max, ], n_visu=600, loss=pinns.losses, residual=pinns.pde, draw_contours=True, n_drawn_contours=20, parameters_values="mean", loss_groups=["bc"], ) if save_fig: figfilename = "navierstokes.png" fig_path = default_path / default_folder_name / figfilename plt.savefig(fig_path, bbox_inches="tight") print(f"Figure saved at {fig_path}") if plot_fig: plt.show() return pinns if __name__ == "__main__": # Run the Navier-Stokes 2D problem with the specified parameters run_navier_stokes_2d( param=[0.001, 0.001], new_training=True, plot_fig=True, save_fig=True )