r"""Solves the linearized Euler equations in 1D using a PINN. .. math:: \partial_t p + \partial_x u & = f_1 in \Omega \times (0, T) \\ \partial_t u + \partial_x p & = f_2 in \Omega \times (0, T) \\ p & = g_1 on \partial \Omega \times (0, T) \\ u & = g_2 on \partial \Omega \times (0, T) \\ p & = p_0 on \Omega \times {0} \\ u & = u_0 on \Omega \times {0} where :math:`p: \partial \Omega \times (0, T) \to \mathbb{R}` and :math:`u: \partial \Omega \times (0, T) \to \mathbb{R}` are the unknown functions, :math:`\Omega \subset \mathbb{R}` is the spatial domain and :math:`(0, T) \subset \mathbb{R}` is the time domain. Dirichlet boundary conditions are prescribed. The equation is solved on a segment domain; weak boundary and initial conditions are used. Three training strategies are compared: standard PINNs, PINNs with energy natural gradient preconditioning and PINNs with Anagram preconditioning. """ from typing import Callable, Tuple import matplotlib.pyplot as plt import torch from scimba_torch.approximation_space.abstract_space import AbstractApproxSpace from scimba_torch.approximation_space.nn_space import NNxtSpace from scimba_torch.domain.meshless_domain.domain_1d import Segment1D from scimba_torch.integration.monte_carlo import DomainSampler, TensorizedSampler from scimba_torch.integration.monte_carlo_parameters import UniformParametricSampler from scimba_torch.integration.monte_carlo_time import UniformTimeSampler from scimba_torch.neural_nets.coordinates_based_nets.mlp import GenericMLP from scimba_torch.numerical_solvers.temporal_pde.pinns import ( AnagramTemporalPinns, NaturalGradientTemporalPinns, TemporalPinns, ) from scimba_torch.optimizers.optimizers_data import OptimizerData from scimba_torch.plots.plots_nd import plot_abstract_approx_spaces from scimba_torch.utils.scimba_tensors import LabelTensor, MultiLabelTensor from scimba_torch.utils.typing_protocols import VarArgCallable class LinearizedEuler: def __init__( self, space: AbstractApproxSpace, init: Callable, f: Callable, g: Callable, **kwargs, ): self.space = space self.init = init self.f = f self.g = g self.linear = True self.residual_size = 2 self.bc_residual_size = 2 self.ic_residual_size = 2 def grad( self, w: torch.Tensor | MultiLabelTensor, y: torch.Tensor | LabelTensor, ) -> torch.Tensor | Tuple[torch.Tensor, ...]: return self.space.grad(w, y) def rhs( self, w: MultiLabelTensor, t: LabelTensor, x: LabelTensor, mu: LabelTensor ) -> Tuple[torch.Tensor]: return self.f(t, x, mu) def bc_rhs( self, w: MultiLabelTensor, t: LabelTensor, x: LabelTensor, n: LabelTensor, mu: LabelTensor, ) -> Tuple[torch.Tensor]: return self.g(t, x, mu) def time_operator( self, w: MultiLabelTensor, t: LabelTensor, x: LabelTensor, mu: LabelTensor ) -> Tuple[torch.Tensor]: p, u = w.get_components() p_t = self.grad(p, t) u_t = self.grad(u, t) return p_t, u_t def space_operator( self, w: MultiLabelTensor, t: LabelTensor, x: LabelTensor, mu: LabelTensor ) -> Tuple[torch.Tensor]: p, u = w.get_components() p_x = self.grad(p, x) u_x = self.grad(u, x) return u_x, p_x def functional_operator( self, func: VarArgCallable, t: torch.Tensor, x: torch.Tensor, mu: torch.Tensor, theta: torch.Tensor, ) -> torch.Tensor: # space operator pu_space = torch.func.jacrev(func, 1)(t, x, mu, theta) pu_space = torch.flip(pu_space, (-1,)) # time operator pu_time = torch.func.jacrev(func, 0)(t, x, mu, theta) # sum both contributions return (pu_space + pu_time).squeeze() # Dirichlet conditions def bc_operator( self, w: MultiLabelTensor, t: LabelTensor, x: LabelTensor, n: LabelTensor, mu: LabelTensor, ) -> Tuple[torch.Tensor]: p, u = w.get_components() return p, u def functional_operator_bc( self, func: VarArgCallable, t: torch.Tensor, x: torch.Tensor, n: torch.Tensor, mu: torch.Tensor, theta: torch.Tensor, ) -> torch.Tensor: return func(t, x, mu, theta) def initial_condition(self, x: LabelTensor, mu: LabelTensor) -> Tuple[torch.Tensor]: return self.init(x, mu) def functional_operator_ic( self, func: VarArgCallable, x: torch.Tensor, mu: torch.Tensor, theta: torch.Tensor, ) -> Tuple[torch.Tensor]: t = torch.zeros_like(x) return func(t, x, mu, theta) def exact_solution(t: LabelTensor, x: LabelTensor, mu: LabelTensor) -> torch.Tensor: x = x.get_components() D = 0.02 coeff = 1 / (4 * torch.pi * D) ** 0.5 p_plus_u = coeff * torch.exp(-((x - t.x - 1) ** 2) / (4 * D)) p_minus_u = coeff * torch.exp(-((x + t.x - 1) ** 2) / (4 * D)) p = (p_plus_u + p_minus_u) / 2 u = (p_plus_u - p_minus_u) / 2 return torch.cat((p, u), dim=-1) def initial_solution(x: LabelTensor, mu: LabelTensor) -> Tuple[torch.Tensor]: sol = exact_solution(LabelTensor(torch.zeros_like(x.x)), x, mu) return sol[..., 0:1], sol[..., 1:2] def f_rhs(t: LabelTensor, x: LabelTensor, mu: LabelTensor) -> Tuple[torch.Tensor]: x = x.get_components() return torch.zeros_like(x), torch.zeros_like(x) def f_bc(t: LabelTensor, x: LabelTensor, mu: LabelTensor) -> Tuple[torch.Tensor]: x = x.get_components() return torch.zeros_like(x), torch.zeros_like(x) domain_mu = [] domain_x = Segment1D((-1.0, 3.0), is_main_domain=True) t_min, t_max = 0.0, 0.5 domain_t = (t_min, t_max) sampler = TensorizedSampler( [ UniformTimeSampler(domain_t), DomainSampler(domain_x), UniformParametricSampler(domain_mu), ] ) bc_weight = 30.0 ic_weight = 300.0 # space = NNxtSpace(2, 0, GenericMLP, domain_x, sampler, layer_sizes=[80, 80, 80, 40, 20, 10]) space = NNxtSpace(2, 0, GenericMLP, domain_x, sampler, layer_sizes=[64]) pde = LinearizedEuler(space, initial_solution, f_rhs, f_bc) opt_1 = { "name": "adam", "optimizer_args": {"lr": 1.8e-2, "betas": (0.9, 0.999)}, } opt_2 = { "name": "lbfgs", "switch_at_epoch_ratio": 0.7, } pinns = TemporalPinns( pde, bc_type="weak", ic_type="weak", optimizers=OptimizerData(opt_1, opt_2), bc_weight=bc_weight, ic_weight=ic_weight, one_loss_by_equation=True, ) solving_mode = "load" # can be "new" for solve from scratch, # "resume" for load from file and continue solving, # or "load" for load from file load_post_fix = "pinns" # the post_fix of the file where to load the pinn save_post_fix = "pinns" # the post_fix of the file where to save the pinn new_solving = solving_mode == "new" resume_solving = solving_mode == "resume" try_load = resume_solving or (not new_solving) load_ok = try_load and pinns.load(__file__, load_post_fix) solve = (not load_ok) or (resume_solving or new_solving) if solve: pinns.solve( epochs=1000, n_collocation=3000, n_bc_collocation=1000, n_ic_collocation=1000, verbose=True, ) pinns.save(__file__, save_post_fix) pinns.space.load_from_best_approx() # plot_AbstractApproxSpaces( # pinns.space, # domain_x, # domain_mu, # domain_t, # time_values=[t_max,], # loss=pinns.losses, # residual=pde, # solution=exact_solution, # error=exact_solution, # derivatives=["ux", "ut"], # title="solving LinearizedEuler with TemporalPinns", # ) # # plt.show() space2 = NNxtSpace(2, 0, GenericMLP, domain_x, sampler, layer_sizes=[64]) pde2 = LinearizedEuler(space2, initial_solution, f_rhs, f_bc) pinns2 = NaturalGradientTemporalPinns( pde2, bc_type="weak", ic_type="weak", bc_weight=bc_weight, ic_weight=ic_weight, one_loss_by_equation=True, matrix_regularization=1e-6, ) solving_mode = "load" # can be "new" for solve from scratch, # "resume" for load from file and continue solving, # or "load" for load from file load_post_fix = "pinns_ENG" # the post_fix of the file where to load the pinn save_post_fix = "pinns_ENG" # the post_fix of the file where to save the pinn new_solving = solving_mode == "new" resume_solving = solving_mode == "resume" try_load = resume_solving or (not new_solving) load_ok = try_load and pinns2.load(__file__, load_post_fix) solve = (not load_ok) or (resume_solving or new_solving) if solve: pinns2.solve( epochs=1000, n_collocation=3000, n_bc_collocation=1000, n_ic_collocation=1000, verbose=True, ) pinns2.save(__file__, save_post_fix) plot_abstract_approx_spaces( ( pinns.space, pinns2.space, ), domain_x, domain_mu, domain_t, time_values=[ t_max, ], # loss=( # pinns.losses, # pinns2.losses, # ), residual=( pde, pde2, ), solution=exact_solution, error=exact_solution, derivatives=["ux", "ut"], title="solving LinearizedEuler with TemporalPinns", ) plt.show() space3 = NNxtSpace(2, 0, GenericMLP, domain_x, sampler, layer_sizes=[64]) pde3 = LinearizedEuler(space3, initial_solution, f_rhs, f_bc) pinns3 = AnagramTemporalPinns( pde3, bc_type="weak", ic_type="weak", bc_weight=bc_weight, ic_weight=ic_weight, one_loss_by_equation=True, svd_threshold=5e-1, ) solving_mode = "new" # can be "new" for solve from scratch, # "resume" for load from file and continue solving, # or "load" for load from file load_post_fix = "pinns_Anagram" # the post_fix of the file where to load the pinn save_post_fix = "pinns_Anagram" # the post_fix of the file where to save the pinn new_solving = solving_mode == "new" resume_solving = solving_mode == "resume" try_load = resume_solving or (not new_solving) load_ok = try_load and pinns3.load(__file__, load_post_fix) solve = (not load_ok) or (resume_solving or new_solving) if solve: pinns3.solve( epochs=1000, n_collocation=3000, n_bc_collocation=1000, n_ic_collocation=1000, verbose=True, ) pinns3.save(__file__, save_post_fix) plot_abstract_approx_spaces( ( pinns2.space, pinns3.space, ), domain_x, domain_mu, domain_t, time_values=[ t_max, ], # loss=( # pinns2.losses, # pinns3.losses, # ), residual=( pde2, pde3, ), solution=exact_solution, error=exact_solution, derivatives=["ux", "ut"], title="solving LinearizedEuler with TemporalPinns", ) plt.show()