r"""Solves an advection-diffusion equation in dimension 1 to 8. ..math:: \partial_t u + a \cdot \nabla u - D \Delta u = 0 The initial condition is an anisotropic, high-rank Gaussian bump. The equation is solved using the neural semi-Lagrangian scheme with a natural gradient projector. """ # %% import time import matplotlib.pyplot as plt import torch from scimba_torch.approximation_space.nn_space import NNxSpace from scimba_torch.domain.meshless_domain.domain_nd import HypercubeND from scimba_torch.integration.monte_carlo import DomainSampler, TensorizedSampler from scimba_torch.integration.monte_carlo_parameters import UniformParametricSampler from scimba_torch.neural_nets.coordinates_based_nets.mlp import GenericMLP from scimba_torch.numerical_solvers.temporal_pde.neural_semilagrangian import ( Characteristic, NeuralSemiLagrangian, ) from scimba_torch.numerical_solvers.temporal_pde.time_discrete import ( TimeDiscreteNaturalGradientProjector, ) from scimba_torch.physical_models.temporal_pde.advection_diffusion_equation import ( AdvectionReactionDiffusionDirichletStrongForm, ) from scimba_torch.utils.scimba_tensors import LabelTensor, MultiLabelTensor def identity(d): return torch.eye(d).unsqueeze(0) def advection_field(d): return torch.ones(d).unsqueeze(0) def initial_mean(d): return -0.5 * torch.ones(d).unsqueeze(0) DIFFUSION_COEFFICIENT = 5e-3 MU_SAMPLER = UniformParametricSampler([]) def initial_covariance(d): # return torch.eye(d).unsqueeze(0) * 0.05**2 # return (torch.ones(d) + (d - 1) * torch.eye(d)).unsqueeze(0) * 0.05**2 Σ0 = 2 * d * torch.eye(d) for i in range(d): for j in range(d): if i != j: Σ0[i, j] = d - abs(i - j) Σ0 = Σ0 * 0.05**2 / (2 * d) return Σ0.unsqueeze(0) def f_ini(x, mu, dim): return f_exact(0, x, mu, dim) def f_exact(t, x, mu, dim): def M(t): # noqa: N802 return initial_mean(dim) + advection_field(dim) * t def Σ(t): # noqa: N802 Σ0 = initial_covariance(dim) return Σ0 + 2 * DIFFUSION_COEFFICIENT * t[..., None] * identity(dim) x_ = x.get_components() if dim == 1: x_ = (x_,) x_ = torch.cat(x_, dim=1) if isinstance(t, (int, float)): t = t * torch.ones((x_.shape[0], 1)) if isinstance(t, torch.Tensor): t = LabelTensor(t) t_ = t.get_components() xt = x_ - M(t_) # xt = torch.remainder(xt + 1, 2) - 1 det_Σ0 = torch.linalg.det(initial_covariance(dim)) det_Σ = torch.linalg.det(Σ(t_)) arg = -0.5 * torch.einsum("bi,bi->b", xt, torch.linalg.solve(Σ(t_), xt)) res = 1 + torch.exp(arg) / torch.sqrt(det_Σ / det_Σ0) return res[:, None] def diffusion_matrix(t, x, mu): batch_size, n = x.shape[0], x.shape[1] return (DIFFUSION_COEFFICIENT * torch.eye(n)).unsqueeze(0).repeat(batch_size, 1, 1) # %% class NonUniformSampler(DomainSampler): def __init__(self, f, domain, **kwargs): super().__init__(domain, **kwargs) self.f = f self.mu_sampler = MU_SAMPLER def sample_around_zeroes(self, n, sigma=0.05, keep=0.75): candidates = LabelTensor(self.vol_sampler.sample(10 * n)) parameters = self.mu_sampler.sample(10 * n) f_values = self.f(candidates, parameters).w[:, 0] weights = torch.exp(-(f_values**2) / (2 * sigma**2)) return candidates[weights > keep].x def sample_new_points(self, n: int): """Samples `n` new points from the volumetric domain and labels them by subdomains. Args: n (int): Number of points to sample. Returns: ScimbaTensor: A tensor of sampled points with their subdomain labels. """ pts_1 = self.sample_around_zeroes(n, sigma=20) if pts_1.shape[0] >= n // 4: pts_1 = pts_1[: n // 4] pts_2 = self.sample_around_zeroes(n, sigma=100) pts_2 = torch.cat((pts_1, pts_2), dim=0) if pts_2.shape[0] >= n // 2: pts_2 = pts_2[: n // 2] pts_3 = self.sample_around_zeroes(n, sigma=5000) pts_3 = torch.cat((pts_2, pts_3), dim=0) if pts_3.shape[0] >= 3 * n // 4: pts_3 = pts_3[: 3 * n // 4] remaining = n - pts_3.shape[0] pts_unif = self.vol_sampler.sample(remaining) pts = torch.cat((pts_3, pts_unif), dim=0) labels = torch.zeros(n, dtype=int) # Label points based on subdomains for i, subdomain in enumerate(self.domain.list_subdomains): condition = subdomain.is_inside(pts) labels[condition] = i + 1 # Wrap the points and labels in a ScimbaTensor data = LabelTensor(pts, labels) data.x.requires_grad_() return data # %% def solve_with_neural_sl( T: float, dt: float, dim: int, layer_sizes: list, N_c: int, epochs: int, epochs_init: int, retrain_initial_condition=False, verbose=False, ) -> tuple: assert isinstance(dim, int) and dim > 0 if DIFFUSION_COEFFICIENT == 0: file_desc = "advection" else: file_desc = "advection_diffusion" torch.random.manual_seed(0) BOUNDS = [(-0.75, 1 / dim**1.5)] * dim DOMAIN = HypercubeND(BOUNDS, is_main_domain=True) sampler_unif = TensorizedSampler([DomainSampler(DOMAIN), MU_SAMPLER]) space = NNxSpace( 1, 0, GenericMLP, DOMAIN, sampler_unif, layer_sizes=layer_sizes, activation_type="regularized_hat", ) def grad_for_sample(x, mu): x.x.requires_grad_() w = space.evaluate(x, mu).w dw = space.grad(w, x) if dim == 1: dw = (dw,) dw = torch.cat(list(dw), dim=1) grad_w2 = (dw**2).sum(dim=1, keepdim=True) res = 1 / (1e-2 + grad_w2) return MultiLabelTensor(res) sampler = TensorizedSampler( [NonUniformSampler(grad_for_sample, DOMAIN), MU_SAMPLER] ) space.integrator = sampler def initial_condition(x, mu): return f_ini(x, mu, dim) pde = AdvectionReactionDiffusionDirichletStrongForm( space, d=diffusion_matrix, u0=initial_condition, constant_advection=True, constant_diffusion=True, ) characteristic = Characteristic( pde, periodic=True, diffusion_coefficient=DIFFUSION_COEFFICIENT, ) projector = TimeDiscreteNaturalGradientProjector(pde, rhs=initial_condition) scheme = NeuralSemiLagrangian(characteristic, projector) cpu_init = 0 if retrain_initial_condition: cpu_init = time.perf_counter() scheme.initialization(epochs=epochs_init, n_collocation=N_c, verbose=verbose) cpu_init = time.perf_counter() - cpu_init scheme.projector.save(f"ini_{file_desc}_{dim}D") projector = TimeDiscreteNaturalGradientProjector(pde, rhs=initial_condition) projector.load(f"ini_{file_desc}_{dim}D") projector.space.load_from_best_approx() scheme = NeuralSemiLagrangian(characteristic, projector) cpu_iter = time.perf_counter() scheme.solve( dt=dt, final_time=T, epochs=epochs, n_collocation=N_c, verbose=verbose, save=f"{file_desc}_{dim}D", ) cpu_iter = time.perf_counter() - cpu_iter return scheme, sampler, cpu_init, cpu_iter def compute_error(scheme, sampler, dim, t=1, n=500_000): x, mu = sampler.sample(n) u = scheme.pde.space.evaluate(x, mu).w u_exact = f_exact(t, x, mu, dim) error = torch.abs(u - u_exact) / torch.abs(u_exact) L2_error = torch.sqrt(torch.mean(error**2)).item() Linf_error = torch.max(error).item() return L2_error, Linf_error # %% # dims = list(range(1, 4)) dims = list(range(1, 2)) # dims = list(range(2, 3)) # dims = list(range(3, 4)) # dims = list(range(4, 5)) # dims = list(range(5, 6)) # dims = list(range(6, 9)) # dims = list(range(1, 9)) # dims = list(range(1, 5)) nts = [20] # nts = [2**i for i in range(1, 8)] res = [] # plot = False plot = True for dim in dims: res_ = [] for nt in nts: print("#" * 42, flush=True) print( f"Solving {dim}D advection-diffusion equation with {nt} time steps...", flush=True, ) print("#" * 42, flush=True) # epochs = 15 if dim <= 4 else 5 # epochs_init = 200 if dim <= 4 else 400 # n_collocation = 50_000 if dim <= 4 else 100_000 # layer_sizes = [10 * dim, 10 * dim] if dim <= 4 else [7 * dim, 7 * dim, 7 * dim] epochs = 10 epochs_init = 200 n_collocation = 100_000 layer_sizes = [7 * dim, 7 * dim, 7 * dim] end_time = 1 / dim dt = end_time / nt scheme, sampler, cpu_init, cpu_iter = solve_with_neural_sl( end_time, dt, dim, layer_sizes, n_collocation, epochs, epochs_init, retrain_initial_condition=True, verbose=True, ) res_.append((scheme, sampler, end_time, cpu_init, cpu_iter)) print("#" * 42, flush=True) print( f"Finished {dim}D advection-diffusion equation with {nt} time steps.", flush=True, ) print("#" * 42, flush=True) res.append(res_) # %% string = "" for i_dim, dim in enumerate(dims): for i_nt, nt in enumerate(nts): scheme, sampler, end_time, cpu_init, cpu_iter = res[i_dim][i_nt] L2_error, Linf_error = compute_error(scheme, sampler, dim, end_time) if DIFFUSION_COEFFICIENT == 0: string += f"\nPure advection, dimension {dim}, {nt} time steps:" else: string += f"\nAdvection-diffusion, dimension {dim}, {nt} time steps:\n" string += f"Elapsed time (init.): {cpu_init:.2f} s\n" string += f"Elapsed time (iter.): {cpu_iter:.2f} s\n" string += f"L2 relative error: {L2_error:.3e}\n" string += f"Linf relative error: {Linf_error:.3e}\n\n" print(string) with open("advection_diffusion_nD_Gaussian_results.txt", "w") as file: file.write(string) # %% if plot and (dim == 1): x, mu = sampler.sample(1000) u = scheme.pde.space.evaluate(x, mu).w[:, 0].detach().cpu() u_exact = f_exact(end_time, x, mu, dim).detach().cpu() u_ini = f_ini(x, mu, dim).detach().cpu() x = x.x.detach().cpu() plt.scatter(x, u_ini, label="Initial condition", s=0.25) plt.scatter(x, u_exact, label="Exact solution", s=0.25) plt.scatter(x, u, label="Approximation", s=0.1) plt.legend() plt.show() elif plot and (dim >= 2): x, mu = sampler.sample(25_000) if dim > 2: x_ = x.get_components() slice = (-0.5 + end_time) * torch.ones(x.shape[0], dim - 2) x = LabelTensor(torch.cat([x_[0], x_[1], slice], dim=1)) slice_ini = -0.5 * torch.ones(x.shape[0], dim - 2) x_ini = LabelTensor(torch.cat([x_[0], x_[1], slice_ini], dim=1)) u = scheme.pde.space.evaluate(x, mu).w[:, 0].detach().cpu() u_exact = f_exact(end_time, x, mu, dim)[:, 0].detach().cpu() u_ini = f_ini(x_ini, mu, dim)[:, 0].detach().cpu() if dim == 2: x1, x2 = x.get_components() else: x_ = x.get_components() x1, x2 = x_[0], x_[1] x1, x2 = x1.detach().cpu(), x2.detach().cpu() fig, ax = plt.subplots(2, 2, figsize=(12, 12)) ax[0, 0].scatter(x1, x2, c=u_ini, s=0.5, cmap="turbo") plt.colorbar(ax[0, 0].collections[0], ax=ax[0, 0]) ax[0, 0].set_title("Initial condition") ax[0, 1].scatter(x1, x2, c=u_exact, s=0.5, cmap="turbo") plt.colorbar(ax[0, 1].collections[0], ax=ax[0, 1]) ax[0, 1].set_title("Exact solution") ax[1, 0].scatter(x1, x2, c=u, s=0.5, cmap="turbo") plt.colorbar(ax[1, 0].collections[0], ax=ax[1, 0]) ax[1, 0].set_title("Approximate solution") error = torch.abs(u - u_exact) / torch.abs(u_exact) ax[1, 1].scatter(x1, x2, c=error, s=0.5, cmap="gist_heat") plt.colorbar(ax[1, 1].collections[0], ax=ax[1, 1]) ax[1, 1].set_title("Relative error") plt.show() # %% if plot_exact_solution := False: n_visu = 250 for dim in range(1, 9): x_plot = torch.linspace(-0.75, 1 / dim**1.5, n_visu).unsqueeze(-1) x_plot_ = x_plot.detach().cpu() t = 1 / dim x_ = (-0.5 + t) * torch.ones((n_visu, dim - 1)) x = LabelTensor(torch.cat((x_plot, x_), axis=-1)) ex = f_exact(t, x, None, dim).detach().cpu() plt.plot(x_plot_, ex, label=f"dim {dim}") plt.legend() # %%