Time discrete: transport in a cylinderΒΆ
Solves the advection of a 3D parametric bump function in a cylinder.
\[\partial_t u + a \cdot \nabla u = 0\]
where \(u: \mathbb{R}^3 \times (0, T) \times \mathbb{R}^2 \to \mathbb{R}\) is the unknown function, depending on space, time, and two parameters (the position and the variance of the initial bump).
The equation is solved using the neural semi-Lagrangian scheme, with either a classical Adam optimizer, or a natural gradient preconditioning.
[1]:
# %%
import time
import matplotlib.pyplot as plt
import torch
from scimba_torch import PI
from scimba_torch.approximation_space.nn_space import NNxSpace
from scimba_torch.domain.meshless_domain.domain_3d import Cylinder3D
from scimba_torch.integration.monte_carlo import DomainSampler, TensorizedSampler
from scimba_torch.integration.monte_carlo_parameters import UniformParametricSampler
from scimba_torch.neural_nets.coordinates_based_nets.mlp import GenericMLP
from scimba_torch.numerical_solvers.collocation_projector import (
AbstractNonlinearProjector,
)
from scimba_torch.numerical_solvers.temporal_pde.neural_semilagrangian import (
Characteristic,
NeuralSemiLagrangian,
)
from scimba_torch.numerical_solvers.temporal_pde.time_discrete import (
TimeDiscreteCollocationProjector,
TimeDiscreteNaturalGradientProjector,
)
from scimba_torch.optimizers.optimizers_data import OptimizerData
from scimba_torch.physical_models.temporal_pde.advection_diffusion_equation import (
AdvectionReactionDiffusionDirichletStrongForm,
)
from scimba_torch.utils.scimba_tensors import LabelTensor, MultiLabelTensor
RADIUS = 1
LENGTH = 2
Z_VELOCITY = 1
DOMAIN = Cylinder3D(RADIUS, LENGTH, is_main_domain=True)
PARAMETER_BOUNDS = [[0.3, 0.5], [0.05, 0.15]]
MU_PLOT = torch.tensor([0.4, 0.1])
MU_SAMPLER = UniformParametricSampler(PARAMETER_BOUNDS)
def f_ini(x, mu):
return exact(LabelTensor(torch.zeros((x.shape[0], 1))), x, mu)
def exact(t, x, mu):
def bump(x1, x2, x3, v):
return torch.exp(-(x1**2 + x2**2 + x3**2) / (2 * v**2))
x1, x2, x3 = x.get_components()
c, v = mu.get_components()
t_ = t.get_components()
c_z = LENGTH / 2
x1_t = torch.cos(2 * PI * t_)
x2_t = torch.sin(2 * PI * t_)
x1_ = x1 - c * x1_t
x2_ = x2 - c * x2_t
x3_ = torch.remainder((x3 - Z_VELOCITY * t_), LENGTH) - c_z
return 1 + bump(x1_, x2_, x3_, v)
def a(t, x, mu):
x1, x2, x3 = x.get_components()
return torch.cat((-2 * PI * x2, 2 * PI * x1, Z_VELOCITY * x3), dim=1)
def exact_foot(
t: torch.Tensor, x: LabelTensor, mu: LabelTensor, dt: float
) -> torch.Tensor:
x1, x2, x3 = x.get_components()
arg = 2 * PI * t
c_n = torch.cos(arg)
s_n = torch.sin(arg)
k1 = x1 * s_n + x2 * c_n
k2 = x1 * c_n - x2 * s_n
arg_np1 = 2 * PI * (t + dt)
c_np1 = torch.cos(arg_np1)
s_np1 = torch.sin(arg_np1)
x1_np1 = k1 * s_np1 + k2 * c_np1
x2_np1 = -k2 * s_np1 + k1 * c_np1
x3_np1 = x3 - Z_VELOCITY * dt
return torch.cat((x1_np1, x2_np1, x3_np1), dim=1)
def plot_results(f, sampler, T, iter=None, n_visu=20_000):
fig, axes = plt.subplots(2, 2, figsize=(12, 12), constrained_layout=True)
# plot in z-plane
x, mu = sampler(n_visu)
x1, x2, _ = x.get_components()
x3 = torch.ones_like(x1) * LENGTH / 2 + Z_VELOCITY * T
x3 = torch.remainder(x3, 2)
x = LabelTensor(torch.cat((x1, x2, x3), dim=1))
mu = LabelTensor(torch.ones((x.shape[0], 2)) * MU_PLOT)
predictions = f(x, mu).w.detach().cpu()
x1, x2 = x1.detach().cpu(), x2.detach().cpu()
scatter1 = axes[0, 0].scatter(x1, x2, c=predictions, s=0.5, cmap="turbo")
plane = (LENGTH / 2 + Z_VELOCITY * T) % 2
axes[0, 0].set_title(f"Predictions, plane z={plane}")
axes[0, 0].set_aspect("equal")
fig.colorbar(scatter1, ax=axes[0, 0], fraction=0.046, pad=0.04)
T_ = LabelTensor(torch.ones((x.shape[0], 1)) * T)
expected = exact(T_, x, mu).detach().cpu()
error = torch.abs(predictions - expected) / expected
error = error.detach().cpu()
scatter2 = axes[0, 1].scatter(x1, x2, c=error, s=0.5, cmap="turbo")
axes[0, 1].set_title("Error")
axes[0, 1].set_aspect("equal")
fig.colorbar(scatter2, ax=axes[0, 1], fraction=0.046, pad=0.04)
# plot in x-plane
x, mu = sampler(n_visu)
x1, _, x3 = x.get_components()
x = LabelTensor(torch.cat((x1, torch.zeros_like(x3), x3), dim=1))
mu = LabelTensor(torch.ones((x.shape[0], 2)) * MU_PLOT)
predictions = f(x, mu).w.detach().cpu()
x1, x3 = x1.detach().cpu(), x3.detach().cpu()
scatter1 = axes[1, 0].scatter(x1, x3, c=predictions, s=0.5, cmap="turbo")
axes[1, 0].set_title(f"Predictions, plane y={0}")
axes[1, 0].set_aspect("equal")
fig.colorbar(scatter1, ax=axes[1, 0], fraction=0.046, pad=0.04)
T_ = LabelTensor(torch.ones((x.shape[0], 1)) * T)
expected = exact(T_, x, mu).detach().cpu()
error = torch.abs(predictions - expected) / expected
error = error.detach().cpu()
scatter2 = axes[1, 1].scatter(x1, x3, c=error, s=0.5, cmap="turbo")
axes[1, 1].set_title("Error")
axes[1, 1].set_aspect("equal")
fig.colorbar(scatter2, ax=axes[1, 1], fraction=0.046, pad=0.04)
plt.show()
# %%
class NonUniformSampler(DomainSampler):
def __init__(self, f, domain, mu_sampler, **kwargs):
super().__init__(domain, **kwargs)
self.f = f
self.mu_sampler = mu_sampler
def sample_around_zeroes(self, n, sigma=0.05, keep=0.75):
candidates = LabelTensor(self.vol_sampler.sample(10 * n))
c1, c2, c3 = candidates.get_components()
parameters = self.mu_sampler.sample(10 * n)
f_values = self.f(candidates, parameters).w[:, 0]
weights = torch.exp(-(f_values**2) / (2 * sigma**2))
return candidates[weights > keep].x
def sample_new_points(self, n: int):
"""Samples `n` new points from the volumetric domain and labels them by subdomains.
Args:
n (int): Number of points to sample.
Returns:
ScimbaTensor: A tensor of sampled points with their subdomain labels.
"""
pts_1 = self.sample_around_zeroes(n, sigma=0.5)
if pts_1.shape[0] >= n // 4:
pts_1 = pts_1[: n // 4]
pts_2 = self.sample_around_zeroes(n, sigma=10)
pts_2 = torch.cat((pts_1, pts_2), dim=0)
if pts_2.shape[0] >= n // 2:
pts_2 = pts_2[: n // 2]
pts_3 = self.sample_around_zeroes(n, sigma=500)
pts_3 = torch.cat((pts_2, pts_3), dim=0)
if pts_3.shape[0] >= 3 * n // 4:
pts_3 = pts_3[: 3 * n // 4]
remaining = n - pts_3.shape[0]
pts_unif = self.vol_sampler.sample(remaining)
pts = torch.cat((pts_3, pts_unif), dim=0)
labels = torch.zeros(n, dtype=int)
# Label points based on subdomains
for i, subdomain in enumerate(self.domain.list_subdomains):
condition = subdomain.is_inside(pts)
labels[condition] = i + 1
# Wrap the points and labels in a ScimbaTensor
data = LabelTensor(pts, labels)
data.x.requires_grad_()
return data
# %%
def make_projector(
use_natural_gradient: bool,
pde,
initial_condition: bool = False,
) -> AbstractNonlinearProjector:
if use_natural_gradient:
return TimeDiscreteNaturalGradientProjector(pde, rhs=f_ini)
else:
opt_1 = {
"name": "adam",
"optimizer_args": {"lr": 2.5e-2, "betas": (0.9, 0.999)},
}
opt_2 = {
"name": "lbfgs",
"switch_at_epoch_ratio": 0.9,
"optimizer_args": {
"history_size": 50,
"max_iter": 50,
"tolerance_grad": 1e-11,
"tolerance_change": 1e-9,
},
}
if initial_condition:
opt = OptimizerData(opt_1, opt_2)
else:
opt = OptimizerData(opt_2)
return TimeDiscreteCollocationProjector(pde, rhs=f_ini, optimizers=opt)
def solve_with_neural_sl(
T: float,
dt: float,
test_case: int,
retrain_initial_condition: bool = False,
verbose: bool = False,
) -> NeuralSemiLagrangian:
torch.random.manual_seed(0)
assert test_case in [0, 1, 2, 3], "test_case must be 0, 1, 2 or 3"
file_name = f"transport_3D_cylinder_test_case_{test_case}"
use_natural_gradient = test_case >= 1
use_adaptive_sampling = test_case >= 1
sampler_unif = TensorizedSampler([DomainSampler(DOMAIN), MU_SAMPLER])
layer_sizes = [20, 40, 20] if test_case == 1 else [60, 60, 60]
if test_case == 0:
epochs = 100
elif test_case == 1 or test_case == 2:
epochs = 50
else:
epochs = 200
if test_case == 0:
epochs_init = 1_000
elif test_case == 1 or test_case == 2:
epochs_init = 150
else:
epochs_init = 500
if test_case <= 2:
n_collocation = 50_000
else:
n_collocation = 150_000
# layer_sizes = [2, 4, 2] if test_case == 1 else [6, 6, 6]
# epochs = 5 if test_case <= 2 else 10
# epochs_init = 10 if use_natural_gradient else 15
# n_collocation = 50
description = {}
description["use_natural_gradient"] = use_natural_gradient
description["use_adaptive_sampling"] = use_adaptive_sampling
description["layer_sizes"] = layer_sizes
description["epochs"] = epochs
description["epochs_init"] = epochs_init
description["n_collocation"] = n_collocation
description["file_name"] = file_name
space = NNxSpace(1, 2, GenericMLP, DOMAIN, sampler_unif, layer_sizes=layer_sizes)
def grad_for_sample(x, mu):
x.x.requires_grad_()
w = space.evaluate(x, mu).w
w_x, w_y, w_z = space.grad(w, x)
grad_w2 = w_x**2 + w_y**2 + w_z**2
pm_one = torch.randint(0, 2, (grad_w2.shape[0], 1)) * 2 - 1
res = pm_one / (1e-2 + (w_x**2 + w_y**2 + w_z**2))
return MultiLabelTensor(res)
if use_adaptive_sampling:
sampler = TensorizedSampler(
[NonUniformSampler(grad_for_sample, DOMAIN, MU_SAMPLER), MU_SAMPLER]
)
space.integrator = sampler
else:
sampler = sampler_unif
pde = AdvectionReactionDiffusionDirichletStrongForm(
space, a=a, u0=f_ini, constant_advection=False, zero_diffusion=True
)
characteristic = Characteristic(pde, exact_foot=exact_foot, periodic=True)
CPU_times = {}
if retrain_initial_condition:
projector = make_projector(use_natural_gradient, pde, initial_condition=True)
scheme = NeuralSemiLagrangian(characteristic, projector)
start = time.perf_counter()
scheme.initialization(
epochs=epochs_init, n_collocation=n_collocation, verbose=verbose
)
CPU_times["ini"] = time.perf_counter() - start
scheme.projector.save(f"ini_{file_name}")
if verbose:
plot_results(space.evaluate, sampler.sample, T=0)
projector = make_projector(use_natural_gradient, pde)
projector.load(f"ini_{file_name}")
projector.space.load_from_best_approx()
scheme = NeuralSemiLagrangian(characteristic, projector)
start = time.perf_counter()
scheme.solve(
dt=dt,
final_time=T,
epochs=epochs,
n_collocation=n_collocation,
verbose=verbose,
plot=plot_results if verbose else None,
save=file_name,
sol_exact=exact,
)
CPU_times["solve"] = time.perf_counter() - start
return {
"scheme": scheme,
"sampler": sampler,
"CPU_times": CPU_times,
"description": description,
}
# %%
T = LENGTH * Z_VELOCITY
dt = T / 4
# %%
results = []
Uncomment the following to run the example (it can be long).
[2]:
# for test_case in [0, 1, 2, 3]:
# results.append(
# solve_with_neural_sl(T, dt, test_case, retrain_initial_condition=True)
# )
# %%