Source code for regridding._weights._weights_transposed._weights_transposed

import numpy as np
import numba
from ... import _util
from .._weights_conservative_1d._grids import cell_length
from .._weights_conservative_2d._grids import grid_volume as cell_area

__all__ = [
    "transpose_weights",
    "transpose_weights_conservative",
]


[docs] def transpose_weights( weights: tuple[np.ndarray, tuple[int, ...], tuple[int, ...]], ) -> tuple[np.ndarray, tuple[int, ...], tuple[int, ...]]: r""" Transpose the sparse matrix of weights calculated by :func:`regridding.weights`. This function works by swapping the indices, :math:`(i, j, w) \rightarrow (j, i, w)`. Transposed weights can be used with :func:`regridding.regrid_from_weights` to perform a transform in the opposite direction. Parameters ---------- weights Ragged array of weights computed by :func:`regridding.weights`. """ weights, shape_input, shape_output = weights shape = weights.shape weights = numba.typed.List(weights.reshape(-1)) result = _transpose_weights_numba(weights) result = np.fromiter(result, dtype=object) result = result.reshape(shape) return result, shape_output, shape_input
@numba.njit( cache=True, fastmath=True, ) def _transpose_weights_numba( weights: numba.typed.List, ) -> numba.typed.List: result = numba.typed.List() for d in numba.prange(len(weights)): d = numba.types.int64(d) weights_d = weights[d] result_d = numba.typed.List() for w in range(len(weights_d)): i_input, i_output, weight = weights_d[w] result_d.append((i_output, i_input, weight)) result.append(result_d) return result
[docs] def transpose_weights_conservative( weights: tuple[np.ndarray, tuple[int, ...], tuple[int, ...]], coordinates_input: np.ndarray | tuple[np.ndarray, ...], coordinates_output: np.ndarray | tuple[np.ndarray, ...], axis_input: None | int | tuple[int, ...] = None, axis_output: None | int | tuple[int, ...] = None, weights_input: None | np.ndarray = None, ) -> tuple[np.ndarray, tuple[int, ...], tuple[int, ...]]: r""" Transpose matrix of weights and normalize to be conservative. Similar to :func:`transpose_weights`, this function transposes the matrix of weights calculated by :func:`regridding.weights`. However, this function also applies the appropriate normalization to the transposed weights such that they conserve flux when used with :func:`regridding.regrid_from_weights` to perform an inverse transform. Parameters ---------- weights Ragged array of weights computed by :func:`regridding.weights`. coordinates_input Vertices of each cell in the input grid provided to :func:`~regridding.weights`. Each transposed weight will be `multiplied` by the volume of the corresponding cell in the input grid. coordinates_output Vertices of each cell in the output grid. Each transposed weight will be `divided` by the volume of the corresponding cell in the output grid. axis_input Logical axes of the input grid to resample. If :obj:`None`, resample all the axes of the input grid. The number of axes should be equal to the number of coordinates in the input grid. axis_output Logical axes of the output grid corresponding to the resampled axes of the input grid. If :obj:`None`, all the axes of the output grid correspond to resampled axes in the input grid. The number of axes should be equal to the number of coordinates in the output grid. weights_input An optional array of weights that were applied to the input values by :func:`regridding.weights`. Each transposed weight will be `divided` by its corresponding input weight. Examples -------- Regrid array of values onto new grid with precalculated weights, and then transform back with transposed weights. .. jupyter-execute:: import numpy as np import matplotlib.pyplot as plt import regridding # Define input grid x_input = np.linspace(-4, 4, num=11) y_input = np.linspace(-4, 4, num=11) x_input, y_input = np.meshgrid(x_input, y_input, indexing="ij") # Define rotated output grid angle = 0.2 x_output = x_input * np.cos(angle) - y_input * np.sin(angle) y_output = x_input * np.sin(angle) + y_input * np.cos(angle) # Define arrays of values defined on the same grid values_input = np.zeros((10, 10)) values_input[4, 4] = 1 # Save regridding weights relating the input and output grids weights = regridding.weights( coordinates_input=(x_input, y_input), coordinates_output=(x_output, y_output), method="conservative", ) # Regrid the first array of values using the saved weights values_output = regridding.regrid_from_weights( *weights, values_input=values_input, ) # Transpose calculated weights weights_transposed = regridding.transpose_weights_conservative( weights, coordinates_input=(x_input, y_input), coordinates_output=(x_output, y_output), ) # Regrid the regridded values back onto original grid using transposed weights. values_transposed = regridding.regrid_from_weights( *weights_transposed, values_input=values_output, ) # Plot the original and regridded arrays of values fig, axs = plt.subplots( nrows=1, ncols=3, sharex=True, sharey=True, constrained_layout=True, ) axs[0].pcolormesh(x_input, y_input, values_input, vmin=0, vmax=1); axs[0].set_title(r"original"); axs[1].pcolormesh(x_output, y_output, values_output, vmin=0, vmax=1); axs[1].set_title(r"rotated"); axs[2].pcolormesh(x_input, y_input, values_transposed, vmin=0, vmax=1); axs[2].set_title(r"rotated and tranposed"); axs[0].set_aspect("equal"); axs[1].set_aspect("equal"); axs[2].set_aspect("equal"); """ weights, shape_input, shape_output = weights ( coordinates_input, coordinates_output, axis_input, axis_output, _, _, shape_orthogonal, ) = _util._normalize_input_output_coordinates( coordinates_input=coordinates_input, coordinates_output=coordinates_output, axis_input=axis_input, axis_output=axis_output, ) axis_numba_input = ~np.arange(len(axis_input))[::-1] axis_numba_output = ~np.arange(len(axis_output))[::-1] size_orthogonal = int(np.prod(shape_orthogonal)) if weights_input is not None: weights_input = np.broadcast_to(weights_input, shape_input) weights_input = np.moveaxis(weights_input, axis_input, axis_numba_input) weights_input = weights_input.reshape(size_orthogonal, -1) volume_input = _cell_volume(coordinates_input, axis_input) volume_input = np.moveaxis(volume_input, axis_input, axis_numba_input) volume_input = volume_input.reshape(size_orthogonal, -1) volume_output = _cell_volume(coordinates_output, axis_output) volume_output = np.moveaxis(volume_output, axis_output, axis_numba_output) volume_output = volume_output.reshape(size_orthogonal, -1) shape = weights.shape weights = numba.typed.List(weights.reshape(-1)) result = _transpose_weights_conservative_numba( weights, weights_input=weights_input, volume_input=volume_input, volume_output=volume_output, ) result = np.fromiter(result, dtype=object) result = result.reshape(shape) return result, shape_output, shape_input
@numba.njit( cache=True, fastmath=True, parallel=True, ) def _transpose_weights_conservative_numba( weights: numba.typed.List, volume_input: np.ndarray, volume_output: np.ndarray, weights_input: None | np.ndarray, ) -> numba.typed.List: result = numba.typed.List() for d in range(len(weights)): result_d = numba.typed.List() for _ in range(0): # pragma: nocover result_d.append((0, 0, 0.0)) result.append(result_d) for d in numba.prange(len(weights)): d = numba.types.int64(d) weights_d = weights[d] result_d = result[d] volume_input_d = volume_input[d] volume_output_d = volume_output[d] for w in range(len(weights_d)): i_input, i_output, weight = weights_d[w] if weights_input is not None: weight = weight / weights_input[d][i_input] weight = weight * volume_input_d[i_input] / volume_output_d[i_output] result_d.append((i_output, i_input, weight)) result[d] = result_d return result def _cell_volume( grid: tuple[np.ndarray, ...], axis: tuple[int, ...], ) -> np.ndarray: """ Compute the :math:`n`-dimensional volume of each cell in a grid. Parameters ---------- grid 1- or 2-dimensional grid. """ shape = grid[0].shape axis_numba = ~np.arange(len(axis))[::-1] shape_numba = tuple(shape[ax] for ax in axis) if len(grid) == 1: (x,) = grid x = np.moveaxis(x, axis, axis_numba) x_ = np.reshape(x, (-1,) + shape_numba) result = _cell_volume_1d(grid=(x_,)) result = np.reshape(result, x_.shape[:-1] + result.shape[-1:]) result = np.moveaxis(result, axis_numba, axis) elif len(grid) == 2: x, y = grid x = np.moveaxis(x, axis, axis_numba) y = np.moveaxis(y, axis, axis_numba) x_ = np.reshape(x, (-1,) + shape_numba) y_ = np.reshape(y, (-1,) + shape_numba) result = _cell_volume_2d(grid=(x_, y_)) result = np.reshape(result, x_.shape[:-2] + result.shape[-2:]) result = np.moveaxis(result, axis_numba, axis) else: # pragma: nocover raise ValueError("Grids greater than 2D not supported.") return result @numba.njit( cache=True, fastmath=True, ) def _cell_volume_1d( grid: tuple[np.ndarray], ) -> np.ndarray: (x,) = grid shape_t, shape_x = x.shape result = np.empty((shape_t, shape_x - 1)) for t in range(shape_t): result[t] = cell_length(x[t]) return result @numba.njit( cache=True, fastmath=True, ) def _cell_volume_2d( grid: tuple[np.ndarray, np.ndarray], ) -> np.ndarray: x, y = grid shape_t, shape_x, shape_y = x.shape result = np.empty((shape_t, shape_x - 1, shape_y - 1)) for t in range(shape_t): result[t] = cell_area(grid=(x[t], y[t])) return result