Time discrete: transport in a cylinderΒΆ

Solves the advection of a 3D parametric bump function in a cylinder.

\[\partial_t u + a \cdot \nabla u = 0\]

where \(u: \mathbb{R}^3 \times (0, T) \times \mathbb{R}^2 \to \mathbb{R}\) is the unknown function, depending on space, time, and two parameters (the position and the variance of the initial bump).

The equation is solved using the neural semi-Lagrangian scheme, with either a classical Adam optimizer, or a natural gradient preconditioning.

[1]:
# %%

import time

import matplotlib.pyplot as plt
import torch

from scimba_torch import PI
from scimba_torch.approximation_space.nn_space import NNxSpace
from scimba_torch.domain.meshless_domain.domain_3d import Cylinder3D
from scimba_torch.integration.monte_carlo import DomainSampler, TensorizedSampler
from scimba_torch.integration.monte_carlo_parameters import UniformParametricSampler
from scimba_torch.neural_nets.coordinates_based_nets.mlp import GenericMLP
from scimba_torch.numerical_solvers.collocation_projector import (
    AbstractNonlinearProjector,
)
from scimba_torch.numerical_solvers.temporal_pde.neural_semilagrangian import (
    Characteristic,
    NeuralSemiLagrangian,
)
from scimba_torch.numerical_solvers.temporal_pde.time_discrete import (
    TimeDiscreteCollocationProjector,
    TimeDiscreteNaturalGradientProjector,
)
from scimba_torch.optimizers.optimizers_data import OptimizerData
from scimba_torch.physical_models.temporal_pde.advection_diffusion_equation import (
    AdvectionReactionDiffusionDirichletStrongForm,
)
from scimba_torch.utils.scimba_tensors import LabelTensor, MultiLabelTensor

RADIUS = 1
LENGTH = 2
Z_VELOCITY = 1
DOMAIN = Cylinder3D(RADIUS, LENGTH, is_main_domain=True)

PARAMETER_BOUNDS = [[0.3, 0.5], [0.05, 0.15]]
MU_PLOT = torch.tensor([0.4, 0.1])
MU_SAMPLER = UniformParametricSampler(PARAMETER_BOUNDS)


def f_ini(x, mu):
    return exact(LabelTensor(torch.zeros((x.shape[0], 1))), x, mu)


def exact(t, x, mu):
    def bump(x1, x2, x3, v):
        return torch.exp(-(x1**2 + x2**2 + x3**2) / (2 * v**2))

    x1, x2, x3 = x.get_components()
    c, v = mu.get_components()
    t_ = t.get_components()

    c_z = LENGTH / 2

    x1_t = torch.cos(2 * PI * t_)
    x2_t = torch.sin(2 * PI * t_)

    x1_ = x1 - c * x1_t
    x2_ = x2 - c * x2_t
    x3_ = torch.remainder((x3 - Z_VELOCITY * t_), LENGTH) - c_z
    return 1 + bump(x1_, x2_, x3_, v)


def a(t, x, mu):
    x1, x2, x3 = x.get_components()
    return torch.cat((-2 * PI * x2, 2 * PI * x1, Z_VELOCITY * x3), dim=1)


def exact_foot(
    t: torch.Tensor, x: LabelTensor, mu: LabelTensor, dt: float
) -> torch.Tensor:
    x1, x2, x3 = x.get_components()

    arg = 2 * PI * t
    c_n = torch.cos(arg)
    s_n = torch.sin(arg)

    k1 = x1 * s_n + x2 * c_n
    k2 = x1 * c_n - x2 * s_n

    arg_np1 = 2 * PI * (t + dt)
    c_np1 = torch.cos(arg_np1)
    s_np1 = torch.sin(arg_np1)

    x1_np1 = k1 * s_np1 + k2 * c_np1
    x2_np1 = -k2 * s_np1 + k1 * c_np1

    x3_np1 = x3 - Z_VELOCITY * dt

    return torch.cat((x1_np1, x2_np1, x3_np1), dim=1)


def plot_results(f, sampler, T, iter=None, n_visu=20_000):
    fig, axes = plt.subplots(2, 2, figsize=(12, 12), constrained_layout=True)

    # plot in z-plane

    x, mu = sampler(n_visu)
    x1, x2, _ = x.get_components()

    x3 = torch.ones_like(x1) * LENGTH / 2 + Z_VELOCITY * T
    x3 = torch.remainder(x3, 2)

    x = LabelTensor(torch.cat((x1, x2, x3), dim=1))
    mu = LabelTensor(torch.ones((x.shape[0], 2)) * MU_PLOT)

    predictions = f(x, mu).w.detach().cpu()

    x1, x2 = x1.detach().cpu(), x2.detach().cpu()

    scatter1 = axes[0, 0].scatter(x1, x2, c=predictions, s=0.5, cmap="turbo")
    plane = (LENGTH / 2 + Z_VELOCITY * T) % 2
    axes[0, 0].set_title(f"Predictions, plane z={plane}")
    axes[0, 0].set_aspect("equal")
    fig.colorbar(scatter1, ax=axes[0, 0], fraction=0.046, pad=0.04)

    T_ = LabelTensor(torch.ones((x.shape[0], 1)) * T)
    expected = exact(T_, x, mu).detach().cpu()
    error = torch.abs(predictions - expected) / expected
    error = error.detach().cpu()

    scatter2 = axes[0, 1].scatter(x1, x2, c=error, s=0.5, cmap="turbo")
    axes[0, 1].set_title("Error")
    axes[0, 1].set_aspect("equal")
    fig.colorbar(scatter2, ax=axes[0, 1], fraction=0.046, pad=0.04)

    # plot in x-plane

    x, mu = sampler(n_visu)
    x1, _, x3 = x.get_components()
    x = LabelTensor(torch.cat((x1, torch.zeros_like(x3), x3), dim=1))
    mu = LabelTensor(torch.ones((x.shape[0], 2)) * MU_PLOT)

    predictions = f(x, mu).w.detach().cpu()

    x1, x3 = x1.detach().cpu(), x3.detach().cpu()

    scatter1 = axes[1, 0].scatter(x1, x3, c=predictions, s=0.5, cmap="turbo")
    axes[1, 0].set_title(f"Predictions, plane y={0}")
    axes[1, 0].set_aspect("equal")
    fig.colorbar(scatter1, ax=axes[1, 0], fraction=0.046, pad=0.04)

    T_ = LabelTensor(torch.ones((x.shape[0], 1)) * T)
    expected = exact(T_, x, mu).detach().cpu()
    error = torch.abs(predictions - expected) / expected
    error = error.detach().cpu()

    scatter2 = axes[1, 1].scatter(x1, x3, c=error, s=0.5, cmap="turbo")
    axes[1, 1].set_title("Error")
    axes[1, 1].set_aspect("equal")
    fig.colorbar(scatter2, ax=axes[1, 1], fraction=0.046, pad=0.04)

    plt.show()


# %%


class NonUniformSampler(DomainSampler):
    def __init__(self, f, domain, mu_sampler, **kwargs):
        super().__init__(domain, **kwargs)
        self.f = f
        self.mu_sampler = mu_sampler

    def sample_around_zeroes(self, n, sigma=0.05, keep=0.75):
        candidates = LabelTensor(self.vol_sampler.sample(10 * n))
        c1, c2, c3 = candidates.get_components()
        parameters = self.mu_sampler.sample(10 * n)
        f_values = self.f(candidates, parameters).w[:, 0]
        weights = torch.exp(-(f_values**2) / (2 * sigma**2))
        return candidates[weights > keep].x

    def sample_new_points(self, n: int):
        """Samples `n` new points from the volumetric domain and labels them by subdomains.

        Args:
            n (int): Number of points to sample.

        Returns:
            ScimbaTensor: A tensor of sampled points with their subdomain labels.
        """
        pts_1 = self.sample_around_zeroes(n, sigma=0.5)

        if pts_1.shape[0] >= n // 4:
            pts_1 = pts_1[: n // 4]

        pts_2 = self.sample_around_zeroes(n, sigma=10)
        pts_2 = torch.cat((pts_1, pts_2), dim=0)

        if pts_2.shape[0] >= n // 2:
            pts_2 = pts_2[: n // 2]

        pts_3 = self.sample_around_zeroes(n, sigma=500)
        pts_3 = torch.cat((pts_2, pts_3), dim=0)

        if pts_3.shape[0] >= 3 * n // 4:
            pts_3 = pts_3[: 3 * n // 4]

        remaining = n - pts_3.shape[0]
        pts_unif = self.vol_sampler.sample(remaining)

        pts = torch.cat((pts_3, pts_unif), dim=0)
        labels = torch.zeros(n, dtype=int)

        # Label points based on subdomains
        for i, subdomain in enumerate(self.domain.list_subdomains):
            condition = subdomain.is_inside(pts)
            labels[condition] = i + 1

        # Wrap the points and labels in a ScimbaTensor
        data = LabelTensor(pts, labels)
        data.x.requires_grad_()
        return data


# %%


def make_projector(
    use_natural_gradient: bool,
    pde,
    initial_condition: bool = False,
) -> AbstractNonlinearProjector:
    if use_natural_gradient:
        return TimeDiscreteNaturalGradientProjector(pde, rhs=f_ini)

    else:
        opt_1 = {
            "name": "adam",
            "optimizer_args": {"lr": 2.5e-2, "betas": (0.9, 0.999)},
        }
        opt_2 = {
            "name": "lbfgs",
            "switch_at_epoch_ratio": 0.9,
            "optimizer_args": {
                "history_size": 50,
                "max_iter": 50,
                "tolerance_grad": 1e-11,
                "tolerance_change": 1e-9,
            },
        }
        if initial_condition:
            opt = OptimizerData(opt_1, opt_2)
        else:
            opt = OptimizerData(opt_2)

        return TimeDiscreteCollocationProjector(pde, rhs=f_ini, optimizers=opt)


def solve_with_neural_sl(
    T: float,
    dt: float,
    test_case: int,
    retrain_initial_condition: bool = False,
    verbose: bool = False,
) -> NeuralSemiLagrangian:
    torch.random.manual_seed(0)

    assert test_case in [0, 1, 2, 3], "test_case must be 0, 1, 2 or 3"

    file_name = f"transport_3D_cylinder_test_case_{test_case}"

    use_natural_gradient = test_case >= 1
    use_adaptive_sampling = test_case >= 1

    sampler_unif = TensorizedSampler([DomainSampler(DOMAIN), MU_SAMPLER])

    layer_sizes = [20, 40, 20] if test_case == 1 else [60, 60, 60]

    if test_case == 0:
        epochs = 100
    elif test_case == 1 or test_case == 2:
        epochs = 50
    else:
        epochs = 200

    if test_case == 0:
        epochs_init = 1_000
    elif test_case == 1 or test_case == 2:
        epochs_init = 150
    else:
        epochs_init = 500

    if test_case <= 2:
        n_collocation = 50_000
    else:
        n_collocation = 150_000

    # layer_sizes = [2, 4, 2] if test_case == 1 else [6, 6, 6]
    # epochs = 5 if test_case <= 2 else 10
    # epochs_init = 10 if use_natural_gradient else 15
    # n_collocation = 50

    description = {}
    description["use_natural_gradient"] = use_natural_gradient
    description["use_adaptive_sampling"] = use_adaptive_sampling
    description["layer_sizes"] = layer_sizes
    description["epochs"] = epochs
    description["epochs_init"] = epochs_init
    description["n_collocation"] = n_collocation
    description["file_name"] = file_name

    space = NNxSpace(1, 2, GenericMLP, DOMAIN, sampler_unif, layer_sizes=layer_sizes)

    def grad_for_sample(x, mu):
        x.x.requires_grad_()
        w = space.evaluate(x, mu).w
        w_x, w_y, w_z = space.grad(w, x)
        grad_w2 = w_x**2 + w_y**2 + w_z**2
        pm_one = torch.randint(0, 2, (grad_w2.shape[0], 1)) * 2 - 1
        res = pm_one / (1e-2 + (w_x**2 + w_y**2 + w_z**2))
        return MultiLabelTensor(res)

    if use_adaptive_sampling:
        sampler = TensorizedSampler(
            [NonUniformSampler(grad_for_sample, DOMAIN, MU_SAMPLER), MU_SAMPLER]
        )
        space.integrator = sampler
    else:
        sampler = sampler_unif

    pde = AdvectionReactionDiffusionDirichletStrongForm(
        space, a=a, u0=f_ini, constant_advection=False, zero_diffusion=True
    )
    characteristic = Characteristic(pde, exact_foot=exact_foot, periodic=True)

    CPU_times = {}

    if retrain_initial_condition:
        projector = make_projector(use_natural_gradient, pde, initial_condition=True)
        scheme = NeuralSemiLagrangian(characteristic, projector)

        start = time.perf_counter()
        scheme.initialization(
            epochs=epochs_init, n_collocation=n_collocation, verbose=verbose
        )
        CPU_times["ini"] = time.perf_counter() - start

        scheme.projector.save(f"ini_{file_name}")
        if verbose:
            plot_results(space.evaluate, sampler.sample, T=0)

    projector = make_projector(use_natural_gradient, pde)
    projector.load(f"ini_{file_name}")
    projector.space.load_from_best_approx()
    scheme = NeuralSemiLagrangian(characteristic, projector)

    start = time.perf_counter()
    scheme.solve(
        dt=dt,
        final_time=T,
        epochs=epochs,
        n_collocation=n_collocation,
        verbose=verbose,
        plot=plot_results if verbose else None,
        save=file_name,
        sol_exact=exact,
    )
    CPU_times["solve"] = time.perf_counter() - start

    return {
        "scheme": scheme,
        "sampler": sampler,
        "CPU_times": CPU_times,
        "description": description,
    }


# %%

T = LENGTH * Z_VELOCITY
dt = T / 4

# %%

results = []

Uncomment the following to run the example (it can be long).

[2]:
# for test_case in [0, 1, 2, 3]:
#    results.append(
#        solve_with_neural_sl(T, dt, test_case, retrain_initial_condition=True)
#    )

# %%