r"""Solves the linearized Euler equations in 1D using a PINN. .. math:: \partial_t p + \partial_x u & = f_1 in \Omega \times (0, T) \\ \partial_t u + \partial_x p & = f_2 in \Omega \times (0, T) \\ p & = g_1 on \partial \Omega \times (0, T) \\ u & = g_2 on \partial \Omega \times (0, T) \\ p & = p_0 on \Omega \times {0} \\ u & = u_0 on \Omega \times {0} where :math:`p: \partial \Omega \times (0, T) \to \mathbb{R}` and :math:`u: \partial \Omega \times (0, T) \to \mathbb{R}` are the unknown functions, :math:`\Omega \subset \mathbb{R}` is the spatial domain and :math:`(0, T) \subset \mathbb{R}` is the time domain. Dirichlet boundary conditions are prescribed. The equation is solved on a segment domain; weak boundary and initial conditions are used. Three training strategies are compared: standard PINNs, PINNs with energy natural gradient preconditioning and PINNs with Anagram preconditioning. """ from typing import Callable, Tuple import numpy as np import torch from scimba_torch.approximation_space.abstract_space import AbstractApproxSpace from scimba_torch.approximation_space.nn_space import NNxtSpace from scimba_torch.domain.meshless_domain.domain_1d import Segment1D from scimba_torch.integration.monte_carlo import DomainSampler, TensorizedSampler from scimba_torch.integration.monte_carlo_parameters import UniformParametricSampler from scimba_torch.integration.monte_carlo_time import UniformTimeSampler from scimba_torch.neural_nets.coordinates_based_nets.mlp import GenericMLP from scimba_torch.numerical_solvers.temporal_pde.pinns import ( AnagramTemporalPinns, NaturalGradientTemporalPinns, NystromNaturalGradientTemporalPinns, SketchyNaturalGradientTemporalPinns, TemporalPinns, ) from scimba_torch.optimizers.losses import DataLoss from scimba_torch.utils.scimba_tensors import LabelTensor, MultiLabelTensor from scimba_torch.utils.typing_protocols import VarArgCallable class LinearizedEuler: def __init__( self, space: AbstractApproxSpace, init: Callable, f: Callable, g: Callable, **kwargs, ): self.space = space self.init = init self.f = f self.g = g self.linear = True self.residual_size = 2 self.bc_residual_size = 2 self.ic_residual_size = 2 def grad( self, w: torch.Tensor | MultiLabelTensor, y: torch.Tensor | LabelTensor, ) -> torch.Tensor | Tuple[torch.Tensor, ...]: return self.space.grad(w, y) def rhs( self, w: MultiLabelTensor, t: LabelTensor, x: LabelTensor, mu: LabelTensor ) -> Tuple[torch.Tensor]: return self.f(t, x, mu) def bc_rhs( self, w: MultiLabelTensor, t: LabelTensor, x: LabelTensor, n: LabelTensor, mu: LabelTensor, ) -> Tuple[torch.Tensor]: return self.g(t, x, mu) def time_operator( self, w: MultiLabelTensor, t: LabelTensor, x: LabelTensor, mu: LabelTensor ) -> Tuple[torch.Tensor]: p, u = w.get_components() p_t = self.grad(p, t) u_t = self.grad(u, t) return p_t, u_t def space_operator( self, w: MultiLabelTensor, t: LabelTensor, x: LabelTensor, mu: LabelTensor ) -> Tuple[torch.Tensor]: p, u = w.get_components() p_x = self.grad(p, x) u_x = self.grad(u, x) return u_x, p_x def functional_operator( self, func: VarArgCallable, t: torch.Tensor, x: torch.Tensor, mu: torch.Tensor, theta: torch.Tensor, ) -> torch.Tensor: # space operator pu_space = torch.func.jacrev(func, 1)(t, x, mu, theta) pu_space = torch.flip(pu_space, (-1,)) # time operator pu_time = torch.func.jacrev(func, 0)(t, x, mu, theta) # sum both contributions return (pu_space + pu_time).squeeze() # Dirichlet conditions def bc_operator( self, w: MultiLabelTensor, t: LabelTensor, x: LabelTensor, n: LabelTensor, mu: LabelTensor, ) -> Tuple[torch.Tensor]: p, u = w.get_components() return p, u def functional_operator_bc( self, func: VarArgCallable, t: torch.Tensor, x: torch.Tensor, n: torch.Tensor, mu: torch.Tensor, theta: torch.Tensor, ) -> torch.Tensor: return func(t, x, mu, theta) def initial_condition(self, x: LabelTensor, mu: LabelTensor) -> Tuple[torch.Tensor]: return self.init(x, mu) def functional_operator_ic( self, func: VarArgCallable, x: torch.Tensor, mu: torch.Tensor, theta: torch.Tensor, ) -> Tuple[torch.Tensor]: t = torch.zeros_like(x) return func(t, x, mu, theta) def exact_solution(t: LabelTensor, x: LabelTensor, mu: LabelTensor) -> torch.Tensor: x = x.get_components() D = 0.02 coeff = 1 / (4 * torch.pi * D) ** 0.5 p_plus_u = coeff * torch.exp(-((x - t.x - 1) ** 2) / (4 * D)) p_minus_u = coeff * torch.exp(-((x + t.x - 1) ** 2) / (4 * D)) p = (p_plus_u + p_minus_u) / 2 u = (p_plus_u - p_minus_u) / 2 return torch.cat((p, u), dim=-1) def initial_solution(x: LabelTensor, mu: LabelTensor) -> Tuple[torch.Tensor]: sol = exact_solution(LabelTensor(torch.zeros_like(x.x)), x, mu) return sol[..., 0:1], sol[..., 1:2] def f_rhs(t: LabelTensor, x: LabelTensor, mu: LabelTensor) -> Tuple[torch.Tensor]: x = x.get_components() return torch.zeros_like(x), torch.zeros_like(x) def f_bc(t: LabelTensor, x: LabelTensor, mu: LabelTensor) -> Tuple[torch.Tensor]: x = x.get_components() return torch.zeros_like(x), torch.zeros_like(x) domain_mu = [] domain_x = Segment1D((-1.0, 3.0), is_main_domain=True) t_min, t_max = 0.0, 0.5 domain_t = (t_min, t_max) sampler = TensorizedSampler( [ UniformTimeSampler(domain_t), DomainSampler(domain_x), UniformParametricSampler(domain_mu), ] ) ### generate data n_samples_per_dim = 50 box = [(-1.0, 3.0)] + domain_mu + [domain_t] linspaces = [np.linspace(b[0], b[1], n_samples_per_dim) for b in box] meshgrid = np.meshgrid(*linspaces) mesh = torch.tensor(np.stack(meshgrid, axis=-1).reshape((n_samples_per_dim**2, 2))) # print(mesh) t_data = LabelTensor(mesh[:, :1]) x_data = LabelTensor(mesh[:, 1:2]) mu = torch.zeros((t_data.shape[0], 0)) mu_data = LabelTensor(mu) y_data = exact_solution(t_data, x_data, mu_data) # print(y_data) dloss = DataLoss((mesh[:, :1], mesh[:, 1:2], mu), y_data, torch.nn.MSELoss()) bc_weight = 30.0 ic_weight = 300.0 torch.manual_seed(0) print( "############################# No preconditioning, SS-BFGS ###########################" ) space = NNxtSpace(2, 0, GenericMLP, domain_x, sampler, layer_sizes=[64]) pde = LinearizedEuler(space, initial_solution, f_rhs, f_bc) pinns = TemporalPinns( pde, bc_type="weak", ic_type="weak", optimizers="ssbfgs", bc_weight=bc_weight, ic_weight=ic_weight, one_loss_by_equation=True, data_losses=[dloss], dl_weights=[1.0], ) solving_mode = "load" # can be "new" for solve from scratch, # "resume" for load from file and continue solving, # or "load" for load from file load_post_fix = "pinns" # the post_fix of the file where to load the pinn save_post_fix = "pinns" # the post_fix of the file where to save the pinn new_solving = solving_mode == "new" resume_solving = solving_mode == "resume" try_load = resume_solving or (not new_solving) load_ok = try_load and pinns.load(__file__, load_post_fix) solve = (not load_ok) or (resume_solving or new_solving) if solve: pinns.solve( epochs=1000, n_collocation=3000, n_bc_collocation=1000, n_ic_collocation=1000, ) pinns.save(__file__, save_post_fix) print(f"Training time: {pinns.training_time:.2f} seconds") print(f"Best Loss : {pinns.best_loss:.2e}") print("########################################################\n") # plot_AbstractApproxSpaces( # pinns.space, # domain_x, # domain_mu, # domain_t, # time_values=[t_max,], # loss=pinns.losses, # residual=pde, # solution=exact_solution, # error=exact_solution, # derivatives=["ux", "ut"], # title="solving LinearizedEuler with TemporalPinns", # ) # # plt.show() torch.manual_seed(0) print("############################# ENG preconditioning ###########################") space2 = NNxtSpace(2, 0, GenericMLP, domain_x, sampler, layer_sizes=[64]) pde2 = LinearizedEuler(space2, initial_solution, f_rhs, f_bc) pinns2 = NaturalGradientTemporalPinns( pde2, bc_type="weak", ic_type="weak", bc_weight=bc_weight, ic_weight=ic_weight, one_loss_by_equation=True, data_losses=[dloss], dl_weights=[1.0], matrix_regularization=1e-6, ) solving_mode = "load" # can be "new" for solve from scratch, # "resume" for load from file and continue solving, # or "load" for load from file load_post_fix = "pinns_ENG" # the post_fix of the file where to load the pinn save_post_fix = "pinns_ENG" # the post_fix of the file where to save the pinn new_solving = solving_mode == "new" resume_solving = solving_mode == "resume" try_load = resume_solving or (not new_solving) load_ok = try_load and pinns2.load(__file__, load_post_fix) solve = (not load_ok) or (resume_solving or new_solving) if solve: pinns2.solve( epochs=600, n_collocation=3000, n_bc_collocation=1000, n_ic_collocation=1000, ) pinns2.save(__file__, save_post_fix) print(f"Training time: {pinns2.training_time:.2f} seconds") print(f"Best Loss : {pinns2.best_loss:.2e}") print("########################################################\n") # plot_abstract_approx_spaces( # ( # pinns.space, # pinns2.space, # ), # domain_x, # domain_mu, # domain_t, # time_values=[ # t_max, # ], # loss=( # pinns.losses, # pinns2.losses, # ), # residual=( # pde, # pde2, # ), # solution=exact_solution, # error=exact_solution, # derivatives=["ux", "ut"], # title="solving LinearizedEuler with TemporalPinns", # titles=("no preconditioning", "ENG preconditioning"), # ) # # plt.show() torch.manual_seed(0) print( "############################# ANaGRAM preconditioning ###########################" ) space3 = NNxtSpace(2, 0, GenericMLP, domain_x, sampler, layer_sizes=[16, 16]) pde3 = LinearizedEuler(space3, initial_solution, f_rhs, f_bc) pinns3 = AnagramTemporalPinns( pde3, bc_type="weak", ic_type="weak", bc_weight=bc_weight, ic_weight=ic_weight, data_losses=[dloss], dl_weights=[1.0], one_loss_by_equation=True, svd_threshold=5e-6, ) solving_mode = "load" # can be "new" for solve from scratch, # "resume" for load from file and continue solving, # or "load" for load from file load_post_fix = "pinns_Anagram" # the post_fix of the file where to load the pinn save_post_fix = "pinns_Anagram" # the post_fix of the file where to save the pinn new_solving = solving_mode == "new" resume_solving = solving_mode == "resume" try_load = resume_solving or (not new_solving) load_ok = try_load and pinns3.load(__file__, load_post_fix) solve = (not load_ok) or (resume_solving or new_solving) if solve: pinns3.solve( epochs=600, n_collocation=3000, n_bc_collocation=1000, n_ic_collocation=1000, ) pinns3.save(__file__, save_post_fix) print(f"Training time: {pinns3.training_time:.2f} seconds") print(f"Best Loss : {pinns3.best_loss:.2e}") print("########################################################\n") torch.manual_seed(0) print( "############################# Sketchy preconditioning ###########################" ) space4 = NNxtSpace(2, 0, GenericMLP, domain_x, sampler, layer_sizes=[16, 16]) pde4 = LinearizedEuler(space4, initial_solution, f_rhs, f_bc) pinns4 = SketchyNaturalGradientTemporalPinns( pde4, bc_type="weak", ic_type="weak", bc_weight=bc_weight, ic_weight=ic_weight, data_losses=[dloss], dl_weights=[1.0], one_loss_by_equation=True, tol=1e-7, ) solving_mode = "load" # can be "new" for solve from scratch, # "resume" for load from file and continue solving, # or "load" for load from file load_post_fix = "pinns_SNG" # the post_fix of the file where to load the pinn save_post_fix = "pinns_SNG" # the post_fix of the file where to save the pinn new_solving = solving_mode == "new" resume_solving = solving_mode == "resume" try_load = resume_solving or (not new_solving) load_ok = try_load and pinns4.load(__file__, load_post_fix) solve = (not load_ok) or (resume_solving or new_solving) if solve: pinns4.solve( epochs=600, n_collocation=3000, n_bc_collocation=1000, n_ic_collocation=1000, ) pinns4.save(__file__, save_post_fix) print(f"Training time: {pinns4.training_time:.2f} seconds") print(f"Best Loss : {pinns4.best_loss:.2e}") print("########################################################\n") torch.manual_seed(0) print( "############################# Nystrom preconditioning ###########################" ) space5 = NNxtSpace(2, 0, GenericMLP, domain_x, sampler, layer_sizes=[16, 16]) pde5 = LinearizedEuler(space5, initial_solution, f_rhs, f_bc) pinns5 = NystromNaturalGradientTemporalPinns( pde5, bc_type="weak", ic_type="weak", bc_weight=bc_weight, ic_weight=ic_weight, data_losses=[dloss], dl_weights=[1.0], one_loss_by_equation=True, eps=1e-10, debug=True, ) solving_mode = "new" # can be "new" for solve from scratch, # "resume" for load from file and continue solving, # or "load" for load from file load_post_fix = "pinns_NNG" # the post_fix of the file where to load the pinn save_post_fix = "pinns_NNG" # the post_fix of the file where to save the pinn new_solving = solving_mode == "new" resume_solving = solving_mode == "resume" try_load = resume_solving or (not new_solving) load_ok = try_load and pinns5.load(__file__, load_post_fix) solve = (not load_ok) or (resume_solving or new_solving) if solve: pinns5.solve( epochs=600, n_collocation=3000, n_bc_collocation=1000, n_ic_collocation=1000, ) pinns5.save(__file__, save_post_fix) print(f"Training time: {pinns5.training_time:.2f} seconds") print(f"Best Loss : {pinns5.best_loss:.2e}") print("########################################################\n") # plot_abstract_approx_spaces( # ( # pinns2.space, # pinns3.space, # ), # domain_x, # domain_mu, # domain_t, # time_values=[ # t_max, # ], # loss=( # pinns2.losses, # pinns3.losses, # ), # residual=( # pde2, # pde3, # ), # solution=exact_solution, # error=exact_solution, # derivatives=["ux", "ut"], # title="solving LinearizedEuler with TemporalPinns", # ) # # plt.show()