Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 229 additions & 12 deletions python/packages/isce3/signal/compute_evd_cpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import numpy as np
from numpy import linalg as la
from collections.abc import Iterator
import copy
import warnings

def slice_gen(total_size: int, batch_size: int, combine_rem: bool=True) -> Iterator[slice]:
"""Generate slices with size defined by batch_size.
Expand Down Expand Up @@ -74,7 +76,6 @@ def eigen_decomp_sort(cov_matrix):

return eig_val_sort, eig_vec_sort


def compute_evd(
raw_data: np.ndarray,
):
Expand Down Expand Up @@ -111,25 +112,51 @@ def compute_evd(

def compute_evd_tb(
raw_data: np.ndarray,
cpi_len=32,
cpi_len: int=16,
prf_dither_mode: bool=False,
mask_valid: np.ndarray=None,
off_diag_overlap_ratio: float=0.1,
diag_valid_ratio: float=0.05,
noise_ev_idx: int=10,
):
"""Divide input raw data equivalent to a threshold block into data blocks
or Coherent Processing Intervals (CPI) with resepct to axis=0 and perform
Eigenvalue Decomposition for all CPIs.
"""Divide input raw data equivalent to a threshold block into Coherent
Processing Intervals (CPI) with respect to axis=0 and perform Eigenvalue
Decomposition for all CPIs.

For constant-PRF data, standard EVD is used. For dithered-PRF data, gap-exclusion
EVD is used. CPI validity is always checked.

Parameters
------------
raw_data: array-like complex [num_pulses x num_rng_samples]
raw data to be processed
Raw data to be processed
cpi_len: int, optional
Number of slow-time pulses within a CPI, default=32
Number of slow-time pulses within a CPI, default=16
prf_dither_mode: bool, optional
If True, use gap-aware covariance estimation
mask_valid : np.ndarray bool, [num_pulses x num_rng_samples], optional
Valid-sample mask with same shape as raw_data. Required if
prf_dither_mode=True.
off_diag_overlap_ratio : float, optional
Minimum overlap ratio used by gap exclusion covariance estimation
diag_valid_ratio : float, optional
Minimum fraction of valid samples required to compute a diagonal term
in the sample covariance matrix entry R_ii.
noise_ev_idx : int, optional
Eigenvalue index used by threshold estimation to estimate the slow-time
minimum Eigenvalue slope.

Returns
--------
eig_val_sort_array: 2D array of float with dimension [num_cpi x cpi_len]
Eigenvalues of all CPIs sorted in descending order
eig_vec_sort_array: 3D array of complex with dimension [num_cpi x cpi_len x cpi_len]
Sorted column vector Eigenvectors of all CPIs based on index of sorted Eigenvalues
eig_vec_sort_array: 3D array of complex with dimension
[num_cpi x cpi_len x cpi_len]
Sorted column vector Eigenvectors of all CPIs based on index of sorted
Eigenvalues
tb_is_valid : bool
False if any CPI in the threshold block does not have enough usable
eigenvalues for noise_ev_idx.
"""

# compute number of CPIs
Expand All @@ -156,15 +183,205 @@ def compute_evd_tb(
f"Coherent Processing Interval length exceeds total number of pulses {num_pulses}!"
)

if noise_ev_idx >= cpi_len:
raise ValueError(
f"noise_ev_idx ({noise_ev_idx}) must be less than cpi_len ({cpi_len}). "
"Since Python uses 0-based indexing, the maximum valid index is cpi_len - 1."
)

if prf_dither_mode and mask_valid is None:
raise ValueError("mask_valid must be provided when prf_dither_mode=True")

# Output Eigenvalues and Eigenvectors
eig_val_sort_array = np.zeros([num_cpi, cpi_len], dtype="f4")
eig_vec_sort_array = np.zeros((num_cpi, cpi_len, cpi_len), dtype="complex64")

for idx_cpi, cpi_slow_time in enumerate(slice_gen(num_pulses, cpi_len, combine_rem=False)):
tb_is_valid = True

# Compute Eigenvalue and Eigenvector pairs for each CPI
for idx_cpi, cpi_slow_time in enumerate(
slice_gen(num_pulses, cpi_len, combine_rem=False)
):
data_cpi = raw_data[cpi_slow_time]

eig_val_sort, eig_vec_sort = compute_evd(data_cpi)
if not prf_dither_mode:
eig_val_sort, eig_vec_sort = compute_evd(data_cpi)
else:
mask_valid_cpi = mask_valid[cpi_slow_time]
eig_val_sort, eig_vec_sort = compute_evd_gap(
data_cpi,
mask_valid_cpi=mask_valid_cpi,
off_diag_overlap_ratio=off_diag_overlap_ratio,
diag_valid_ratio=diag_valid_ratio,
)

# Verify if the eigenvalue of CPI at index defind by noise_ev_idx is meaningful
eig_val_abs = np.maximum(np.abs(eig_val_sort), 1e-30)
noise_ev_rel_db = 10 * np.log10(eig_val_abs[noise_ev_idx] / eig_val_abs[0])
rel_ev_thresh_db = -30

if noise_ev_rel_db < rel_ev_thresh_db:
tb_is_valid = False
break

eig_val_sort_array[idx_cpi] = eig_val_sort
eig_vec_sort_array[idx_cpi] = eig_vec_sort

return eig_val_sort_array, eig_vec_sort_array
return eig_val_sort_array, eig_vec_sort_array, tb_is_valid

def compute_evd_gap(
raw_data: np.ndarray,
*,
mask_valid_cpi: np.ndarray = None,
off_diag_overlap_ratio: float = 0.1,
diag_valid_ratio: float = 0.05,
Comment on lines +236 to +237
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the runconfig, it is assumed that the num_range_sample_block >= 250, then here are the suggested default percentage.

Suggested change
off_diag_overlap_ratio: float = 0.1,
diag_valid_ratio: float = 0.05,
off_diag_overlap_ratio: float = 0.25,
diag_valid_ratio: float = 0.2,

):
"""Perform Eigenvalue Decomposition along axis 0.

Parameters
------------
raw_data: array-like complex [num_pulses x num_rng_samples]
raw data to be processed
mask_valid_cpi: (num_pulses, num_rng_samples) bool array, optional
True indicates valid samples. False indicates invalid samples or gaps.
If None, an all true boolean mask is created. All samples are assumed to be valid.
off_diag_overlap_ratio: float
Minimum fraction of overlapping valid range samples required to compute
an off-diagonal term in the sample covariance matrix entry R_ij.
diag_valid_ratio : float, optional
Minimum fraction of valid samples required to compute a diagonal term in the
sample covariance matrix entry R_ii.

Returns
--------
eig_val_sort: 1D array of float, same length as the number of rows of input matrix
Eigenvalues sorted in descending order
eig_vec_sort: 2D array of complex, same shape as input matrix
column vector Eigenvectors sorted based on index of sorted Eigenvalues
"""

# The raw_data is not necessarily zero-mean when it is corrupted by RFI.
# If so, estimated sample covariance matrix cov_cpi should be called
# sample correlation matrix instead. The reference below demonstrates
# this concept and notation.

# F. Zhou, R. Wu, M. Xing, and Z. Bao, “Eigensubspace-Based Filtering With
# Application in Narrow-Band Interference Suppression for SAR”, IEEE Geoscience
# and Remote Sensing Letters, vol. 4, no. 1, pp. 76,2007.

if mask_valid_cpi is None:
mask_valid_cpi = np.ones(raw_data.shape, dtype=bool)

if mask_valid_cpi.shape != raw_data.shape:
raise ValueError(f"mask shape {mask_valid_cpi.shape} != data shape {raw_data.shape}")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise ValueError(f"mask shape {mask_valid_cpi.shape} != data shape {raw_data.shape}")
raise ValueError(f"Valid CPI mask shape {mask_valid_cpi.shape} != data shape {raw_data.shape}")


cov_cpi = compute_gap_exclusion_cov(
raw_data,
mask_valid_cpi=mask_valid_cpi,
off_diag_overlap_ratio=off_diag_overlap_ratio,
diag_valid_ratio=diag_valid_ratio,
)

eig_val_sort, eig_vec_sort = eigen_decomp_sort(cov_cpi)

return eig_val_sort, eig_vec_sort


def compute_gap_exclusion_cov(
data: np.ndarray,
*,
mask_valid_cpi: np.ndarray = None,
off_diag_overlap_ratio: float = 0.1,
diag_valid_ratio: float = 0.05,
):
"""
Compute a gap-excluded slow-time sample covariance matrix.

Parameters
----------
data: (num_pulses, num_rng_samples) complex array
Slow-time block: K pulses x M range samples.
Pulses should be contiguous in slow time for ST-EVD.
mask_valid_cpi: (num_pulses, num_rng_samples) bool array, optional
True indicates valid samples. False indicates invalid samples or gaps.
If None, an all true boolean mask is created. All samples are assumed to be valid.
off_diag_overlap_ratio: float
Minimum fraction of overlapping valid range samples required to compute
an off-diagonal term in the sample covariance matrix entry R_ij.
diag_valid_ratio : float, optional
Minimum fraction of valid samples required to compute a diagonal term in the
sample covariance matrix entry R_ii.

Returns
-------
cov : (num_pulses, num_pulses) complex64
Gap-excluded sample covariance matrix.
"""

num_pulses, num_rng_samples = data.shape
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a check to see if x percent of total number of samples is above min required CPI length per theory and if not issue a warning.

FYI, for fixed PRF, make sure to check the number of valid samples (fixed over all CPI pulses) meets the min requirement and they are not all zeros.


if mask_valid_cpi is None:
mask_valid_cpi = np.ones(data.shape, dtype=bool)

if mask_valid_cpi.shape != data.shape:
raise ValueError(f"mask shape {mask_valid_cpi.shape} != data shape {data.shape}")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise ValueError(f"mask shape {mask_valid_cpi.shape} != data shape {data.shape}")
raise ValueError(f"Valid CPI mask shape {mask_valid_cpi.shape} != data shape {data.shape}")


if not (0.0 <= off_diag_overlap_ratio <= 1.0):
raise ValueError("off_diag_overlap_ratio must be between 0 and 1.")

if not (0.0 <= diag_valid_ratio <= 1.0):
raise ValueError("diag_valid_ratio must be between 0 and 1.")

# Minimum Samples required to compute diagonal and off-diagonal terms of
# Sample Covariance Matrix
min_valid_off_diag = max(1, int(np.ceil(off_diag_overlap_ratio * num_rng_samples)))
min_valid_diag = max(1, int(np.ceil(diag_valid_ratio * num_rng_samples)))

# Zero-out invalid samples
x_valid = data * mask_valid_cpi

# Count valid sample overalp count for each element of the
# sample covariance matrix
mask_int = mask_valid_cpi.astype(np.int32)
overlap_counts = mask_int @ mask_int.T # shape (pulse x pulse)

# Sum of conjugate products over overlapping valid samples
# without proper normalization
cov_sum = x_valid @ x_valid.conj().T

# Initialize gap-excluded sample covariance matrix
cov = np.zeros((num_pulses, num_pulses), dtype=np.complex64)

#Count number of valid in diagonal and off-diagonal for normalization

# Diagonal terms: Generate Diagonal indices
diag_idx = np.diag_indices(num_pulses)

# Extract the number of valid samples of diagonal terms
diag_counts = overlap_counts[diag_idx]

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve the runtime by vectoring the sample covariance matrix construction. Use as special type with numpy or scipy function for optimization.

# Extract the uncorrected diagonal values
diag_cov_sum = cov_sum[diag_idx]

# Check if there are enough valid samples
diag_valid_idx = diag_counts >= min_valid_diag

diag_vals = np.zeros(num_pulses, dtype=np.complex64)

# Normalize the diagonal terms
diag_vals[diag_valid_idx] = diag_cov_sum[diag_valid_idx] / diag_counts[diag_valid_idx]
cov[diag_idx] = diag_vals

# Off-diagonal: Verify if there are enough valid samples for off-diagonal terms
off_diag_valid = overlap_counts >= min_valid_off_diag

# Mask out diagonal terms of the off_diagonal_valid matrix
np.fill_diagonal(off_diag_valid, False)

# Normalize the off-diagonal terms
cov[off_diag_valid] = cov_sum[off_diag_valid] / overlap_counts[off_diag_valid]

# Ensure Hermitian numerically
cov = (0.5 * (cov + cov.conj().T)).astype(np.complex64)

return cov
Loading
Loading