"""
Implementation of the wave field solution of 1D and 2D waves. For the Wave1D and Wave2D
class, a power spectral density distribution function :math:`S(k)` can be imposed on
the wave nodes such that we can simulate the propagation of waves obeying either a
Jonswap or Gauss spectrum.
The theoretical background has the following contents
* Wave1D_ : Description of the Wave1D class
- dft1d_ : Discrete Fourier Transform implementation for 1D wave spectra
- fft1d_ : Fast Fourier Transform implementation for 1D waves spectra
* Wave2D_ : Description of the Wave2D class
- dft2d_ : Discrete Fourier Transform implementation for 2D wave spectra
- fft2d_ : Fast Fourier Transform implementation for 2D wave spectra
* References_ : List of literature used for this implementation
.. _Wave1D:
=============================================
Wave1D: Theoretical background on one-D waves
=============================================
.. _dft1d:
Wave equation with DFT
----------------------
Assume a spectrum :math:`S(k_i)`, with :math:`i=0, n_k`. The wave field
:math:`\\eta(\\mathbf{x}, t)` at time :math:`t` along spatial positions :math:`x_p`
(where :math:`p=0, n_x`) is constructed using a Discrete Fourier Transform (DFT) as
follows
.. math ::
\\eta(x_p, t) = \\sum_i^{n_k} a_i \\exp(j (k_i x_p - \\omega_i t + \\phi_i))
where :math:`j^2\\equiv -1`, the amplitude :math:`a_i` follows from the power spectral
density :math:`S(\\mathbf{k})` as
.. math ::
a_i = \\sqrt{2 S(k_i) dk_i},
:math:`\\phi_i` is the random phase corresponding to the wave node :math:`i`, and the
angular frequency :math:`\\omega_i` is related to the wave vector via the deep water
dispersion relation according to
.. math ::
\\omega_i = \\sqrt{g |k_i|}
with :math:`g=9.81 m/s^2` being the gravity constant.
We can rewrite the wave equation in matrix form:
.. math ::
\\eta(x_p, t) = \\sum_i^{n_k} M_{ip}\\exp(-j \\omega_i t)
where the matrix :math:`M_{ip}` is
.. math ::
M_{ip} = \\hat{A}_i \\exp(j k_i x_p)
and the complex amplitude :math:`\\hat{A}_i` follows from
.. math ::
\\hat{A}_{i} = a_i\\exp(j\\phi_i)
From the matrix form is can be seen that the wave equation can be calculated by a dot
product multiplication of the :math:`n_x \\times n_k` matrix :math:`M_{ip}` and the
vector :math:`\\exp(-j\\omega_i t)` of size :math:`n_k \\times 1`.
The result is a :math:`1 \\times n_x` vector of the wave along the
x-axis :math:`\\eta(x_p, t)`. This is how the DFT is implemented in the code of Wave1D.
Note that we have described the wave equation in terms of the power spectral density
(PSD) equation in the wave vector domain :math:`k`, denoted here as :math:`S(k)`.
Normally the PSD is given in the angular frequency domain :math:`\\omega` as
:math:`S^\\prime(\\omega)`
We can obtain the version in wave vector domain using the equality
.. math ::
S(k) = S^\\prime(\\omega)\\frac{d\\omega}{dk} =
S^\\prime(\\sqrt{g k}) \\frac{g}{2 k}.
Using the PSD in the wave vector domain has the advantage that we only have to deal with
the periodicity of the discrete Fourier transform in the spatial domain x: a wave will
be period after a length :math:`2\\pi/\\Delta k`. To get rid of this periodicity is we
can simply use a spatial domain which larger than this repetition length.
.. _fft1d:
Wave equation with FFT
----------------------
In case the number of wave numbers nodes is equal to the number of spatial nodes, and we
have ensured that
.. math ::
\\hat{A}(-k_i)\\exp(-j\\omega t) = \\hat{A}^\\ast(k_i)\\exp(j\\omega t)
we can use the FFT algorithm for calculating the wave equation at time :math:`t`. This
is because the FFT uses the symmetry rule that for any *real* valued signal
:math:`F(x)`, the Fourier transform :math:`\\hat{F}(k)` by definition has the property
that :math:`\\hat{F}(-k) = \\hat{F}^\\ast(k)` (where the :math:`\\ast` indicates the
complex conjugate). The FFT is a much more efficient way to obtain a solution for the
wave equation: the calculation time for a DFT of a signal with :math:`N` nodes is
proportional to :math:`N^2`, while the calculation time of an FFT is proportional to
:math:`N\\log(N)`. The FFT is used for the Wave1D solution when the *wave_construction*
field is set to *FFT*.
.. _Wave2D:
=============================================
Wave2D: Theoretical background on two-D waves
=============================================
The Wave2D Class gives a description of a 2D propagating wave with a spectral density
distribution given by either a Gauss or a Jonswap spectrum. The class extents the
functionality of the Wave1D: it obtains all the information related to the 1D wave in
k-vector domain from the wave1D attribute field which stores a Wave1D object. For all
the information additionally required for the 2D description (like spreading function),
extra attributes are added to the class.
Again, three ways to solve the wave field from the spectral components have been
implemented:
1. *DFTpolar*: A 2D wave field is constructed from the multiplication of the Spectral
density S(k) and the directional distribution D(theta). A Discrete Fourier Transform
(DFT) is used to calculate the wave field. This is a straightforward implementation,
but again, DFT is `slow` to calculate, especially in 2D, so only use this for
testing. You can speed up the calculation by selecting wave node with an amplitude
larger than a certain threshold.
2. *FFT*: if symmetry in spectral k domain is imposed, we can again use the FFT to
calculate the wave field. Recommended
3. *DFTcartesian*: The exact same symmetric spectral amplitudes as the FFT is used, but
the wave is calculated with a DFT. This is so slow that it is not possible to
include all wave components, so only used for testing purposes.
.. _dft2d:
2D DFT implementation
---------------------
A 2D wave can be described by its spectral density distribution :math:`S(k)` and the
directional distribution :math:`D(\\theta)` as
.. math ::
E_{k, \\theta}(k, \\theta) = S(k)D(\\theta)
where for :math:`S(k)` we can either take a Jonswap or a Gauss distribution as
implemented in the *wave_spectra* module in *spectrum_jonswap* and *spectrum_gauss*.
Since we are working in the wave vector domain :math:`k`, we must first transform the
spectrum from :math:`\\omega` to :math:`k` space, just as described in the 1D wave
theory. For directional distribution, the *spreading_function* is recommended.
The *DFTpolar* implementation constructs the complex amplitudes by multiplying the
spectral density :math:`E_{k, \\theta}` with its bin width :math:`dk d\\theta`:
.. math ::
\\hat{A}_{k, \\theta} =
\\sqrt{2 E_{k, \\theta}(k, \\theta) dk d\\theta} \\exp({j\\phi})
where the phase :math:`\\phi` is a random number between :math:`0\\sim 2\\pi`.
The wave amplitude is then calculate by simply summing over all the wave components we
have added, similar to our 1D version
.. math ::
\\eta(\\mathbf{x}, t) =
\\sum_p^{n_{k}}\\sum_q^{n_{\\theta}} \\hat{A}_{k_p, \\theta_q}
\\exp(j (\\mathbf{k}_{p,q}\\cdot\\mathbf{x}- \\omega(k_p) t))
where :math:`\\mathbf{k} = (k_x, k_y)` is the 2D wave vector in cartesian coordinates
belonging to the sample wave vector in polar coordinates :math:`(k_p, \\theta_q)` with
:math:`k=|\\mathbf{k}|`. The wave vector magnitude, :math:`\\mathbf{x}` is the cartesian
vector in spatial domain. Also we are using the deep water dispersion relation again
to calculate the angular wave frequency :math:`\\omega` for a given wave
vector :math:`k`.
The DFT implementation can be sped up significantly by smartly selecting nodes, for
instance based on the components above a certain threshold, or we could make a
non-uniform distribution. Both methods have been implemented and can be selected by
settings the *wave_selection* attribute field to *Subrange* or *EqualEnergyBins*,
respectively.
.. _fft2d:
2D FFT implementation
---------------------
As already mentioned, the DFT implementation is very slow. We can speed up by selecting
wave nodes in a smart way, but this consequently also takes out information from our
resulting wave field. On top of that: it turns out that even after selecting only the
most important spectral wave components, the DFT still is not very fast.
The recommended way to construct a wave field is again by using the FFT setting for the
*wave_construction* argument. There is only one catch: the FFT algorithm requires the
spectral components described in the cartesian wave vector domain, whereas so far we
have been working in the polar wave vector domain. On top of that, we have to impose
the symmetry required for a FFT:
.. math ::
\\hat{A}(-\\mathbf{k})\\exp(-j\\omega t)=
\\hat{A}^\\ast(\\mathbf{k})\\exp(j\\omega t)
This symmetry rule is the 2D version from the one we have seen in the 1D wave section.
For the FFT method we need a description of the complex amplitudes in the
Cartesian domain, i.e. we need to derive :math:`E_{k}(\\mathbf{k})` from our starting
point `E_{k, \\theta}(k, \\theta)`. It can be shown that we need to do
.. math ::
E_{k}(\\mathbf{k}) = \\frac{E_{k, \\theta}(k, \\theta)}{k}
This equation is used in the function *spectrum2d_complex_amplitudes* as we divide the
directional spreading function with the wave vector magnitude. For a more detailed
derivation, please have a look at the References_.
.. _References:
References
----------
* Det Norske Veritas: Environmental conditions and environmental loads, April 2007,
DNV-RP-C205
* Jocely Fr\'echot: Realistic simulation of ocean surface using wave spectra.
Proceedings of the First International Conference on Computer Graphics Theory and
Applications (GRAPP, 2006), 2006, Portugal, p 76--83 <hal-00307938>
=========================================
Start of the *wave_fields* implementation
=========================================
"""
import logging
from os.path import splitext
import colorcet as cc
import h5py
import matplotlib.animation as animation
import matplotlib.cm as cm
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy as sp
import seaborn as sns
from matplotlib.colors import LightSource
from numpy.fft import fftshift
from scipy.constants import g as g0 # gravity constant 9.81 m/s2
import pymarine.waves.wave_spectra as ms
from pymarine.utils.coordinate_transformations import polar_to_cartesian
from pymarine.utils.numerical import find_idx_nearest_val
from pymarine.utils.plotting import clean_up_artists, set_limits
sns.set(context="notebook")
logger = logging.getLogger(__name__)
[docs]
class PlotProperties:
"""
A class containing the properties of the line plot.
Parameters
----------
show: bool, optional,
Show the current wae
color: str, optional
Store the color of the line plotting the wave. Default = "w"
linewidth: int, optional
Width of the line plotting the wave. Default = 1
scattersize: int, optional
Size of the scatter points plotted at the wave. Default = 0, i.e., no scatter
points are plotted
Notes
-----
* Only used by WaveSimulator to connect plot properties to each wave via the *plot*
attribute field of the wave
* Not critical and perhaps this class should be moved out of the *wave_fields*
module later. For now, leave it as it is. It doesn't hurt me as well.
"""
def __init__(self, show=True, color="w", linewidth=1, scattersize=0):
self.show = show
self.color = color
self.linewidth = linewidth
self.scattersize = scattersize
self.save_image = False
self.save_data = False
def _energy_deficit(k, k0, E, Hs, Tp, gamma, spectral_version, spectrum_type, sigma):
"""
Helper function for fsolve to calculate the integral of the spectrum between k0 and
k1 and subtract E
Parameters
----------
k: ndarray
Array with the wave vector nodes
k0: float
Start integrating at wave vector k0
E:
Energy per bin
Hs:
Significant wave height
Tp:
Peak period
gamma:
Peakness of Jonswap
spectral_version:
Version of implementation used
spectrum_type: {"jonswap", "gauss"}
Type of wave spectrum. Default = jonswap
sigma:
Width of the Gauss
Returns
-------
float:
Energy deficit
Notes
-----
This function is used a helper function for fsolve and should not be called
directly. It is used to calculate the integrated energy of a bin between k and k+dk
"""
# Solve the integral of the spectrum between k0 and k1 and subtract E and returns
# the deficit energy.
# This function can be used to calculate the end border of an integration interval
# such that the area of the integral results in a given value E
(delta_e, err) = (
sp.integrate.quad(
ms.spectrum_wave_k_domain,
k0,
k,
args=(Hs, Tp, gamma, sigma, spectrum_type, spectral_version),
)
- E
)
return delta_e
[docs]
class Wave2D:
"""
A class for linearized solutions of the 2D wave (deep water, linear). The Wave1D is
used to describe the radial direction
Parameters
----------
wave1D: :obj:`Wave1D`
The Wave1D field to describe the spectral properties along the radial direction
in k-domain the 2D wave
Lx: float, optional
Length of the domain in x-direction in m, default = 200 m
Ly: float, optional
Length of the domain in y-direction in m, default = 200 m
xmin: float, optional
Starting coordinate of the x-domain. Default = 0 m
ymin: float, optional
Starting coordinate of the y-domain. Default = 0 m
nx_points: int, optional
Number of nodes in the x-domain. Default = 64
ny_points: int, optional
Number of nodes in the y-domain. Default = 64
kx_min: float, optional
Start of wave vector domain. Default = 0 rad/m
kx_max: float, optional
End of wave vector domain. Default = pi rad/m
delta_kx: float, optional
Bin spacing in wave vector domain. Default is pi/32
n_kx_nodes: int, optional
Number of bins in in x direction in k vector domain. Default = 64
ky_min: float, optional
Start of wave vector domain. Default = 0 rad/m
ky_max: float, optional
End of wave vector domain. Default = pi rad/m
delta_ky: float, optional
Bin spacing in wave vector domain. Default is pi/32
n_ky_nodes: int, optional
Number of bins in y direction in k vector domain. Default = 64
theta_min: float, optional
Minimum value of theta direction in polar domain. Default = 0 rad
theta_max: float, optional
Maximum value of theta direction in polar domain. Default = 2 pi rad
n_theta_nodes: int, optional
Number of nodes in theta direction. Default = 100
Theta_0: float, optional
Main wave direction in rad. Default = 0 rad. The valid range is: -pi/2 < T < pi
Theta_s_spreading_factor: float, optional
Spreading factor (s-definition) in theta domain. Default = 5 (Typical for wind
waves, for swell use 13.0)
"""
def __init__(
self,
wave1D,
name=None,
Lx=200,
Ly=200,
xmin=0,
ymin=0,
nx_points=64,
ny_points=64,
kx_min=0.0,
kx_max=3.1415,
delta_kx=0.0981,
n_kx_nodes=32,
ky_min=0.0,
ky_max=3.1415,
delta_ky=0.0981,
n_ky_nodes=32,
theta_min=0,
theta_max=2 * np.pi,
n_theta_nodes=100,
Theta_0=0,
Theta_s_spreading_factor=5,
):
logger.info("Initialise JonSwap 1D wave field")
if name is None:
self.name = "_".join(
[
"wave",
wave1D.spectrum_type,
wave1D.wave_construction,
wave1D.wave_selection,
"2d",
]
)
else:
self.name = name
self.wave1D = wave1D
self.Lx = Lx
self.Ly = Ly
self.nx_points = nx_points
self.ny_points = ny_points
self.xmin = xmin
self.Lx = Lx
self.xmax = self.xmin + self.Lx
self.xmid = self.xmin + self.Lx / 2.0
self.xpoints = np.linspace(self.xmin, self.xmax, self.nx_points)
self.delta_x = self.xpoints[1] - self.xpoints[0]
self.ymin = ymin
self.Ly = Ly
self.ymax = self.ymin + self.Ly
self.ymid = self.ymin + self.Ly / 2.0
self.ypoints = np.linspace(self.ymin, self.ymax, self.ny_points)
self.delta_y = self.ypoints[1] - self.ypoints[0]
self.n_theta_nodes = n_theta_nodes
self.Theta_0 = Theta_0
self.Theta_s_spreading_factor = Theta_s_spreading_factor
self.theta_points = None
self.kx_nyquist = None
self.kt = None
self.delta_theta = None
# Here, theta=0 corresponds to the north direction. Rotate clock wise
self.theta_min = theta_min
self.theta_max = theta_max
self.kx_min = kx_min
self.kx_max = kx_max
self.delta_kx = delta_kx
self.n_kx_nodes = n_kx_nodes
self.k_polar_mesh = None
self.k_polar_bin_area_over_kk = None
self.omega_dispersion = None
self.E_wave_density_polar = None
self.ky_min = ky_min
self.ky_max = ky_max
self.delta_ky = delta_ky
self.n_ky_nodes = n_ky_nodes
self.k_xy_mesh = None
self.kx_nodes = None
self.ky_nodes = None
self.k_cartesian_mesh = None
self.kk = None
self.E_wave_complex_amplitudes = None
self.omega_sign = None
self.xy_mesh = None
self.amplitude = None
self.phase = None
self.theta_area_fraction = 1
# use seed to update the random phase if required
self.seed = 1
self.update_phase = True
self.update_x_k_theta_sample_space()
self.D_spread = np.zeros(self.theta_points.shape)
self.calculate_spreading_function()
self.calculate_spectral_components()
self.calculate_wave_surface()
[docs]
def make_report(self):
"""Make report of settings for this wave"""
frm = "{:40s} : {}"
print(frm.format("Name", self.name))
print("----------- Domain settings --------")
print(frm.format("Start x [m]", self.xmin))
print(frm.format("Start y [m]", self.ymin))
print(frm.format("End x [m]", self.xmax))
print(frm.format("End y [m]", self.ymax))
print(frm.format("Length x [m]", self.Lx))
print(frm.format("Length y [m]", self.Ly))
print("----------- Time settings --------")
print(frm.format("Start t [s]", self.wave1D.t_start))
print(frm.format("End t [s]", self.wave1D.t_end))
print(frm.format("Length t [s]", self.wave1D.t_length))
print(frm.format("Delta t [s]", self.wave1D.delta_t))
n_x_points_total = self.xpoints.size * self.ypoints.size
n_k_points_total = self.k_cartesian_mesh[0].size
print("----------- Numerical resolutions --------")
print(frm.format("Number x - nodes", self.xpoints.size))
print(frm.format("Number y - nodes", self.ypoints.size))
print(frm.format("Total number of spatial points", n_x_points_total))
if self.kx_nodes is not None:
print("# Cartesian mesh specifications")
print(frm.format("Number kx - nodes", self.kx_nodes.size))
print(frm.format("Number ky - nodes", self.ky_nodes.size))
print(frm.format("Delta x [m]", self.delta_x))
print(frm.format("Delta y [m]", self.delta_y))
dkx = np.diff(self.kx_nodes)
dky = np.diff(self.ky_nodes)
print(frm.format("Delta kx min [rad/m]", dkx.min()))
print(frm.format("Delta ky min [rad/m]", dky.min()))
print(frm.format("Delta kx max [rad/m]", dkx.max()))
print(frm.format("Delta ky max [rad/m]", dky.max()))
print(frm.format("Delta kx first [rad/m]", self.delta_kx))
print(frm.format("Delta ky first [rad/m]", self.delta_ky))
print(frm.format("kx-min [rad/m]", self.kx_nodes.min()))
print(frm.format("ky-min [rad/m]", self.ky_nodes.min()))
print(frm.format("kx-max [rad/m]", self.kx_nodes.max()))
print(frm.format("ky-max [rad/m]", self.ky_nodes.max()))
print(frm.format("kx-nyq [rad/m]", self.kx_nyquist))
print(frm.format("ky-nyq [rad/m]", self.ky_nyquist))
print(frm.format("Lx_max [m]", 2 * np.pi / dkx[0]))
print(frm.format("Ly_max [m]", 2 * np.pi / dky[0]))
else:
print("# Polar mesh specifications")
print(frm.format("Number k_r - nodes", self.k_polar_mesh[1].shape[0]))
print(frm.format("Number theta - nodes", self.k_polar_mesh[0].shape[1]))
print(frm.format("Number total nodes", n_k_points_total))
print("----------- Numerical methods --------")
print(frm.format("Selection method", self.wave1D.wave_selection))
print(frm.format("Construction method", self.wave1D.wave_construction))
print(frm.format("DFT N x N", n_x_points_total * n_k_points_total))
print(frm.format("FFT N x log(N)", n_x_points_total * np.log(n_x_points_total)))
[docs]
def update_k_polar_mesh(self):
"""Update the polar mesh and its bin area divided by k belonging to the current
k/theta mesh
Notes
-----
The bin area follows from the theta and k linear array as k x dtheta x dk
However, to calculate the complex wave amplitude we need to divide D(omega, kk)
by kk to go the cartesian mesh.
Here, by not multiplying with kk to get the bin.
Therefore, we calculate darea / kk = kk dtheta * dkk / kk = dtheta x dkk
"""
# create polar mesh if direction x kk_magnitude and store in k_polar_mesh.
# k_polar_mesh[0] are the directions over the 2D mesh, k_polar_mesh[1] are the
# wave vector magnitudes over the 2D mesh
self.k_polar_mesh = np.meshgrid(self.theta_points, self.wave1D.kx_nodes)
# k_polar_mesh[0] is the theta over the 2D plane, so the delta theta is the
# gradient-y
delta_theta = np.gradient(self.k_polar_mesh[0])[1]
# k_polar_mesh[1] is the kr over the 2D plane, so the delta kr is the gradient-x
delta_k_r = np.gradient(self.k_polar_mesh[1])[0]
# the area of a polar mesh is k * dtheta * dk and dived by k, so the
# kmagnitude drops here
self.k_polar_bin_area_over_kk = delta_theta * delta_k_r
# turn the 2D polar coordinates into cartesian coordinates convert the polar
# angle from mathematical definition (theta=0 -> x-axis and counter-clock
# rotation) to naval definition (theta=0 is y-axis and clock-wise rotation)
# theta_naval=pi/2-theta
self.k_cartesian_mesh = np.array(
polar_to_cartesian(self.k_polar_mesh[1], np.pi / 2 - self.k_polar_mesh[0])
)
self.kk = np.sqrt(self.k_cartesian_mesh[0] ** 2 + self.k_cartesian_mesh[1] ** 2)
[docs]
def update_x_k_theta_sample_space(self):
self.theta_points = np.linspace(
self.theta_min, self.theta_max, self.n_theta_nodes
)
self.delta_theta = self.theta_points[1] - self.theta_points[0]
self.xmax = self.xmin + self.Lx
self.xmid = self.xmin + self.Lx / 2.0
self.xpoints = np.linspace(self.xmin, self.xmax, self.nx_points)
self.delta_x = self.xpoints[1] - self.xpoints[0]
self.ymax = self.ymin + self.Ly
self.ymid = self.ymin + self.Ly / 2.0
self.ypoints = np.linspace(self.ymin, self.ymax, self.ny_points)
self.delta_y = self.ypoints[1] - self.ypoints[0]
self.xy_mesh = np.meshgrid(self.xpoints, self.ypoints, indexing="ij")
self.amplitude = np.zeros(self.xy_mesh[0].shape)
self.kx_nyquist = np.pi / self.delta_x
self.ky_nyquist = np.pi / self.delta_y
if self.wave1D.wave_construction == "DFTpolar":
# take the (non)-uniform wave vectors from the 1D wave
self.update_k_polar_mesh()
else:
# For the FFT, the number wave vectors should be equal to the number of x
# points. For the DFT on the cartesian mesh, we use the same mesh as the
# FFT, so we can compare the speed of the algorithms
self.kx_nodes = 2 * np.pi * np.fft.fftfreq(self.nx_points, self.delta_x)
self.ky_nodes = 2 * np.pi * np.fft.fftfreq(self.ny_points, self.delta_y)
self.delta_kx = self.kx_nodes[1] - self.kx_nodes[0]
self.delta_ky = self.ky_nodes[1] - self.ky_nodes[0]
# create the mesh [KX, KY]
self.k_xy_mesh = np.meshgrid(self.kx_nodes, self.ky_nodes, indexing="ij")
self.k_cartesian_mesh = self.k_xy_mesh
self.kk = np.sqrt(
self.k_cartesian_mesh[0] ** 2 + self.k_cartesian_mesh[1] ** 2
)
[docs]
def calculate_spreading_function(self):
"""Calculate the spreading function"""
self.D_spread = ms.spreading_function(
theta=self.theta_points,
theta0=self.Theta_0,
s_spreading_factor=self.Theta_s_spreading_factor,
)
(
self.itheta_low,
self.itheta_high,
self.theta_low,
self.theta_high,
self.theta_peak,
self.D_peak,
self.D_spread_area,
self.theta_mask,
) = ms.thetaspreadspecs(
self.theta_points, self.D_spread, self.theta_area_fraction
)
# check if we need to mask the points
mask_points = False
if self.wave1D.wave_selection == "Subrange":
mask_points = True
if (
self.wave1D.wave_selection == "EqualEnergyBins"
and self.wave1D.use_subrange_energy_limits
):
mask_points = True
if mask_points:
# extra wave vectors based on the range k_low,k_high
self.theta_points = np.extract(self.theta_mask, self.theta_points)
self.D_spread = np.extract(self.theta_mask, self.D_spread)
if self.wave1D.sample_every > 1:
self.theta_points = self.theta_points[:: self.wave1D.sample_every]
self.D_spread = self.D_spread[:: self.wave1D.sample_every]
# we need to update the k_polar_mesh as well
self.update_k_polar_mesh()
if self.phase is not None:
if (
self.phase.shape[0] != self.wave1D.n_kx_nodes
or self.phase.shape[1] != self.theta_points.size
):
phase_shape_mismatch = True
else:
phase_shape_mismatch = False
if self.phase is None or phase_shape_mismatch or self.update_phase:
self.E_wave_density_polar = np.zeros(self.k_cartesian_mesh[0].shape)
self.phase = ms.initialize_phase(
self.E_wave_density_polar, self.wave1D.seed
)
self.update_phase = False
[docs]
def calculate_spectral_components(self):
"""
Calculate the 2D wave density function from the current k-array and spreading
angles
"""
if self.wave1D.wave_construction == "DFTpolar":
# the DFT based on a polar mesh is used. Calculate the polar coordinates
# and turn it in cartesian values
self.omega_dispersion = np.sqrt(
self.wave1D.gravity0 * abs(self.k_polar_mesh[1])
) * np.where(self.k_polar_mesh[1] >= 0, 1, -1)
# The wave density function follows from Sk * Dspread.
# To get a matrix out of it, multiply the transposed S^T with the D
n_k = self.wave1D.spectrumK.size
n_d = self.D_spread.size
self.E_wave_density_polar = self.wave1D.spectrumK.reshape(
n_k, 1
) * self.D_spread.reshape(1, n_d)
# In the polar domain, the integral multiplied with delta_theta*delta_kx
# give
# the complex amplitude a_k = sqrt(2 S(k, theta) dk * dtheta
self.E_wave_complex_amplitudes = np.sqrt(
2 * self.E_wave_density_polar * self.k_polar_bin_area_over_kk
) * np.exp(1j * self.phase)
else:
# Either a DFT (wave_construction==DFTcartesian) or a FFT
# (wave_construction==FFT) based on a cartesian mesh is used.
self.k_cartesian_mesh = self.k_xy_mesh
self.kk = np.sqrt(self.k_xy_mesh[0] ** 2 + self.k_xy_mesh[1] ** 2)
self.kk = np.where(
abs(self.kk) < ms.TINY, ms.TINY * np.ones(self.kk.shape), self.kk
)
(
self.E_wave_complex_amplitudes,
self.omega_sign,
) = ms.spectrum2d_complex_amplitudes(
kx_nodes=self.kx_nodes,
ky_nodes=self.ky_nodes,
Hs=self.wave1D.Hs,
Tp=self.wave1D.Tp,
gamma=self.wave1D.gamma,
Theta_0=self.Theta_0,
Theta_s_spread_kx=self.Theta_s_spreading_factor,
spectrum_type=self.wave1D.spectrum_type,
spectral_version=self.wave1D.spectral_version,
)
k_bin_area = self.delta_kx * self.delta_ky
# area_polar = self.wave1D.kx_nodes * self.k_polar_bin_area_over_kk
np_e_sq = np.square(abs(self.E_wave_complex_amplitudes))
self.E_wave_density_polar = np_e_sq / (k_bin_area / 2) / self.kk
# calculate the omega values belong to the wave vectors
self.calculate_omega_dispersion()
[docs]
def calculate_omega_dispersion(self):
# Calculate the omega frequency belonging to the wave vectors according to the
# deep water dispersion relation.
self.omega_dispersion = np.sqrt(g0 * abs(self.kk)) * self.omega_sign
self.delta_omega = np.diff(self.omega_dispersion)
[docs]
def calculate_wave_surface(self):
if not self.wave1D.wave_construction == "FFT":
# For the DFT directly calculate the wave field from the spectral components
self.amplitude = self.dft_complex_amplitudes(
self.E_wave_complex_amplitudes, self.omega_dispersion, self.wave1D.time
)
if self.wave1D.wave_construction == "DFTcartesian":
# Scale the amplitude with a factor 2 because we used the two-side
# k-space
self.amplitude *= 0.5
else:
# get the wave field using an FFT
self.amplitude = self.fft_amplitude(
self.E_wave_complex_amplitudes, self.omega_dispersion, self.wave1D.time
)
logger.debug(f"H_s of 2D surface {4 * np.std(self.amplitude)} ")
[docs]
def dft_complex_amplitudes(self, S_tilde, omega, time):
"""Calculate DFT of complex amplitudes at time 'time'
Parameters
----------
S_tilde: Complex amplitude obtained from
omega: angular frequency for each node
time: current time
Returns
-------
ndarray:
2D array with Discrete fourier transform
Notes
-----
* The trick with the exponential matrix exp (j*x*k) does not work in 2D because
you run out of memory too fast.
Therefore, calculate the wave field with a loop over the wave vectors.
* This algorithm is really slow, so you should use FFT for 2D waves!
"""
dft = np.full(self.xy_mesh[0].shape, 0 + 0 * 1j, dtype=complex)
KX = self.k_cartesian_mesh[0]
KY = self.k_cartesian_mesh[1]
for j in range(self.k_cartesian_mesh[0].shape[1]):
for i in range(self.k_cartesian_mesh[0].shape[0]):
s_theta = S_tilde[i, j]
dft += s_theta * np.exp(
1j
* (
self.xy_mesh[0] * KX[i, j]
+ self.xy_mesh[1] * KY[i, j]
- omega[i, j] * time
)
)
return np.real(dft)
[docs]
def fft_amplitude(self, S_tilde, omega, time):
"""
Calculate Fourier transform of S using the FFT
Parameters
----------
S_tilde: ndarray, complex
Complex array of the fouriern components
omega: ndarray
Angular freqyencies belong to the wave vectors
time: float
Current time
Returns
-------
ndarray
real array with the DFT of the complex amplitudes
"""
N = int(S_tilde.size / 2)
ampl = N * np.fft.ifft2(S_tilde * np.exp(1j * (-time * omega)))
# ampl should be real already because S_tilde should be symmetrical around
# k=0 S(k)=S^*(-k) to be sure, take the real value only
return np.real(ampl)
[docs]
def export_complex_amplitudes(self, filename, exportAsHD5=True):
"""Export the calculated complex amplitudes to HDF 5 file
Parameters
----------
filename: str
Name of the file
exportAsHD5: bool, optional
Export as HD5. Default = True. If false, the complex amplitudes are written
to Ascii
"""
if exportAsHD5:
filebase, ext = splitext(filename)
logger.debug(f"writing hd5 file to {filebase}")
with h5py.File(filebase + ".h5", "w") as hf:
hf.create_dataset("Kx", data=self.k_cartesian_mesh[0])
hf.create_dataset("Ky", data=self.k_cartesian_mesh[1])
hf.create_dataset("Amodulus", data=abs(self.E_wave_complex_amplitudes))
hf.create_dataset(
"Aphase", data=np.angle(self.E_wave_complex_amplitudes)
)
hf.create_dataset("omega", data=self.omega_dispersion)
else:
# write the complex amplitudes to the file filename in ascii format
logger.debug(f"writing numpy ascii file to {filename}")
nx = self.E_wave_complex_amplitudes.shape[0]
ny = self.E_wave_complex_amplitudes.shape[1]
KX = self.k_cartesian_mesh[0]
KY = self.k_cartesian_mesh[1]
with open(filename, "w") as f:
f.write(
"# complex amplitudes a at kx,ky mesh {}x{} kxtheta\n".format(
nx, ny
)
)
f.write(
"# {:>18s}{:>20s}{:>20s}{:>20s}{:>20s}\n".format(
"kx", "ky", "real(a)", "imag(a)", "omega"
)
)
for j in range(ny):
f.write(f"# j={j}\n")
for i in range(nx):
ampl = self.E_wave_complex_amplitudes[i][j]
f.write(
"{:20.8g}{:20.8g}{:20.8g}{:20.8g}{:20.8g}\n".format(
KX[i, j],
KY[i, j],
float(np.real(ampl)),
float(np.imag(ampl)),
float(self.omega_dispersion[i, j]),
)
)
f.write("\n")
[docs]
def propagate_wave(self):
"""Increase the time and recalculate the surface"""
# the time is stored in the wave1D object
self.wave1D.next_time()
self.calculate_wave_surface()
[docs]
def plot_wave(
self,
figsize=None,
r_axis_type="frequency",
plot_title=None,
x_plot_title=0.05,
y_plot_title=0.99,
min_data_value=None,
max_data_value=None,
number_of_contour_levels=10,
title_font_size=10,
x_time_label=0.05,
y_time_label=0.95,
x_hs_label=0.05,
y_hs_label=0.92,
add_hs_estimate=True,
color_map=cc.m_coolwarm,
zorder=0,
use_contourf=True,
):
"""Create a plot of the current wave
Parameters
----------
figsize: tuple
x and y size of the total figure. Default is None, so figure size is not
imposed
r_axis_type: {"frequency", "period"}, optional
quantity to use along the radial axis. Either frequency [Hz] or period [s].
Default = "frequency"
plot_title: str
Title to put in th figure. If None, use the titles found in the liftdyn file
x_plot_title: float, optional
x position (as a fraction of the sub plot index_title_axis). Default = 0.0
y_plot_title: float, optional
y position (as a fraction of the sub plot index_title_axis). Default = 1.1
delta_y_titles: float, optional
spacing between the lines of thet title lines. Default = 0.05
max_title_lines: int or None
Maxinmum number of title lines to plot. Default = None, ie. plot all
title_font_size: int
Size of the title font
r_axis_lim: tuple or None
The limits of the x axis. Default = None, so no limits are imposed
zorder: int, optional
Position of the contour plot. Default = 0, meaning that the contours are
placed at the bottom and the grid and labels are visible
use_contourf: bool, optional
If true, use contourf to make the contour plot. Slower. Default = False
Returns
-------
tuple (fig, axis)
Handle to the figure and the axis
"""
if figsize is not None:
size = figsize
else:
size = None
fig, ax = plt.subplots(nrows=1, ncols=1, figsize=size)
ax.set_aspect("equal", "datalim")
x_label = "X [m]"
y_label = "Y [m]"
data_x_2d = self.xy_mesh[0]
data_y_2d = self.xy_mesh[1]
amplitude = self.amplitude
if min_data_value is None:
v_min = np.nanmin(amplitude)
else:
v_min = min_data_value
if max_data_value is None:
v_max = np.nanmax(amplitude)
else:
v_max = max_data_value
# set the contour levels belonging to this subplot
if min_data_value is None and max_data_value is None:
# If both limits where not given, assume that we want matplotlib decide on
# the limits so set levels=None
levels = None
else:
levels = np.linspace(v_min, v_max, number_of_contour_levels + 1)
# Finally, create the contour plot wit the RAO magnitude
if use_contourf:
cs = ax.contourf(
data_x_2d,
data_y_2d,
amplitude,
cmap=color_map,
levels=levels,
zorder=zorder,
)
else:
ls = LightSource(azdeg=315, altdeg=30)
vert_eg = 1
L_boundaries = (0, self.Lx, 0, self.Ly)
rgb = ls.shade(
amplitude.T, cmap=cm.ocean, vert_exag=vert_eg, blend_mode="hsv"
)
cs = plt.imshow(
rgb,
origin="lower",
extent=L_boundaries,
animated=True,
vmin=v_min,
vmax=v_max,
)
# if levels is not None:
# cs.cmap.set_under("k")
# cs.cmap.set_over("k")
# cs.set_clim(v_min, v_max)
# set the axis labels
ax.set_xlabel(x_label)
ax.set_ylabel(y_label)
if r_axis_type == "period":
label_values = ax.get_yticks()
label_strings = []
for label in label_values:
try:
label_strings.append(f"{1 / float(label):.1f}")
except (ValueError, ZeroDivisionError):
pass
ax.set_yticks(label_values)
ax.set_yticklabels(label_strings)
# create a contour bar
cbar = fig.colorbar(cs, ax=ax)
cbar.ax.set_ylabel("{} [{}]".format("Wave height", "m"))
if add_hs_estimate:
hs_estimate = 4 * amplitude.std()
plt.figtext(
x_hs_label,
y_hs_label,
f"Hs={hs_estimate:.2f} m",
fontsize=title_font_size,
verticalalignment="top",
)
if plot_title is not None:
plt.figtext(
x_plot_title,
y_plot_title,
plot_title,
fontsize=title_font_size,
verticalalignment="top",
)
window_title = plot_title
fig.canvas.set_window_title(window_title)
return fig, ax
@staticmethod
def _update_plot(
frame_index=-1,
self=None,
fig=None,
ax=None,
changed_list=list(),
min_data_value=0,
max_data_value=None,
number_of_contour_levels=10,
color_map=cc.m_coolwarm,
use_contourf=True,
add_hs_estimate=True,
title_font_size=10,
x_time_label=0.05,
y_time_label=0.95,
x_hs_label=0.05,
y_hs_label=0.92,
zorder=0,
):
if frame_index < 0:
changed_list = list()
else:
clean_up_artists(axis=ax, artist_list=changed_list)
self.wave1D.propagate_wave()
self.calculate_wave_surface()
amplitude = self.amplitude
data_x_2d = self.xy_mesh[0]
data_y_2d = self.xy_mesh[1]
if min_data_value is None:
v_min = np.nanmin(amplitude)
else:
v_min = min_data_value
if max_data_value is None:
v_max = np.nanmax(amplitude)
else:
v_max = max_data_value
if min_data_value is not None and max_data_value is not None:
# set the contour levels belonging to this subplot
levels = np.linspace(
v_min, v_max, number_of_contour_levels + 1, endpoint=True
)
else:
levels = None
# finally create the contour plot wit the RAO magnitude
if use_contourf:
cs = ax.contourf(
data_x_2d,
data_y_2d,
amplitude,
cmap=color_map,
levels=levels,
zorder=zorder,
)
else:
ls = LightSource(azdeg=315, altdeg=30)
vert_eg = 1
L_boundaries = (0, self.Lx, 0, self.Ly)
rgb = ls.shade(
amplitude.T, cmap=cm.ocean, vert_exag=vert_eg, blend_mode="hsv"
)
cs = plt.imshow(
rgb,
origin="lower",
extent=L_boundaries,
animated=True,
vmin=v_min,
vmax=v_max,
)
changed_list.append(cs)
# if levels is not None:
# cs.cmap.set_under("k")
# cs.cmap.set_over("k")
# cs.set_clim(v_min, v_max)
if add_hs_estimate:
hs_estimate = 4 * amplitude.std()
txt = plt.figtext(
x_hs_label,
y_hs_label,
f"Hs={hs_estimate:.6f} m",
fontsize=title_font_size,
verticalalignment="top",
)
changed_list.append(txt)
if frame_index < 0:
# create a contour bar
cbar = fig.colorbar(cs, ax=ax)
cbar.ax.set_ylabel("{} [{}]".format("Wave height", "m"))
time_label = f"{self.wave1D.time_delta}"
time_txt = plt.figtext(
x_time_label,
y_time_label,
time_label,
fontsize=title_font_size,
verticalalignment="top",
)
changed_list.append(time_txt)
return changed_list
[docs]
def animate_wave(
self,
figsize=None,
plot_title=None,
x_plot_title=0.05,
y_plot_title=0.99,
min_data_value=None,
max_data_value=None,
number_of_contour_levels=10,
title_font_size=10,
x_time_label=0.05,
y_time_label=0.95,
x_hs_label=0.05,
y_hs_label=0.92,
add_hs_estimate=True,
color_map=cc.m_coolwarm,
zorder=0,
use_contourf=True,
interval=1,
title_horizontal_algnment="center",
):
"""Create a plot of the current wave
Parameters
----------
figsize: tuple
x and y size of the total figure. Default is None, so figure size is not
imposed
plot_title: str
Title to put in th figure. If None, use the titles found in the liftdyn file
x_plot_title: float, optional
x position (as a fraction of the sub plot index_title_axis). Default = 0.0
y_plot_title: float, optional
y position (as a fraction of the sub plot index_title_axis). Default = 1.1
delta_y_titles: float, optional
spacing between the lines of thet title lines. Default = 0.05
max_title_lines: int or None
Maxinmum number of title lines to plot. Default = None, ie. plot all
title_font_size: int
Size of the title font
r_axis_lim: tuple or None
The limits of the x axis. Default = None, so no limits are imposed
zorder: int, optional
Position of the contour plot. Default = 0, meaning that the contours are
placed at the bottom and the grid and labels are visible
use_contourf: bool, optional
If true, use contourf to make the contour plot. Slower. Default = False
Returns
-------
tuple (fig, axis)
Handle to the figure and the axis
"""
if figsize is not None:
size = figsize
else:
size = None
fig, ax = plt.subplots(figsize=size)
ax.set_aspect("equal", "datalim")
x_label = "X [m]"
y_label = "Y [m]"
# data_x_2d = self.xy_mesh[0]
# data_y_2d = self.xy_mesh[1]
amplitude = self.amplitude
if min_data_value is None:
v_min = np.nanmin(amplitude)
else:
v_min = min_data_value
if max_data_value is None:
v_max = np.nanmax(amplitude)
else:
v_max = max_data_value
# set the axis labels
ax.set_xlabel(x_label)
ax.set_ylabel(y_label)
if plot_title is not None:
plt.figtext(
x_plot_title,
y_plot_title,
plot_title,
fontsize=title_font_size,
verticalalignment="top",
ha=title_horizontal_algnment,
)
window_title = plot_title
fig.canvas.set_window_title(window_title)
# set the contour levels belonging to this subplot
if min_data_value is None and max_data_value is None:
# if both limits where not given, assume that we want matplotlib to decide
# on the limits so set levels to None
levels = None
else:
levels = np.linspace(v_min, v_max, number_of_contour_levels + 1)
changed_artists = list()
# Initial call to _update_plot to put of the axis.
# Return changed_artists with a list of all items which need to be deleted
# every frame
changed_artists = self._update_plot(
self=self,
fig=fig,
ax=ax,
changed_list=changed_artists,
min_data_value=min_data_value,
max_data_value=max_data_value,
number_of_contour_levels=levels,
color_map=color_map,
use_contourf=use_contourf,
add_hs_estimate=add_hs_estimate,
title_font_size=title_font_size,
x_time_label=x_time_label,
y_time_label=y_time_label,
x_hs_label=x_hs_label,
y_hs_label=y_hs_label,
)
# the fagrs list below must exactly match the arguments of the _update_plot
# function, except # for the first argument which is the frame_index
ani = animation.FuncAnimation(
fig,
self._update_plot,
frames=int(self.wave1D.nt_samples),
fargs=(
self,
fig,
ax,
changed_artists,
min_data_value,
max_data_value,
number_of_contour_levels,
color_map,
use_contourf,
add_hs_estimate,
title_font_size,
x_time_label,
y_time_label,
x_hs_label,
y_hs_label,
zorder,
),
interval=interval,
blit=False,
repeat=True,
)
return ani
[docs]
def plot_spectrum(
self,
figsize=(12, 6),
r_axis_type="wave_number",
plot_title=None,
x_plot_title=0.05,
y_plot_title=0.99,
min_data_value=0,
max_data_value=None,
number_of_contour_levels=10,
title_font_size=10,
r_axis_lim=None,
polar_projection=False,
shift_origin=True,
color_map=cc.m_rainbow,
color_map_phase=cc.m_coolwarm,
zorder=0,
use_contourf=False,
r_label_position=180,
kx_min=None,
kx_max=None,
ky_min=None,
ky_max=None,
):
"""Create a polar plot of the current spectral data
Parameters
----------
figsize: tuple
x and y size of the total figure. Default is None, so figure size is not
imposed
r_axis_type: {"wave_number", "wave_length"}, optional
quantity to use along the radial axis. Either wave_number [rad/m] or
wave_length [m]. Default = "wave_number"
plot_title: str
Title to put in th figure. If None, don't add a title
x_plot_title: float, optional
x position (as a fraction of the sub plot index_title_axis). Default = 0.0
y_plot_title: float, optional
y position (as a fraction of the sub plot index_title_axis). Default = 1.1
title_font_size: int
Size of the title font
r_axis_lim: tuple or None
The limits of the x axis. Default = None, so no limits are imposed
zorder: int, optional
Position of the contour plot. Default = 0, meaning that the contours are
placed at the bottom and the grid and labels are visible
r_label_position: float, optional
Angle along which we position the labels of the r-axis
use_contourf: bool, optional
Use the slower contourf to create a contour plot. Default = False, i.e.,
imshow is used
kx_min: float, optional
Minimum wave vector node in x direction for cartesian plot. Default = None
kx_max: float, optional
Maximum wave vector node in x direction for cartesian plot. Default = None
ky_min: float, optional
Minimum wave vector node in y direction for cartesian plot. Default = None
ky_max: float, optional
Maximum wave vector node in y direction for cartesian plot. Default = None
Returns
-------
tuple (fig, axis)
Handle to the figure and the axis
"""
if figsize is not None:
size = figsize
else:
size = None
if polar_projection:
fig, axis = plt.subplots(
nrows=1, ncols=2, figsize=size, subplot_kw=dict(projection="polar")
)
for ax in axis:
ax.set_theta_zero_location("N")
ax.set_theta_direction(-1)
else:
fig, axis = plt.subplots(nrows=1, ncols=2, figsize=size)
for ax in axis:
ax.set_aspect("equal", "datalim")
if r_axis_lim is not None:
# the radial limits are but on the ylim
ax.set_ylim(r_axis_lim)
if r_axis_type == "wave_number":
x_label = "Wave number [rad/m]"
y_label = ""
elif r_axis_type == "wave_length":
x_label = "Wave length [m]"
y_label = ""
else:
raise AssertionError(
"r_axis_type can only be 'wave_number' or 'wave_length'. Found {}"
"".format(r_axis_type)
)
if polar_projection:
data_x_2d = self.k_polar_mesh[0]
data_y_2d = self.k_polar_mesh[1]
psd_2d = abs(self.E_wave_density_polar)
ang_2d = np.angle(self.E_wave_complex_amplitudes)
else:
x_label = "k_x [rad/m]"
y_label = "k_y [rad/m]"
if self.wave1D.mirror or shift_origin:
data_x_2d = fftshift(self.k_cartesian_mesh[0])
data_y_2d = fftshift(self.k_cartesian_mesh[1])
psd_2d = fftshift(abs(self.E_wave_density_polar))
ang_2d = fftshift(np.angle(self.E_wave_complex_amplitudes))
else:
data_x_2d = self.k_cartesian_mesh[0]
data_y_2d = self.k_cartesian_mesh[1]
psd_2d = abs(self.E_wave_density_polar)
ang_2d = np.angle(self.E_wave_complex_amplitudes)
if min_data_value is None:
v_min = np.nanmin(psd_2d)
else:
v_min = min_data_value
if max_data_value is None:
v_max = np.nanmax(psd_2d)
else:
v_max = max_data_value
# set the contour levels belonging to this subplot
if min_data_value is None and min_data_value is None:
# if both limits where not given, assume that we want matplotlib to decide
# on the limits so set levels=None
levels = None
else:
levels = np.linspace(v_min, v_max, number_of_contour_levels + 1)
# finally create the contour plot wit the RAO magnitude
if use_contourf:
cs = axis[0].contourf(
data_x_2d,
data_y_2d,
psd_2d,
cmap=color_map,
levels=levels,
zorder=zorder,
)
cs2 = axis[1].contourf(
data_x_2d,
data_y_2d,
ang_2d,
cmap=color_map_phase,
levels=None,
zorder=zorder,
)
else:
cs = axis[0].imshow(psd_2d.T, origin="lower", cmap=color_map)
cs2 = axis[1].imshow(ang_2d.T, origin="lower", cmap=color_map_phase)
# if levels is not None:
# if levels is not None:
# cs.cmap.set_under("k")
# cs.cmap.set_over("k")
# cs.set_clim(v_min, v_max)
# set the axis labels
if r_axis_type == "wave_length":
label_values = ax.get_yticks()
label_strings = []
for label in label_values:
try:
label_strings.append(f"{2 * np.pi / float(label):.1f}")
except (ValueError, ZeroDivisionError):
pass
ax.set_yticks(label_values)
ax.set_yticklabels(label_strings)
# set the axis labels
for ax in axis:
ax.set_xlabel(x_label)
ax.set_ylabel(y_label)
if polar_projection:
ax.set_rlabel_position(r_label_position)
if r_axis_lim is not None:
# the radial limits are but on the ylim
ax.set_ylim(r_axis_lim)
else:
set_limits(axis=ax, v_min=kx_min, v_max=kx_max, direction="x")
set_limits(axis=ax, v_min=ky_min, v_max=ky_max, direction="y")
ax.set_aspect("equal", "datalim")
# create a contour bar
cbar = fig.colorbar(cs, ax=axis[0])
cbar.ax.set_ylabel("{} [{}]".format("PSD", "m3"))
cbar2 = fig.colorbar(cs2, ax=axis[1])
cbar2.ax.set_ylabel("{} [{}]".format("Phase", "rad"))
if plot_title is not None:
plt.figtext(
x_plot_title,
y_plot_title,
plot_title,
fontsize=title_font_size,
verticalalignment="top",
)
window_title = plot_title
fig.canvas.set_window_title(window_title)
return fig, axis
[docs]
class Wave1D:
r"""
A class for linearized solutions of 1D wave vector equation
.. math ::
\eta(x_p, t) = \sum_i^{n_k} a_i \exp(j (k_i x_p - \omega_i t + \phi_i))
where the amplitudes :math:`a_i` follow from the Jonswap or Gauss power spectral
density function. See the top of the module for more information
Parameters
----------
xmin: float, optional
Start of spatial domain. Default = 0
Lx: float, optional
Length of spatial domain. Default = 100
t_start: float, optional
Start of temporal domain. Default = 0
t_length: float, optional
Length of temporal domain. Default = 100
nt_samples: int, optional
Number of time samples. Default = 256
nx_points: float, optional
Number of space domain samples in x direction. Default = 256
E_limit_low: float, optional
Fraction of energy to cut at the low end of the spectrum. Default = 0.01
E_limit_high: float, optional
Fraction of energy to keep at the high end of the spectrum. Default = 0.9
(so 0.1 is cut off at the high end)
kx_min: float, optional
Minimal value of the wave vector domain. Default = 0
kx_max: float, optional
Maximum value of the wave vector domain. Default = pi/2
delta_kx: float, optional
Spacing in wave vector domain.
Default = 0.06283 ((kx_max - kx_min) / n_kx_nodes)
n_kx_nodes: int, optional
Number of nodes in the wave vector domain. Default = 256
n_bins_equal_energy: int, optional
Number of nodes in the wave vector domain in case we are in the EqualEnergyBins
mode. See `wave_selection` below. Default value = 100
Hs: float, optional
Significant wave height [m]. Default = 3 m
Tp: float, optional
Peak period [s]. Default = 10 s
gamma: float, optional
Peakness factor in case Jonswap spectrum is used. Default = 3.3
sigma: float, optional
Width of spectrum in case Gauss spectrum with *spectral_version='dnv'* is used.
Default = 0.0625
random_phase: bool, optional
Use random phase with each run. Default = False
spectrum_type: {"jonswap", "gauss"}, optional
Type of spectrum used. Default = "jonswap"
spectral_version: {"sim", "dnv"}, optional
Version of wave spectrum used. Default = "sim". For the Jonswap spectrum there
is virtually no difference between the "sim" and "dnv" version. For the Gauss
spectrum the "sim" version has a spectral width related to Tp which is fixed,
wheras the "dnv" version has a width based on the *sigma* input argument.
gravity0: float, optional
Gravitation constant. Default = g0 = 9.81
wave_construction: {"FFT", "DFTpolar", "DFTcartesian"}
Method how the wave field is constructed from the spectrum. Default = "FFT". The
options are
* *DFTpolar*: A DFT (see top of module) is used to calculate the wave field.
Slow, however, the location of the wave vectors is not restricted.
Therefore, a smart selection of the wave nodes can be made. See
*wave_selection* for the possibilities.
* *FFT* : The wave is constructed from the wave vector nodes using a Fast
Fourier Transfor (FFT). This implies that the spectrum nodal poits are
restricted: the number of wave nodes equals the number of spatial points x,
and more over, the complex amplitude must obey the symmetry
rule :math:`a(k)=a^*(-k)`. The implies that we can not make a wave selection.
However, the FFT algorithm is so much faster than the DFT that we can still
obtain faster calculations time using the FFT based on all wave nodes.
The FFT wave construction is therefore recommended.
* *DFTcartesian*: This choice is for validation purpose only. It assumes the
same symmetric spectrum as used for the FFT option but then still the slow
DFT is used to calculate the wave field.
wave_selection: {"All", "EqualEnergyBins", "Subrange"}
For the DFTPolar wave construction mode we can make a selection of wave
components in order to speed up the wave calculation. Three choices are possible
* All: No selection is made so all the wave vectors as defined in the kx_nodes
domain are used
* Subrange: A Subrange of wave vectors based on the E_limit_low and E_limit_high
values is made
* EqualEnergyBins: The energy per bin is assumed equal
use_subrange_energy_limits : bool
If true the wave selection in the EqualEnergyBins mode is also limited by the
Subrange settings. Default = True
sample_every : int
Make a wave selection by taking every 'sample_every' point in the wave vector
domain. Only applicable when the wave_selection modes is *Subrange*
Attributes
----------
kx_nodes : ndarray
Wave vector nodes of size :math:`n_k` used to build the spectrum and the wave
field. The values depend on the wave_selection mode and wave_construction type.
In case FFT is used, the kx_nodes are symmetrical around 0 to be able to define
the symmetric spectrum.
xpoints : ndarray
Spatial position array of size :math:`n_x`
amplitude : ndarray
Wave height along x-direction at time t of size :math:`n_x`
phase : ndarray
Random phase array of size :math:`n_k`
omega_dispersion : ndarray
Angular frequency per wave node of size :math:`n_k` following from the deep water
dispersion relation
spectrumK : ndarray
Power spectral density of the wave spectrum along the wave vector nodes k of
size :math:`n_k`
spectrumW : ndarray
Power spectral density of the wave spectrum along the angular frequencies of size
:math:`n_k`. Note that since the angular frequency are coupled to the wave vector
nodes via the dispesion relation, the bin spacing of this vector is not
equidistant
complex_amplitudes : ndarray
The complex amplitudes following from the power spectral density spectrumK and
the phase vector as S_k * exp(j phase)
delta_x : float
Spatial resolution of the x-domain
kx_nyquist : float
Maximum wave vector following from the spatial resolution
*delta_x* as pi / delta_x
Examples
--------
A wave can be setup a wave with all the default settings and plot the spectrum and
wave at time 0 as follows
>>> wave_dft = Wave1D()
>>> fig, axis = wave_dft.plot_spectrum()
>>> fig, axis = wave_dft.plot_wave()
This will create the wave based on the DFT, hence it is slow. A wave construction
based on the FFT can be picked as
>>> wave_fft = Wave1D()
An animation of the wave can be made as follows
>>> movie = wave_fft.animate_wave()
This will infinitely simulate the wave. We can put and finit time by setting the
t_length to the wave
>>> wave_fft.reset_time(t_length=100)
For more examples, please have a look at the notebook linked via the Example section
example_wave_fields
"""
def __init__(
self,
name=None,
xmin=0,
Lx=None,
xmax=None,
t_start=0,
delta_t=1,
t_length=None,
nt_samples=1000000,
nx_points=512,
E_limit_low=0.01,
E_limit_high=0.90,
kx_min=0.0,
kx_max=1.5708,
delta_kx=0.06283,
n_kx_nodes=512,
n_bins_equal_energy=64,
use_subrange_energy_limits=False,
sample_every=1,
Hs=3.0,
Tp=10.0,
gamma=3.3,
sigma=0.0625,
random_phase=False,
wave_construction="FFT",
wave_selection="All",
lock_nodes_to_wave_one=False,
spectrum_type="jonswap",
spectral_version="sim",
gravity0=g0,
):
self.spectrum_type = spectrum_type
self.spectral_version = spectral_version
logger.info(f"Initialise {self.spectrum_type} 1D wave field")
if name is None:
self.name = "_".join(
["wave", spectrum_type, wave_construction, wave_selection, "1d"]
)
else:
self.name = name
# current time settings
self.time = 0
self.time_delta = pd.Timedelta(self.time, unit="s")
self.t_index = 0
self.repeat_movie = False
self.playing_movie = False
self.xmin = xmin
if xmax is not None:
self.Lx = xmax - self.xmin
if Lx is not None:
logger.warning(
"Both maximum x value xmax={} and length of domain Lx={} are "
"defined. Taking xmax leading so set Lx to {}"
"".format(xmax, Lx, self.Lx)
)
else:
if Lx is not None:
self.Lx = Lx
else:
self.Lx = 1000
self.xmax = self.xmin + self.Lx
self.t_start = None
self.delta_t = None
self.nt_samples = None
self.t_length = None
self.t_end = None
self.reset_time(
t_length=t_length, t_start=t_start, nt_samples=nt_samples, delta_t=delta_t
)
self.nx_points = nx_points
self.nt_samples = nt_samples
self.Hs = Hs
self.Tp = Tp
self.gravity0 = gravity0
# depending on the wave type we need to define gamma or sigma (for Jonswap or
# Gauss, resp.)
self.gamma = gamma
self.sigma = sigma
self.E_limit_low = E_limit_low
self.E_limit_high = E_limit_high
self.kx_min = kx_min
self.kx_max = kx_max
self.delta_kx = delta_kx
self.delta_kx_non_uni = None
self.n_kx_nodes = n_kx_nodes
self.random_phase = random_phase
# Select which wave component are selected. Choices are: Subrange,
# EqualEnergyBins, All
self.wave_selection = wave_selection
# set the wave construction
self.wave_construction = None
self.mirror = False
self.set_wave_construction(wave_construction)
if self.wave_construction == "FFT" and self.wave_selection != "All":
raise ValueError(
"Can not combine 'FFT' wave_construction method with a "
"wave_selecting other than 'All'"
)
self.lock_nodes_to_wave_one = lock_nodes_to_wave_one
# use seed to update the random phase if required
self.seed = 1
self.update_phase = True
self.pick_single_wave = False
self.picked_wave_index = 0
self.n_bins_equal_energy = n_bins_equal_energy
self.use_subrange_energy_limits = use_subrange_energy_limits
self.sample_every = sample_every
self.xpoints = None
self.phase = None
self.kx = None
self.amplitude = None
self.delta_x = None
self.kx_nyquist = None
self.eta = 0
self.ik_low = None
self.ik_high = None
self.ik_peak = None
self.k_low = None
self.k_high = None
self.k_peak = None
self.a_peakK = None
self.varianceK = None
self.iW_low = None
self.iW_high = None
self.iW_peak = None
self.W_low = None
self.W_high = None
self.W_peak = None
self.a_peakW = None
self.varianceW = None
self.spectrumK = None
self.omega_dispersion = None
self.delta_omega = None
self.complex_amplitudes = None
# the minimum and maximum wave
self.global_wave_extremes = [0, 0]
# add all the properties of the plot of the current wave
self.plot = PlotProperties()
self.update_x_k_t_sample_space()
self.calculate_spectra_modulus()
[docs]
def reset_time(self, t_length=None, t_start=0, nt_samples=10000000, delta_t=1):
"""Reset all time properties and allow to recalculate"""
self.t_start = t_start
self.delta_t = delta_t
self.nt_samples = nt_samples
if t_length is None:
self.t_length = self.nt_samples * self.delta_t
else:
self.t_length = t_length
self.nt_samples = int(self.t_length / self.delta_t)
self.t_end = self.t_start + self.t_length
self.time = self.t_start
self.t_index = 0
self.time_delta = pd.Timedelta(self.time, unit="s")
[docs]
def make_report(self):
"""Make report of settings for this wave"""
frm = "{:40s} : {}"
logger.info(frm.format("Name", self.name))
logger.info("----------- Domain settings --------")
logger.info(frm.format("Start x [m]", self.xmin))
logger.info(frm.format("End x [m]", self.xmax))
logger.info(frm.format("Length x [m]", self.Lx))
logger.info("----------- Time settings --------")
logger.info(frm.format("Start t [s]", self.t_start))
logger.info(frm.format("End t [s]", self.t_end))
logger.info(frm.format("Length t [s]", self.t_length))
logger.info(frm.format("Delta t [s]", self.delta_t))
logger.info("----------- Numerical resolutions --------")
logger.info(frm.format("Number x - nodes", self.xpoints.size))
logger.info(frm.format("Number kx - nodes", self.kx_nodes.size))
logger.info(frm.format("Delta x [m]", self.delta_x))
dkx = np.diff(self.kx_nodes)
logger.info(frm.format("Delta kx min [rad/m]", dkx.min()))
logger.info(frm.format("Delta kx max [rad/m]", dkx.max()))
logger.info(frm.format("Delta kx first [rad/m]", self.delta_kx))
logger.info(frm.format("k-min [rad/m]", self.kx_nodes.min()))
logger.info(frm.format("k-max [rad/m]", self.kx_nodes.max()))
logger.info(frm.format("k-nyq [rad/m]", self.kx_nyquist))
logger.info(frm.format("L_max [m]", 2 * np.pi / dkx[0]))
logger.info("----------- Numerical methodds --------")
logger.info(frm.format("Selection method", self.wave_selection))
logger.info(frm.format("Construction method", self.wave_construction))
[docs]
def next_time(self):
"""Increase the time"""
self.time += self.delta_t
self.time_delta = pd.Timedelta(self.time, unit="s")
self.t_index += 1
if self.time > self.t_end:
if self.repeat_movie:
self.time = self.t_start
self.t_index = 0
else:
self.playing_movie = False
[docs]
def propagate_wave(self):
"""Increase the time and recalculate the surface"""
self.next_time()
self.calculate_wave_surface()
[docs]
def animate_wave(
self,
x_min=None,
x_max=None,
y_min=None,
y_max=None,
figsize=(12, 4),
interval=25,
x_time_label=0.01,
y_time_label=0.93,
plot_title=None,
x_plot_title=0.5,
y_plot_title=0.97,
title_font_size=12,
title_horizontal_algnment="center",
):
"""Animate the 1D wave vs space and wave vs time
Parameters
----------
fig: :obj:`figure`
Figure returned by previous call can be update
ax: :obj:`Axes`
Axes returned by previous call can be updated to add more waves
x_min: float, optional
Start of the x-range. Default = None
x_max: float, optional
End of the x-range. Default = None
y_min: float, optional
Start of the y-range. Default = None
y_max: float, optional
End of the y-range. Default = None
figsize: tuple, optional
Tuple with (with, height) in inches. Default = None
interval: int, optional
Time in ms between each frame. Default = 25
x_time_label: float, optional
X position relative to the current graph where to put the time label
y_time_label: float, optional
Y Position relative to the current graph where to put the time label
Returns
-------
tuple
(fig, ax) reference to the figure and its axis
"""
fig, ax = plt.subplots(nrows=1, ncols=1, figsize=figsize)
self.calculate_wave_surface()
(line,) = ax.plot(self.xpoints, self.amplitude)
ax.set_xlabel("x position [m]")
ax.set_ylabel("wave amplitude [m]")
time_label_string = "{:02d}:{:02d}:{:02d}"
time_text = ax.text(
x_time_label,
y_time_label,
time_label_string.format(
self.time_delta.components.hours,
self.time_delta.components.minutes,
self.time_delta.components.seconds,
),
transform=ax.transAxes,
)
if plot_title is not None:
plt.figtext(
x_plot_title,
y_plot_title,
plot_title,
fontsize=title_font_size,
verticalalignment="top",
ha=title_horizontal_algnment,
)
window_title = plot_title
fig.canvas.set_window_title(window_title)
set_limits(axis=ax, v_min=y_min, v_max=y_max, direction="y")
self.playing_movie = True
def next_time():
# An iterator to proceed to the next wave and yield the index only if we
# want to keep playing.
# Obsolete, I have now added the proceed wave to the animate function and
# past nt_samples
logger.debug(f"Updating wave for time {self.time}")
self.propagate_wave()
if self.playing_movie:
yield self.t_index
def animate(i):
logger.debug(
"Plotting wave for time {:10.1f} s until {:10.1f} s : {}".format(
self.time, self.t_end, self.time_delta
)
)
self.propagate_wave()
line.set_ydata(self.amplitude)
time_text.set_text(
time_label_string.format(
self.time_delta.components.hours,
self.time_delta.components.minutes,
self.time_delta.components.seconds,
)
)
return line, time_text
def init():
# Initialise the wave plot by first calculating the current wave height
# and than set the data
line.set_ydata(self.amplitude)
return (line,)
ani = animation.FuncAnimation(
fig,
animate,
self.nt_samples,
init_func=init,
interval=interval,
blit=False,
repeat=True,
)
return ani
[docs]
@staticmethod
def limit_markers(
ax, text, a_x, a_y, s_x=0, s_y=0, symbol="vy", arrow_color="black", fontsize=8
):
ax.plot([a_x], [a_y], symbol)
ax.annotate(
text,
xy=(a_x, a_y),
xytext=(s_x, s_y),
textcoords="offset points",
horizontalalignment="left",
va="top",
arrowprops=dict(width=2, headwidth=5, shrink=0.1, color=arrow_color),
fontsize=fontsize,
)
[docs]
def plot_spectrum(
self,
fig=None,
axis=None,
x_min=None,
x_max=None,
y_min=None,
y_max=None,
figsize=None,
add_limit_markers=False,
add_hs_estimate=False,
add_n_points=True,
y_n_points_label=0.8,
line_markers="o",
markersize=4,
linestyle="-",
linecolor=None,
plot_title=None,
x_plot_title=0.5,
y_plot_title=0.97,
title_font_size=12,
title_horizontal_algnment="center",
):
"""Plot the spectrum in k-spaco
Parameters
----------
fig: :obj:`Figure`, optional
Reference to the figure to add the plot to
axis: :obj:`Axis`, optional
Reference to the axis
x_min: float, optional
set minimum x value, Default = None i.e take the default
x_max: float, optional
set maximum x value, Default = None i.e take the default
y_min: float, optional
set minimum y value, Default = None i.e take the default
y_max: float, optional
set maximum y value, Default = None i.e take the default
figsize: tuple
(width, height) of Figure. Default = None ie. take the default
add_limit_markers: bool, optional
Add markers to indicate the low and high limits of the energy in the
spectrum
add_hs_estimate=False, optional
Add a label indicating the Hs belonging to the energy of the spectrum
add_n_points: bool, optional
Add a label with the number of points
y_n_points_label: bool, optional
Give the y coordinates the n_point label, Default = 0.8
line_markers: str, optional
Give the marker of the plot. Default = "o". Set to "" to remove the markers
linestyle: str, optional
Give the linestyle, Default = "-"
linecolor: str, optional
Give the color of the line. Default = None, ie take the default
markersize: int, optional
Give the size of the markers. Default = 4
Returns
-------
tuple
(fig, axis), reference to the create fig and axis
"""
if fig is None or axis is None:
fig, axis = plt.subplots(nrows=2, ncols=1, figsize=figsize)
plt.subplots_adjust(hspace=0.25)
axis[0].set_ylabel("spectral amplitude [m3]")
axis[0].set_xlabel("wave vector kx [rad/m]")
axis[1].set_ylabel("spectral amplitdude [m2s]")
axis[1].set_xlabel("omega [rad/s]")
if self.mirror:
kx_nodes = fftshift(self.kx_nodes)
omega_disp = fftshift(self.omega_dispersion)
spectrum_k = fftshift(self.spectrumK)
spectrum_w = fftshift(self.spectrumW)
else:
kx_nodes = self.kx_nodes
omega_disp = self.omega_dispersion
spectrum_k = self.spectrumK
spectrum_w = self.spectrumW
(line0,) = axis[0].plot(
kx_nodes,
spectrum_k,
linestyle=linestyle,
marker=line_markers,
markersize=markersize,
color=linecolor,
)
(line1,) = axis[1].plot(
omega_disp,
spectrum_w,
linestyle=linestyle,
marker=line_markers,
markersize=markersize,
color=linecolor,
)
if add_limit_markers:
self.limit_markers(
ax=axis[0],
a_x=self.k_low,
a_y=self.spectrumK[self.ik_low],
symbol="vy",
text=f"E(k<{self.k_low:.2g})/E\n={self.E_limit_low:.2g}",
s_x=-50,
s_y=40,
)
self.limit_markers(
ax=axis[0],
a_x=self.k_high,
a_y=self.spectrumK[self.ik_high],
symbol="^g",
text="E(k>{:.2g})/E\n={:.2g}".format(
self.k_high, 1 - self.E_limit_high
),
s_x=10,
s_y=40,
)
self.limit_markers(
ax=axis[0],
a_x=self.k_peak,
a_y=self.spectrumK[self.ik_peak],
symbol="or",
text="k={:.2g} rad/m\nL={:.1f} m".format(
self.k_peak, 2 * np.pi / self.k_peak
),
s_x=20,
s_y=-30,
)
self.limit_markers(
ax=axis[1],
a_x=self.W_low,
a_y=self.spectrumW[self.iW_low],
symbol="vy",
text=f"E(k<{self.W_low:.2g})/E\n={self.E_limit_low:.2g}",
s_x=-50,
s_y=40,
)
self.limit_markers(
ax=axis[1],
a_x=self.W_high,
a_y=self.spectrumW[self.iW_high],
symbol="^g",
text="E(k>{:.2g})/E\n={:.2g}".format(
self.W_high, 1 - self.E_limit_high
),
s_x=10,
s_y=40,
)
self.limit_markers(
ax=axis[1],
a_x=self.W_peak,
a_y=self.spectrumW[self.iW_peak],
symbol="or",
text="k={:.2g} rad/s\nT={:.1f} s".format(
self.W_peak, 2 * np.pi / self.W_peak
),
s_x=20,
s_y=-30,
)
if add_hs_estimate:
var_k = np.sum(self.spectrumK * self.delta_kx)
if self.wave_construction in ("FFT", "DFTcartesian"):
var_k /= 2.0
hs_estimate_k = 4 * np.sqrt(var_k)
axis[0].text(
0.85,
0.92,
f"Hs={hs_estimate_k:.1f} m",
transform=axis[0].transAxes,
va="top",
)
var_w = np.sum(self.spectrumW * self.delta_omega)
if self.wave_construction in ("FFT", "DFTcartesian"):
var_w /= 2.0
hs_estimate_w = 4 * np.sqrt(var_w)
axis[1].text(
0.85,
0.92,
f"Hs={hs_estimate_w:.1f} m",
transform=axis[1].transAxes,
)
if add_n_points:
axis[0].text(
0.85,
y_n_points_label,
f"N={self.spectrumK.size:d}",
transform=axis[0].transAxes,
va="top",
)
# for ax in axis:
# set_limits(ax, v_min=x_min, v_max=x_max, axis="x")
# set_limits(ax, v_min=y_min, v_max=y_max, axis="y")
if plot_title is not None:
plt.figtext(
x_plot_title,
y_plot_title,
plot_title,
fontsize=title_font_size,
verticalalignment="top",
ha=title_horizontal_algnment,
)
window_title = plot_title
fig.canvas.set_window_title(window_title)
return fig, axis
[docs]
def plot_wave(
self,
fig=None,
ax=None,
x_min=None,
x_max=None,
y_min=None,
y_max=None,
figsize=(12, 4),
linestyle="-",
line_markers=None,
linecolor=None,
markersize=5,
label=None,
plot_title=None,
x_plot_title=0.5,
y_plot_title=0.97,
title_font_size=12,
title_horizontal_algnment="center",
):
"""Plot the 1D wave vs space and wave vs time
Parameters
----------
fig: :obj:`figure`
Figure returned by previous call can be update
ax: :obj:`Axes`
Axes returned by previous call can be updated to add more waves
x_min: float, optional
Start of the x-range. Default = None
x_max: float, optional
End of the x-range. Default = None
y_min: float, optional
Start of the y-range. Default = None
y_max: float, optional
End of the y-range. Default = None
figsize: tuple, optional
Tuple with (with, height) in inches. Default = None
plot_title: str, optional
Add a title to the plot. Default = None, ie. don't add a title
x_plot_title: float, optional
X Location of the plot title. Default = 0.5 wro the figurek
y_plot_title: float, optional
Y Location of the plot title. Default = 0.97 wro the figurek
title_font_size: int, optional
Font size of the title
Returns
-------
tuple
(fig, ax) reference to the figure and its axis
"""
self.calculate_wave_surface()
if fig is None or ax is None:
fig, ax = plt.subplots(nrows=1, ncols=1, figsize=figsize)
(line1,) = ax.plot(
self.xpoints,
self.amplitude,
linestyle=linestyle,
marker=line_markers,
markersize=markersize,
color=linecolor,
label=label,
)
ax.set_xlabel("x position [m]")
ax.set_ylabel("wave amplitude [m]")
set_limits(axis=ax, v_min=x_min, v_max=x_max, direction="x")
set_limits(axis=ax, v_min=y_min, v_max=y_max, direction="y")
if plot_title is not None:
plt.figtext(
x_plot_title,
y_plot_title,
plot_title,
fontsize=title_font_size,
verticalalignment="top",
horizontalalignment=title_horizontal_algnment,
)
window_title = plot_title
fig.canvas.set_window_title(window_title)
return fig, ax
[docs]
def set_wave_construction(self, mode):
"""
Set the wave how the wave field is constructed from the power spectral density:
via FFT or DFT
Parameters
----------
mode: {"FFT", "DFTpolar", "DFTcartesian"}
Construction type of the wave field. In case FFT or DFTcartesian is chosen,
the spectrum most be symmetric and therefore mirror must be True
"""
if self.wave_selection != "All" and mode != "DFTpolar":
logger.warning(
"You have set a wave selection but want to use an FFT. Selecting waves "
"is only possible for DFTpolar. Setting that now"
)
mode = "DFTpolar"
self.wave_construction = mode
if mode in ("FFT", "DFTcartesian"):
self.mirror = True
elif mode == "DFTpolar":
self.mirror = False
else:
raise AssertionError(
"Wave construction must be FFT, DFTcartesian, or DFTpolar. Found {}"
"".format(mode)
)
[docs]
def update_x_k_t_sample_space(self):
"""
After a change of number of x-points or wave vector nodes k has been made, call
this routine to update all the arrays
"""
self.xmax = self.xmin + self.Lx
self.xpoints = np.linspace(self.xmin, self.xmax, self.nx_points, endpoint=True)
self.amplitude = np.zeros(self.xpoints.shape)
self.delta_x = self.xpoints[1] - self.xpoints[0]
logger.debug(
"XMIN {} XMAX {} nx {} deltax {}".format(
self.xmin, self.xmax, self.nx_points, self.delta_x
)
)
self.kx_nyquist = np.pi / self.delta_x
if self.wave_construction == "DFTpolar":
# If a DDT is used for the wave field calculation, you can use any amount
# of wave vectors
if self.kx_max > self.kx_nyquist:
logger.warning(
"kx_max larger than nyquist belongin to dx={}: clipping kx to {}"
"".format(self.delta_x, self.kx_nyquist)
)
self.kx_max = self.kx_nyquist
self.kx_nodes = np.linspace(self.kx_min, self.kx_max, self.n_kx_nodes)
self.delta_kx = self.kx_nodes[1] - self.kx_nodes[0]
else:
# for the FFT the number wave vectors should be equal to the number of
# x-points the DFT based on cartesian values in this case take the same
# mesh as FFT but then uses the DFT algorith for comparison
self.kx_nodes = 2 * np.pi * np.fft.fftfreq(self.nx_points, self.delta_x)
self.delta_kx = self.kx_nodes[1] - self.kx_nodes[0]
logger.debug(
"kx_nodes minx/max {} {}".format(
np.amin(self.kx_nodes), np.amax(self.kx_nodes)
)
)
if (
self.phase is None
or self.kx_nodes.shape != self.phase.shape
or self.update_phase
):
self.phase = ms.initialize_phase(self.kx_nodes, self.seed)
self.update_phase = False
self.t_end = self.t_start + self.t_length
self.delta_t = self.t_length / self.nt_samples
[docs]
def calculate_omega_dispersion(self):
"""
Calculate the omega frequency belonging to the wave vectors according to the
deep water dispersion relation.
"""
self.omega_dispersion = np.sqrt(self.gravity0 * abs(self.kx_nodes)) * np.where(
self.kx_nodes >= 0,
np.ones(self.kx_nodes.shape),
-np.ones(self.kx_nodes.shape),
)
delta_omega = np.diff(self.omega_dispersion)
self.delta_omega = np.append(delta_omega, [delta_omega[-1]])
[docs]
def calculate_spectra_modulus(self):
"""
This routine calculates the Spectrum in omega and k domain. The spectrum can
be either a Jonswap or a Gauss spectrum depending on the wave_type argument
Based on the energy limits, the minimum and maximum frequency/wave vector is
obtained using the specspecs routine. Based on those limits, you may take a
selection of wave vectors if the wave_selection variable is set to 'subrange' or
'EqualEnergyBins'
"""
# self.phase = ms.initialize_phase(self.kx_nodes,self.seed)
self.spectrumK = ms.spectrum_wave_k_domain(
k_waves=self.kx_nodes,
Hs=self.Hs,
Tp=self.Tp,
gamma=self.gamma,
sigma=self.sigma,
spectrum_type=self.spectrum_type,
spectral_version=self.spectral_version,
)
(
self.ik_low,
self.ik_high,
self.ik_peak,
self.k_low,
self.k_high,
self.k_peak,
self.a_peakK,
self.varianceK,
) = ms.specspecs(
self.kx_nodes,
self.spectrumK,
lowlim=self.E_limit_low,
higlim=self.E_limit_high,
mirror=self.mirror,
)
# Select a sub range of the wave vectors. In case that fft is used, this is
# not possible.
if self.wave_construction == "DFTpolar" and self.wave_selection == "Subrange":
# extra wave vectors based on the range k_low,k_high
# Create a mask array to select the wave vectors within the subrange k_low
# k_high
mask = ms.mask_out_of_range(self.kx_nodes, self.k_low, self.k_high)
# make a selection of wave vectors: set value equal to zero outside range
self.kx_nodes = np.extract([mask], [self.kx_nodes])
self.spectrumK = np.extract([mask], [self.spectrumK])
self.phase = np.extract([mask], [self.phase])
# Subsample the wave vectors and frequencies bin in case sample_every is
# larger than one
if self.sample_every > 1:
self.kx_nodes = self.kx_nodes[:: self.sample_every]
self.spectrumK = self.spectrumK[:: self.sample_every]
self.phase = self.phase[:: self.sample_every]
elif (
self.wave_construction == "DFTpolar"
and self.wave_selection == "EqualEnergyBins"
):
# If EqualEnergy bins is selected, make a selection of frequency bins such
# that the energy per interval S*dk remains constant to Ebin (the mean
# energy per bin based on the total energy and number of bins
self.Ebin = self.varianceK / self.n_bins_equal_energy
kx_nodes = []
kk = self.kx_min
if self.use_subrange_energy_limits:
kk = self.k_low
kx_nodes.append(kk)
logger.debug("Start solving the euqal energy bins...")
n_trail_max = 10
while kk < self.kx_max:
logger.debug(f"Solving bin for kk = {kk}")
# Calculate the next wave vector such that S*dk (energy in this bin)
# equals Ebin by solving the integral int_k0^knew Sdk. kk+delta_kx is
# just a first guess
delta_kx = self.delta_kx
n_trail = n_trail_max
found_solution = False
while not found_solution and n_trail > 0:
ans = sp.optimize.fsolve(
_energy_deficit,
kk + delta_kx,
args=(
kk,
self.Ebin,
self.Hs,
self.Tp,
self.gamma,
self.spectral_version,
self.spectrum_type,
self.sigma,
),
)
if ans[0] > 0:
found_solution = True
kk = ans[0]
else:
delta_kx /= 2
n_trail -= 1
# we could not find a solution. Try again with a smaller offset
logger.debug(
"Failed to find a solution for kk. Try again for smaller"
" delta kx {} {}".format(delta_kx, n_trail)
)
if n_trail < 0:
logger.warning(
"Tried {} times without succes. Stop solving, "
"hope for the best".format(n_trail)
)
break
# check if kk is within subrange is requested and then add it to the
# list
if kk < self.kx_max and not (
self.use_subrange_energy_limits
and (kk < self.k_low or kk > self.k_high)
):
logger.debug(
"Storing bin kk = {} with k_low = {} k_high ={}"
"".format(kk, self.k_low, self.k_high)
)
kx_nodes.append(kk)
logger.debug("Done")
# turn the create kx_node list into a nparray
if self.lock_nodes_to_wave_one:
# If lock_nodes_to_wave_one is true, then the k values calculated above
# are all # locked to the wave vectors of the full wave domain by
# finding the nearest wave value
mask = np.full(self.kx_nodes.shape, False, dtype=np.bool)
for kk in kx_nodes:
kinx = find_idx_nearest_val(self.kx_nodes, kk)
mask[kinx] = True
# select the kx and phase at the nodes
self.kx_nodes = np.extract(mask, self.kx_nodes)
self.phase = np.extract(mask, self.phase)
else:
# in case the nodes do not have to be locked to the first wave,
# convert the created list of wave vectors into a numpy array, calculate
# a new phase and the corresponding modulus
self.kx_nodes = np.array(kx_nodes)
self.phase = ms.initialize_phase(self.kx_nodes, self.seed)
# Based on the locked or non-locked kx_nodes with fixed and non-fixed
# phases, calculate the spectrumK
self.spectrumK = ms.spectrum_wave_k_domain(
k_waves=self.kx_nodes,
Hs=self.Hs,
Tp=self.Tp,
gamma=self.gamma,
sigma=self.sigma,
spectrum_type=self.spectrum_type,
spectral_version=self.spectral_version,
)
elif self.wave_selection == "OneWave":
logger.debug(f"selecting wave {self.picked_wave_index}")
# pick one wave only
k = self.picked_wave_index
if not self.wave_construction == "FFT":
# only copy the single wave of the picked wave vector for the DFT
# algorithms
self.kx_nodes = np.array([self.kx_nodes[k]])
else:
# keep all the wave vectors except set the value not equal to the picked
# wave to zero
mask = np.full(self.spectrumK.shape, True, dtype=bool)
mask[self.picked_wave_index] = False
self.spectrumK[mask] = 0.0
self.phase[mask] = 0.0
else:
logger.debug("No wave selection has been made, so using all the k nodes")
# based on the new omega values, calculate the omega spectrum as well
self.calculate_omega_dispersion()
if self.spectrum_type == "jonswap":
self.spectrumW = ms.spectrum_jonswap(
abs(self.omega_dispersion),
Hs=self.Hs,
Tp=self.Tp,
gamma=self.gamma,
spectral_version=self.spectral_version,
)
elif self.spectrum_type == "gauss":
self.spectrumW = ms.spectrum_gauss(
abs(self.omega_dispersion),
Hs=self.Hs,
Tp=self.Tp,
sigma=self.sigma,
spectral_version=self.spectral_version,
)
else:
raise AssertionError(
"spectrum type must either be 'jonswap' or 'gauss'. Found {}".format(
self.spectrum_type
)
)
(
self.iW_low,
self.iW_high,
self.iW_peak,
self.W_low,
self.W_high,
self.W_peak,
self.a_peakW,
self.varianceW,
) = ms.specspecs(
self.omega_dispersion,
self.spectrumW,
lowlim=self.E_limit_low,
higlim=self.E_limit_high,
mirror=self.mirror,
)
# calculate the complex amplitudes for the wave nodes and random phase vector
self.complex_amplitudes = ms.spectrum_to_complex_amplitudes(
self.kx_nodes,
spectral_modulus=self.spectrumK,
phase=self.phase,
mirror=self.mirror,
)
if not self.wave_construction == "FFT":
# create the maxtrix exp (j * kx_nodes * x_nodes) where kx_nodes * x_nodes
# is the matrix following from the vector procuct of the vectos k^T and x
# only calculate this for DFT
self.exp_matrix_kx = np.exp(
1j
* self.kx_nodes.reshape(self.kx_nodes.size, 1)
* self.xpoints.reshape((1, self.xpoints.size))
)
[docs]
def calculate_wave_surface(self):
"""
Calculate the wave surface for current time using either DFT or FFT
Notes
-----
The *wave_construction* attribute determines which algorithm is used to
calculate the the wave surface. The following options are possible
* DFTpolar: a DFT (Discrete Fourier Transform) is used to calculate the wave
surface. There are no requirement to the number and shape of the fourier
nodes stored in the 'complex_amplitudes'.
* FFT: a FFT (Fast Fourier Transform) is used to calculate the wave surface.
This implies that the spectral nodes must be symmetrical around zero as
S(k)=S^*(-k)
* DFTcartesian: A DFT is used to calculate the wave surface, however, still the
same symmetric spectrum as used with the FFT is supplied. This allows to
compare the DFT and DFT in calculation time and outcome with the exact same
outcome (as the input nodes can be the same). The scaling with 0.5 due to the
double spectrum (as it is symmetric) is taken care of here.
"""
if self.wave_construction in ("DFTpolar", "DFTcartesian"):
self.amplitude = self.dft_complex_amplitudes(
self.complex_amplitudes,
self.exp_matrix_kx,
self.omega_dispersion,
self.time,
)
if self.wave_construction == "DFTcartesian":
# The DFTcartesian uses a symmetric spectrum, just as you do with the
# FFT. Therefore you have to scale the energy with a half
self.amplitude *= 0.5
elif self.wave_construction == "FFT":
# the fft is used
self.amplitude = self.fft_amplitude(
self.complex_amplitudes, self.omega_dispersion, self.time
)
else:
raise (
AssertionError(
"wave_construction should be either FFT, DFTpolar, or DFTcartesian."
" Found {}".format(self.wave_construction)
)
)
logger.debug(f"H_s of 1D surface {4 * np.std(self.amplitude)} ")
[docs]
@staticmethod
def dft_complex_amplitudes(S_tilde, exp_kx, omega, time):
"""
Calculate the wave height from the complex amplitudes at given time
Parameters
----------
S_tilde: complex ndarray
vector Nx1 with the complex amplitudes
exp_kx: ndarray array
NxM matrix with exp(k * x) where k is Nx1 k waves vectors and x is 1XM
spatial nodes
omega: ndarray
N ndvector
time: float
Current time
Returns
-------
ndarray array
Mx1 real array with the wave amplitude for given time
Notes
-----
Calculate the vector height(x0,x1,..xM) = real(sum_k A*exp(j(k*x-w*t))), where
the MxN matrix exp(k*x) and the Nx1 vector A (with the complex amplitudes) have
been pre-calculated and exp(j*w*t) is put here in a NxN diagonal matrix.
Return the Mx1 vector with height for each position x
"""
N = S_tilde.size
omega_matrix = np.eye(N) * np.exp(-1j * omega * time)
M = np.dot(S_tilde, omega_matrix)
dft = np.dot(M, exp_kx)
return np.real(dft)
[docs]
@staticmethod
def fft_amplitude(S_tilde, omega, time):
"""Calculate the amplitude at time using the FFT
Parameters
----------
S_tilde: ndarray
Nx1 array with the complex amplitudes of the wave spectrum
omega: ndarray
N x 1 array with the real angular frequency rad/s per wave vector k
time: float
Current time
Returns
-------
ndarray
real Nx1 array with the wave amplitude at time t
Notes
-----
Since the FFT is used, the S_tilde should by symmetrical around k=0 such that
S(k)=S^*(-k)
"""
N = S_tilde.size / 2
ampl = N * np.fft.ifft(S_tilde * np.exp(1j * (-time * omega)))
# ampl should be real already because S_tilde should by symmetrical around k=0
# S(k)=S^*(-k) to be sure, take the real value only
return np.real(ampl)