from .._core.tendencystepper import TendencyStepper
from .._core.dataarray import DataArray
from .._core.state import copy_untouched_quantities, add, multiply
class SSPRungeKutta(TendencyStepper):
"""
A TendencyStepper using the Strong Stability Preserving Runge-Kutta scheme,
as in Numerical Methods for Fluid Dynamics by Dale Durran (2nd ed) and
as proposed by Shu and Osher (1988).
"""
def __init__(self, *args, **kwargs):
"""
Initialize a strong stability preserving Runge-Kutta time stepper.
Args
----
*args : TendencyComponent or ImplicitTendencyComponent
Objects to call for tendencies when doing time stepping.
stages: int, optional
Number of stages to use. Should be 2 or 3. Default is 3.
"""
stages = kwargs.pop('stages', 3)
if stages not in (2, 3):
raise ValueError(
'stages must be one of 2 or 3, received {}'.format(stages))
self._stages = stages
self._euler_stepper = AdamsBashforth(*args, order=1)
super(SSPRungeKutta, self).__init__(*args, **kwargs)
def _call(self, state, timestep):
"""
Updates the input state dictionary and returns a new state corresponding
to the next timestep.
Args
----
state : dict
The current model state. Will be updated in-place by
the call with any diagnostics from the current timestep.
timestep : timedelta
The amount of time to step forward.
Returns
-------
diagnostics : dict
Diagnostics from the timestep of the input state.
new_state : dict
The model state at the next timestep.
"""
if self._stages == 2:
return self._step_2_stages(state, timestep)
elif self._stages == 3:
return self._step_3_stages(state, timestep)
def _step_3_stages(self, state, timestep):
diagnostics, state_1 = self._euler_stepper(state, timestep)
_, state_1_5 = self._euler_stepper(state_1, timestep)
state_2 = add(multiply(0.75, state), multiply(0.25, state_1_5))
_, state_2_5 = self._euler_stepper(state_2, timestep)
out_state = add(multiply(1./3, state), multiply(2./3, state_2_5))
return diagnostics, out_state
def _step_2_stages(self, state, timestep):
assert state is not None
diagnostics, state_1 = self._euler_stepper(state, timestep)
assert state_1 is not None
_, state_2 = self._euler_stepper(state_1, timestep)
out_state = multiply(0.5, add(state, state_2))
return diagnostics, out_state
[docs]class AdamsBashforth(TendencyStepper):
"""A TendencyStepper using the Adams-Bashforth scheme."""
[docs] def __init__(self, *args, **kwargs):
"""
Initialize an Adams-Bashforth time stepper.
Args
----
*args : TendencyComponent or ImplicitTendencyComponent
Objects to call for tendencies when doing time stepping.
order : int, optional
The order of accuracy to use. Must be between
1 and 4. 1 is the same as the Euler method. Default is 3.
"""
order = kwargs.pop('order', 3)
if isinstance(order, float) and order.is_integer():
order = int(order)
if not isinstance(order, int):
raise TypeError('order must be an integer')
if not 1 <= order <= 4:
raise ValueError('order must be between 1 and 4')
self._order = order
self._timestep = None
self._tendencies_list = []
super(AdamsBashforth, self).__init__(*args, **kwargs)
def _call(self, state, timestep):
"""
Updates the input state dictionary and returns a new state corresponding
to the next timestep.
Args
----
state : dict
The current model state. Will be updated in-place by
the call with any diagnostics from the current timestep.
timestep : timedelta
The amount of time to step forward.
Returns
-------
diagnostics : dict
Diagnostics from the timestep of the input state.
new_state : dict
The model state at the next timestep.
Raises
------
ValueError
If the timestep is not the same as the last time
step() was called on this instance of this object.
"""
self._ensure_constant_timestep(timestep)
state = state.copy()
tendencies, diagnostics = self.prognostic(state, timestep)
convert_tendencies_units_for_state(tendencies, state)
self._tendencies_list.append(tendencies)
new_state = self._perform_step(state, timestep)
copy_untouched_quantities(state, new_state)
if len(self._tendencies_list) == self._order:
self._tendencies_list.pop(0) # remove the oldest entry
return diagnostics, new_state
def _perform_step(self, state, timestep):
# if we don't have enough previous tendencies built up, use lower order
order = min(self._order, len(self._tendencies_list))
if order == 1:
new_state = step_forward_euler(
state, self._tendencies_list[-1], timestep)
elif order == 2:
new_state = second_bashforth(state, self._tendencies_list, timestep)
elif order == 3:
new_state = third_bashforth(state, self._tendencies_list, timestep)
elif order == 4:
new_state = fourth_bashforth(state, self._tendencies_list, timestep)
else:
# the following should never happen, if it is there's a bug
raise RuntimeError('order should be integer between 1 and 4')
return new_state
def _ensure_constant_timestep(self, timestep):
if self._timestep is None:
self._timestep = timestep
elif self._timestep != timestep:
raise ValueError(
'timestep must be constant for Adams-Bashforth time stepping')
def convert_tendencies_units_for_state(tendencies, state):
"""
Converts the units of any DataArrays with unit informaton in the
tendencies dictionary to have units of {value_units}/second where
{value_units} is the units of the value in the state dictionary.
This is done in-place.
"""
for quantity_name in tendencies.keys():
if isinstance(tendencies[quantity_name], DataArray) and ('units' in tendencies[quantity_name].attrs):
desired_units = '{} s^-1'.format(state[quantity_name].attrs['units'])
tendencies[quantity_name] = tendencies[quantity_name].to_units(desired_units)
[docs]class Leapfrog(TendencyStepper):
"""A TendencyStepper using the Leapfrog scheme.
This scheme calculates the
values at time $t_{n+1}$ using the derivatives at $t_{n}$ and values at
$t_{n-1}$. Following the step, an Asselin filter is applied to damp the
computational mode that results from the scheme and maintain stability. The
Asselin filter brings the values at $t_{n}$ (and optionally the values at
$t_{n+1}$, according to Williams (2009)) closer to the mean of the values
at $t_{n-1}$ and $t_{n+1}$."""
[docs] def __init__(self, *args, **kwargs):
"""
Initialize a Leapfrog time stepper.
Args
----
*args : TendencyComponent or ImplicitTendencyComponent
Objects to call for tendencies when doing time stepping.
asselin_strength : float, optional
The filter parameter used to determine the strength
of the Asselin filter. Default is 0.05.
alpha : float, optional
Constant from Williams (2009), where the midpoint is shifted
by alpha*influence, and the right point is shifted
by (1-alpha)*influence. If alpha is 1 then the behavior
is that of the classic Robert-Asselin time filter, while if it
is 0.5 the filter will conserve the three-point mean.
Default is 0.5.
References
----------
Williams, P., 2009: A Proposed Modification to the Robert-Asselin
Time Filter. Mon. Wea. Rev., 137, 2538--2546,
doi: 10.1175/2009MWR2724.1.
"""
self._old_state = None
self._asselin_strength = kwargs.pop('asselin_strength', 0.05)
self._timestep = None
self._alpha = kwargs.pop('alpha', 0.5)
super(Leapfrog, self).__init__(*args, **kwargs)
def _call(self, state, timestep):
"""
Updates the input state dictionary and returns a new state corresponding
to the next timestep.
Args
----
state : dict
The current model state. Will be updated in-place by
the call due to the Robert-Asselin-Williams filter.
timestep : timedelta
The amount of time to step forward.
Returns
-------
diagnostics : dict
Diagnostics from the timestep of the input state.
new_state : dict
The model state at the next timestep.
Raises
------
SharedKeyError
If a DiagnosticComponent object has an output that is
already in the state at the start of the timestep.
ValueError
If the timestep is not the same as the last time
step() was called on this instance of this object.
"""
original_state = state
state = state.copy()
self._ensure_constant_timestep(timestep)
tendencies, diagnostics = self.prognostic(state, timestep)
convert_tendencies_units_for_state(tendencies, state)
if self._old_state is None:
new_state = step_forward_euler(state, tendencies, timestep)
else:
state, new_state = step_leapfrog(
self._old_state, state, tendencies, timestep,
asselin_strength=self._asselin_strength, alpha=self._alpha)
copy_untouched_quantities(state, new_state)
self._old_state = state
for key in original_state.keys():
original_state[key] = state[key] # allow filtering to be applied
return diagnostics, new_state
def _ensure_constant_timestep(self, timestep):
if self._timestep is None:
self._timestep = timestep
elif self._timestep != timestep:
raise ValueError(
'timestep must be constant for Leapfrog time stepping')
def step_leapfrog(
old_state, state, tendencies, timestep, asselin_strength=0.05,
alpha=1.):
"""
Steps the model state forward in time using the given tendencies and the
leapfrog time scheme, with a Robert-Asselin time filter.
Args
----
old_state : dict
Model state at the last timestep.
state : dict
Model state at the current timestep. May be modified by
this function call, specifically by the Asselin filter.
tendencies : dict
Time derivatives at the current timestep in
units/second.
timestep : timedelta
The amount of time to step forward.
asselin_strength : float, optional
Asselin filter strength. Default is 0.05.
alpha : float, optional
Constant from Williams (2009), where the
midpoint is shifted by alpha*influence, and the right point is
shifted by (alpha-1)*influence. If alpha is 1 then the behavior
is that of the classic Robert-Asselin time filter, while if it
is 0.5 the filter will conserve the three-point mean.
Default is 1.
Returns
-------
state : dict
The input state, modified in place.
new_state : dict
Model state at the next timestep.
"""
new_state = {}
for key in tendencies.keys():
new_state[key] = (
old_state[key] + 2*tendencies[key]*timestep.total_seconds())
filter_influence = 0.5*asselin_strength*(
old_state[key] - 2*state[key] + new_state[key])
state[key] += alpha * filter_influence
if alpha != 1.:
new_state[key] += (alpha - 1.) * filter_influence
return state, new_state
def step_forward_euler(state, tendencies, timestep):
return_state = {}
for key in tendencies.keys():
return_state[key] = state[key] + tendencies[key]*timestep.total_seconds()
return return_state
def second_bashforth(state, tendencies_list, timestep):
"""Return the new state using second-order Adams-Bashforth. tendencies_list
should be a list of dictionaries whose values are tendencies in
units/second (from oldest to newest), and timestep should be a timedelta
object. The dictionaries in tendencies_list should all have the same keys.
"""
return_state = {}
for key in tendencies_list[0].keys():
return_state[key] = state[key] + timestep.total_seconds() * (
1.5*tendencies_list[-1][key] - 0.5*tendencies_list[-2][key]
)
return return_state
def third_bashforth(state, tendencies_list, timestep):
"""Return the new state using third-order Adams-Bashforth. tendencies_list
should be a list of dictionaries whose values are tendencies in
units/second (from oldest to newest), and timestep should be a timedelta
object."""
return_state = {}
for key in tendencies_list[0].keys():
return_state[key] = state[key] + timestep.total_seconds() * (
23./12*tendencies_list[-1][key] - 4./3*tendencies_list[-2][key] +
5./12*tendencies_list[-3][key]
)
return return_state
def fourth_bashforth(state, tendencies_list, timestep):
"""Return the new state using fourth-order Adams-Bashforth. tendencies_list
should be a list of dictionaries whose values are tendencies in
units/second (from oldest to newest), and timestep should be a timedelta
object."""
return_state = {}
for key in tendencies_list[0].keys():
return_state[key] = state[key] + timestep.total_seconds() * (
55./24*tendencies_list[-1][key] - 59./24*tendencies_list[-2][key] +
37./24*tendencies_list[-3][key] - 3./8*tendencies_list[-4][key]
)
return return_state