r"""Solves the linearized Euler equations in 1D using a PINN. .. math:: \partial_t p + \partial_x u & = f_1 in \Omega \times (0, T) \\ \partial_t u + \partial_x p & = f_2 in \Omega \times (0, T) \\ p & = g_1 on \partial \Omega \times (0, T) \\ u & = g_2 on \partial \Omega \times (0, T) \\ p & = p_0 on \Omega \times {0} \\ u & = u_0 on \Omega \times {0} where :math:`p: \partial \Omega \times (0, T) \to \mathbb{R}` and :math:`u: \partial \Omega \times (0, T) \to \mathbb{R}` are the unknown functions, :math:`\Omega \subset \mathbb{R}` is the spatial domain and :math:`(0, T) \subset \mathbb{R}` is the time domain. Dirichlet boundary conditions are prescribed. The equation is solved on a segment domain; weak boundary and initial conditions are used. Three training strategies are compared: standard PINNs, PINNs with energy natural gradient preconditioning and PINNs with Anagram preconditioning. """ from typing import Callable, Tuple import matplotlib.pyplot as plt import numpy as np import torch from scimba_torch.approximation_space.abstract_space import AbstractApproxSpace from scimba_torch.approximation_space.nn_space import NNxtSpace from scimba_torch.domain.meshless_domain.domain_1d import Segment1D from scimba_torch.integration.monte_carlo import DomainSampler, TensorizedSampler from scimba_torch.integration.monte_carlo_parameters import UniformParametricSampler from scimba_torch.integration.monte_carlo_time import UniformTimeSampler from scimba_torch.neural_nets.coordinates_based_nets.mlp import GenericMLP from scimba_torch.numerical_solvers.temporal_pde.pinns import ( NaturalGradientTemporalPinns, TemporalPinns, ) from scimba_torch.optimizers.losses import DataLoss from scimba_torch.plots.plots_nd import plot_abstract_approx_spaces from scimba_torch.utils.scimba_tensors import LabelTensor, MultiLabelTensor from scimba_torch.utils.typing_protocols import VarArgCallable class LinearizedEuler: def __init__( self, space: AbstractApproxSpace, init: Callable, f: Callable, g: Callable, **kwargs, ): self.space = space self.init = init self.f = f self.g = g self.linear = True self.residual_size = 2 self.bc_residual_size = 2 self.ic_residual_size = 2 def grad( self, w: torch.Tensor | MultiLabelTensor, y: torch.Tensor | LabelTensor, ) -> torch.Tensor | Tuple[torch.Tensor, ...]: return self.space.grad(w, y) def rhs( self, w: MultiLabelTensor, t: LabelTensor, x: LabelTensor, mu: LabelTensor ) -> Tuple[torch.Tensor]: return self.f(t, x, mu) def bc_rhs( self, w: MultiLabelTensor, t: LabelTensor, x: LabelTensor, n: LabelTensor, mu: LabelTensor, ) -> Tuple[torch.Tensor]: return self.g(t, x, mu) def time_operator( self, w: MultiLabelTensor, t: LabelTensor, x: LabelTensor, mu: LabelTensor ) -> Tuple[torch.Tensor]: p, u = w.get_components() p_t = self.grad(p, t) u_t = self.grad(u, t) return p_t, u_t def space_operator( self, w: MultiLabelTensor, t: LabelTensor, x: LabelTensor, mu: LabelTensor ) -> Tuple[torch.Tensor]: p, u = w.get_components() p_x = self.grad(p, x) u_x = self.grad(u, x) return u_x, p_x def functional_operator( self, func: VarArgCallable, t: torch.Tensor, x: torch.Tensor, mu: torch.Tensor, theta: torch.Tensor, ) -> torch.Tensor: # space operator pu_space = torch.func.jacrev(func, 1)(t, x, mu, theta) pu_space = torch.flip(pu_space, (-1,)) # time operator pu_time = torch.func.jacrev(func, 0)(t, x, mu, theta) # sum both contributions return (pu_space + pu_time).squeeze() # Dirichlet conditions def bc_operator( self, w: MultiLabelTensor, t: LabelTensor, x: LabelTensor, n: LabelTensor, mu: LabelTensor, ) -> Tuple[torch.Tensor]: p, u = w.get_components() return p, u def functional_operator_bc( self, func: VarArgCallable, t: torch.Tensor, x: torch.Tensor, n: torch.Tensor, mu: torch.Tensor, theta: torch.Tensor, ) -> torch.Tensor: return func(t, x, mu, theta) def initial_condition(self, x: LabelTensor, mu: LabelTensor) -> Tuple[torch.Tensor]: return self.init(x, mu) def functional_operator_ic( self, func: VarArgCallable, x: torch.Tensor, mu: torch.Tensor, theta: torch.Tensor, ) -> Tuple[torch.Tensor]: t = torch.zeros_like(x) return func(t, x, mu, theta) def exact_solution(t: LabelTensor, x: LabelTensor, mu: LabelTensor) -> torch.Tensor: x = x.get_components() D = 0.02 coeff = 1 / (4 * torch.pi * D) ** 0.5 p_plus_u = coeff * torch.exp(-((x - t.x - 1) ** 2) / (4 * D)) p_minus_u = coeff * torch.exp(-((x + t.x - 1) ** 2) / (4 * D)) p = (p_plus_u + p_minus_u) / 2 u = (p_plus_u - p_minus_u) / 2 return torch.cat((p, u), dim=-1) def initial_solution(x: LabelTensor, mu: LabelTensor) -> Tuple[torch.Tensor]: sol = exact_solution(LabelTensor(torch.zeros_like(x.x)), x, mu) return sol[..., 0:1], sol[..., 1:2] def f_rhs(t: LabelTensor, x: LabelTensor, mu: LabelTensor) -> Tuple[torch.Tensor]: x = x.get_components() return torch.zeros_like(x), torch.zeros_like(x) def f_bc(t: LabelTensor, x: LabelTensor, mu: LabelTensor) -> Tuple[torch.Tensor]: x = x.get_components() return torch.zeros_like(x), torch.zeros_like(x) domain_mu = [] domain_x = Segment1D((-1.0, 3.0), is_main_domain=True) t_min, t_max = 0.0, 0.5 domain_t = (t_min, t_max) sampler = TensorizedSampler( [ UniformTimeSampler(domain_t), DomainSampler(domain_x), UniformParametricSampler(domain_mu), ] ) ### generate data n_samples_per_dim = 50 box = [(-1.0, 3.0)] + domain_mu + [domain_t] linspaces = [np.linspace(b[0], b[1], n_samples_per_dim) for b in box] meshgrid = np.meshgrid(*linspaces) mesh = torch.tensor(np.stack(meshgrid, axis=-1).reshape((n_samples_per_dim**2, 2))) # print(mesh) t_data = LabelTensor(mesh[:, :1]) x_data = LabelTensor(mesh[:, 1:2]) mu = torch.zeros((t_data.shape[0], 0)) mu_data = LabelTensor(mu) y_data = exact_solution(t_data, x_data, mu_data) # print(y_data) dloss = DataLoss((mesh[:, :1], mesh[:, 1:2], mu), y_data, torch.nn.MSELoss()) bc_weight = 30.0 ic_weight = 300.0 # space = NNxtSpace(2, 0, GenericMLP, domain_x, sampler, layer_sizes=[80, 80, 80, 40, 20, 10]) space = NNxtSpace(2, 0, GenericMLP, domain_x, sampler, layer_sizes=[64]) pde = LinearizedEuler(space, initial_solution, f_rhs, f_bc) pinns = TemporalPinns( pde, bc_type="weak", ic_type="weak", optimizers="ssbfgs", bc_weight=bc_weight, ic_weight=ic_weight, one_loss_by_equation=True, data_losses=[dloss], dl_weights=[1.0], ) solving_mode = "new" # can be "new" for solve from scratch, # "resume" for load from file and continue solving, # or "load" for load from file load_post_fix = "pinns" # the post_fix of the file where to load the pinn save_post_fix = "pinns" # the post_fix of the file where to save the pinn new_solving = solving_mode == "new" resume_solving = solving_mode == "resume" try_load = resume_solving or (not new_solving) load_ok = try_load and pinns.load(__file__, load_post_fix) solve = (not load_ok) or (resume_solving or new_solving) if solve: pinns.solve( epochs=1000, n_collocation=3000, n_bc_collocation=1000, n_ic_collocation=1000, ) pinns.save(__file__, save_post_fix) # plot_AbstractApproxSpaces( # pinns.space, # domain_x, # domain_mu, # domain_t, # time_values=[t_max,], # loss=pinns.losses, # residual=pde, # solution=exact_solution, # error=exact_solution, # derivatives=["ux", "ut"], # title="solving LinearizedEuler with TemporalPinns", # ) # # plt.show() space2 = NNxtSpace(2, 0, GenericMLP, domain_x, sampler, layer_sizes=[64]) pde2 = LinearizedEuler(space2, initial_solution, f_rhs, f_bc) pinns2 = NaturalGradientTemporalPinns( pde2, bc_type="weak", ic_type="weak", bc_weight=bc_weight, ic_weight=ic_weight, one_loss_by_equation=True, data_losses=[dloss], dl_weights=[1.0], matrix_regularization=1e-6, ) solving_mode = "new" # can be "new" for solve from scratch, # "resume" for load from file and continue solving, # or "load" for load from file load_post_fix = "pinns_ENG" # the post_fix of the file where to load the pinn save_post_fix = "pinns_ENG" # the post_fix of the file where to save the pinn new_solving = solving_mode == "new" resume_solving = solving_mode == "resume" try_load = resume_solving or (not new_solving) load_ok = try_load and pinns2.load(__file__, load_post_fix) solve = (not load_ok) or (resume_solving or new_solving) if solve: pinns2.solve( epochs=600, n_collocation=3000, n_bc_collocation=1000, n_ic_collocation=1000, ) pinns2.save(__file__, save_post_fix) plot_abstract_approx_spaces( ( pinns.space, pinns2.space, ), domain_x, domain_mu, domain_t, time_values=[ t_max, ], loss=( pinns.losses, pinns2.losses, ), residual=( pde, pde2, ), solution=exact_solution, error=exact_solution, derivatives=["ux", "ut"], title="solving LinearizedEuler with TemporalPinns", titles=("SS-BFGS", "ENG preconditioning"), ) plt.show() space3 = NNxtSpace(2, 0, GenericMLP, domain_x, sampler, layer_sizes=[16, 16]) pde3 = LinearizedEuler(space3, initial_solution, f_rhs, f_bc) pinns3 = NaturalGradientTemporalPinns( pde3, bc_type="weak", ic_type="weak", bc_weight=bc_weight, ic_weight=ic_weight, data_losses=[dloss], dl_weights=[1.0], one_loss_by_equation=True, ng_algo="ANaGRAM", svd_threshold=5e-1, ) solving_mode = "new" # can be "new" for solve from scratch, # "resume" for load from file and continue solving, # or "load" for load from file load_post_fix = "pinns_ANaGRAM" # the post_fix of the file where to load the pinn save_post_fix = "pinns_ANaGRAM" # the post_fix of the file where to save the pinn new_solving = solving_mode == "new" resume_solving = solving_mode == "resume" try_load = resume_solving or (not new_solving) load_ok = try_load and pinns3.load(__file__, load_post_fix) solve = (not load_ok) or (resume_solving or new_solving) if solve: pinns3.solve( epochs=100, n_collocation=3000, n_bc_collocation=1000, n_ic_collocation=1000, ) pinns3.save(__file__, save_post_fix) plot_abstract_approx_spaces( ( pinns2.space, pinns3.space, ), domain_x, domain_mu, domain_t, time_values=[ t_max, ], loss=( pinns2.losses, pinns3.losses, ), residual=( pde2, pde3, ), solution=exact_solution, error=exact_solution, derivatives=["ux", "ut"], title="solving LinearizedEuler with TemporalPinns", ) plt.show()