"""Postprocess the results of the 3D cylindrical transport problem.""" # %% import time import matplotlib.pyplot as plt import torch from scimba_torch import PI from scimba_torch.approximation_space.abstract_space import AbstractApproxSpace from scimba_torch.approximation_space.nn_space import NNxSpace from scimba_torch.domain.meshless_domain.domain_3d import Cylinder3D from scimba_torch.integration.monte_carlo import DomainSampler, TensorizedSampler from scimba_torch.integration.monte_carlo_parameters import UniformParametricSampler from scimba_torch.neural_nets.coordinates_based_nets.mlp import GenericMLP from scimba_torch.numerical_solvers.collocation_projector import ( AbstractNonlinearProjector, CollocationProjector, NaturalGradientProjector, ) from scimba_torch.numerical_solvers.temporal_pde.neural_semilagrangian import ( Characteristic, NeuralSemiLagrangian, ) from scimba_torch.optimizers.optimizers_data import OptimizerData from scimba_torch.physical_models.temporal_pde.advection_diffusion_equation import ( AdvectionReactionDiffusionDirichletStrongForm, ) from scimba_torch.utils.scimba_tensors import LabelTensor, MultiLabelTensor RADIUS = 1 LENGTH = 2 Z_VELOCITY = 1 DOMAIN = Cylinder3D(RADIUS, LENGTH, is_main_domain=True) PARAMETER_BOUNDS = [[0.3, 0.5], [0.05, 0.15]] MU_PLOT = torch.tensor([0.4, 0.1]) MU_SAMPLER = UniformParametricSampler(PARAMETER_BOUNDS) def f_ini(x, mu): return exact(LabelTensor(torch.zeros((x.shape[0], 1))), x, mu) def exact(t, x, mu): def bump(x1, x2, x3, v): return torch.exp(-(x1**2 + x2**2 + x3**2) / (2 * v**2)) x1, x2, x3 = x.get_components() c, v = mu.get_components() t_ = t.get_components() c_z = LENGTH / 2 x1_t = torch.cos(2 * PI * t_) x2_t = torch.sin(2 * PI * t_) x1_ = x1 - c * x1_t x2_ = x2 - c * x2_t x3_ = torch.remainder((x3 - Z_VELOCITY * t_), LENGTH) - c_z return 1 + bump(x1_, x2_, x3_, v) def a(t, x, mu): x1, x2, x3 = x.get_components() return torch.cat((-2 * PI * x2, 2 * PI * x1, Z_VELOCITY * x3), dim=1) def exact_foot( t: torch.Tensor, x: LabelTensor, mu: LabelTensor, dt: float ) -> torch.Tensor: x1, x2, x3 = x.get_components() arg = 2 * PI * t c_n = torch.cos(arg) s_n = torch.sin(arg) k1 = x1 * s_n + x2 * c_n k2 = x1 * c_n - x2 * s_n arg_np1 = 2 * PI * (t + dt) c_np1 = torch.cos(arg_np1) s_np1 = torch.sin(arg_np1) x1_np1 = k1 * s_np1 + k2 * c_np1 x2_np1 = -k2 * s_np1 + k1 * c_np1 x3_np1 = x3 - Z_VELOCITY * dt return torch.cat((x1_np1, x2_np1, x3_np1), dim=1) def plot_results(f, sampler, T, iter=None, n_visu=20_000, to_save=False): fig, axes = plt.subplots(2, 2, figsize=(12, 12), constrained_layout=True) # plot in z-plane x, mu = sampler(n_visu) x1, x2, _ = x.get_components() x3 = torch.ones_like(x1) * LENGTH / 2 + Z_VELOCITY * T x3 = torch.remainder(x3, 2) x = LabelTensor(torch.cat((x1, x2, x3), dim=1)) mu = LabelTensor(torch.ones((x.shape[0], 2)) * MU_PLOT) predictions = f(x, mu).w.detach().cpu() x1, x2 = x1.detach().cpu(), x2.detach().cpu() scatter1 = axes[0, 0].scatter(x1, x2, c=predictions, s=0.5, cmap="turbo") plane = (LENGTH / 2 + Z_VELOCITY * T) % 2 axes[0, 0].set_title(f"Predictions, plane z={plane}") axes[0, 0].set_aspect("equal") fig.colorbar(scatter1, ax=axes[0, 0], fraction=0.046, pad=0.04) if to_save: fig_s, ax_s = plt.subplots(1, 1, figsize=(5, 5), tight_layout=True) scatter = ax_s.scatter(x1, x2, c=predictions, s=0.5, cmap="turbo") fig_s.colorbar(scatter, ax=ax_s, fraction=0.046, pad=0.04) ax_s.set_aspect("equal") plt.gca().set_rasterization_zorder(-1) fig_s.savefig("3D_rotation_prediction_z.pdf", format="pdf") plt.close(fig_s) T_ = LabelTensor(torch.ones((x.shape[0], 1)) * T) expected = exact(T_, x, mu).detach().cpu() error = torch.abs(predictions - expected) / expected error = error.detach().cpu() L2_error = (error**2).mean().item() ** 0.5 scatter2 = axes[0, 1].scatter(x1, x2, c=error, s=0.5, cmap="turbo") axes[0, 1].set_title(f"Error = {L2_error:.2e}") axes[0, 1].set_aspect("equal") fig.colorbar(scatter2, ax=axes[0, 1], fraction=0.046, pad=0.04) if to_save: fig_s, ax_s = plt.subplots(1, 1, figsize=(5, 5), tight_layout=True) scatter = ax_s.scatter(x1, x2, c=error, s=0.5, cmap="jet") fig_s.colorbar(scatter, ax=ax_s, fraction=0.046, pad=0.04) ax_s.set_aspect("equal") plt.gca().set_rasterization_zorder(-1) fig_s.savefig("3D_rotation_error_z.pdf", format="pdf") plt.close(fig_s) # plot in x-plane x, mu = sampler(n_visu) x1, _, x3 = x.get_components() x = LabelTensor(torch.cat((x1, torch.zeros_like(x3), x3), dim=1)) mu = LabelTensor(torch.ones((x.shape[0], 2)) * MU_PLOT) predictions = f(x, mu).w.detach().cpu() x1, x3 = x1.detach().cpu(), x3.detach().cpu() scatter1 = axes[1, 0].scatter(x1, x3, c=predictions, s=0.5, cmap="turbo") axes[1, 0].set_title(f"Predictions, plane y={0}") axes[1, 0].set_aspect("equal") fig.colorbar(scatter1, ax=axes[1, 0], fraction=0.046, pad=0.04) if to_save: fig_s, ax_s = plt.subplots(1, 1, figsize=(5, 5), tight_layout=True) scatter = ax_s.scatter(x1, x3, c=predictions, s=0.5, cmap="turbo") fig_s.colorbar(scatter, ax=ax_s, fraction=0.046, pad=0.04) ax_s.set_aspect("equal") plt.gca().set_rasterization_zorder(-1) fig_s.savefig("3D_rotation_prediction_y.pdf", format="pdf") plt.close(fig_s) T_ = LabelTensor(torch.ones((x.shape[0], 1)) * T) expected = exact(T_, x, mu).detach().cpu() error = torch.abs(predictions - expected) / expected error = error.detach().cpu() L2_error = (error**2).mean().item() ** 0.5 scatter2 = axes[1, 1].scatter(x1, x3, c=error, s=0.5, cmap="turbo") axes[1, 1].set_title(f"Error = {L2_error:.2e}") axes[1, 1].set_aspect("equal") fig.colorbar(scatter2, ax=axes[1, 1], fraction=0.046, pad=0.04) if to_save: fig_s, ax_s = plt.subplots(1, 1, figsize=(5, 5), tight_layout=True) scatter = ax_s.scatter(x1, x3, c=error, s=0.5, cmap="jet") fig_s.colorbar(scatter, ax=ax_s, fraction=0.046, pad=0.04) ax_s.set_aspect("equal") plt.gca().set_rasterization_zorder(-1) fig_s.savefig("3D_rotation_error_y.pdf", format="pdf") plt.close(fig_s) plt.show() # %% class NonUniformSampler(DomainSampler): def __init__(self, f, domain, mu_sampler, **kwargs): super().__init__(domain, **kwargs) self.f = f self.mu_sampler = mu_sampler def sample_around_zeroes(self, n, sigma=0.05, keep=0.75): candidates = LabelTensor(self.vol_sampler.sample(10 * n)) c1, c2, c3 = candidates.get_components() parameters = self.mu_sampler.sample(10 * n) f_values = self.f(candidates, parameters).w[:, 0] weights = torch.exp(-(f_values**2) / (2 * sigma**2)) return candidates[weights > keep].x def sample_new_points(self, n: int): """Samples `n` new points from the volumetric domain and labels them by subdomains. Args: n: Number of points to sample. Returns: ScimbaTensor: A tensor of sampled points with their subdomain labels. """ pts_1 = self.sample_around_zeroes(n, sigma=0.5) if pts_1.shape[0] >= n // 4: pts_1 = pts_1[: n // 4] pts_2 = self.sample_around_zeroes(n, sigma=10) pts_2 = torch.cat((pts_1, pts_2), dim=0) if pts_2.shape[0] >= n // 2: pts_2 = pts_2[: n // 2] pts_3 = self.sample_around_zeroes(n, sigma=500) pts_3 = torch.cat((pts_2, pts_3), dim=0) if pts_3.shape[0] >= 3 * n // 4: pts_3 = pts_3[: 3 * n // 4] remaining = n - pts_3.shape[0] pts_unif = self.vol_sampler.sample(remaining) pts = torch.cat((pts_3, pts_unif), dim=0) labels = torch.zeros(n, dtype=int) # Label points based on subdomains for i, subdomain in enumerate(self.domain.list_subdomains): condition = subdomain.is_inside(pts) labels[condition] = i + 1 # Wrap the points and labels in a ScimbaTensor data = LabelTensor(pts, labels) data.x.requires_grad_() return data # %% def make_projector( use_natural_gradient: bool, space: AbstractApproxSpace, initial_condition: bool = False, ) -> AbstractNonlinearProjector: if use_natural_gradient: return NaturalGradientProjector(space, rhs=f_ini) else: opt_1 = { "name": "adam", "optimizer_args": {"lr": 2.5e-2, "betas": (0.9, 0.999)}, } opt_2 = { "name": "lbfgs", "switch_at_epoch_ratio": 0.9, "optimizer_args": { "history_size": 50, "max_iter": 50, "tolerance_grad": 1e-11, "tolerance_change": 1e-9, }, } if initial_condition: opt = OptimizerData(opt_1, opt_2) else: opt = OptimizerData(opt_2) return CollocationProjector(space, optimizers=opt, rhs=f_ini) def solve_with_neural_sl( T: float, dt: float, test_case: int, retrain_initial_condition: bool = False, verbose: bool = False, ) -> NeuralSemiLagrangian: torch.random.manual_seed(0) assert test_case in [0, 1, 2, 3], "test_case must be 0, 1, 2 or 3" file_name = f"transport_3D_cylinder_test_case_{test_case}" use_natural_gradient = test_case >= 1 use_adaptive_sampling = test_case >= 1 sampler_unif = TensorizedSampler([DomainSampler(DOMAIN), MU_SAMPLER]) layer_sizes = [20, 40, 20] if test_case == 1 else [60, 60, 60] if test_case == 0: epochs = 100 elif test_case == 1 or test_case == 2: epochs = 50 else: epochs = 150 epochs_init = 150 if use_natural_gradient else 1_000 n_collocation = 50_000 description = {} description["use_natural_gradient"] = use_natural_gradient description["use_adaptive_sampling"] = use_adaptive_sampling description["layer_sizes"] = layer_sizes description["epochs"] = epochs description["epochs_init"] = epochs_init description["n_collocation"] = n_collocation description["file_name"] = file_name space = NNxSpace(1, 2, GenericMLP, DOMAIN, sampler_unif, layer_sizes=layer_sizes) def grad_for_sample(x, mu): x.x.requires_grad_() w = space.evaluate(x, mu).w w_x, w_y, w_z = space.grad(w, x) grad_w2 = w_x**2 + w_y**2 + w_z**2 pm_one = torch.randint(0, 2, (grad_w2.shape[0], 1)) * 2 - 1 res = pm_one / (1e-2 + (w_x**2 + w_y**2 + w_z**2)) return MultiLabelTensor(res) if use_adaptive_sampling: sampler = TensorizedSampler( [NonUniformSampler(grad_for_sample, DOMAIN, MU_SAMPLER), MU_SAMPLER] ) space.integrator = sampler else: sampler = sampler_unif pde = AdvectionReactionDiffusionDirichletStrongForm( space, a=a, u0=f_ini, constant_advection=False, zero_diffusion=True ) characteristic = Characteristic(pde, exact_foot=exact_foot, periodic=True) CPU_times = {} projector = make_projector(use_natural_gradient, space) projector.load(f"4_{file_name}") projector.space.load_from_best_approx() scheme = NeuralSemiLagrangian(characteristic, projector) start = time.perf_counter() CPU_times["solve"] = time.perf_counter() - start return { "scheme": scheme, "sampler": sampler, "CPU_times": CPU_times, "description": description, } # %% T_end = LENGTH * Z_VELOCITY dt = T_end / 4 # %% results = [] for test_case in [0, 1, 2, 3]: results.append( solve_with_neural_sl(T_end, dt, test_case, retrain_initial_condition=True) ) # %% n_integration = 1_000_000 T = LabelTensor(torch.ones((n_integration, 1)) * T_end) uniform_sampler = results[0]["sampler"] torch.random.manual_seed(0) n_random = 2_000 L2_errors = torch.ones((n_random, 4)) Linf_errors = torch.ones((n_random, 4)) bounds = torch.tensor(PARAMETER_BOUNDS) for i in range(n_random): x, _ = uniform_sampler.sample(n_integration) mu = bounds[:, 0] + (bounds[:, 1] - bounds[:, 0]) * torch.rand(2) mu = LabelTensor(torch.ones((x.shape[0], 2)) * mu) print(f"Random sample {i + 1}/{n_random}") for i_res, res in enumerate(results): predicted = res["scheme"].pde.space.evaluate(x, mu).w expected = exact(T, x, mu) relative_error = torch.abs(predicted - expected) / expected L2_errors[i, i_res] = (relative_error**2).mean().item() ** 0.5 Linf_errors[i, i_res] = relative_error.max().item() torch.save(L2_errors, "transport_3D_cylinder_L2_error.pt") torch.save(Linf_errors, "transport_3D_cylinder_Linf_error.pt") # %% L2_errors = torch.load("transport_3D_cylinder_L2_error.pt") Linf_errors = torch.load("transport_3D_cylinder_Linf_error.pt") for test_case in [0, 1, 2, 3]: err = L2_errors[:, test_case] min, max = err.min().item(), err.max().item() avg, std = err.mean().item(), err.std().item() print(f"L2 errors, test {test_case}") print(f"{min:.2e}, {avg:.2e}, {max:.2e}, {std:.2e}") err = Linf_errors[:, test_case] min, max = err.min().item(), err.max().item() avg, std = err.mean().item(), err.std().item() print(f"Linf errors, test {test_case}") print(f"{min:.2e}, {avg:.2e}, {max:.2e}, {std:.2e}") # %% for res in results: space = res["scheme"].pde.space sampler = res["sampler"] plot_results(space.evaluate, sampler.sample, T=T_end) # %% space = results[-1]["scheme"].pde.space sampler = results[-1]["sampler"] plot_results(space.evaluate, sampler.sample, T=T_end, n_visu=5_000, to_save=True) # %%