r"""Solves a 1D parametric advection equation using two time-sequential schemes. ..math:: \partial_t u + a \partial_x u = 0 The function :math:`u` depends on space and on two parameters: the velocity :math:`a` and the variance of the initial Gaussian bump. The equation is solved using the neural semi-Lagrangian scheme and a discrete PINN. """ # %% import datetime import matplotlib.pyplot as plt import numpy as np import torch from scimba_torch.approximation_space.nn_space import NNxSpace from scimba_torch.domain.mesh_based_domain.cuboid import Cuboid from scimba_torch.domain.meshless_domain.domain_1d import Segment1D from scimba_torch.integration.mesh_based_quadrature import RectangleMethod from scimba_torch.integration.monte_carlo import DomainSampler, TensorizedSampler from scimba_torch.integration.monte_carlo_parameters import UniformParametricSampler from scimba_torch.neural_nets.coordinates_based_nets.features import PeriodicMLP from scimba_torch.neural_nets.coordinates_based_nets.mlp import GenericMLP from scimba_torch.numerical_solvers.temporal_pde.discrete_pinn import ( DiscretePINN, ) from scimba_torch.numerical_solvers.temporal_pde.neural_semilagrangian import ( Characteristic, NeuralSemiLagrangian, ) from scimba_torch.numerical_solvers.temporal_pde.time_discrete import ( TimeDiscreteCollocationProjector, TimeDiscreteNaturalGradientProjector, ) from scimba_torch.optimizers.optimizers_data import OptimizerData from scimba_torch.physical_models.temporal_pde.advection_diffusion_equation import ( AdvectionReactionDiffusionDirichletStrongForm, ) from scimba_torch.physical_models.temporal_pde.transport_equation import Transport1D from scimba_torch.utils.scimba_tensors import LabelTensor torch.random.manual_seed(0) def f_rhs(w, t, x, mu): x_ = x.get_components() return 0 * x_ def f_bc(w, t, x, n, mu): x_ = x.get_components() return 0 * x_ def f_ini(x, mu): return exact(LabelTensor(torch.zeros_like(x.x)), x, mu) def exact(t: LabelTensor, x: LabelTensor, mu: LabelTensor) -> torch.Tensor: x_ = x.get_components() t_ = t.get_components() a, v = mu.get_components() x_ = x_ - a * t_ return 1 + torch.exp(-((x_ - 0.5) ** 2) / (2 * v**2)) def f_a(t, x, mu): a, v = mu.get_components() return a # %% def make_projector(pde, adam=True): opt_adam = { "name": "adam", "optimizer_args": {"lr": 2.5e-2, "betas": (0.9, 0.999)}, } opt_lbfgs = { "name": "lbfgs", "switch_at_epoch_ratio": 0.9, "optimizer_args": { "history_size": 50, "max_iter": 50, "tolerance_grad": 1e-11, "tolerance_change": 1e-9, }, } if adam: opt = OptimizerData(opt_adam, opt_lbfgs) else: opt = OptimizerData(opt_lbfgs) return TimeDiscreteCollocationProjector(pde, optimizers=opt, rhs=f_ini) # %% def solve_with_discrete_pinn(T: float, dt: float, scheme: str): torch.random.manual_seed(0) domain_x = Segment1D((0, 2), is_main_domain=True) sampler = TensorizedSampler( [ DomainSampler(domain_x), UniformParametricSampler([(0.5, 1.0), (0.05, 0.15)]), ] ) space = NNxSpace( 1, 2, PeriodicMLP, domain_x, sampler, layer_sizes=[40, 60, 80, 60, 40] ) pde = Transport1D( space, a=f_a, init=f_ini, f=f_rhs, g=f_bc, exact_solution=exact, ) projector = make_projector(pde) scheme = DiscretePINN(pde, projector, scheme=scheme) scheme.initialization(epochs=2500, verbose=True, n_collocation=10000) scheme.projector.save("ini_transport") plot(scheme, T=0) scheme.projector = make_projector(pde, adam=False) scheme.projector.load("ini_transport") scheme.projector.space.load_from_best_approx() scheme.solve(dt=dt, T=T, epochs=100, n_collocation=10000, verbose=True, plot=plot) plot(scheme, T=T) # print(">>>> Begin natural gradient >>>>>") # space2 = NNxSpace(1, 0, PeriodicMLP, domain_x, sampler, layer_sizes=[16, 8]) # pde2 = Transport1D(space2, init=f_ini, f=f_rhs, g=f_bc) # projector2 = NaturalGradientProjector(space2, rhs=f_ini) # scheme2 = DiscretePINN(pde2, projector2, scheme="rk4") # scheme2.initialization(epochs=500, verbose=True, n_collocation=3000) # plot(scheme2, T=0) # scheme2.solve(dt=dt, T=T, epochs=15, n_collocation=3000, verbose=False) # plot(scheme2, T=T) return scheme # scheme2 def solve_with_neural_sl( T: float, dt: float, with_natural_gradient: bool = True, with_classical_projector: bool = False, use_rectangle_method: bool = False, ): torch.random.manual_seed(0) if use_rectangle_method: domain_x = Cuboid([(0, 2)], is_main_domain=True) x_sampler = RectangleMethod else: domain_x = Segment1D((0, 2), is_main_domain=True) x_sampler = DomainSampler sampler = TensorizedSampler( [ x_sampler(domain_x), UniformParametricSampler([]), ] ) def a(t, x, mu): return 1.0 if with_classical_projector: space = NNxSpace(1, 0, PeriodicMLP, domain_x, sampler, layer_sizes=[20, 40, 20]) pde = AdvectionReactionDiffusionDirichletStrongForm( space, u0=f_ini, constant_advection=True, zero_diffusion=True, exact_solution=exact, ) characteristic = Characteristic(pde, periodic=True) projector = TimeDiscreteCollocationProjector(pde, rhs=f_ini) scheme = NeuralSemiLagrangian(characteristic, projector) scheme.initialization(epochs=750, verbose=True, n_collocation=3000) scheme.projector.save("ini_transport_SL") plot(scheme, T=0) scheme.projector = TimeDiscreteCollocationProjector(pde, rhs=f_ini) scheme.projector.load("ini_transport_SL") scheme.projector.space.load_from_best_approx() scheme.solve(dt=dt, final_time=T, epochs=250, n_collocation=2000, verbose=True) plot(scheme, T=T) if with_natural_gradient: space = NNxSpace(1, 0, GenericMLP, domain_x, sampler, layer_sizes=[16, 16, 16]) pde = AdvectionReactionDiffusionDirichletStrongForm( space, u0=f_ini, constant_advection=True, zero_diffusion=True, exact_solution=exact, ) characteristic = Characteristic(pde, periodic=True) projector = TimeDiscreteNaturalGradientProjector(pde, rhs=f_ini) scheme = NeuralSemiLagrangian(characteristic, projector) scheme.initialization(epochs=100, verbose=True, n_collocation=3000) scheme.projector.save("ini_transport_SL") plot(scheme, T=0) scheme.solve(dt=dt, final_time=T, epochs=100, n_collocation=2000, verbose=True) plot(scheme, T=T) return scheme # %% def get_spline(x: torch.Tensor, y: torch.Tensor): """Return a spline interpolation of the data (x, y).""" import scipy.interpolate as spi # add random noise to make sure the spline is well defined x = x[:, 0].detach().cpu().numpy() + 1e-10 * np.random.randn(len(x)) y = y[:, 0].detach().cpu().numpy() sorter = np.argsort(x) return spi.CubicSpline(x[sorter], y[sorter], bc_type="natural") def compute_output(scheme: DiscretePINN, T: float, a: float, v: float, n_visu=5_000): x_plot = torch.linspace(0, 2, 250).detach().cpu() x_plot_ = x_plot[:, None] a_ = torch.ones_like(x_plot) * a v_ = torch.ones_like(x_plot) * v mu_plot = LabelTensor(torch.stack((a_, v_), dim=1)) T_plot = LabelTensor(torch.ones_like(x_plot_) * T) w_exact_plot = exact(T_plot, LabelTensor(x_plot_), mu_plot)[:, 0] x, _ = scheme.pde.space.integrator.sample(n_visu) a_ = torch.ones_like(x.x) * a v_ = torch.ones_like(x.x) * v mu = LabelTensor(torch.cat((a_, v_), dim=1)) w_pred = scheme.pde.space.evaluate(x, mu) w_plot = get_spline(x.x, w_pred.w)(x_plot) w_error = w_exact_plot - w_plot return x_plot, w_plot, w_exact_plot, w_error def plot(scheme: DiscretePINN, T=0, n_visu=2_000, iter=None): all_a = [0.55, 0.75, 0.95] all_v = [0.075, 0.1, 0.125] for a_, v_ in zip(all_a, all_v): x_plot, w_plot, w_exact_plot, w_error = compute_output( scheme, T, a_, v_, n_visu ) fig, axs = plt.subplots(1, 2, figsize=(12, 6)) # Plot predictions and expected values axs[0].scatter(x_plot, w_plot, label="pred") axs[0].scatter(x_plot, w_exact_plot, label="true", s=0.5) axs[0].legend() axs[0].set_title("Predictions vs Expected") # Plot error axs[1].scatter(x_plot, w_error, label="error") axs[1].legend() axs[1].set_title("Error") plt.show() def save_output(scheme, n_visu=5_000): all_a = [0.55, 0.75, 0.95] all_v = [0.075, 0.1, 0.125] for a_, v_ in zip(all_a, all_v): out = np.array(compute_output(scheme, 1, a_, v_, n_visu)) out = np.delete(out, 2, axis=0) outfile_path = f"1D_param_advection_{dt}_[{v_}, {a_}]_dPINN.csv" np.savetxt( outfile_path, np.array(out).T, fmt="%.3e", header="x, prediction dPINN, error dPINN", comments="", delimiter=",", ) # %% if __name__ == "__main__": dt = 0.025 print(f"dt={dt}", flush=True) print(f"start at {datetime.datetime.now()}", flush=True) scheme = solve_with_discrete_pinn(1, dt, "rk4") print(f"end at {datetime.datetime.now()}", flush=True) # %% def local_error(scheme: DiscretePINN, T: float, mu: torch.tensor, n_visu: int = 5_000): x, _ = scheme.pde.space.integrator.sample(n_visu) mu_ = LabelTensor(torch.tensor(mu).repeat(n_visu, 1)) w_pred = scheme.pde.space.evaluate(x, mu_) w_exact = exact(LabelTensor(torch.ones_like(x.x)), x, mu_) return w_exact[:, 0] - w_pred.w[:, 0] def get_error( scheme: DiscretePINN, T: float, all_mus: torch.tensor, n_visu: int = 5_000 ) -> torch.tensor: import concurrent.futures from tqdm import tqdm def compute_error(mu): return local_error(scheme, T, mu, n_visu) with concurrent.futures.ThreadPoolExecutor() as executor: res = list(tqdm(executor.map(compute_error, all_mus), total=len(all_mus))) return torch.stack(res) def process_error_wrt_parameters(res): errors = np.sqrt((res.detach().numpy() ** 2).mean(axis=1)) min_error = errors.min() avg_error = errors.mean() max_error = errors.max() std_error = errors.std() return min_error, avg_error, max_error, std_error # %% if __name__ == "__main__": np.random.seed(0) torch.random.manual_seed(0) n_random = 2_000 all_mus = np.vstack( (np.random.uniform(0.5, 1.0, n_random), np.random.uniform(0.05, 0.15, n_random)) ).T res = get_error(scheme, 1, all_mus, n_visu=5_000) stats = process_error_wrt_parameters(res) # %%