pipt.misc_tools.wavelet_tools
Sparse representation of seismic data using wavelet compression.
Copyright (c) 2019-2022 NORCE, All Rights Reserved. 4DSEIS
1""" 2Sparse representation of seismic data using wavelet compression. 3 4Copyright (c) 2019-2022 NORCE, All Rights Reserved. 4DSEIS 5""" 6import pywt 7import numpy as np 8import sys 9from copy import deepcopy 10import warnings 11 12 13class SparseRepresentation: 14 15 # Initialize 16 def __init__(self, options): 17 # options: dim, actnum, level, wname, colored_noise, threshold_rule, th_mult, 18 # use_hard_th, keep_ca, inactive_value 19 # dim must be given as (nz,ny,nx) 20 self.options = options 21 self.num_grid = np.prod(self.options['dim']) 22 23 self.est_noise = np.array([]) 24 self.est_noise_level = [] # this is a scalar for each subband 25 self.mask = [] 26 self.threshold = [] 27 self.num_total_coeff = None 28 self.cd_leading_index = None 29 self.cd_leading_coeff = None 30 self.ca_leading_index = None 31 self.ca_leading_coeff = None 32 33 # Function to doing image compression. If the function is called without threshold, then the leading indices must 34 # be defined in the class. Typically, this is done by running the compression on true data with a given threshold. 35 def compress(self, data, th_mult=None): 36 if ('inactive_value' not in self.options) or (self.options['inactive_value'] is None): 37 self.options['inactive_value'] = np.mean(data) 38 signal = np.zeros(self.num_grid) 39 signal[~self.options['mask']] = self.options['inactive_value'] 40 signal[self.options['mask']] = data 41 if 'order' not in self.options: 42 self.options['order'] = 'C' 43 if 'min_noise' not in self.options: 44 self.options['min_noise'] = 1.0e-9 45 signal = signal.reshape(self.options['dim'], order=self.options['order']) 46 # get the signal back into its original shape (nx,ny,nz) 47 signal = signal.transpose((2, 1, 0)) 48 # pywt throws a warning in case of single-dimentional entries in the shape of the signal. 49 with warnings.catch_warnings(): 50 warnings.simplefilter("ignore") 51 wdec = pywt.wavedecn( 52 signal, self.options['wname'], 'symmetric', int(self.options['level'])) 53 wdec_rec = deepcopy(wdec) 54 55 # Perform thresholding if the threshold is given as input. 56 do_thresholding = False 57 est_noise_level = None 58 true_data = False 59 if th_mult is not None and th_mult >= 0: 60 do_thresholding = True 61 if not self.est_noise.size: # assume that the true data is input 62 true_data = True 63 else: 64 self.est_noise = np.array([]) # this will be rebuilt 65 66 # Initialize 67 # Note: the keys below are organized the same way as in Matlab. 68 keys = ['daa', 'ada', 'dda', 'aad', 'dad', 'add', 'ddd'] 69 if true_data: 70 for level in range(0, int(self.options['level'])+1): 71 num_subband = 1 if level == 0 else len(keys) 72 for subband in range(0, num_subband): 73 if level == 0: 74 self.mask.append(False) 75 self.est_noise_level.append(0) 76 self.threshold.append(0) 77 else: 78 if subband == 0: 79 self.mask.append({}) 80 self.est_noise_level.append({}) 81 self.threshold.append({}) 82 self.mask[level][keys[subband]] = False 83 self.est_noise_level[level][keys[subband]] = 0 84 self.threshold[level][keys[subband]] = 0 85 86 # In the white noise case estimated std is based on the high (hhh) subband only 87 if true_data and not self.options['colored_noise']: 88 subband_hhh = wdec[-1]['ddd'].flatten() 89 est_noise_level = np.median( 90 np.abs(subband_hhh - np.median(subband_hhh))) / 0.6745 # estimated noise std 91 est_noise_level = np.maximum(est_noise_level, self.options['min_noise']) 92 # Threshold based on universal rule 93 current_threshold = th_mult * \ 94 np.sqrt(2 * np.log(np.size(data))) * est_noise_level 95 self.threshold[0] = current_threshold 96 97 # Loop over all levels and subbands (including the lll subband) 98 ca_in_vec = np.array([]) 99 cd_in_vec = np.array([]) 100 for level in range(0, int(self.options['level'])+1): 101 num_subband = 1 if level == 0 else len(keys) 102 for subband in range(0, num_subband): 103 coeffs = wdec[level] if level == 0 else wdec[level][keys[subband]] 104 coeffs = coeffs.flatten() 105 106 # In the colored noise case estimated std is based on all subbands 107 if true_data and self.options['colored_noise']: 108 est_noise_level = np.median( 109 np.abs(coeffs - np.median(coeffs))) / 0.6745 110 est_noise_level = np.maximum( 111 est_noise_level, self.options['min_noise']) 112 # threshold based on universal rule 113 if self.options['threshold_rule'] == 'universal': 114 current_threshold = np.sqrt( 115 2 * np.log(np.size(coeffs))) * est_noise_level 116 # threshold based on bayesian rule 117 elif self.options['threshold_rule'] == 'bayesian': 118 std_data = np.linalg.norm(coeffs, 2) / len(coeffs) 119 current_threshold = est_noise_level**2 / \ 120 np.sqrt(np.abs(std_data**2 - est_noise_level**2)) 121 else: 122 print('Thresholding rule not implemented') 123 sys.exit(1) 124 current_threshold = th_mult * current_threshold 125 if level == 0: 126 self.threshold[level] = current_threshold 127 else: 128 self.threshold[level][keys[subband]] = current_threshold 129 # self.threshold = np.append(self.threshold, current_threshold) 130 131 # Perform thresholding 132 if do_thresholding: 133 if level == 0 or not self.options['colored_noise']: 134 current_threshold = self.threshold[0] 135 else: 136 current_threshold = self.threshold[level][keys[subband]] 137 if level > 0 or (level == 0 and not self.options['keep_ca']): 138 if self.options['use_hard_th']: # use hard thresholding 139 zero_index = np.abs(coeffs) < current_threshold 140 coeffs[zero_index] = 0 141 else: # use soft thresholding 142 coeffs = np.sign( 143 coeffs) * np.maximum(np.abs(coeffs) - current_threshold, 0) 144 zero_index = coeffs == 0 145 else: 146 zero_index = np.zeros(coeffs.size).astype(bool) 147 148 # Construct the mask for each subband and estimate the noise level 149 if level == 0: 150 self.mask[level] = np.invert(zero_index) + self.mask[level] 151 if true_data: 152 self.est_noise_level[level] = est_noise_level 153 else: 154 self.mask[level][keys[subband]] = np.invert( 155 zero_index) + self.mask[level][keys[subband]] 156 if true_data: 157 self.est_noise_level[level][keys[subband]] = est_noise_level 158 159 # Build the noise for the compressed signal 160 if level == 0: 161 num_el = np.sum(self.mask[level]) 162 current_noise_level = self.est_noise_level[level] 163 self.est_noise = np.append( 164 self.est_noise, current_noise_level * np.ones(num_el)) 165 else: 166 num_el = np.sum(self.mask[level][keys[subband]]) 167 current_noise_level = self.est_noise_level[level][keys[subband]] 168 self.est_noise = np.append( 169 self.est_noise, current_noise_level * np.ones(num_el)) 170 171 if level == 0: 172 ca_in_vec = coeffs 173 else: 174 cd_in_vec = np.append(cd_in_vec, coeffs) 175 176 # Compute the leading indices and compressed signal 177 if do_thresholding: 178 self.num_total_coeff = ca_in_vec.size + cd_in_vec.size 179 if self.cd_leading_index is not None: 180 self.cd_leading_index = np.union1d( 181 self.cd_leading_index, np.nonzero(cd_in_vec)[0]) 182 else: 183 self.cd_leading_index = np.nonzero(cd_in_vec)[0] 184 self.cd_leading_coeff = cd_in_vec[self.cd_leading_index] 185 if self.options['keep_ca']: 186 if self.ca_leading_index is None: 187 self.ca_leading_index = np.arange(wdec[0].size) 188 else: 189 if self.ca_leading_index is None: 190 self.ca_leading_index = np.nonzero(ca_in_vec)[0] 191 else: 192 self.ca_leading_index = np.union1d( 193 self.ca_leading_index, np.nonzero(ca_in_vec)[0]) 194 self.ca_leading_coeff = ca_in_vec[self.ca_leading_index] 195 compressed_data = np.append(self.ca_leading_coeff, self.cd_leading_coeff) 196 else: 197 if self.ca_leading_index is None or self.cd_leading_index is None: 198 print('Leading indices not defined') 199 sys.exit(1) 200 compressed_data = np.append( 201 ca_in_vec[self.ca_leading_index], cd_in_vec[self.cd_leading_index]) 202 203 # Construct wdec_rec 204 for level in range(0, int(self.options['level']) + 1): 205 if level == 0: 206 shape = wdec_rec[level].shape 207 coeff = wdec_rec[level].flatten() 208 coeff[~self.mask[level]] = 0 209 wdec_rec[level] = coeff.reshape(shape) 210 else: 211 num_subband = len(keys) 212 for subband in range(0, num_subband): 213 shape = wdec_rec[level][keys[subband]].shape 214 coeff = wdec_rec[level][keys[subband]].flatten() 215 coeff[~self.mask[level][keys[subband]]] = 0 216 wdec_rec[level][keys[subband]] = coeff.reshape(shape) 217 218 return compressed_data, wdec_rec 219 220 # Reconstruct the current compressed dataset. 221 def reconstruct(self, wdec_rec): 222 223 if wdec_rec is None: 224 print('No signal to reconstruct') 225 sys.exit(1) 226 227 data_rec = pywt.waverecn(wdec_rec, self.options['wname'], 'symmetric') 228 data_rec = data_rec.transpose((2, 1, 0)) # flip the axes 229 dim = self.options['dim'] 230 data_rec = data_rec[0:dim[0], 0:dim[1], 0:dim[2]] # severe issure here 231 data_rec = data_rec.flatten(order=self.options['order']) 232 data_rec = data_rec[self.options['mask']] 233 234 return data_rec
class
SparseRepresentation:
14class SparseRepresentation: 15 16 # Initialize 17 def __init__(self, options): 18 # options: dim, actnum, level, wname, colored_noise, threshold_rule, th_mult, 19 # use_hard_th, keep_ca, inactive_value 20 # dim must be given as (nz,ny,nx) 21 self.options = options 22 self.num_grid = np.prod(self.options['dim']) 23 24 self.est_noise = np.array([]) 25 self.est_noise_level = [] # this is a scalar for each subband 26 self.mask = [] 27 self.threshold = [] 28 self.num_total_coeff = None 29 self.cd_leading_index = None 30 self.cd_leading_coeff = None 31 self.ca_leading_index = None 32 self.ca_leading_coeff = None 33 34 # Function to doing image compression. If the function is called without threshold, then the leading indices must 35 # be defined in the class. Typically, this is done by running the compression on true data with a given threshold. 36 def compress(self, data, th_mult=None): 37 if ('inactive_value' not in self.options) or (self.options['inactive_value'] is None): 38 self.options['inactive_value'] = np.mean(data) 39 signal = np.zeros(self.num_grid) 40 signal[~self.options['mask']] = self.options['inactive_value'] 41 signal[self.options['mask']] = data 42 if 'order' not in self.options: 43 self.options['order'] = 'C' 44 if 'min_noise' not in self.options: 45 self.options['min_noise'] = 1.0e-9 46 signal = signal.reshape(self.options['dim'], order=self.options['order']) 47 # get the signal back into its original shape (nx,ny,nz) 48 signal = signal.transpose((2, 1, 0)) 49 # pywt throws a warning in case of single-dimentional entries in the shape of the signal. 50 with warnings.catch_warnings(): 51 warnings.simplefilter("ignore") 52 wdec = pywt.wavedecn( 53 signal, self.options['wname'], 'symmetric', int(self.options['level'])) 54 wdec_rec = deepcopy(wdec) 55 56 # Perform thresholding if the threshold is given as input. 57 do_thresholding = False 58 est_noise_level = None 59 true_data = False 60 if th_mult is not None and th_mult >= 0: 61 do_thresholding = True 62 if not self.est_noise.size: # assume that the true data is input 63 true_data = True 64 else: 65 self.est_noise = np.array([]) # this will be rebuilt 66 67 # Initialize 68 # Note: the keys below are organized the same way as in Matlab. 69 keys = ['daa', 'ada', 'dda', 'aad', 'dad', 'add', 'ddd'] 70 if true_data: 71 for level in range(0, int(self.options['level'])+1): 72 num_subband = 1 if level == 0 else len(keys) 73 for subband in range(0, num_subband): 74 if level == 0: 75 self.mask.append(False) 76 self.est_noise_level.append(0) 77 self.threshold.append(0) 78 else: 79 if subband == 0: 80 self.mask.append({}) 81 self.est_noise_level.append({}) 82 self.threshold.append({}) 83 self.mask[level][keys[subband]] = False 84 self.est_noise_level[level][keys[subband]] = 0 85 self.threshold[level][keys[subband]] = 0 86 87 # In the white noise case estimated std is based on the high (hhh) subband only 88 if true_data and not self.options['colored_noise']: 89 subband_hhh = wdec[-1]['ddd'].flatten() 90 est_noise_level = np.median( 91 np.abs(subband_hhh - np.median(subband_hhh))) / 0.6745 # estimated noise std 92 est_noise_level = np.maximum(est_noise_level, self.options['min_noise']) 93 # Threshold based on universal rule 94 current_threshold = th_mult * \ 95 np.sqrt(2 * np.log(np.size(data))) * est_noise_level 96 self.threshold[0] = current_threshold 97 98 # Loop over all levels and subbands (including the lll subband) 99 ca_in_vec = np.array([]) 100 cd_in_vec = np.array([]) 101 for level in range(0, int(self.options['level'])+1): 102 num_subband = 1 if level == 0 else len(keys) 103 for subband in range(0, num_subband): 104 coeffs = wdec[level] if level == 0 else wdec[level][keys[subband]] 105 coeffs = coeffs.flatten() 106 107 # In the colored noise case estimated std is based on all subbands 108 if true_data and self.options['colored_noise']: 109 est_noise_level = np.median( 110 np.abs(coeffs - np.median(coeffs))) / 0.6745 111 est_noise_level = np.maximum( 112 est_noise_level, self.options['min_noise']) 113 # threshold based on universal rule 114 if self.options['threshold_rule'] == 'universal': 115 current_threshold = np.sqrt( 116 2 * np.log(np.size(coeffs))) * est_noise_level 117 # threshold based on bayesian rule 118 elif self.options['threshold_rule'] == 'bayesian': 119 std_data = np.linalg.norm(coeffs, 2) / len(coeffs) 120 current_threshold = est_noise_level**2 / \ 121 np.sqrt(np.abs(std_data**2 - est_noise_level**2)) 122 else: 123 print('Thresholding rule not implemented') 124 sys.exit(1) 125 current_threshold = th_mult * current_threshold 126 if level == 0: 127 self.threshold[level] = current_threshold 128 else: 129 self.threshold[level][keys[subband]] = current_threshold 130 # self.threshold = np.append(self.threshold, current_threshold) 131 132 # Perform thresholding 133 if do_thresholding: 134 if level == 0 or not self.options['colored_noise']: 135 current_threshold = self.threshold[0] 136 else: 137 current_threshold = self.threshold[level][keys[subband]] 138 if level > 0 or (level == 0 and not self.options['keep_ca']): 139 if self.options['use_hard_th']: # use hard thresholding 140 zero_index = np.abs(coeffs) < current_threshold 141 coeffs[zero_index] = 0 142 else: # use soft thresholding 143 coeffs = np.sign( 144 coeffs) * np.maximum(np.abs(coeffs) - current_threshold, 0) 145 zero_index = coeffs == 0 146 else: 147 zero_index = np.zeros(coeffs.size).astype(bool) 148 149 # Construct the mask for each subband and estimate the noise level 150 if level == 0: 151 self.mask[level] = np.invert(zero_index) + self.mask[level] 152 if true_data: 153 self.est_noise_level[level] = est_noise_level 154 else: 155 self.mask[level][keys[subband]] = np.invert( 156 zero_index) + self.mask[level][keys[subband]] 157 if true_data: 158 self.est_noise_level[level][keys[subband]] = est_noise_level 159 160 # Build the noise for the compressed signal 161 if level == 0: 162 num_el = np.sum(self.mask[level]) 163 current_noise_level = self.est_noise_level[level] 164 self.est_noise = np.append( 165 self.est_noise, current_noise_level * np.ones(num_el)) 166 else: 167 num_el = np.sum(self.mask[level][keys[subband]]) 168 current_noise_level = self.est_noise_level[level][keys[subband]] 169 self.est_noise = np.append( 170 self.est_noise, current_noise_level * np.ones(num_el)) 171 172 if level == 0: 173 ca_in_vec = coeffs 174 else: 175 cd_in_vec = np.append(cd_in_vec, coeffs) 176 177 # Compute the leading indices and compressed signal 178 if do_thresholding: 179 self.num_total_coeff = ca_in_vec.size + cd_in_vec.size 180 if self.cd_leading_index is not None: 181 self.cd_leading_index = np.union1d( 182 self.cd_leading_index, np.nonzero(cd_in_vec)[0]) 183 else: 184 self.cd_leading_index = np.nonzero(cd_in_vec)[0] 185 self.cd_leading_coeff = cd_in_vec[self.cd_leading_index] 186 if self.options['keep_ca']: 187 if self.ca_leading_index is None: 188 self.ca_leading_index = np.arange(wdec[0].size) 189 else: 190 if self.ca_leading_index is None: 191 self.ca_leading_index = np.nonzero(ca_in_vec)[0] 192 else: 193 self.ca_leading_index = np.union1d( 194 self.ca_leading_index, np.nonzero(ca_in_vec)[0]) 195 self.ca_leading_coeff = ca_in_vec[self.ca_leading_index] 196 compressed_data = np.append(self.ca_leading_coeff, self.cd_leading_coeff) 197 else: 198 if self.ca_leading_index is None or self.cd_leading_index is None: 199 print('Leading indices not defined') 200 sys.exit(1) 201 compressed_data = np.append( 202 ca_in_vec[self.ca_leading_index], cd_in_vec[self.cd_leading_index]) 203 204 # Construct wdec_rec 205 for level in range(0, int(self.options['level']) + 1): 206 if level == 0: 207 shape = wdec_rec[level].shape 208 coeff = wdec_rec[level].flatten() 209 coeff[~self.mask[level]] = 0 210 wdec_rec[level] = coeff.reshape(shape) 211 else: 212 num_subband = len(keys) 213 for subband in range(0, num_subband): 214 shape = wdec_rec[level][keys[subband]].shape 215 coeff = wdec_rec[level][keys[subband]].flatten() 216 coeff[~self.mask[level][keys[subband]]] = 0 217 wdec_rec[level][keys[subband]] = coeff.reshape(shape) 218 219 return compressed_data, wdec_rec 220 221 # Reconstruct the current compressed dataset. 222 def reconstruct(self, wdec_rec): 223 224 if wdec_rec is None: 225 print('No signal to reconstruct') 226 sys.exit(1) 227 228 data_rec = pywt.waverecn(wdec_rec, self.options['wname'], 'symmetric') 229 data_rec = data_rec.transpose((2, 1, 0)) # flip the axes 230 dim = self.options['dim'] 231 data_rec = data_rec[0:dim[0], 0:dim[1], 0:dim[2]] # severe issure here 232 data_rec = data_rec.flatten(order=self.options['order']) 233 data_rec = data_rec[self.options['mask']] 234 235 return data_rec
SparseRepresentation(options)
17 def __init__(self, options): 18 # options: dim, actnum, level, wname, colored_noise, threshold_rule, th_mult, 19 # use_hard_th, keep_ca, inactive_value 20 # dim must be given as (nz,ny,nx) 21 self.options = options 22 self.num_grid = np.prod(self.options['dim']) 23 24 self.est_noise = np.array([]) 25 self.est_noise_level = [] # this is a scalar for each subband 26 self.mask = [] 27 self.threshold = [] 28 self.num_total_coeff = None 29 self.cd_leading_index = None 30 self.cd_leading_coeff = None 31 self.ca_leading_index = None 32 self.ca_leading_coeff = None
def
compress(self, data, th_mult=None):
36 def compress(self, data, th_mult=None): 37 if ('inactive_value' not in self.options) or (self.options['inactive_value'] is None): 38 self.options['inactive_value'] = np.mean(data) 39 signal = np.zeros(self.num_grid) 40 signal[~self.options['mask']] = self.options['inactive_value'] 41 signal[self.options['mask']] = data 42 if 'order' not in self.options: 43 self.options['order'] = 'C' 44 if 'min_noise' not in self.options: 45 self.options['min_noise'] = 1.0e-9 46 signal = signal.reshape(self.options['dim'], order=self.options['order']) 47 # get the signal back into its original shape (nx,ny,nz) 48 signal = signal.transpose((2, 1, 0)) 49 # pywt throws a warning in case of single-dimentional entries in the shape of the signal. 50 with warnings.catch_warnings(): 51 warnings.simplefilter("ignore") 52 wdec = pywt.wavedecn( 53 signal, self.options['wname'], 'symmetric', int(self.options['level'])) 54 wdec_rec = deepcopy(wdec) 55 56 # Perform thresholding if the threshold is given as input. 57 do_thresholding = False 58 est_noise_level = None 59 true_data = False 60 if th_mult is not None and th_mult >= 0: 61 do_thresholding = True 62 if not self.est_noise.size: # assume that the true data is input 63 true_data = True 64 else: 65 self.est_noise = np.array([]) # this will be rebuilt 66 67 # Initialize 68 # Note: the keys below are organized the same way as in Matlab. 69 keys = ['daa', 'ada', 'dda', 'aad', 'dad', 'add', 'ddd'] 70 if true_data: 71 for level in range(0, int(self.options['level'])+1): 72 num_subband = 1 if level == 0 else len(keys) 73 for subband in range(0, num_subband): 74 if level == 0: 75 self.mask.append(False) 76 self.est_noise_level.append(0) 77 self.threshold.append(0) 78 else: 79 if subband == 0: 80 self.mask.append({}) 81 self.est_noise_level.append({}) 82 self.threshold.append({}) 83 self.mask[level][keys[subband]] = False 84 self.est_noise_level[level][keys[subband]] = 0 85 self.threshold[level][keys[subband]] = 0 86 87 # In the white noise case estimated std is based on the high (hhh) subband only 88 if true_data and not self.options['colored_noise']: 89 subband_hhh = wdec[-1]['ddd'].flatten() 90 est_noise_level = np.median( 91 np.abs(subband_hhh - np.median(subband_hhh))) / 0.6745 # estimated noise std 92 est_noise_level = np.maximum(est_noise_level, self.options['min_noise']) 93 # Threshold based on universal rule 94 current_threshold = th_mult * \ 95 np.sqrt(2 * np.log(np.size(data))) * est_noise_level 96 self.threshold[0] = current_threshold 97 98 # Loop over all levels and subbands (including the lll subband) 99 ca_in_vec = np.array([]) 100 cd_in_vec = np.array([]) 101 for level in range(0, int(self.options['level'])+1): 102 num_subband = 1 if level == 0 else len(keys) 103 for subband in range(0, num_subband): 104 coeffs = wdec[level] if level == 0 else wdec[level][keys[subband]] 105 coeffs = coeffs.flatten() 106 107 # In the colored noise case estimated std is based on all subbands 108 if true_data and self.options['colored_noise']: 109 est_noise_level = np.median( 110 np.abs(coeffs - np.median(coeffs))) / 0.6745 111 est_noise_level = np.maximum( 112 est_noise_level, self.options['min_noise']) 113 # threshold based on universal rule 114 if self.options['threshold_rule'] == 'universal': 115 current_threshold = np.sqrt( 116 2 * np.log(np.size(coeffs))) * est_noise_level 117 # threshold based on bayesian rule 118 elif self.options['threshold_rule'] == 'bayesian': 119 std_data = np.linalg.norm(coeffs, 2) / len(coeffs) 120 current_threshold = est_noise_level**2 / \ 121 np.sqrt(np.abs(std_data**2 - est_noise_level**2)) 122 else: 123 print('Thresholding rule not implemented') 124 sys.exit(1) 125 current_threshold = th_mult * current_threshold 126 if level == 0: 127 self.threshold[level] = current_threshold 128 else: 129 self.threshold[level][keys[subband]] = current_threshold 130 # self.threshold = np.append(self.threshold, current_threshold) 131 132 # Perform thresholding 133 if do_thresholding: 134 if level == 0 or not self.options['colored_noise']: 135 current_threshold = self.threshold[0] 136 else: 137 current_threshold = self.threshold[level][keys[subband]] 138 if level > 0 or (level == 0 and not self.options['keep_ca']): 139 if self.options['use_hard_th']: # use hard thresholding 140 zero_index = np.abs(coeffs) < current_threshold 141 coeffs[zero_index] = 0 142 else: # use soft thresholding 143 coeffs = np.sign( 144 coeffs) * np.maximum(np.abs(coeffs) - current_threshold, 0) 145 zero_index = coeffs == 0 146 else: 147 zero_index = np.zeros(coeffs.size).astype(bool) 148 149 # Construct the mask for each subband and estimate the noise level 150 if level == 0: 151 self.mask[level] = np.invert(zero_index) + self.mask[level] 152 if true_data: 153 self.est_noise_level[level] = est_noise_level 154 else: 155 self.mask[level][keys[subband]] = np.invert( 156 zero_index) + self.mask[level][keys[subband]] 157 if true_data: 158 self.est_noise_level[level][keys[subband]] = est_noise_level 159 160 # Build the noise for the compressed signal 161 if level == 0: 162 num_el = np.sum(self.mask[level]) 163 current_noise_level = self.est_noise_level[level] 164 self.est_noise = np.append( 165 self.est_noise, current_noise_level * np.ones(num_el)) 166 else: 167 num_el = np.sum(self.mask[level][keys[subband]]) 168 current_noise_level = self.est_noise_level[level][keys[subband]] 169 self.est_noise = np.append( 170 self.est_noise, current_noise_level * np.ones(num_el)) 171 172 if level == 0: 173 ca_in_vec = coeffs 174 else: 175 cd_in_vec = np.append(cd_in_vec, coeffs) 176 177 # Compute the leading indices and compressed signal 178 if do_thresholding: 179 self.num_total_coeff = ca_in_vec.size + cd_in_vec.size 180 if self.cd_leading_index is not None: 181 self.cd_leading_index = np.union1d( 182 self.cd_leading_index, np.nonzero(cd_in_vec)[0]) 183 else: 184 self.cd_leading_index = np.nonzero(cd_in_vec)[0] 185 self.cd_leading_coeff = cd_in_vec[self.cd_leading_index] 186 if self.options['keep_ca']: 187 if self.ca_leading_index is None: 188 self.ca_leading_index = np.arange(wdec[0].size) 189 else: 190 if self.ca_leading_index is None: 191 self.ca_leading_index = np.nonzero(ca_in_vec)[0] 192 else: 193 self.ca_leading_index = np.union1d( 194 self.ca_leading_index, np.nonzero(ca_in_vec)[0]) 195 self.ca_leading_coeff = ca_in_vec[self.ca_leading_index] 196 compressed_data = np.append(self.ca_leading_coeff, self.cd_leading_coeff) 197 else: 198 if self.ca_leading_index is None or self.cd_leading_index is None: 199 print('Leading indices not defined') 200 sys.exit(1) 201 compressed_data = np.append( 202 ca_in_vec[self.ca_leading_index], cd_in_vec[self.cd_leading_index]) 203 204 # Construct wdec_rec 205 for level in range(0, int(self.options['level']) + 1): 206 if level == 0: 207 shape = wdec_rec[level].shape 208 coeff = wdec_rec[level].flatten() 209 coeff[~self.mask[level]] = 0 210 wdec_rec[level] = coeff.reshape(shape) 211 else: 212 num_subband = len(keys) 213 for subband in range(0, num_subband): 214 shape = wdec_rec[level][keys[subband]].shape 215 coeff = wdec_rec[level][keys[subband]].flatten() 216 coeff[~self.mask[level][keys[subband]]] = 0 217 wdec_rec[level][keys[subband]] = coeff.reshape(shape) 218 219 return compressed_data, wdec_rec
def
reconstruct(self, wdec_rec):
222 def reconstruct(self, wdec_rec): 223 224 if wdec_rec is None: 225 print('No signal to reconstruct') 226 sys.exit(1) 227 228 data_rec = pywt.waverecn(wdec_rec, self.options['wname'], 'symmetric') 229 data_rec = data_rec.transpose((2, 1, 0)) # flip the axes 230 dim = self.options['dim'] 231 data_rec = data_rec[0:dim[0], 0:dim[1], 0:dim[2]] # severe issure here 232 data_rec = data_rec.flatten(order=self.options['order']) 233 data_rec = data_rec[self.options['mask']] 234 235 return data_rec