r"""Solves an advection-diffusion equation in dimension 1 to 8. ..math:: \partial_t u + a \cdot \nabla u - D \Delta u = 0 The initial condition is a high-rank mixture of sine functions. The equation is solved using the neural semi-Lagrangian scheme with a natural gradient projector. """ # %% import time import matplotlib.pyplot as plt import torch from scimba_torch import PI from scimba_torch.approximation_space.nn_space import NNxSpace from scimba_torch.domain.meshless_domain.domain_nd import HypercubeND from scimba_torch.integration.monte_carlo import DomainSampler, TensorizedSampler from scimba_torch.integration.monte_carlo_parameters import UniformParametricSampler from scimba_torch.neural_nets.coordinates_based_nets.features import FourierMLP from scimba_torch.neural_nets.coordinates_based_nets.mlp import GenericMLP from scimba_torch.numerical_solvers.temporal_pde.neural_semilagrangian import ( Characteristic, NeuralSemiLagrangian, ) from scimba_torch.numerical_solvers.temporal_pde.time_discrete import ( TimeDiscreteNaturalGradientProjector, ) from scimba_torch.physical_models.temporal_pde.advection_diffusion_equation import ( AdvectionReactionDiffusionDirichletStrongForm, ) from scimba_torch.utils.scimba_tensors import LabelTensor MU_SAMPLER = UniformParametricSampler([]) LOG_2 = torch.log(torch.tensor(2.0)).item() DIFFUSION_COEFFICIENT = 0.1 FREQUENCY = 1 * PI def f_ini(x, mu, dim): return f_exact(0, x, mu, dim) def f_exact(t, x, mu, dim): x_ = x.get_components() if dim == 1: x_ = (x_,) if isinstance(t, (int, float)): t = t * torch.ones((x_[0].shape[0], 1)) if isinstance(t, torch.Tensor): t = LabelTensor(t) t_ = t.get_components() a = [1] * dim d = DIFFUSION_COEFFICIENT s = torch.linspace(0, 1 - 1 / dim, dim) # sol = 1 # for i in range(dim): # sol *= torch.sin(FREQUENCY * (x_[i] - s[i] - a[i] * t_)) # sol *= torch.exp(-dim * d * FREQUENCY**2 * t_) arg = 0 for i in range(dim): arg = arg + x_[i] - s[i] - a[i] * t_ sin = torch.sin(FREQUENCY * arg) exp = torch.exp(-dim * d * FREQUENCY**2 * t_) return 2 + sin * exp def diffusion_matrix(t, x, mu): d = DIFFUSION_COEFFICIENT batch_size, n = x.shape[0], x.shape[1] return (d * torch.eye(n)).unsqueeze(0).repeat(batch_size, 1, 1) # %% class NonUniformSampler(DomainSampler): def __init__(self, f, domain, **kwargs): super().__init__(domain, **kwargs) self.f = f self.mu_sampler = MU_SAMPLER def sample_around_zeroes(self, n, sigma=0.05, keep=0.75): candidates = LabelTensor(self.vol_sampler.sample(10 * n)) parameters = self.mu_sampler.sample(10 * n) f_values = self.f(candidates, parameters).w[:, 0] weights = torch.exp(-(f_values**2) / (2 * sigma**2)) return candidates[weights > keep].x def sample_new_points(self, n: int): """Samples `n` new points from the volumetric domain and labels them by subdomains. Args: n: Number of points to sample. Returns: ScimbaTensor: A tensor of sampled points with their subdomain labels. """ pts_1 = self.sample_around_zeroes(n, sigma=20) if pts_1.shape[0] >= n // 4: pts_1 = pts_1[: n // 4] pts_2 = self.sample_around_zeroes(n, sigma=100) pts_2 = torch.cat((pts_1, pts_2), dim=0) if pts_2.shape[0] >= n // 2: pts_2 = pts_2[: n // 2] pts_3 = self.sample_around_zeroes(n, sigma=5000) pts_3 = torch.cat((pts_2, pts_3), dim=0) if pts_3.shape[0] >= 3 * n // 4: pts_3 = pts_3[: 3 * n // 4] remaining = n - pts_3.shape[0] pts_unif = self.vol_sampler.sample(remaining) pts = torch.cat((pts_3, pts_unif), dim=0) labels = torch.zeros(n, dtype=int) # Label points based on subdomains for i, subdomain in enumerate(self.domain.list_subdomains): condition = subdomain.is_inside(pts) labels[condition] = i + 1 # Wrap the points and labels in a ScimbaTensor data = LabelTensor(pts, labels) data.x.requires_grad_() return data # %% def solve_with_neural_sl( T: float, dt: float, dim: int, layer_sizes: list, N_c: int, epochs: int, epochs_init: int, retrain_initial_condition=False, verbose=False, use_Fourier=True, ) -> tuple: assert isinstance(dim, int) and dim > 0 if DIFFUSION_COEFFICIENT == 0: file_desc = "advection" else: file_desc = "advection_diffusion" torch.random.manual_seed(0) bounds = [(-1, 1)] * dim domain_x = HypercubeND(bounds, is_main_domain=True) # sampler_unif = TensorizedSampler([DomainSampler(domain_x), MU_SAMPLER]) sampler = TensorizedSampler([DomainSampler(domain_x), MU_SAMPLER]) net = FourierMLP if use_Fourier else GenericMLP space = NNxSpace( 1, 0, net, domain_x, sampler, layer_sizes=layer_sizes, nb_features=5 * dim, mean=0.0, std=FREQUENCY * dim**0.5, activation_type="sine", # activation_type="tanh", ) # def grad_for_sample(x, mu): # x.x.requires_grad_() # w = space.evaluate(x, mu).w # dw = space.grad(w, x) # if dim == 1: # dw = (dw,) # dw = torch.cat(list(dw), dim=1) # grad_w2 = (dw**2).sum(dim=1, keepdim=True) # res = 1 / (1e-2 + grad_w2) # return MultiLabelTensor(res) # sampler = TensorizedSampler( # [NonUniformSampler(grad_for_sample, domain_x), MU_SAMPLER] # ) # space.integrator = sampler def initial_condition(x, mu): return f_ini(x, mu, dim) pde = AdvectionReactionDiffusionDirichletStrongForm( space, d=diffusion_matrix, u0=initial_condition, constant_advection=True, constant_diffusion=True, ) characteristic = Characteristic( pde, periodic=True, diffusion_coefficient=DIFFUSION_COEFFICIENT, ) projector = TimeDiscreteNaturalGradientProjector(pde, rhs=initial_condition) scheme = NeuralSemiLagrangian(characteristic, projector) cpu_init = 0 if retrain_initial_condition: cpu_init = time.perf_counter() scheme.initialization(epochs=epochs_init, n_collocation=N_c, verbose=verbose) cpu_init = time.perf_counter() - cpu_init scheme.projector.save(f"ini_{file_desc}_{dim}D") projector = TimeDiscreteNaturalGradientProjector(pde, rhs=initial_condition) projector.load(f"ini_{file_desc}_{dim}D") projector.space.load_from_best_approx() scheme = NeuralSemiLagrangian(characteristic, projector) cpu_iter = time.perf_counter() scheme.solve( dt=dt, final_time=T, epochs=epochs, n_collocation=N_c, verbose=verbose, # save=f"{file_desc}_{dim}D", save=None, ) cpu_iter = time.perf_counter() - cpu_iter return scheme, sampler, cpu_init, cpu_iter def compute_error(scheme, sampler, dim, t=1, n=500_000): x, mu = sampler.sample(n) u = scheme.pde.space.evaluate(x, mu).w u_exact = f_exact(t, x, mu, dim) error = torch.abs(u - u_exact) / torch.abs(u_exact) L2_error = torch.sqrt(torch.mean(error**2)).item() Linf_error = torch.max(error).item() return L2_error, Linf_error # %% # dims = list(range(1, 2)) # dims = list(range(1, 2)) dims = list(range(2, 3)) # dims = list(range(3, 4)) # dims = list(range(4, 5)) # dims = list(range(5, 6)) # dims = list(range(6, 9)) # dims = list(range(6, 7)) # dims = list(range(8, 9)) # dims = list(range(1, 9)) # dims = list(range(1, 5)) # nts = [20] nts = [32] # nts = [2**i for i in range(1, 8)] res = [] plot = False for dim in dims: res_ = [] for nt in nts: print("#" * 42, flush=True) print( f"Solving {dim}D advection-diffusion equation with {nt} time steps...", flush=True, ) print("#" * 42, flush=True) epochs = 20 # epochs_init = 250 epochs_init = 100 # n_collocation = 10_000 * dim # n_collocation = 50_000 * dim n_collocation = 10_000 # n_collocation = 100_000 # n_collocation = 25_000 # layer_sizes = [30] * 3 layer_sizes = [7 * dim] * 3 # layer_sizes = [6 * dim] * 4 # layer_sizes = [5 * dim] * 5 if dim == 3 and nt != 20: end_time = 1 else: end_time = LOG_2 / (dim * DIFFUSION_COEFFICIENT * FREQUENCY**2) dt = end_time / nt scheme, sampler, cpu_init, cpu_iter = solve_with_neural_sl( end_time, dt, dim, layer_sizes, n_collocation, epochs, epochs_init, retrain_initial_condition=nt == 20 or nt == 2 or nt == 6, # retrain_initial_condition=True, # retrain_initial_condition=False, # verbose=False, verbose=True, ) res_.append((scheme, sampler, end_time, cpu_init, cpu_iter)) print("#" * 42, flush=True) print( f"Finished {dim}D advection-diffusion equation with {nt} time steps.", flush=True, ) print("#" * 42, flush=True) res.append(res_) # %% string = "" for i_dim, dim in enumerate(dims): for i_nt, nt in enumerate(nts): scheme, sampler, end_time, cpu_init, cpu_iter = res[i_dim][i_nt] L2_error, Linf_error = compute_error(scheme, sampler, dim, end_time) if DIFFUSION_COEFFICIENT == 0: string += f"\nPure advection, dimension {dim}, {nt} time steps:" else: string += f"\nAdvection-diffusion, dimension {dim}, {nt} time steps:\n" string += f"Elapsed time (init.): {cpu_init:.2f} s\n" string += f"Elapsed time (iter.): {cpu_iter:.2f} s\n" string += f"L2 relative error: {L2_error:.3e}\n" string += f"Linf relative error: {Linf_error:.3e}\n\n" print(string) with open("advection_diffusion_nD_results.txt", "w") as file: file.write(string) # %% if plot and (dim == 1): x, mu = sampler.sample(1000) u = scheme.pde.space.evaluate(x, mu).w[:, 0].detach().cpu() u_exact = f_exact(end_time, x, mu, dim).detach().cpu() u_ini = f_ini(x, mu, dim).detach().cpu() x = x.x.detach().cpu() plt.scatter(x, u_ini, label="Initial condition", s=0.25) plt.scatter(x, u_exact, label="Exact solution", s=0.25) plt.scatter(x, u, label="Approximation", s=0.1) plt.legend() plt.show() elif True and (dim >= 2): x, mu = sampler.sample(100_000) if dim > 2: x_ = x.get_components() slice = 1 / (1 + dim) + torch.zeros(x.shape[0], dim - 2) x = LabelTensor(torch.cat([x_[0], x_[1], slice], dim=1)) u = scheme.pde.space.evaluate(x, mu).w[:, 0].detach().cpu() u_exact = f_exact(end_time, x, mu, dim)[:, 0].detach().cpu() u_ini = f_ini(x, mu, dim)[:, 0].detach().cpu() if dim == 2: x1, x2 = x.get_components() else: x_ = x.get_components() x1, x2 = x_[0], x_[1] x1, x2 = x1.detach().cpu(), x2.detach().cpu() fig, ax = plt.subplots(2, 2, figsize=(12, 12)) ax[0, 0].scatter(x1, x2, c=u_ini, cmap="turbo") plt.colorbar(ax[0, 0].collections[0], ax=ax[0, 0]) ax[0, 0].set_title("Initial condition") ax[0, 1].scatter(x1, x2, c=u_exact, cmap="turbo") plt.colorbar(ax[0, 1].collections[0], ax=ax[0, 1]) ax[0, 1].set_title("Exact solution") ax[1, 0].scatter(x1, x2, c=u, cmap="turbo") plt.colorbar(ax[1, 0].collections[0], ax=ax[1, 0]) ax[1, 0].set_title("Approximate solution") error = torch.abs(u - u_exact) / torch.abs(u_exact) ax[1, 1].scatter(x1, x2, c=error, cmap="gist_heat") plt.colorbar(ax[1, 1].collections[0], ax=ax[1, 1]) ax[1, 1].set_title("Relative error") plt.show() # %%