r"""Solves a 1D advection equation using two time-sequential schemes. ..math:: \partial_t u + a \partial_x u = 0 The initial condition is a bump function. The equation is solved using the neural semi-Lagrangian scheme and a discrete PINN. """ # %% import datetime import matplotlib.pyplot as plt import torch from scimba_torch.approximation_space.nn_space import NNxSpace from scimba_torch.domain.mesh_based_domain.cuboid import Cuboid from scimba_torch.domain.meshless_domain.domain_1d import Segment1D from scimba_torch.integration.mesh_based_quadrature import RectangleMethod from scimba_torch.integration.monte_carlo import DomainSampler, TensorizedSampler from scimba_torch.integration.monte_carlo_parameters import UniformParametricSampler from scimba_torch.neural_nets.coordinates_based_nets.features import PeriodicMLP from scimba_torch.neural_nets.coordinates_based_nets.mlp import GenericMLP from scimba_torch.numerical_solvers.temporal_pde.discrete_pinn import DiscretePINN from scimba_torch.numerical_solvers.temporal_pde.neural_semilagrangian import ( Characteristic, NeuralSemiLagrangian, ) from scimba_torch.numerical_solvers.temporal_pde.time_discrete import ( TimeDiscreteCollocationProjector, TimeDiscreteNaturalGradientProjector, ) from scimba_torch.optimizers.optimizers_data import OptimizerData from scimba_torch.physical_models.temporal_pde.advection_diffusion_equation import ( AdvectionReactionDiffusionDirichletStrongForm, ) from scimba_torch.physical_models.temporal_pde.transport_equation import Transport1D from scimba_torch.plots.plot_time_discrete_scheme import plot_time_discrete_scheme from scimba_torch.utils.scimba_tensors import LabelTensor def f_rhs(w, t, x, mu): x_ = x.get_components() return 0 * x_ def f_bc(w, t, x, n, mu): x_ = x.get_components() return 0 * x_ def f_ini(x, mu): return exact(LabelTensor(torch.zeros_like(x.x)), x, mu) def exact(t: LabelTensor, x: LabelTensor, mu: LabelTensor) -> torch.Tensor: x_ = x.get_components() t_ = t.get_components() a = 1 v = sig0 x_ = x_ - a * t_ return 1 + torch.exp(-((x_ - 0.5) ** 2) / (2 * v**2)) # %% def make_projector(pde, adam=True): opt_adam = { "name": "adam", "optimizer_args": {"lr": 2.5e-2, "betas": (0.9, 0.999)}, } opt_lbfgs = { "name": "lbfgs", "switch_at_epoch_ratio": 0.9, "optimizer_args": { "history_size": 50, "max_iter": 50, "tolerance_grad": 1e-11, "tolerance_change": 1e-9, }, } if adam: opt = OptimizerData(opt_adam, opt_lbfgs) else: opt = OptimizerData(opt_lbfgs) return TimeDiscreteCollocationProjector(pde, optimizers=opt, rhs=f_ini) # %% def solve_with_discrete_pinn(T: float, dt: float, scheme: str): torch.random.manual_seed(0) domain_x = Segment1D((0, 2), is_main_domain=True) domain_mu = [] sampler = TensorizedSampler( [ DomainSampler(domain_x), UniformParametricSampler(domain_mu), ] ) space = NNxSpace(1, 0, PeriodicMLP, domain_x, sampler, layer_sizes=[40, 40, 40]) pde = Transport1D( space, init=f_ini, f=f_rhs, g=f_bc, exact_solution=exact, ) projector = make_projector(pde) scheme = DiscretePINN(pde, projector, scheme=scheme) scheme.initialization(epochs=1500, verbose=True, n_collocation=5000) scheme.projector.save("ini_transport") # plot(scheme.pde.space.evaluate, scheme.pde.space.integrator.sample, T=0) # scheme.projector = make_projector(space, adam=False) scheme.projector = make_projector(pde) scheme.projector.load("ini_transport") scheme.projector.space.load_from_best_approx() scheme.solve(dt=dt, T=T, epochs=200, n_collocation=5000, verbose=False) return scheme def solve_with_neural_sl( T: float, dt: float, with_natural_gradient: bool = True, with_classical_projector: bool = False, use_rectangle_method: bool = False, ): torch.random.manual_seed(0) if use_rectangle_method: domain_x = Cuboid([(0, 2)], is_main_domain=True) x_sampler = RectangleMethod else: domain_x = Segment1D((0, 2), is_main_domain=True) x_sampler = DomainSampler domain_mu = [] sampler = TensorizedSampler( [ x_sampler(domain_x), UniformParametricSampler(domain_mu), ] ) def a(t, x, mu): return 1.0 if with_classical_projector: space = NNxSpace(1, 0, PeriodicMLP, domain_x, sampler, layer_sizes=[20, 40, 20]) pde = AdvectionReactionDiffusionDirichletStrongForm( space, u0=f_ini, constant_advection=True, zero_diffusion=True, exact_solution=exact, ) characteristic = Characteristic(pde, periodic=True) projector = TimeDiscreteCollocationProjector(pde, rhs=f_ini) scheme = NeuralSemiLagrangian(characteristic, projector) scheme.initialization(epochs=750, verbose=True, n_collocation=3000) scheme.projector.save("ini_transport_SL") scheme.projector = TimeDiscreteCollocationProjector(pde, rhs=f_ini) scheme.projector.load("ini_transport_SL") scheme.projector.space.load_from_best_approx() scheme.solve(dt=dt, final_time=T, epochs=250, n_collocation=2000, verbose=True) if with_natural_gradient: space = NNxSpace(1, 0, GenericMLP, domain_x, sampler, layer_sizes=[16, 16, 16]) pde = AdvectionReactionDiffusionDirichletStrongForm( space, u0=f_ini, constant_advection=True, zero_diffusion=True, exact_solution=exact, ) characteristic = Characteristic(pde, periodic=True) projector = TimeDiscreteNaturalGradientProjector(pde, rhs=f_ini) scheme = NeuralSemiLagrangian(characteristic, projector) scheme.initialization(epochs=100, verbose=True, n_collocation=3000) scheme.projector.save("ini_transport_SL") scheme.solve(dt=dt, final_time=T, epochs=100, n_collocation=2000, verbose=True) return scheme # %% def get_spline(x: torch.Tensor, y: torch.Tensor): """Return a spline interpolation of the data (x, y).""" import numpy as np import scipy.interpolate as spi # add random noise to make sure the spline is well defined x = x[:, 0].detach().cpu().numpy() + 1e-10 * np.random.randn(len(x)) y = y[:, 0].detach().cpu().numpy() sorter = np.argsort(x) return spi.CubicSpline(x[sorter], y[sorter], bc_type="natural") def compute_output(scheme, n_visu=5_000): x_plot = torch.linspace(0, 2, 250).detach().cpu() w_exact_plot = exact( LabelTensor(torch.ones_like(x_plot)[:, None]), LabelTensor(x_plot[:, None]), LabelTensor(torch.zeros_like(x_plot)[:, None]), )[:, 0] w_exact_plot = w_exact_plot.detach().cpu().numpy() x, mu = scheme.pde.space.integrator.sample(n_visu) w_pred = scheme.pde.space.evaluate(x, mu) w_plot = get_spline(x.x, w_pred.w)(x_plot) w_error = w_exact_plot - w_plot return x_plot, w_plot, w_error def save_output(scheme, n_visu=5_000): import numpy as np out = np.array(compute_output(scheme, n_visu)) outfile_path = f"1D_constant_advection_{sig0}_{dt}_dPINN.csv" np.savetxt( outfile_path, out.T, fmt="%.3e", header="x, prediction dPINN, error dPINN", comments="", delimiter=",", ) out = scheme.errors[:, 0::2].detach().cpu() outfile_path = f"dPINN_1D_constant_advection_{sig0}_{dt}.csv" np.savetxt( outfile_path, out, fmt="%.3e", header="t, error", comments="", delimiter=",", ) sig0 = 0.1 dt = 0.02 # %% if __name__ == "__main__": print(f"dt={dt}, sig0={sig0}", flush=True) print(f"start at {datetime.datetime.now()}", flush=True) scheme = solve_with_discrete_pinn(1.0, dt, "rk4") print(f"end at {datetime.datetime.now()}", flush=True) save_output(scheme) plot_time_discrete_scheme( scheme, solution=exact, error=exact, ) plt.show() # %% if __name__ == "__main__": scheme = solve_with_neural_sl( 0.2, 0.1, with_natural_gradient=True, with_classical_projector=False, use_rectangle_method=True, ) plot_time_discrete_scheme( scheme, solution=exact, error=exact, ) plt.show() # %%