r"""Solves the linearized Euler equations in 1D using a PINN. .. math:: \partial_t p + \partial_x u & = f_1 in \Omega \times (0, T) \\ \partial_t u + \partial_x p & = f_2 in \Omega \times (0, T) \\ p & = g_1 on \partial \Omega \times (0, T) \\ u & = g_2 on \partial \Omega \times (0, T) \\ p & = p_0 on \Omega \times {0} \\ u & = u_0 on \Omega \times {0} where :math:`p: \partial \Omega \times (0, T) \to \mathbb{R}` and :math:`u: \partial \Omega \times (0, T) \to \mathbb{R}` are the unknown functions, :math:`\Omega \subset \mathbb{R}` is the spatial domain and :math:`(0, T) \subset \mathbb{R}` is the time domain. Dirichlet boundary conditions are prescribed. The equation is solved on a segment domain; weak boundary and initial conditions are used. Three training strategies are compared: standard PINNs, PINNs with energy natural gradient preconditioning and PINNs with Anagram preconditioning. """ from typing import Callable, Tuple import torch from scimba_torch.approximation_space.abstract_space import AbstractApproxSpace from scimba_torch.approximation_space.nn_space import NNxtSpace from scimba_torch.domain.meshless_domain.domain_1d import Segment1D from scimba_torch.integration.monte_carlo import DomainSampler, TensorizedSampler from scimba_torch.integration.monte_carlo_parameters import UniformParametricSampler from scimba_torch.integration.monte_carlo_time import UniformTimeSampler from scimba_torch.neural_nets.coordinates_based_nets.mlp import GenericMLP from scimba_torch.numerical_solvers.temporal_pde.pinns import ( NaturalGradientTemporalPinns, NystromNaturalGradientTemporalPinns, SketchyNaturalGradientTemporalPinns, TemporalPinns, ) from scimba_torch.optimizers.optimizers_data import OptimizerData from scimba_torch.utils.scimba_tensors import LabelTensor, MultiLabelTensor from scimba_torch.utils.typing_protocols import VarArgCallable class LinearizedEuler: def __init__( self, space: AbstractApproxSpace, init: Callable, f: Callable, g: Callable, **kwargs, ): self.space = space self.init = init self.f = f self.g = g self.linear = True self.residual_size = 2 self.bc_residual_size = 2 self.ic_residual_size = 2 def grad( self, w: torch.Tensor | MultiLabelTensor, y: torch.Tensor | LabelTensor, ) -> torch.Tensor | Tuple[torch.Tensor, ...]: return self.space.grad(w, y) def rhs( self, w: MultiLabelTensor, t: LabelTensor, x: LabelTensor, mu: LabelTensor ) -> Tuple[torch.Tensor]: return self.f(t, x, mu) def bc_rhs( self, w: MultiLabelTensor, t: LabelTensor, x: LabelTensor, n: LabelTensor, mu: LabelTensor, ) -> Tuple[torch.Tensor]: return self.g(t, x, mu) def time_operator( self, w: MultiLabelTensor, t: LabelTensor, x: LabelTensor, mu: LabelTensor ) -> Tuple[torch.Tensor]: p, u = w.get_components() p_t = self.grad(p, t) u_t = self.grad(u, t) return p_t, u_t def space_operator( self, w: MultiLabelTensor, t: LabelTensor, x: LabelTensor, mu: LabelTensor ) -> Tuple[torch.Tensor]: p, u = w.get_components() p_x = self.grad(p, x) u_x = self.grad(u, x) return u_x, p_x def functional_operator( self, func: VarArgCallable, t: torch.Tensor, x: torch.Tensor, mu: torch.Tensor, theta: torch.Tensor, ) -> torch.Tensor: # space operator pu_space = torch.func.jacrev(func, 1)(t, x, mu, theta) pu_space = torch.flip(pu_space, (-1,)) # time operator pu_time = torch.func.jacrev(func, 0)(t, x, mu, theta) # sum both contributions return (pu_space + pu_time).squeeze() # Dirichlet conditions def bc_operator( self, w: MultiLabelTensor, t: LabelTensor, x: LabelTensor, n: LabelTensor, mu: LabelTensor, ) -> Tuple[torch.Tensor]: p, u = w.get_components() return p, u def functional_operator_bc( self, func: VarArgCallable, t: torch.Tensor, x: torch.Tensor, n: torch.Tensor, mu: torch.Tensor, theta: torch.Tensor, ) -> torch.Tensor: return func(t, x, mu, theta) def initial_condition(self, x: LabelTensor, mu: LabelTensor) -> Tuple[torch.Tensor]: return self.init(x, mu) def functional_operator_ic( self, func: VarArgCallable, x: torch.Tensor, mu: torch.Tensor, theta: torch.Tensor, ) -> Tuple[torch.Tensor]: t = torch.zeros_like(x) return func(t, x, mu, theta) def exact_solution(t: LabelTensor, x: LabelTensor, mu: LabelTensor) -> torch.Tensor: x = x.get_components() D = 0.02 coeff = 1 / (4 * torch.pi * D) ** 0.5 p_plus_u = coeff * torch.exp(-((x - t.x - 1) ** 2) / (4 * D)) p_minus_u = coeff * torch.exp(-((x + t.x - 1) ** 2) / (4 * D)) p = (p_plus_u + p_minus_u) / 2 u = (p_plus_u - p_minus_u) / 2 return torch.cat((p, u), dim=-1) def initial_solution(x: LabelTensor, mu: LabelTensor) -> Tuple[torch.Tensor]: sol = exact_solution(LabelTensor(torch.zeros_like(x.x)), x, mu) return sol[..., 0:1], sol[..., 1:2] def f_rhs(t: LabelTensor, x: LabelTensor, mu: LabelTensor) -> Tuple[torch.Tensor]: x = x.get_components() return torch.zeros_like(x), torch.zeros_like(x) def f_bc(t: LabelTensor, x: LabelTensor, mu: LabelTensor) -> Tuple[torch.Tensor]: x = x.get_components() return torch.zeros_like(x), torch.zeros_like(x) domain_mu = [] domain_x = Segment1D((-1.0, 3.0), is_main_domain=True) t_min, t_max = 0.0, 0.5 domain_t = (t_min, t_max) sampler = TensorizedSampler( [ UniformTimeSampler(domain_t), DomainSampler(domain_x), UniformParametricSampler(domain_mu), ] ) bc_weight = 30.0 ic_weight = 300.0 space = NNxtSpace(2, 0, GenericMLP, domain_x, sampler, layer_sizes=[64]) pde = LinearizedEuler(space, initial_solution, f_rhs, f_bc) opt_1 = { "name": "adam", "optimizer_args": {"lr": 1.8e-2, "betas": (0.9, 0.999)}, } opt_2 = { "name": "lbfgs", "switch_at_epoch_ratio": 0.7, } pinns = TemporalPinns( pde, bc_type="weak", ic_type="weak", optimizers=OptimizerData(opt_1, opt_2), bc_weight=bc_weight, ic_weight=ic_weight, one_loss_by_equation=True, ) solving_mode = "load" # can be "new" for solve from scratch, # "resume" for load from file and continue solving, # or "load" for load from file load_post_fix = "pinns" # the post_fix of the file where to load the pinn save_post_fix = "pinns" # the post_fix of the file where to save the pinn new_solving = solving_mode == "new" resume_solving = solving_mode == "resume" try_load = resume_solving or (not new_solving) load_ok = try_load and pinns.load(__file__, load_post_fix) solve = (not load_ok) or (resume_solving or new_solving) if solve: pinns.solve( epochs=1000, n_collocation=3000, n_bc_collocation=1000, n_ic_collocation=1000, ) pinns.save(__file__, save_post_fix) print(f"Training time: {pinns.training_time:.2f} seconds") print(f"Best Loss : {pinns.best_loss:.2e}") # plot_AbstractApproxSpaces( # pinns.space, # domain_x, # domain_mu, # domain_t, # time_values=[t_max,], # loss=pinns.losses, # residual=pde, # solution=exact_solution, # error=exact_solution, # derivatives=["ux", "ut"], # title="solving LinearizedEuler with TemporalPinns", # ) # # plt.show() space2 = NNxtSpace(2, 0, GenericMLP, domain_x, sampler, layer_sizes=[64]) pde2 = LinearizedEuler(space2, initial_solution, f_rhs, f_bc) pinns2 = NaturalGradientTemporalPinns( pde2, bc_type="weak", ic_type="weak", bc_weight=bc_weight, ic_weight=ic_weight, one_loss_by_equation=True, matrix_regularization=1e-6, ) solving_mode = "load" # can be "new" for solve from scratch, # "resume" for load from file and continue solving, # or "load" for load from file load_post_fix = "pinns_ENG" # the post_fix of the file where to load the pinn save_post_fix = "pinns_ENG" # the post_fix of the file where to save the pinn new_solving = solving_mode == "new" resume_solving = solving_mode == "resume" try_load = resume_solving or (not new_solving) load_ok = try_load and pinns2.load(__file__, load_post_fix) solve = (not load_ok) or (resume_solving or new_solving) if solve: pinns2.solve( epochs=1000, n_collocation=3000, n_bc_collocation=1000, n_ic_collocation=1000, ) pinns2.save(__file__, save_post_fix) print(f"Training time: {pinns2.training_time:.2f} seconds") print(f"Best Loss : {pinns2.best_loss:.2e}") def norm(space, t, x, mu): x.x.requires_grad_() w = space.evaluate(t, x, mu) u, p = w.get_components() return torch.sqrt(u**2 + p**2) additional_scalar_functions = {"$\\|(u,p)\\|_2$": norm} # plot_abstract_approx_spaces( # ( # pinns.space, # pinns2.space, # ), # domain_x, # domain_mu, # domain_t, # time_values=[ # t_max, # ], # # loss=( # # pinns.losses, # # pinns2.losses, # # ), # residual=( # pde, # pde2, # ), # solution=exact_solution, # error=exact_solution, # derivatives=["ux", "ut"], # additional_scalar_functions=additional_scalar_functions, # title="solving LinearizedEuler with TemporalPinns", # titles=("no preconditioning", "ENG preconditioning"), # ) # # plt.show() # space3 = NNxtSpace(2, 0, GenericMLP, domain_x, sampler, layer_sizes=[32, 32]) # pde3 = LinearizedEuler(space3, initial_solution, f_rhs, f_bc) # # pinns3 = AnagramTemporalPinns( # pde3, # bc_type="weak", # ic_type="weak", # bc_weight=bc_weight, # ic_weight=ic_weight, # one_loss_by_equation=True, # svd_threshold=5e-2, # ) # # solving_mode = "load" # # can be "new" for solve from scratch, # # "resume" for load from file and continue solving, # # or "load" for load from file # load_post_fix = "pinns_Anagram" # # the post_fix of the file where to load the pinn # save_post_fix = "pinns_Anagram" # # the post_fix of the file where to save the pinn # # new_solving = solving_mode == "new" # resume_solving = solving_mode == "resume" # try_load = resume_solving or (not new_solving) # load_ok = try_load and pinns3.load(__file__, load_post_fix) # solve = (not load_ok) or (resume_solving or new_solving) # # if solve: # pinns3.solve( # epochs=100, # n_collocation=3000, # n_bc_collocation=1000, # n_ic_collocation=1000, # ) # pinns3.save(__file__, save_post_fix) # # plot_abstract_approx_spaces( # ( # pinns2.space, # pinns3.space, # ), # domain_x, # domain_mu, # domain_t, # time_values=[ # t_max, # ], # loss=( # pinns2.losses, # pinns3.losses, # ), # residual=( # pde2, # pde3, # ), # solution=exact_solution, # error=exact_solution, # derivatives=["ux", "ut"], # title="solving LinearizedEuler with TemporalPinns", # titles=("ENG preconditioning", "ANaGram preconditioning"), # ) # # plt.show() torch.manual_seed(0) space4 = NNxtSpace(2, 0, GenericMLP, domain_x, sampler, layer_sizes=[64]) pde4 = LinearizedEuler(space4, initial_solution, f_rhs, f_bc) pinns4 = SketchyNaturalGradientTemporalPinns( pde4, bc_type="weak", ic_type="weak", bc_weight=bc_weight, ic_weight=ic_weight, one_loss_by_equation=True, tol=1e-7, ) solving_mode = "load" # can be "new" for solve from scratch, # "resume" for load from file and continue solving, # or "load" for load from file load_post_fix = "pinns_SNG" # the post_fix of the file where to load the pinn save_post_fix = "pinns_SNG" # the post_fix of the file where to save the pinn new_solving = solving_mode == "new" resume_solving = solving_mode == "resume" try_load = resume_solving or (not new_solving) load_ok = try_load and pinns4.load(__file__, load_post_fix) solve = (not load_ok) or (resume_solving or new_solving) if solve: pinns4.solve( epochs=1000, n_collocation=3000, n_bc_collocation=1000, n_ic_collocation=1000, ) pinns4.save(__file__, save_post_fix) print(f"Training time: {pinns4.training_time:.2f} seconds") print(f"Best Loss : {pinns4.best_loss:.2e}") # plot_abstract_approx_spaces( # ( # pinns2.space, # pinns4.space, # ), # domain_x, # domain_mu, # domain_t, # time_values=[ # t_max, # ], # loss=( # pinns2.losses, # pinns4.losses, # ), # residual=( # pde2, # pde4, # ), # solution=exact_solution, # error=exact_solution, # derivatives=["ux", "ut"], # title="solving LinearizedEuler with TemporalPinns", # titles=("ENG preconditioning", "SNG preconditioning"), # ) # # plt.show() torch.manual_seed(0) space5 = NNxtSpace(2, 0, GenericMLP, domain_x, sampler, layer_sizes=[64]) pde5 = LinearizedEuler(space5, initial_solution, f_rhs, f_bc) pinns5 = NystromNaturalGradientTemporalPinns( pde5, bc_type="weak", ic_type="weak", bc_weight=bc_weight, ic_weight=ic_weight, one_loss_by_equation=True, eps=1e-8, # debug=True ) solving_mode = "new" # can be "new" for solve from scratch, # "resume" for load from file and continue solving, # or "load" for load from file load_post_fix = "pinns_NNG" # the post_fix of the file where to load the pinn save_post_fix = "pinns_NNG" # the post_fix of the file where to save the pinn new_solving = solving_mode == "new" resume_solving = solving_mode == "resume" try_load = resume_solving or (not new_solving) load_ok = try_load and pinns5.load(__file__, load_post_fix) solve = (not load_ok) or (resume_solving or new_solving) if solve: pinns5.solve( epochs=200, n_collocation=3000, n_bc_collocation=1000, n_ic_collocation=1000, ) pinns5.save(__file__, save_post_fix) print(f"Training time: {pinns5.training_time:.2f} seconds") print(f"Best Loss : {pinns5.best_loss:.2e}")