r"""Solves the advection of a 3D parametric bump function in a cylinder. ..math:: \partial_t u + a \cdot \nabla u = 0 where :math:`u: \mathbb{R}^3 \times (0, T) \times \mathbb{R}^2 \to \mathbb{R}` is the unknown function, depending on space, time, and two parameters (the position and the variance of the initial bump). The equation is solved using the neural semi-Lagrangian scheme, with either a classical Adam optimizer, or a natural gradient preconditioning. """ # %% import time import matplotlib.pyplot as plt import torch from scimba_torch import PI from scimba_torch.approximation_space.nn_space import NNxSpace from scimba_torch.domain.meshless_domain.domain_3d import Cylinder3D from scimba_torch.integration.monte_carlo import DomainSampler, TensorizedSampler from scimba_torch.integration.monte_carlo_parameters import UniformParametricSampler from scimba_torch.neural_nets.coordinates_based_nets.mlp import GenericMLP from scimba_torch.numerical_solvers.collocation_projector import ( AbstractNonlinearProjector, ) from scimba_torch.numerical_solvers.temporal_pde.neural_semilagrangian import ( Characteristic, NeuralSemiLagrangian, ) from scimba_torch.numerical_solvers.temporal_pde.time_discrete import ( TimeDiscreteCollocationProjector, TimeDiscreteNaturalGradientProjector, ) from scimba_torch.optimizers.optimizers_data import OptimizerData from scimba_torch.physical_models.temporal_pde.advection_diffusion_equation import ( AdvectionReactionDiffusionDirichletStrongForm, ) from scimba_torch.utils.scimba_tensors import LabelTensor, MultiLabelTensor RADIUS = 1 LENGTH = 2 Z_VELOCITY = 1 DOMAIN = Cylinder3D(RADIUS, LENGTH, is_main_domain=True) PARAMETER_BOUNDS = [[0.3, 0.5], [0.05, 0.15]] MU_PLOT = torch.tensor([0.4, 0.1]) MU_SAMPLER = UniformParametricSampler(PARAMETER_BOUNDS) def f_ini(x, mu): return exact(LabelTensor(torch.zeros((x.shape[0], 1))), x, mu) def exact(t, x, mu): def bump(x1, x2, x3, v): return torch.exp(-(x1**2 + x2**2 + x3**2) / (2 * v**2)) x1, x2, x3 = x.get_components() c, v = mu.get_components() t_ = t.get_components() c_z = LENGTH / 2 x1_t = torch.cos(2 * PI * t_) x2_t = torch.sin(2 * PI * t_) x1_ = x1 - c * x1_t x2_ = x2 - c * x2_t x3_ = torch.remainder((x3 - Z_VELOCITY * t_), LENGTH) - c_z return 1 + bump(x1_, x2_, x3_, v) def a(t, x, mu): x1, x2, x3 = x.get_components() return torch.cat((-2 * PI * x2, 2 * PI * x1, Z_VELOCITY * x3), dim=1) def exact_foot( t: torch.Tensor, x: LabelTensor, mu: LabelTensor, dt: float ) -> torch.Tensor: x1, x2, x3 = x.get_components() arg = 2 * PI * t c_n = torch.cos(arg) s_n = torch.sin(arg) k1 = x1 * s_n + x2 * c_n k2 = x1 * c_n - x2 * s_n arg_np1 = 2 * PI * (t + dt) c_np1 = torch.cos(arg_np1) s_np1 = torch.sin(arg_np1) x1_np1 = k1 * s_np1 + k2 * c_np1 x2_np1 = -k2 * s_np1 + k1 * c_np1 x3_np1 = x3 - Z_VELOCITY * dt return torch.cat((x1_np1, x2_np1, x3_np1), dim=1) def plot_results(f, sampler, T, iter=None, n_visu=20_000): fig, axes = plt.subplots(2, 2, figsize=(12, 12), constrained_layout=True) # plot in z-plane x, mu = sampler(n_visu) x1, x2, _ = x.get_components() x3 = torch.ones_like(x1) * LENGTH / 2 + Z_VELOCITY * T x3 = torch.remainder(x3, 2) x = LabelTensor(torch.cat((x1, x2, x3), dim=1)) mu = LabelTensor(torch.ones((x.shape[0], 2)) * MU_PLOT) predictions = f(x, mu).w.detach().cpu() x1, x2 = x1.detach().cpu(), x2.detach().cpu() scatter1 = axes[0, 0].scatter(x1, x2, c=predictions, s=0.5, cmap="turbo") plane = (LENGTH / 2 + Z_VELOCITY * T) % 2 axes[0, 0].set_title(f"Predictions, plane z={plane}") axes[0, 0].set_aspect("equal") fig.colorbar(scatter1, ax=axes[0, 0], fraction=0.046, pad=0.04) T_ = LabelTensor(torch.ones((x.shape[0], 1)) * T) expected = exact(T_, x, mu).detach().cpu() error = torch.abs(predictions - expected) / expected error = error.detach().cpu() scatter2 = axes[0, 1].scatter(x1, x2, c=error, s=0.5, cmap="turbo") axes[0, 1].set_title("Error") axes[0, 1].set_aspect("equal") fig.colorbar(scatter2, ax=axes[0, 1], fraction=0.046, pad=0.04) # plot in x-plane x, mu = sampler(n_visu) x1, _, x3 = x.get_components() x = LabelTensor(torch.cat((x1, torch.zeros_like(x3), x3), dim=1)) mu = LabelTensor(torch.ones((x.shape[0], 2)) * MU_PLOT) predictions = f(x, mu).w.detach().cpu() x1, x3 = x1.detach().cpu(), x3.detach().cpu() scatter1 = axes[1, 0].scatter(x1, x3, c=predictions, s=0.5, cmap="turbo") axes[1, 0].set_title(f"Predictions, plane y={0}") axes[1, 0].set_aspect("equal") fig.colorbar(scatter1, ax=axes[1, 0], fraction=0.046, pad=0.04) T_ = LabelTensor(torch.ones((x.shape[0], 1)) * T) expected = exact(T_, x, mu).detach().cpu() error = torch.abs(predictions - expected) / expected error = error.detach().cpu() scatter2 = axes[1, 1].scatter(x1, x3, c=error, s=0.5, cmap="turbo") axes[1, 1].set_title("Error") axes[1, 1].set_aspect("equal") fig.colorbar(scatter2, ax=axes[1, 1], fraction=0.046, pad=0.04) plt.show() # %% class NonUniformSampler(DomainSampler): def __init__(self, f, domain, mu_sampler, **kwargs): super().__init__(domain, **kwargs) self.f = f self.mu_sampler = mu_sampler def sample_around_zeroes(self, n, sigma=0.05, keep=0.75): candidates = LabelTensor(self.vol_sampler.sample(10 * n)) c1, c2, c3 = candidates.get_components() parameters = self.mu_sampler.sample(10 * n) f_values = self.f(candidates, parameters).w[:, 0] weights = torch.exp(-(f_values**2) / (2 * sigma**2)) return candidates[weights > keep].x def sample_new_points(self, n: int): """Samples `n` new points from the volumetric domain and labels them by subdomains. Args: n (int): Number of points to sample. Returns: ScimbaTensor: A tensor of sampled points with their subdomain labels. """ pts_1 = self.sample_around_zeroes(n, sigma=0.5) if pts_1.shape[0] >= n // 4: pts_1 = pts_1[: n // 4] pts_2 = self.sample_around_zeroes(n, sigma=10) pts_2 = torch.cat((pts_1, pts_2), dim=0) if pts_2.shape[0] >= n // 2: pts_2 = pts_2[: n // 2] pts_3 = self.sample_around_zeroes(n, sigma=500) pts_3 = torch.cat((pts_2, pts_3), dim=0) if pts_3.shape[0] >= 3 * n // 4: pts_3 = pts_3[: 3 * n // 4] remaining = n - pts_3.shape[0] pts_unif = self.vol_sampler.sample(remaining) pts = torch.cat((pts_3, pts_unif), dim=0) labels = torch.zeros(n, dtype=int) # Label points based on subdomains for i, subdomain in enumerate(self.domain.list_subdomains): condition = subdomain.is_inside(pts) labels[condition] = i + 1 # Wrap the points and labels in a ScimbaTensor data = LabelTensor(pts, labels) data.x.requires_grad_() return data # %% def make_projector( use_natural_gradient: bool, pde, initial_condition: bool = False, ) -> AbstractNonlinearProjector: if use_natural_gradient: return TimeDiscreteNaturalGradientProjector(pde, rhs=f_ini) else: opt_1 = { "name": "adam", "optimizer_args": {"lr": 2.5e-2, "betas": (0.9, 0.999)}, } opt_2 = { "name": "lbfgs", "switch_at_epoch_ratio": 0.9, "optimizer_args": { "history_size": 50, "max_iter": 50, "tolerance_grad": 1e-11, "tolerance_change": 1e-9, }, } if initial_condition: opt = OptimizerData(opt_1, opt_2) else: opt = OptimizerData(opt_2) return TimeDiscreteCollocationProjector(pde, rhs=f_ini, optimizers=opt) def solve_with_neural_sl( T: float, dt: float, test_case: int, retrain_initial_condition: bool = False, verbose: bool = False, ) -> NeuralSemiLagrangian: torch.random.manual_seed(0) assert test_case in [0, 1, 2, 3], "test_case must be 0, 1, 2 or 3" file_name = f"transport_3D_cylinder_test_case_{test_case}" use_natural_gradient = test_case >= 1 use_adaptive_sampling = test_case >= 1 sampler_unif = TensorizedSampler([DomainSampler(DOMAIN), MU_SAMPLER]) layer_sizes = [20, 40, 20] if test_case == 1 else [60, 60, 60] if test_case == 0: epochs = 100 elif test_case == 1 or test_case == 2: epochs = 50 else: epochs = 200 if test_case == 0: epochs_init = 1_000 elif test_case == 1 or test_case == 2: epochs_init = 150 else: epochs_init = 500 if test_case <= 2: n_collocation = 50_000 else: n_collocation = 150_000 # layer_sizes = [2, 4, 2] if test_case == 1 else [6, 6, 6] # epochs = 5 if test_case <= 2 else 10 # epochs_init = 10 if use_natural_gradient else 15 # n_collocation = 50 description = {} description["use_natural_gradient"] = use_natural_gradient description["use_adaptive_sampling"] = use_adaptive_sampling description["layer_sizes"] = layer_sizes description["epochs"] = epochs description["epochs_init"] = epochs_init description["n_collocation"] = n_collocation description["file_name"] = file_name space = NNxSpace(1, 2, GenericMLP, DOMAIN, sampler_unif, layer_sizes=layer_sizes) def grad_for_sample(x, mu): x.x.requires_grad_() w = space.evaluate(x, mu).w w_x, w_y, w_z = space.grad(w, x) grad_w2 = w_x**2 + w_y**2 + w_z**2 pm_one = torch.randint(0, 2, (grad_w2.shape[0], 1)) * 2 - 1 res = pm_one / (1e-2 + (w_x**2 + w_y**2 + w_z**2)) return MultiLabelTensor(res) if use_adaptive_sampling: sampler = TensorizedSampler( [NonUniformSampler(grad_for_sample, DOMAIN, MU_SAMPLER), MU_SAMPLER] ) space.integrator = sampler else: sampler = sampler_unif pde = AdvectionReactionDiffusionDirichletStrongForm( space, a=a, u0=f_ini, constant_advection=False, zero_diffusion=True ) characteristic = Characteristic(pde, exact_foot=exact_foot, periodic=True) CPU_times = {} if retrain_initial_condition: projector = make_projector(use_natural_gradient, pde, initial_condition=True) scheme = NeuralSemiLagrangian(characteristic, projector) start = time.perf_counter() scheme.initialization( epochs=epochs_init, n_collocation=n_collocation, verbose=verbose ) CPU_times["ini"] = time.perf_counter() - start scheme.projector.save(f"ini_{file_name}") if verbose: plot_results(space.evaluate, sampler.sample, T=0) projector = make_projector(use_natural_gradient, pde) projector.load(f"ini_{file_name}") projector.space.load_from_best_approx() scheme = NeuralSemiLagrangian(characteristic, projector) start = time.perf_counter() scheme.solve( dt=dt, final_time=T, epochs=epochs, n_collocation=n_collocation, verbose=verbose, plot=plot_results if verbose else None, save=file_name, sol_exact=exact, ) CPU_times["solve"] = time.perf_counter() - start return { "scheme": scheme, "sampler": sampler, "CPU_times": CPU_times, "description": description, } # %% T = LENGTH * Z_VELOCITY dt = T / 4 # %% results = [] # for test_case in [0, 1, 2, 3]: for test_case in [3]: results.append( solve_with_neural_sl( T, dt, test_case, retrain_initial_condition=True, verbose=True ) ) # %%