r"""Solves the Vlasov equation on a periodic square. ..math:: \partial_t u + a \cdot \nabla u = 0 where :math:`u: \mathbb{R}^2 \times (0, T) \to \mathbb{R}` is the unknown function, and where the velocity field is given by :math:`a(x_1, x_2) = (x_2, \sin(x_1))`. The equation is solved using the neural semi-Lagrangian scheme, with either a classical Adam optimizer, or a natural gradient preconditioning. """ # %% import time from pathlib import Path import matplotlib.pyplot as plt import numpy as np import torch from scipy.fftpack import fft, ifft from scipy.interpolate import griddata from tqdm import tqdm from scimba_torch import PI from scimba_torch.approximation_space.nn_space import NNxSpace from scimba_torch.domain.meshless_domain.domain_2d import Square2D from scimba_torch.integration.monte_carlo import DomainSampler, TensorizedSampler from scimba_torch.integration.monte_carlo_parameters import UniformParametricSampler from scimba_torch.neural_nets.coordinates_based_nets.mlp import GenericMLP from scimba_torch.numerical_solvers.collocation_projector import ( NaturalGradientProjector, ) from scimba_torch.numerical_solvers.temporal_pde.neural_semilagrangian import ( Characteristic, NeuralSemiLagrangian, ) from scimba_torch.numerical_solvers.temporal_pde.time_discrete import ( TimeDiscreteCollocationProjector, ) from scimba_torch.optimizers.optimizers_data import OptimizerData from scimba_torch.physical_models.temporal_pde.advection_diffusion_equation import ( AdvectionReactionDiffusionDirichletStrongForm, ) from scimba_torch.utils.scimba_tensors import LabelTensor, MultiLabelTensor DEFAULT_BOUNDS = [[0.0, 2 * PI], [-6.0, 6.0]] DEFAULT_DOMAIN = Square2D(DEFAULT_BOUNDS, is_main_domain=True) SQRT_2_PI = (2 * PI) ** 0.5 SOL_PATH = Path.cwd() / Path("sol") SOL_PATH.mkdir(parents=True, exist_ok=True) # initial condition and reference solution def f_ini(x, mu): _, v = x.get_components() return torch.exp(-(v**2) / 2) / SQRT_2_PI def classical_scheme(T, nx=512, nv=8192, nt_per_t=200): def bspline_python(p, j, x): """Return the value at x in [0,1[ of the B-spline with integer nodes of degree p with support starting at j. Implemented recursively using the de Boor's recursion formula. Args: p: Degree of the B-spline. j: Starting support node. x: Evaluation point in [0,1[. Returns: Value of the B-spline at x. """ assert (x >= 0.0) & (x <= 1.0) assert isinstance(p, int) & isinstance(j, int) if p == 0: if j == 0: return 1.0 else: return 0.0 else: w = (x - j) / p w1 = (x - j - 1) / p b1 = bspline_python(p - 1, j, x) b2 = bspline_python(p - 1, j + 1, x) return w * b1 + (1 - w1) * b2 class BSplineNumpy: """Class to compute BSL advection of 1d function.""" def __init__(self, p, xmin, xmax, ncells): assert p & 1 == 1 # check that p is odd self.p = p self.ncells = ncells # compute eigenvalues of degree p b-spline matrix self.modes = 2 * np.pi * np.arange(ncells) / ncells self.deltax = (xmax - xmin) / ncells self.eig_bspl = bspline_python(p, -(p + 1) // 2, 0.0) for j in range(1, (p + 1) // 2): self.eig_bspl += ( bspline_python(p, j - (p + 1) // 2, 0.0) * 2 * np.cos(j * self.modes) ) self.eigalpha = np.zeros(ncells, dtype=complex) def interpolate_disp(self, f, alpha): """Compute the interpolating spline of degree p of odd degree of a function f on a periodic uniform mesh. The computation is performed at all points xi-alpha. Args: f: Function values on the mesh. alpha: Displacement parameter. Returns: Interpolated values at displaced points. """ p = self.p assert np.size(f) == self.ncells # compute eigenvalues of cubic splines evaluated at displaced points ishift = np.floor(-alpha / self.deltax) beta = -ishift - alpha / self.deltax self.eigalpha.fill(0.0) for j in range(-(p - 1) // 2, (p + 1) // 2 + 1): self.eigalpha += bspline_python(p, j - (p + 1) // 2, beta) * np.exp( (ishift + j) * 1j * self.modes ) # compute interpolating spline using fft and properties of circulant matrices return np.real(ifft(fft(f) * self.eigalpha / self.eig_bspl)) def interpolation_test(): """Test to check interpolation.""" n = 64 cs = BSplineNumpy(3, 0, 1, n) x = np.linspace(0, 1, n, endpoint=False) f = np.sin(x * 4 * np.pi) alpha = 0.2 return np.allclose( np.sin((x - alpha) * 4 * np.pi), cs.interpolate_disp(f, alpha) ) class Vlasov: def __init__(self, xmin, xmax, nx, vmin, vmax, nv): # Grid self.nx = nx self.x, self.dx = np.linspace(xmin, xmax, nx, endpoint=False, retstep=True) self.nv = nv self.v, self.dv = np.linspace(vmin, vmax, nv, endpoint=False, retstep=True) # Distribution function self.f = np.zeros((nx, nv)) # Interpolators for advection self.cs_x = BSplineNumpy(3, xmin, xmax, nx) self.cs_v = BSplineNumpy(3, vmin, vmax, nv) # Initialize distribution function self.X, self.V = np.meshgrid(self.x, self.v) self.f = np.exp(-0.5 * self.V**2) / np.sqrt(2.0 * np.pi) def advection_x(self, dt): a = dt * self.v for j in range(self.nv): self.f[j, :] = self.cs_x.interpolate_disp(self.f[j, :], a[j]) def advection_v(self, dt): a = dt * np.sin(self.x) for i in range(self.nx): self.f[:, i] = self.cs_v.interpolate_disp(self.f[:, i], a[i]) def run(self, nt, dt): self.advection_x(0.5 * dt) for _ in tqdm(range(nt)): self.advection_v(dt) self.advection_x(dt) xmin, xmax = DEFAULT_BOUNDS[0] vmin, vmax = DEFAULT_BOUNDS[1] sim = Vlasov(xmin, xmax, nx, vmin, vmax, nv) nt = int(nt_per_t * T) t, dt = np.linspace(0, T, nt, retstep=True) sol_path = SOL_PATH / Path(f"2D_FakeVlasov_{T}_{nx}_{nv}_SL.pt") try: f = torch.load(sol_path) except FileNotFoundError: sim.run(nt, dt) f = torch.tensor(sim.f) torch.save(f, sol_path) return torch.tensor(sim.X), torch.tensor(sim.V), f def sol_ref(t, x, mu, x1_ref, x2_ref, u_ref): nx_ref = int(x1_ref.shape[0]) nv_ref = int(x1_ref.shape[1]) # add a point to the grid to avoid interpolation issues dx = x1_ref[:, -1] - x1_ref[:, -2] + 1e-15 x1_ref = torch.cat((x1_ref, (x1_ref[:, -1] + dx).unsqueeze(1)), dim=1) x1_ref = torch.cat((x1_ref, x1_ref[-1, :].unsqueeze(0)), dim=0) dy = x2_ref[-1, :] - x2_ref[-2, :] + 1e-15 x2_ref = torch.cat((x2_ref, (x2_ref[-1, :] + dy).unsqueeze(0)), dim=0) x2_ref = torch.cat((x2_ref, x2_ref[:, -1].unsqueeze(1)), dim=1) u_ref = torch.cat((u_ref, u_ref[:, 0].unsqueeze(1)), dim=1) u_ref = torch.cat((u_ref, u_ref[0, :].unsqueeze(0)), dim=0) # interpolate x1_ref = x1_ref.flatten().detach().cpu() x2_ref = x2_ref.flatten().detach().cpu() u_ref = u_ref.flatten().detach().cpu() x1, x2 = x.get_components() x1 = x1.flatten().detach().cpu() x2 = x2.flatten().detach().cpu() nx = int(x1.shape[0] ** 0.5) sol_path = SOL_PATH / Path( f"2D_FakeVlasov_interpolated_{t}_{nx_ref}_{nv_ref}_{nx}_SL.pt" ) try: sol = torch.load(sol_path) except FileNotFoundError: sol = torch.tensor(griddata((x1_ref, x2_ref), u_ref, (x1, x2), method="cubic")) torch.save(sol, sol_path) return sol # plotting def force_aspect(ax, ratio=1): xleft, xright = ax.get_xlim() ybottom, ytop = ax.get_ylim() ax.set_aspect(abs((xright - xleft) / (ybottom - ytop)) * ratio) def compute_errors(w, w_exact): w_error = torch.abs(w - w_exact) L2_error = torch.sqrt(torch.mean(w_error**2)) title_error = f"Error, L2={L2_error:3.2e}" return w_error, title_error def plot_2d_contourf( fig, ax, x1, x2, w, w_exact=None, n_visu=768, n_contour=250, colormap=None, draw_contours=True, draw_exact_contours=True, n_drawn_contours=10, error=False, respect_aspect_ratio=False, ): import numpy as np if error: draw_exact_contours = False if draw_exact_contours and w_exact is None: raise ValueError("w_exact must be provided if draw_exact_contours is True") im = ax.contourf( x1, x2, w, n_contour, cmap="turbo" if colormap is None else colormap, zorder=-9, ) if draw_contours: ax.contour( im, levels=im.levels[:: n_contour // n_drawn_contours], zorder=-9, colors="white", alpha=0.5, linewidths=0.8, ) if draw_exact_contours: ax.contour( x1, x2, w_exact, n_drawn_contours, zorder=-9, colors="black", alpha=0.5, linewidths=1.2, ) if respect_aspect_ratio: ax.set_aspect("equal") else: force_aspect(ax, 1) ticks = np.linspace(w.min(), w.max(), 8) if error: cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04, ticks=ticks) cbar.ax.yaxis.get_major_formatter().set_powerlimits((0, 0)) cbar.ax.yaxis.get_major_formatter().useMathText = True else: cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04, ticks=ticks) def plot_results(scheme, T, n_visu=768, init=False): T = float(T) save_path = f"2D_FakeVlasov_{T}" fig, ax = plt.subplots(1, 2, figsize=(10, 5), tight_layout=True) # compute the exact solution x1 = torch.linspace(*DEFAULT_BOUNDS[0], n_visu) x2 = torch.linspace(*DEFAULT_BOUNDS[1], n_visu) x1, x2 = torch.meshgrid(x1, x2, indexing="ij") x = LabelTensor(torch.stack((x1.flatten(), x2.flatten()), dim=1)) mu = LabelTensor(torch.ones((n_visu**2, 0))) if init: w_exact = f_ini(x, mu).detach().cpu().reshape(n_visu, n_visu) else: w_exact = sol_ref(T, x, mu, *classical_scheme(T)) w_exact = w_exact.detach().cpu().reshape((n_visu, n_visu)) x1 = x1.detach().cpu() x2 = x2.detach().cpu() # take care of SL w_pred = scheme.pde.space.evaluate(x, mu).w.detach().cpu().reshape(n_visu, n_visu) w_error, title_error = compute_errors(w_pred, w_exact) title_error = f"SL {title_error}" print("\n" + "#" * len(title_error)) print(f"{title_error}") print("#" * len(title_error), flush=True) plot_2d_contourf(fig, ax[0], x1, x2, w_pred, w_exact, n_visu=n_visu) ax[0].set_title("prediction, SL") fig_s, ax_s = plt.subplots(1, 1, figsize=(5, 5), tight_layout=True) plot_2d_contourf(fig_s, ax_s, x1, x2, w_pred, w_exact, n_visu=n_visu) plt.gca().set_rasterization_zorder(-1) fig_s.savefig(SOL_PATH / (save_path + "_SL_prediction.pdf"), format="pdf") plt.close(fig_s) plot_2d_contourf( fig, ax[1], x1, x2, w_error, n_visu=n_visu, error=True, colormap="jet" ) ax[1].set_title(f"{title_error}") fig_s, ax_s = plt.subplots(1, 1, figsize=(5, 5), tight_layout=True) plot_2d_contourf( fig_s, ax_s, x1, x2, w_error, n_visu=n_visu, error=True, colormap="jet" ) plt.gca().set_rasterization_zorder(-1) fig_s.savefig(SOL_PATH / (save_path + "_SL_error.pdf"), format="pdf") plt.close(fig_s) # wrap everything up plt.tight_layout() plt.gca().set_rasterization_zorder(-1) plt.show() # classical optimizers def opt(): opt_1 = { "name": "adam", "optimizer_args": {"lr": 2.5e-2, "betas": (0.9, 0.999)}, } opt_2 = { "name": "lbfgs", "switch_at_epoch_ratio": 0.9, "optimizer_args": { "history_size": 50, "max_iter": 50, "tolerance_grad": 1e-11, "tolerance_change": 1e-9, }, } return OptimizerData(opt_1, opt_2) # %% class NonUniformSampler(DomainSampler): def __init__(self, f, domain, **kwargs): super().__init__(domain, **kwargs) self.f = f def sample_around_zeroes(self, n, sigma=0.05, keep=0.75): candidates = LabelTensor(self.vol_sampler.sample(10 * n)) parameters = LabelTensor(torch.zeros((10 * n, 0))) f_values = self.f(candidates, parameters).w[:, 0] weights = torch.exp(-(f_values**2) / (2 * sigma**2)) return candidates[weights > keep].x def sample_new_points(self, n: int): """Samples `n` new points from the volumetric domain and labels them by subdomains. Args: n (int): Number of points to sample. Returns: ScimbaTensor: A tensor of sampled points with their subdomain labels. """ pts_1 = self.sample_around_zeroes(10 * n, sigma=0.05) if pts_1.shape[0] >= n // 4: pts_1 = pts_1[: n // 4] pts_2 = self.sample_around_zeroes(10 * n, sigma=0.1) pts_2 = torch.cat((pts_1, pts_2), dim=0) if pts_2.shape[0] >= n // 2: pts_2 = pts_2[: n // 2] pts_3 = self.sample_around_zeroes(n, sigma=10) pts_3 = torch.cat((pts_2, pts_3), dim=0) if pts_3.shape[0] >= 3 * n // 4: pts_3 = pts_3[: 3 * n // 4] remaining = n - pts_3.shape[0] pts_unif = self.vol_sampler.sample(remaining) pts = torch.cat((pts_3, pts_unif), dim=0) labels = torch.zeros(n, dtype=int) # Label points based on subdomains for i, subdomain in enumerate(self.domain.list_subdomains): condition = subdomain.is_inside(pts) labels[condition] = i + 1 # Wrap the points and labels in a ScimbaTensor data = LabelTensor(pts, labels) data.x.requires_grad_() return data def make_sampler(space, domain, with_non_uniform_sampler): if with_non_uniform_sampler: def f(x, mu): x.x.requires_grad_() w = space.evaluate(x, mu) _, w_v = space.grad(w, x) res = 1 / (1e-1 + w_v**2) # x_ = x.x.detach().cpu() # plt.scatter(x_[:, 0], x_[:, 1], c=res.detach().cpu(), s=0.2) # plt.colorbar() # plt.show() return MultiLabelTensor(res - res.min()) sampler = NonUniformSampler(f, domain) else: sampler = DomainSampler(domain) return TensorizedSampler([sampler, UniformParametricSampler([])]) # %% def solve_with_neural_sl( T: float, dt: float, with_natural_gradient: bool = True, with_classical_projector: bool = False, with_non_uniform_sampler: bool = True, N_c: int = 10_000, epochs_init=250, epochs=150, retrain_initial_condition: bool = False, ): torch.random.manual_seed(0) domain_x = DEFAULT_DOMAIN x_sampler = DomainSampler sampler = TensorizedSampler( [ x_sampler(domain_x), UniformParametricSampler([]), ] ) def a(t, x, mu): x_, v_ = x.get_components() return torch.cat((v_, torch.sin(x_)), dim=1) if with_classical_projector: space = NNxSpace( 1, 0, GenericMLP, domain_x, sampler, layer_sizes=[60, 60, 60], activation_type="tanh", ) pde = AdvectionReactionDiffusionDirichletStrongForm( space, a=a, u0=f_ini, constant_advection=False, zero_diffusion=True ) sampler = make_sampler(space, with_non_uniform_sampler) space.integrator = sampler pde.space = space characteristic = Characteristic(pde, periodic=True) projector = TimeDiscreteCollocationProjector(pde, rhs=f_ini) scheme = NeuralSemiLagrangian(characteristic, projector) scheme.initialization(epochs=750, n_collocation=N_c, verbose=True) scheme.projector.save("ini_transport_2D_SL") # Check initialization plot_results(scheme, 0, init=True) scheme.projector = TimeDiscreteCollocationProjector( pde, rhs=f_ini, optimizers=opt() ) scheme.projector.load("ini_transport_2D_SL") scheme.projector.space.load_from_best_approx() scheme.solve(dt=dt, final_time=T, epochs=750, n_collocation=N_c, verbose=True) # Check results plot_results(scheme, T) if with_natural_gradient: space = NNxSpace( 1, 0, GenericMLP, domain_x, sampler, layer_sizes=[25, 50, 25], activation_type="regularized_hat", ) pde = AdvectionReactionDiffusionDirichletStrongForm( space, a=a, u0=f_ini, constant_advection=False, zero_diffusion=True ) sampler = make_sampler(space, domain_x, with_non_uniform_sampler) space.integrator = sampler pde.space = space characteristic = Characteristic(pde, periodic=True) if retrain_initial_condition: projector = NaturalGradientProjector(space, rhs=f_ini) scheme = NeuralSemiLagrangian(characteristic, projector) scheme.initialization(epochs=epochs_init, n_collocation=N_c, verbose=True) scheme.projector.save("ini_FakeVlasov_SL") plot_results(scheme, 0, init=True) projector = NaturalGradientProjector(space, rhs=f_ini) projector.load("ini_FakeVlasov_SL") projector.space.load_from_best_approx() scheme = NeuralSemiLagrangian(characteristic, projector) scheme.solve( dt=dt, final_time=T, epochs=epochs, n_collocation=N_c, verbose=True ) return scheme # %% T = 3.0 dt = 1.5 # %% start = time.perf_counter() scheme = solve_with_neural_sl( T, dt, with_natural_gradient=True, with_classical_projector=False, with_non_uniform_sampler=True, N_c=192**2, epochs_init=250, epochs=200, # retrain_initial_condition=True, ) print(f"Elapsed time: {time.perf_counter() - start:.2f} s") # %% plot_results(scheme, T, n_visu=768) # %%