Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 189 additions & 0 deletions pySDC/playgrounds/time_dep_BCs/heat_eq_time_dep_BCs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import numpy as np

from pySDC.core.problem import Problem
from pySDC.implementations.datatype_classes.mesh import mesh
from pySDC.implementations.problem_classes.generic_spectral import GenericSpectralLinear


class GenericSpectralLinearTimeDepBCs(GenericSpectralLinear):
def solve_system(self, rhs, dt, u0=None, t=0, *args, **kwargs):
"""
Do an implicit Euler step to solve M u_t + Lu = rhs, with M the mass matrix and L the linear operator as setup by
``GenericSpectralLinear.setup_L`` and ``GenericSpectralLinear.setup_M``.

The implicit Euler step is (M - dt L) u = M rhs. Note that M need not be invertible as long as (M + dt*L) is.
This means solving with dt=0 to mimic explicit methods does not work for all problems, in particular simple DAEs.

Note that by putting M rhs on the right hand side, this function can only solve algebraic conditions equal to
zero. If you want something else, it should be easy to overload this function.
"""

self.heterogeneous_setup()

if self.spectral_space:
rhs_hat = rhs.copy()
else:
rhs_hat = self.spectral.transform(rhs)

rhs_hat = (self.M @ rhs_hat.flatten()).reshape(rhs_hat.shape)
rhs_hat = self.spectral.put_BCs_in_rhs_hat(rhs_hat)
self.put_time_dep_BCs_in_rhs(
rhs_hat, t
) # this line is the difference between this and the generic implementation
rhs_hat = self.Pl @ rhs_hat.flatten()

if dt not in self.cached_factorizations.keys():
A = self.M + dt * self.L
A = self.Pl @ self.spectral.put_BCs_in_matrix(A) @ self.Pr

if dt not in self.cached_factorizations.keys():
if len(self.cached_factorizations) >= self.max_cached_factorizations:
self.cached_factorizations.pop(list(self.cached_factorizations.keys())[0])
self.logger.debug(f'Evicted matrix factorization for {dt=:.6f} from cache')

solver = self.spectral.linalg.factorized(A)

self.cached_factorizations[dt] = solver
self.logger.debug(f'Cached matrix factorization for {dt=:.6f}')
self.work_counters['factorizations']()

_sol_hat = self.cached_factorizations[dt](rhs_hat)
self.work_counters[self.solver_type]()
self.logger.debug(f'Used cached matrix factorization for {dt=:.6f}')

sol_hat = self.spectral.u_init_forward
sol_hat[...] = (self.Pr @ _sol_hat).reshape(sol_hat.shape)

if self.spectral_space:
return sol_hat
else:
sol = self.spectral.u_init
sol[:] = self.spectral.itransform(sol_hat).real
return sol


class Heat1DTimeDependentBCs(GenericSpectralLinearTimeDepBCs):
"""
1D Heat equation with time-dependent Dirichlet Boundary conditions discretized on (-1, 1) using an ultraspherical spectral method.
"""

dtype_u = mesh
dtype_f = mesh

def __init__(self, nvars=128, a=1, b=2, f=1, nu=1e-2, ft=np.pi, **kwargs):
"""
Constructor. `kwargs` are forwarded to parent class constructor.

Args:
nvars (int): Resolution
a (float): Left BC value at t=0
b (float): Right BC value at t=0
f (int): Frequency of the solution
nu (float): Diffusion parameter
ft (int): frequency of the BCs in time
"""
self._makeAttributeAndRegister('nvars', 'a', 'b', 'f', 'nu', 'ft', localVars=locals(), readOnly=True)

bases = [{'base': 'ultraspherical', 'N': nvars}]
components = ['u']

GenericSpectralLinear.__init__(self, bases, components, real_spectral_coefficients=True, **kwargs)

self.x = self.get_grid()[0]

I = self.get_Id()
Dxx = self.get_differentiation_matrix(axes=(0,), p=2)

S2 = self.get_basis_change_matrix(p_in=2, p_out=0)
U2 = self.get_basis_change_matrix(p_in=0, p_out=2)

self.Dxx = S2 @ Dxx

L_lhs = {
'u': {'u': -nu * Dxx},
}
self.setup_L(L_lhs)

M_lhs = {'u': {'u': U2 @ I}}
self.setup_M(M_lhs)

self.add_BC(component='u', equation='u', axis=0, x=-1, v=a, kind="Dirichlet", line=-1)
self.add_BC(component='u', equation='u', axis=0, x=1, v=b, kind="Dirichlet", line=-2)
self.setup_BCs()

def eval_f(self, u, *args, **kwargs):
f = self.f_init
iu = self.index('u')

if self.spectral_space:
u_hat = u.copy()
else:
u_hat = self.transform(u)

u_hat[iu] = (self.nu * (self.Dxx @ u_hat[iu].flatten())).reshape(u_hat[iu].shape)

if self.spectral_space:
me = u_hat
else:
me = self.itransform(u_hat).real

f[iu][...] = me[iu]
return f

def u_exact(self, t=0):
"""
Get initial conditions

Args:
t (float): When you want the exact solution

Returns:
Heat1DUltraspherical.dtype_u: Exact solution
"""
assert t == 0

xp = self.xp
iu = self.index('u')
u = self.spectral.u_init_physical

u[iu] = (
xp.sin(np.pi * self.x) * xp.exp(-self.nu * (self.f * np.pi) ** 2 * t)
+ (self.b - self.a) / 2 * self.x
+ (self.b + self.a) / 2
)

if self.spectral_space:
u_hat = self.spectral.u_init_forward
u_hat[...] = self.transform(u)
u = u_hat

# apply BCs
u = self.solve_system(u, 1e-9, u, t)

return u

def put_time_dep_BCs_in_rhs(self, rhs_hat, t):
"""
Put the time dependent BCs in the right hand side.

See Section 2.5.10 in https://doi.org/10.15480/882.16360 for details on the implementation of the boundary conditions.

In this simple 1D case the BCs are simply in the last two lines of the problem, so we can put there whatever we want.
Note that in 2D you essentially do the same, but you need to unflatten the RHS, put the BCs in the last lines, and then reflatten.
"""
rhs_hat[0, -1] = self.a * self.xp.cos(t * self.ft)
rhs_hat[0, -2] = self.b * self.xp.cos(t * self.ft)
return rhs_hat

def get_fig(self):
import matplotlib.pyplot as plt

fig, ax = plt.subplots()
return fig

def plot(self, u, t, fig):
if self.spectral_space:
u = self.itransform(u)
ax = fig.get_axes()[0]
ax.cla()
ax.plot(self.x, u[0])
100 changes: 100 additions & 0 deletions pySDC/playgrounds/time_dep_BCs/run_time_dep_heat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import numpy as np
import matplotlib.pyplot as plt
from pySDC.helpers.stats_helper import get_sorted, get_list_of_types

from pySDC.playgrounds.time_dep_BCs.heat_eq_time_dep_BCs import Heat1DTimeDependentBCs
from pySDC.implementations.sweeper_classes.generic_implicit import generic_implicit
from pySDC.implementations.controller_classes.controller_nonMPI import controller_nonMPI
from pySDC.implementations.hooks.plotting import PlotPostStep


def run_heat(dt=1e-1, Tend=4, kmax=5, ft=np.pi, plotting=False):
level_params = {}
level_params['dt'] = dt

sweeper_params = {}
sweeper_params['quad_type'] = 'RADAU-RIGHT'
sweeper_params['num_nodes'] = 3
sweeper_params['QI'] = 'LU'

problem_params = {'ft': ft, 'spectral_space': False}

step_params = {}
step_params['maxiter'] = kmax

controller_params = {}
controller_params['logger_level'] = 30
if plotting:
controller_params['hook_class'] = PlotPostStep

description = {}
description['problem_class'] = Heat1DTimeDependentBCs
description['problem_params'] = problem_params
description['sweeper_class'] = generic_implicit
description['sweeper_params'] = sweeper_params
description['level_params'] = level_params
description['step_params'] = step_params

t0 = 0.0

controller = controller_nonMPI(num_procs=1, controller_params=controller_params, description=description)

P = controller.MS[0].levels[0].prob
uinit = P.u_exact(t0)

uend, stats = controller.run(u0=uinit, t0=t0, Tend=Tend)

return uend, stats, controller


def compute_errors(Tend, N, ft, kmax):
solutions = []
dts = []

for n in range(N):
dt = Tend / (2**n)
u, _, _ = run_heat(dt=dt, Tend=Tend, ft=ft, kmax=kmax, plotting=False)

solutions.append(u)
dts.append(dt)

errors = [abs(solutions[i] - solutions[-1]) / abs(solutions[-1]) for i in range(N - 1)]
_dts = [dts[i] for i in range(N - 1)]

return _dts, errors


def compute_order(dts, errors):
orders = []
for i in range(len(errors) - 1):
if errors[i + 1] < 1e-12:
break
orders.append(np.log(errors[i] / errors[i + 1]) / np.log(dts[i] / dts[i + 1]))
return np.median(orders)


def plot_order(): # pragma: no cover
fig, axs = plt.subplots(1, 2, sharex=True, sharey=True, figsize=(9, 4))

for k in range(1, 10):
dts, errors = compute_errors(4, 8, 0, k)
order = compute_order(dts, errors)
axs[0].loglog(dts, errors, label=f'{k=}, {order=:.1f}')

dts, errors = compute_errors(4, 8, 1, k)
order = compute_order(dts, errors)
axs[1].loglog(dts, errors, label=f'{k=}, {order=:.1f}')

axs[0].set_ylabel(r'relative global error')
axs[0].set_xlabel(r'$\Delta t$')
axs[1].set_xlabel(r'$\Delta t$')
axs[0].legend(frameon=False)
axs[1].legend(frameon=False)
axs[0].set_title('Constant in time BCs')
axs[1].set_title('Time-dependent BCs')
fig.tight_layout()


if __name__ == '__main__':
plot_order()
plt.show()
34 changes: 34 additions & 0 deletions pySDC/playgrounds/time_dep_BCs/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import pytest


@pytest.mark.parametrize('a', [1, 3.14])
@pytest.mark.parametrize('b', [-1, -3.14])
@pytest.mark.parametrize('t', [1, 3.14])
def test_time_dep_heat_eq(a, b, t):
from pySDC.playgrounds.time_dep_BCs.heat_eq_time_dep_BCs import Heat1DTimeDependentBCs
import numpy as np

problem = Heat1DTimeDependentBCs(a=a, b=b, ft=1)
u0 = problem.u_exact(0)

u = problem.solve_system(u0, t, u0, t)

if not problem.spectral_space:
u = problem.transform(u)

expect_boundary = np.empty((2, 2))
expect_boundary = problem.put_time_dep_BCs_in_rhs(expect_boundary, t)

# we use T_n(1) = 1 and T_n(-1) = (-1)^n, to compute the values at the boundaries from the spectral representation
# see Wikipedia for more details: https://en.wikipedia.org/wiki/Chebyshev_polynomials#Roots_and_extrema
right_boundary = u.sum()
expect_right_boundary = expect_boundary[0, -2]
assert np.isclose(
right_boundary, expect_right_boundary
), f'Got {right_boundary} at right boundary but expected {expect_right_boundary} at {t=}'

left_boundary = u[0, ::2].sum() - u[0, 1::2].sum()
expect_left_boundary = expect_boundary[0, -1]
assert np.isclose(
left_boundary, expect_left_boundary
), f'Got {left_boundary} at left boundary but expected {expect_left_boundary} at {t=}'
Loading