Source code for scimba_torch.numerical_solvers.temporal_pde.discrete_pinn

"""Implement a discrete PINN for solving temporal PDEs."""

import copy
import math
from typing import Callable

import torch

from scimba_torch.numerical_solvers.temporal_pde.time_discrete import (
    ExplicitTimeDiscreteScheme,
    TimeDiscreteCollocationProjector,
)
from scimba_torch.physical_models.kinetic_pde.abstract_kinetic_pde import KineticPDE
from scimba_torch.physical_models.temporal_pde.abstract_temporal_pde import TemporalPDE
from scimba_torch.utils.scimba_tensors import LabelTensor

TYPE_ARG = LabelTensor
TYPE_FUNC_RES = (
    Callable[[TYPE_ARG, TYPE_ARG], torch.Tensor]
    | Callable[[TYPE_ARG, TYPE_ARG, TYPE_ARG], torch.Tensor]
)
TYPE_FUNC_BC_RES = (
    Callable[[TYPE_ARG, TYPE_ARG, TYPE_ARG], torch.Tensor]
    | Callable[[TYPE_ARG, TYPE_ARG, TYPE_ARG, TYPE_ARG], torch.Tensor]
)
TYPE_PDES = TemporalPDE | KineticPDE


[docs] class DiscretePINN(ExplicitTimeDiscreteScheme): """A discrete PINN for solving a differential equation. It uses linear and/or nonlinear spaces. The class supports initialization of the model with a target function, computation of the right-hand side (RHS) from the model, and stepping through time using a projector. Args: pde: The PDE model, which can be either a TemporalPDE or KineticPDE. projector: The projector for training the model, typically a TimeDiscreteCollocationProjector. scheme: The time scheme used for updating the model. Options include "euler_exp", "rk2", and "rk4". **kwargs: Additional hyperparameters for the scheme. Attributes: model (nn.Module): The neural network model used for computing the solution. projector (ProjectorPINNs): The projector used for training the model. scheme (str): The time scheme used for updating the model. """ def __init__( self, pde: TYPE_PDES, projector: TimeDiscreteCollocationProjector, scheme: str = "euler_exp", **kwargs, ): super().__init__(pde, projector, **kwargs) self.scheme = scheme assert scheme in ["euler_exp", "rk2", "rk4"], "Unknown scheme in DiscretePINN"
[docs] def construct_rhs_ee(self, pde_n: TYPE_PDES, t: float, dt: float) -> TYPE_FUNC_RES: r"""Returns :math:`u_n + \Delta t f(u_n)` as a function. Args: pde_n: The PDE model at the current time step. t: The current time. dt: The time step size. Returns: A function representing the right-hand side (RHS) for the explicit Euler scheme. """ if isinstance(pde_n, TemporalPDE): def res_temporal_pde(x: LabelTensor, mu: LabelTensor) -> torch.Tensor: t_ = LabelTensor(t * torch.ones((x.shape[0], 1))) u_n = pde_n.space.evaluate(x, mu) f_u_n = pde_n.space_operator(u_n, t_, x, mu) s_u_n = pde_n.rhs(u_n, t_, x, mu) return (u_n.w - dt * f_u_n + dt * s_u_n).detach() return res_temporal_pde else: def res_kinetic_pde( x: LabelTensor, v: LabelTensor, mu: LabelTensor ) -> torch.Tensor: t_ = LabelTensor(t * torch.ones((x.shape[0], 1))) u_n = pde_n.space.evaluate(x, v, mu) f_u_n = pde_n.space_operator(u_n, t_, x, v, mu) s_u_n = pde_n.rhs(u_n, t_, x, v, mu) return (u_n.w - dt * f_u_n + dt * s_u_n).detach() return res_kinetic_pde
[docs] def construct_rhs_ee_weak_bc( self, pde_n: TYPE_PDES, t: float, dt: float ) -> tuple[TYPE_FUNC_RES, TYPE_FUNC_BC_RES]: r"""Returns :math:`u_n + \Delta t f(u_n)` as a function. Args: pde_n: The PDE model at the current time step. t: The current time. dt: The time step size. Returns: A tuple containing two functions: one for the right-hand side (RHS) and another for the boundary condition (BC) residuals for the explicit Euler scheme with weak boundary conditions. """ if isinstance(pde_n, TemporalPDE): def res_temporal_pde(x: LabelTensor, mu: LabelTensor) -> torch.Tensor: t_ = LabelTensor(t * torch.ones((x.shape[0], 1))) u_n = pde_n.space.evaluate(x, mu) f_u_n = pde_n.space_operator(u_n, t_, x, mu) s_u_n = pde_n.rhs(u_n, t_, x, mu) return (u_n.w - dt * f_u_n + dt * s_u_n).detach() def bc_res_temporal_pde( x: LabelTensor, n: LabelTensor, mu: LabelTensor ) -> torch.Tensor: t_ = LabelTensor(t * torch.ones((x.shape[0], 1))) u_n = pde_n.space.evaluate(x, mu) s_u_n = pde_n.bc_rhs(u_n, t_, x, n, mu) return s_u_n.detach() ## car on va mininiser u- bc_res return res_temporal_pde, bc_res_temporal_pde else: def res_kinetic_pde( x: LabelTensor, v: LabelTensor, mu: LabelTensor ) -> torch.Tensor: t_ = LabelTensor(t * torch.ones((x.shape[0], 1))) u_n = pde_n.space.evaluate(x, v, mu) f_u_n = pde_n.space_operator(u_n, t_, x, v, mu) s_u_n = pde_n.rhs(u_n, t_, x, v, mu) return (u_n.w - dt * f_u_n + dt * s_u_n).detach() def bc_res_kinetic_pde( x: LabelTensor, v: LabelTensor, n: LabelTensor, mu: LabelTensor ) -> torch.Tensor: t_ = LabelTensor(t * torch.ones((x.shape[0], 1))) u_n = pde_n.space.evaluate(x, v, mu) s_u_n = pde_n.bc_rhs(u_n, t_, x, v, n, mu) return s_u_n.detach() return res_kinetic_pde, bc_res_kinetic_pde
[docs] def construct_rhs_rk2( self, pde_n: TYPE_PDES, pde_nph: TYPE_PDES, t: float, dt: float ) -> TYPE_FUNC_RES: r"""Returns :math:`u_n + \Delta t f(u_n)` as a function. Args: pde_n: The PDE model at the current time step. pde_nph: The PDE model at the next half time step. t: The current time. dt: The time step size. Returns: A function representing the right-hand side (RHS) for the second-order Runge-Kutta (RK2) scheme. """ if isinstance(pde_n, TemporalPDE): assert isinstance(pde_nph, TemporalPDE) def res_temporal_pde(x: LabelTensor, mu: LabelTensor) -> torch.Tensor: t_ = LabelTensor(t * torch.ones((x.shape[0], 1))) u_n = pde_n.space.evaluate(x, mu) u_nph = pde_nph.space.evaluate(x, mu) f_u_nph = pde_nph.space_operator(u_nph, t_, x, mu) s_u_nph = pde_nph.rhs(u_n, t_, x, mu) return (u_n.w - dt * f_u_nph + dt * s_u_nph).detach() return res_temporal_pde else: assert isinstance(pde_nph, KineticPDE) def res_kinetic_pde( x: LabelTensor, v: LabelTensor, mu: LabelTensor ) -> torch.Tensor: t_ = LabelTensor(t * torch.ones((x.shape[0], 1))) u_n = pde_n.space.evaluate(x, v, mu) u_nph = pde_nph.space.evaluate(x, v, mu) f_u_nph = pde_nph.space_operator(u_nph, t_, x, v, mu) s_u_nph = pde_nph.rhs(u_n, t_, x, v, mu) return (u_n.w - dt * f_u_nph + dt * s_u_nph).detach() return res_kinetic_pde
[docs] def construct_rhs_rk2_weak_bc( self, pde_n: TYPE_PDES, pde_nph: TYPE_PDES, t: float, dt: float ) -> tuple[TYPE_FUNC_RES, TYPE_FUNC_BC_RES]: r"""Returns :math:`u_n + \Delta t f(u_n)` as a function. Args: pde_n: The PDE model at the current time step. pde_nph: The PDE model at the next half time step. t: The current time. dt: The time step size. Returns: A tuple containing two functions: one for the right-hand side (RHS) and another for the boundary condition (BC) residuals for the second-order Runge-Kutta (RK2) scheme with weak boundary conditions. """ if isinstance(pde_n, TemporalPDE): assert isinstance(pde_nph, TemporalPDE) def res_temporal_pde(x: LabelTensor, mu: LabelTensor) -> torch.Tensor: t_ = LabelTensor(t * torch.ones((x.shape[0], 1))) u_n = pde_n.space.evaluate(x, mu) u_nph = pde_nph.space.evaluate(x, mu) f_u_nph = pde_nph.space_operator(u_nph, t_, x, mu) s_u_nph = pde_nph.rhs(u_n, t_, x, mu) return (u_n.w - dt * f_u_nph + dt * s_u_nph).detach() def bc_res_temporal_pde( x: LabelTensor, n: LabelTensor, mu: LabelTensor ) -> torch.Tensor: t_ = LabelTensor(t * torch.ones((x.shape[0], 1))) u_nph = pde_nph.space.evaluate(x, mu) s_u_nph = pde_nph.bc_rhs(u_nph, t_, x, n, mu) return s_u_nph.detach() ## car on va mininiser u- bc_res return res_temporal_pde, bc_res_temporal_pde else: assert isinstance(pde_nph, KineticPDE) def res_kinetic_pde( x: LabelTensor, v: LabelTensor, mu: LabelTensor ) -> torch.Tensor: t_ = LabelTensor(t * torch.ones((x.shape[0], 1))) u_n = pde_n.space.evaluate(x, v, mu) u_nph = pde_nph.space.evaluate(x, v, mu) f_u_nph = pde_nph.space_operator(u_nph, t_, x, v, mu) s_u_nph = pde_nph.rhs(u_n, t_, x, v, mu) return (u_n.w - dt * f_u_nph + dt * s_u_nph).detach() def bc_res_kinetic_pde( x: LabelTensor, v: LabelTensor, n: LabelTensor, mu: LabelTensor ) -> torch.Tensor: t_ = LabelTensor(t * torch.ones((x.shape[0], 1))) u_nph = pde_nph.space.evaluate(x, v, mu) s_u_nph = pde_n.bc_rhs(u_nph, t_, x, v, n, mu) return s_u_nph.detach() ## car on va mininiser u- bc_res return res_kinetic_pde, bc_res_kinetic_pde
[docs] def construct_rhs_rk4( self, pde_n: TYPE_PDES, pde_1: TYPE_PDES, pde_2: TYPE_PDES, pde_3: TYPE_PDES, t: float, dt: float, ) -> TYPE_FUNC_RES: r"""Returns :math:`u_n + \Delta t f(u_n)` as a function. Args: pde_n: The PDE model at the current time step. pde_1: The PDE model at the first intermediate step. pde_2: The PDE model at the second intermediate step. pde_3: The PDE model at the third intermediate step. t: The current time. dt: The time step size. Returns: A function representing the right-hand side (RHS) of the RK4 scheme. Raises: NotImplementedError: If the PDE models are not of type TemporalPDE, as RK4 is not implemented for KineticPDE. """ if isinstance(pde_n, TemporalPDE): assert isinstance(pde_1, TemporalPDE) assert isinstance(pde_2, TemporalPDE) assert isinstance(pde_3, TemporalPDE) def res_temporal_pde(x: LabelTensor, mu: LabelTensor) -> torch.Tensor: t_ = LabelTensor(t * torch.ones((x.shape[0], 1))) u_n = pde_n.space.evaluate(x, mu) u_1 = pde_1.space.evaluate(x, mu) u_2 = pde_2.space.evaluate(x, mu) u_3 = pde_3.space.evaluate(x, mu) f_u_3 = pde_3.space_operator(u_3, t_, x, mu) s_u_3 = pde_3.rhs(u_n, t_, x, mu) u_ = (-u_n.w + u_1.w + 2 * u_2.w + u_3.w) / 3 return (u_ - dt * f_u_3 + dt * s_u_3).detach() return res_temporal_pde else: raise NotImplementedError( "construct_rhs_RK4 not implemented for KineticPDE" )
[docs] def update(self, t: float, dt: float, **kwargs): r"""Compute the new parameters :math:`\theta_{n+1}` from :math:`\theta_n`. Uses the time step dt and the chosen time scheme. Args: t: The current time. dt: The time step size. **kwargs: Additional arguments for the projector's solve method. Raises: NotImplementedError: If the RK4 scheme is chosen with weak boundary conditions, as it is not implemented. """ self.projector.best_loss = 1e10 pde_n = copy.deepcopy(self.pde) if self.scheme == "euler_exp": if self.bool_weak_bc: self.projector.rhs, self.projector.bc_rhs = ( self.construct_rhs_ee_weak_bc(pde_n, t, dt) ) else: self.projector.rhs = self.construct_rhs_ee(pde_n, t, dt) self.projector.solve(**kwargs) elif self.scheme == "rk2": if self.bool_weak_bc: self.projector.rhs, self.projector.bc_rhs = ( self.construct_rhs_ee_weak_bc(pde_n, t, dt / 2) ) else: self.projector.rhs = self.construct_rhs_ee(pde_n, t, dt / 2) self.projector.solve(**kwargs) self.projector.best_loss = 1e10 pde_nph = copy.deepcopy(self.pde) if self.bool_weak_bc: self.projector.rhs, self.projector.bc_rhs = ( self.construct_rhs_rk2_weak_bc(pde_n, pde_nph, t + dt / 2, dt) ) else: self.projector.rhs = self.construct_rhs_rk2( pde_n, pde_nph, t + dt / 2, dt ) self.projector.solve(**kwargs) elif self.scheme == "rk4": if self.bool_weak_bc: raise NotImplementedError("RK4 not implemented for weak BC conditions") self.projector.rhs = self.construct_rhs_ee(pde_n, t, dt / 2) self.projector.solve(**kwargs) self.projector.best_loss = 1e10 pde_1 = copy.deepcopy(self.pde) self.projector.rhs = self.construct_rhs_rk2( pde_n, pde_1, t + dt / 2, dt / 2 ) self.projector.solve(**kwargs) self.projector.best_loss = 1e10 pde_2 = copy.deepcopy(self.pde) self.projector.rhs = self.construct_rhs_rk2(pde_n, pde_2, t + dt / 2, dt) self.projector.solve(**kwargs) self.projector.best_loss = 1e10 pde_3 = copy.deepcopy(self.pde) self.projector.rhs = self.construct_rhs_rk4( pde_n, pde_1, pde_2, pde_3, t + dt, dt / 6 ) self.projector.solve(**kwargs)
[docs] class DiscretePINNImplicit(DiscretePINN): """Feature-branch DiscretePINN_Implicit that extends the base DiscretePINN. This subclass keeps all base behaviour (explicit schemes) and adds implicit schemes (Euler-imp, Crank–Nicolson, SDIRK2) using an extra PDE instance `pde_imp` and a `type_scheme` switch. Args: pde: The PDE model, which can be either a TemporalPDE or KineticPDE. pde_imp: The PDE for implicit scheme. projector: The projector for training the model, typically a TimeDiscreteCollocationProjector. type_scheme: a switch explicit/implicit. scheme: The time scheme used for updating the model. Options include "euler_exp", "rk2", and "rk4". **kwargs: Additional hyperparameters for the scheme. Raises: ValueError: If the time_scheme is not implicit or explicit. """ def __init__( self, pde: TYPE_PDES, pde_imp: TYPE_PDES, projector: TimeDiscreteCollocationProjector, type_scheme: str = "implicit", scheme: str = "euler_imp", **kwargs, ): super().__init__(pde, projector, scheme="euler_exp", **kwargs) self.type_scheme = type_scheme self.scheme = scheme self.pde_imp = pde_imp self.gamma_sdirk2 = 1.0 - 1.0 / math.sqrt(2.0) if type_scheme == "explicit": assert scheme in ["euler_exp", "rk2", "rk4"], ( f"Unknown explicit scheme: {scheme}" ) elif type_scheme == "implicit": assert scheme in ["euler_imp", "crank_nicolson", "sdirk2"], ( f"Unknown implicit scheme: {scheme}" ) else: raise ValueError(f"Unknown type_scheme: {type_scheme}")
[docs] def construct_rhs_ei(self, pde_n: TYPE_PDES, t: float, dt: float) -> TYPE_FUNC_RES: r"""Construct the right-hand side of the implicit Euler scheme. Returns `u_n + \Delta t \, S(u_n)` as a function, corresponding to the right-hand side (RHS) of the implicit Euler scheme. Args: pde_n: The PDE model at the current time step. t: The current time. dt: The time step size. Returns: A function representing the right-hand side (RHS) for the implicit Euler scheme, where `S(u_n)` denotes the source or reaction term of the PDE. """ if isinstance(pde_n, TemporalPDE): def res_temporal_pde(x, mu): t_ = LabelTensor(t * torch.ones((x.shape[0], 1))) u_n = pde_n.space.evaluate(x, mu) s_un = pde_n.rhs(u_n, t_, x, mu) return (u_n.w + dt * s_un).detach() return res_temporal_pde else: def res_kinetic_pde(x, mu, v): t_ = LabelTensor(t * torch.ones((x.shape[0], 1))) u_n = pde_n.space.evaluate(x, mu, v) s_un = pde_n.rhs(u_n, t_, x, v, mu) return (u_n.w + dt * s_un).detach() return res_kinetic_pde
[docs] def construct_lhs_ei( self, pde_np1: TYPE_PDES, t: float, dt: float ) -> TYPE_FUNC_RES: r"""Construct the left-hand side of the implicit Euler scheme. Returns `u_{n+1} - \Delta t \, F(u_{n+1})` as a function, corresponding to the left-hand side (LHS) of the implicit Euler scheme. Args: pde_np1: The PDE model evaluated at the next time step :math:`t_{n+1}`. t: The current time. dt: The time step size. Returns: A function representing the left-hand side (LHS) for the implicit Euler scheme, where `F(u_{n+1})` denotes the spatial operator of the PDE. """ if isinstance(pde_np1, TemporalPDE): def res_temporal_pde(x, mu, with_last_layer: bool = True): t_ = LabelTensor(t * torch.ones((x.shape[0], 1))) u_np1 = pde_np1.space.evaluate(x, mu, with_last_layer=with_last_layer) f_u_np1 = pde_np1.space_operator(u_np1, t_, x, mu) # s_u_np1 = pde_np1.rhs(u_np1, t_, x, mu) return u_np1.w + dt * f_u_np1 return res_temporal_pde else: def res_kinetic_pde(x, mu, v, with_last_layer: bool = True): t_ = LabelTensor(t * torch.ones((x.shape[0], 1))) u_np1 = pde_np1.space.evaluate( x, mu, v, with_last_layer=with_last_layer ) f_u_np1 = pde_np1.space_operator(u_np1, t_, x, v, mu) # s_u_np1 = pde_np1.rhs(u_np1, t_, x, v, mu) return u_np1.w + dt * f_u_np1 return res_kinetic_pde
[docs] def construct_rhs_cn(self, pde_n: TYPE_PDES, t: float, dt: float) -> TYPE_FUNC_RES: r"""Construct the right-hand side of the Crank–Nicolson scheme. Returns `u^n - \tfrac{1}{2} \Delta t \, (F(u^n) + S(u^n))` as a function, corresponding to the right-hand side (RHS) of the Crank–Nicolson scheme. Args: pde_n: The PDE model at the current time step. t: The current time. dt: The time step size. Returns: A function representing the right-hand side (RHS) for the Crank–Nicolson scheme, combining both the spatial operator `F(u^n)` and the source term `S(u^n)` evaluated at the current time step. """ if isinstance(pde_n, TemporalPDE): def res_temporal_pde(x, mu): t_ = LabelTensor(t * torch.ones((x.shape[0], 1))) u_n = pde_n.space.evaluate(x, mu) f_u_n = pde_n.space_operator(u_n, t_, x, mu) s_u_n = pde_n.rhs(u_n, t_, x, mu) return (u_n.w - 0.5 * dt * f_u_n - 0.5 * dt * s_u_n).detach() return res_temporal_pde else: def res_kinetic_pde(x, mu, v): t_ = LabelTensor(t * torch.ones((x.shape[0], 1))) u_n = pde_n.space.evaluate(x, mu, v) f_u_n = pde_n.space_operator(u_n, t_, x, v, mu) s_u_n = pde_n.rhs(u_n, t_, x, v, mu) return (u_n.w - 0.5 * dt * f_u_n - 0.5 * dt * s_u_n).detach() return res_kinetic_pde
[docs] def construct_lhs_cn( self, pde_np1: TYPE_PDES, t: float, dt: float ) -> TYPE_FUNC_RES: r"""Construct the left-hand side of the Crank–Nicolson scheme. Returns `u^{n+1} + \tfrac{1}{2} \Delta t \, (F(u^{n+1}) - S(u^{n+1}))` as a function, corresponding to the left-hand side (LHS) of the Crank–Nicolson scheme. Args: pde_np1: The PDE model evaluated at the next time step `t_{n+1}`. t: The current time. dt: The time step size. Returns: A function representing the left-hand side (LHS) for the Crank–Nicolson scheme, where `F(u^{n+1})` and `S(u^{n+1})` denote respectively the spatial operator and the source term of the PDE, both evaluated at the next time step. """ if isinstance(pde_np1, TemporalPDE): def res_temporal_pde(x, mu, with_last_layer: bool = True): t_ = LabelTensor(t * torch.ones((x.shape[0], 1))) u_np1 = pde_np1.space.evaluate(x, mu, with_last_layer=with_last_layer) f_u_np1 = pde_np1.space_operator(u_np1, t_, x, mu) s_u_np1 = pde_np1.rhs(u_np1, t_, x, mu) return u_np1.w + 0.5 * dt * f_u_np1 - 0.5 * dt * s_u_np1 return res_temporal_pde else: def res_kinetic_pde(x, mu, v, with_last_layer: bool = True): t_ = LabelTensor(t * torch.ones((x.shape[0], 1))) u_np1 = pde_np1.space.evaluate( x, mu, v, with_last_layer=with_last_layer ) f_u_np1 = pde_np1.space_operator(u_np1, t_, x, v, mu) s_u_np1 = pde_np1.rhs(u_np1, t_, x, v, mu) return u_np1.w + 0.5 * dt * f_u_np1 - 0.5 * dt * s_u_np1 return res_kinetic_pde
[docs] def construct_rhs_sdirk2_step1( self, pde_n: TYPE_PDES, t: float, dt: float ) -> TYPE_FUNC_RES: r"""Construct the right-hand side of the first stage of the SDIRK2 scheme. Returns `u^n` as a function, corresponding to the right-hand side (RHS) of the first stage of the SDIRK2 scheme. Args: pde_n: The PDE model at the current time step. t: The current time. dt: The time step size. Returns: A function representing the RHS for the first stage of the SDIRK2 scheme, where the initial value :math:`u^n` is used as input for the implicit solve. """ if isinstance(pde_n, TemporalPDE): def res_temporal_pde(x, mu): u_n = pde_n.space.evaluate(x, mu) return u_n.w.detach() return res_temporal_pde else: def res_kinetic_pde(x, mu, v): u_n = pde_n.space.evaluate(x, mu, v) return u_n.w.detach() return res_kinetic_pde
[docs] def construct_lhs_sdirk2_step1( self, pde: TYPE_PDES, t: float, dt: float ) -> TYPE_FUNC_RES: r"""Construct the left-hand side of the first stage of the SDIRK2 scheme. Returns `u_1 - \Delta t \, \gamma \, (F(u_1) - S(u_1))` as a function, corresponding to the left-hand side (LHS) of the first stage of the SDIRK2 scheme. Args: pde: The PDE model evaluated at the first stage. t: The current time. dt: The time step size. Returns: A function representing the LHS for the first stage of the SDIRK2 scheme, where `\gamma` is the SDIRK2 coefficient and `F` and `S` denote respectively the spatial and source operators of the PDE. """ t_stage = t + self.gamma_sdirk2 * dt if isinstance(pde, TemporalPDE): def res_temporal_pde(x, mu, with_last_layer: bool = True): t_ = LabelTensor(t_stage * torch.ones((x.shape[0], 1))) u1 = pde.space.evaluate(x, mu, with_last_layer=with_last_layer) f_u1 = pde.space_operator(u1, t_, x, mu) s_u1 = pde.rhs(u1, t_, x, mu) return ( u1.w + dt * self.gamma_sdirk2 * f_u1 - dt * self.gamma_sdirk2 * s_u1 ) return res_temporal_pde else: def res_kinetic_pde(x, mu, v, with_last_layer: bool = True): t_ = LabelTensor(t_stage * torch.ones((x.shape[0], 1))) u1 = pde.space.evaluate(x, mu, v, with_last_layer=with_last_layer) f_u1 = pde.space_operator(u1, t_, x, v, mu) s_u1 = pde.rhs(u1, t_, x, v, mu) return ( u1.w + dt * self.gamma_sdirk2 * f_u1 - dt * self.gamma_sdirk2 * s_u1 ) return res_kinetic_pde
[docs] def construct_rhs_sdirk2_step2( self, pde_n: TYPE_PDES, u1_copy: TYPE_PDES, t: float, dt: float ) -> TYPE_FUNC_RES: r"""Construct the right-hand side of the second stage of the SDIRK2 scheme. Returns `u^n - \Delta t \, (1 - \gamma) \, (F(u_1) - S(u_1))` as a function, corresponding to the right-hand side (RHS) of the second stage of the SDIRK2 scheme. Args: pde_n: The PDE model at the current time step. u1_copy: The PDE model containing the intermediate stage solution `u_1`. t: The current time. dt: The time step size. Returns: A function representing the RHS for the second stage of the SDIRK2 scheme, using the already computed intermediate solution `u_1`. """ t_stage = t + self.gamma_sdirk2 * dt if isinstance(pde_n, TemporalPDE): def res_temporal_pde(x, mu): t_ = LabelTensor(t_stage * torch.ones((x.shape[0], 1))) u_n = pde_n.space.evaluate(x, mu) u1 = u1_copy.space.evaluate(x, mu) f_u1 = pde_n.space_operator(u1, t_, x, mu) s_u1 = pde_n.rhs(u1, t_, x, mu) return ( u_n.w - dt * (1 - self.gamma_sdirk2) * f_u1 + dt * (1 - self.gamma_sdirk2) * s_u1 ).detach() return res_temporal_pde else: def res_kinetic_pde(x, mu, v): t_ = LabelTensor(t_stage * torch.ones((x.shape[0], 1))) u_n = pde_n.space.evaluate(x, mu, v) u1 = u1_copy.space.evaluate(x, mu, v) f_u1 = pde_n.space_operator(u1, t_, x, v, mu) s_u1 = pde_n.rhs(u1, t_, x, v, mu) return ( u_n.w - dt * (1 - self.gamma_sdirk2) * f_u1 + dt * (1 - self.gamma_sdirk2) * s_u1 ).detach() return res_kinetic_pde
[docs] def construct_lhs_sdirk2_step2( self, pde: TYPE_PDES, t: float, dt: float ) -> TYPE_FUNC_RES: r"""Construct the left-hand side of the second stage of the SDIRK2 scheme. Returns `u_2 - \Delta t \, \gamma \, (F(u_2) - S(u_2))` as a function, corresponding to the left-hand side (LHS) of the second stage of the SDIRK2 scheme. Args: pde: The PDE model evaluated at the second stage. t: The current time. dt: The time step size. Returns: A function representing the LHS for the second stage of the SDIRK2 scheme, where `\gamma` is the SDIRK2 coefficient and `F` and `S` denote respectively the spatial and source operators of the PDE. """ t_stage = t + self.gamma_sdirk2 * dt if isinstance(pde, TemporalPDE): def res_temporal_pde(x, mu, with_last_layer: bool = True): t_ = LabelTensor(t_stage * torch.ones((x.shape[0], 1))) u2 = pde.space.evaluate(x, mu, with_last_layer=with_last_layer) f_u2 = pde.space_operator(u2, t_, x, mu) s_u2 = pde.rhs(u2, t_, x, mu) return ( u2.w + dt * self.gamma_sdirk2 * f_u2 - dt * self.gamma_sdirk2 * s_u2 ) return res_temporal_pde else: def res_kinetic_pde(x, mu, v, with_last_layer: bool = True): t_ = LabelTensor(t_stage * torch.ones((x.shape[0], 1))) u2 = pde.space.evaluate(x, mu, v, with_last_layer=with_last_layer) f_u2 = pde.space_operator(u2, t_, x, v, mu) s_u2 = pde.rhs(u2, t_, x, v, mu) return ( u2.w + dt * self.gamma_sdirk2 * f_u2 - dt * self.gamma_sdirk2 * s_u2 ) return res_kinetic_pde
[docs] def update(self, t: float, dt: float, **kwargs): r"""Compute \theta_{n+1} from \theta_n. Compute the new parameters \theta_{n+1} from \theta_n using either explicit or implicit time schemes. Dispatches to : update (explicit) or update_implicit (implicit) depending on ``type_scheme``. Args: t: The current time. dt: The time step size. **kwargs: Additional arguments for the projector's ``solve`` method. Raises: ValueError: If ``type_scheme`` is neither ``explicit`` nor ``implicit``. """ if self.type_scheme == "explicit": super().update(t, dt, **kwargs) elif self.type_scheme == "implicit": self.update_implicit(t, dt, **kwargs) else: raise ValueError(f"Unknown scheme_type: {self.type_scheme}")
[docs] def update_implicit(self, t: float, dt: float, **kwargs): r"""Compute \theta_{n+1} from \theta_n for implicit schemes. Compute the new parameters `\theta_{n+1}` from `\theta_n` for implicit schemes, using self.pde_imp`. Uses the time step ``dt`` and the chosen implicit scheme. Args: t: The current time. dt: The time step size. **kwargs: Additional arguments for the projector's ``solve`` method. """ self.projector.best_loss = 1e10 pde_n = copy.deepcopy(self.pde_imp) pde_np1 = self.pde_imp if self.scheme == "euler_imp": self.projector.rhs = self.construct_rhs_ei(pde_n, t, dt) self.projector.lhs = self.construct_lhs_ei(pde_np1, t + dt, dt) self.projector.solve(**kwargs) elif self.scheme == "crank_nicolson": self.projector.rhs = self.construct_rhs_cn(pde_n, t, dt) self.projector.lhs = self.construct_lhs_cn(pde_np1, t + dt, dt) self.projector.solve(**kwargs) elif self.scheme == "sdirk2": self.projector.best_loss = 1e10 pde_n = copy.deepcopy(self.pde_imp) self.projector.rhs = self.construct_rhs_sdirk2_step1(pde_n, t, dt) self.projector.lhs = self.construct_lhs_sdirk2_step1(self.pde_imp, t, dt) self.projector.solve(**kwargs) self.projector.best_loss = 1e10 u1_copy = copy.deepcopy(self.pde_imp) self.projector.rhs = self.construct_rhs_sdirk2_step2(pde_n, u1_copy, t, dt) self.projector.lhs = self.construct_lhs_sdirk2_step2(self.pde_imp, t, dt) self.projector.solve(**kwargs)