ensemble.ensemble
Package contains the basis for the PET ensemble based structure.
1""" 2Package contains the basis for the PET ensemble based structure. 3""" 4 5# External imports 6import csv # For reading Comma Separated Values files 7import os # OS level tools 8import sys # System-specific parameters and functions 9from copy import deepcopy, copy # Copy functions. (deepcopy let us copy mutable items) 10from shutil import rmtree # rmtree for removing folders 11import numpy as np # Misc. numerical tools 12import pickle # To save and load information 13from glob import glob 14import datetime as dt 15from tqdm.auto import tqdm 16from p_tqdm import p_map 17import logging 18from geostat.decomp import Cholesky # Making realizations 19 20# Internal imports 21from pipt.misc_tools import cov_regularization 22from pipt.misc_tools import wavelet_tools as wt 23from misc import read_input_csv as rcsv 24from misc.system_tools.environ_var import OpenBlasSingleThread # Single threaded OpenBLAS runs 25 26 27class Ensemble: 28 """ 29 Class for organizing misc. variables and simulator for an ensemble-based inversion run. Here, the forecast step 30 and prediction runs are performed. General methods that are useful in various ensemble loops have also been 31 implemented here. 32 """ 33 34 def __init__(self, keys_en, sim, redund_sim=None): 35 """ 36 Class extends the ReadInitFile class. First the PIPT init. file is passed to the parent class for reading and 37 parsing. Rest of the initialization uses the keywords parsed in ReadInitFile (parent) class to set up observed, 38 predicted data and data variance dictionaries. Also, the simulator to be used in forecast and/or predictions is 39 initialized with keywords parsed in ReadInitFile (parent) class. Lastly, the initial ensemble is generated (if 40 it has not been inputted), and some saving of variables can be done chosen in PIPT init. file. 41 42 Parameter 43 --------- 44 init_file : str 45 path to input file containing initiallization values 46 """ 47 # Internalize PET dictionary 48 self.keys_en = keys_en 49 self.sim = sim 50 self.sim.redund_sim = redund_sim 51 self.pred_data = None 52 53 # Auxilliary input to the simulator - can be used e.g., 54 # to allow for different models when optimizing. 55 self.aux_input = None 56 57 # Setup logger 58 logging.basicConfig(level=logging.INFO, 59 filename='pet_logger.log', 60 filemode='w', 61 format='%(asctime)s : %(levelname)s : %(name)s : %(message)s', 62 datefmt='%Y-%m-%d %H:%M:%S') 63 self.logger = logging.getLogger('PET') 64 65 # Check if folder contains any En_ files, and remove them! 66 for folder in glob('En_*'): 67 try: 68 if len(folder.split('_')) == 2: 69 int(folder.split('_')[1]) 70 rmtree(folder) 71 except: 72 pass 73 74 # Save name for (potential) pickle dump/load 75 self.pickle_restart_file = 'emergency_dump' 76 77 # initiallize the restart. Standard is no restart 78 self.restart = False 79 80 # If it is a restart run, we do not need to initialize anything, only load the self info. that exists in the 81 # pickle save file. If it is not a restart run, we initialize everything below. 82 if 'restart' in self.keys_en and self.keys_en['restart'] == 'yes': 83 # Initiate a restart run 84 self.logger.info('\033[92m--- Restart run initiated! ---\033[92m') 85 # Check if the pickle save file exists in folder 86 try: 87 assert (self.pickle_restart_file in [ 88 f for f in os.listdir('.') if os.path.isfile(f)]) 89 except AssertionError as err: 90 self.logger.exception('The restart file "{0}" does not exist in folder. Cannot restart!'.format( 91 self.pickle_restart_file)) 92 raise err 93 94 # Load restart file 95 self.load() 96 97 # Ensure that restart switch is ON since the error may not have happened during a restart run 98 self.restart = True 99 100 # Init. various variables/lists/dicts. needed in ensemble run 101 else: 102 # delete potential restart files to avoid any problems 103 if self.pickle_restart_file in [f for f in os.listdir('.') if os.path.isfile(f)]: 104 os.remove(self.pickle_restart_file) 105 106 # initialize sim limit 107 if 'sim_limit' in self.keys_en: 108 self.sim_limit = self.keys_en['sim_limit'] 109 else: 110 self.sim_limit = float('inf') 111 112 # bool that can be used to supress tqdm output (useful when testing code) 113 if 'disable_tqdm' in self.keys_en: 114 self.disable_tqdm = self.keys_en['disable_tqdm'] 115 else: 116 self.disable_tqdm = False 117 118 # extract information that is given for the prior model 119 self._ext_prior_info() 120 121 # Calculate initial ensemble if IMPORTSTATICVAR has not been given in init. file. 122 # Prior info. on state variables must be given by PRIOR_<STATICVAR-name> keyword. 123 if 'importstaticvar' not in self.keys_en: 124 self.ne = int(self.keys_en['ne']) 125 126 # Output = self.state, self.cov_prior 127 self.gen_init_ensemble() 128 129 else: 130 # State variable imported as a Numpy save file 131 tmp_load = np.load(self.keys_en['importstaticvar'], allow_pickle=True) 132 133 # We assume that the user has saved the state dict. as **state (effectively saved all keys in state 134 # individually). 135 self.state = {key: val for key, val in tmp_load.items()} 136 137 # Find the number of ensemble members from state variable 138 tmp_ne = [] 139 for tmp_state in self.state.keys(): 140 tmp_ne.extend([self.state[tmp_state].shape[1]]) 141 if max(tmp_ne) != min(tmp_ne): 142 print('\033[1;33mInput states have different ensemble size\033[1;m') 143 sys.exit(1) 144 self.ne = min(tmp_ne) 145 self._ext_ml_info() 146 147 def _ext_ml_info(self): 148 ''' 149 Extract the info needed for ML simulations. Note if the ML keyword is not in keys_en we initialize 150 such that we only have one level -- the high fidelity one 151 ''' 152 153 if 'multilevel' in self.keys_en: 154 # parse 155 self.multilevel = {} 156 for i, opt in enumerate(list(zip(*self.keys_en['multilevel']))[0]): 157 if opt == 'levels': 158 self.multilevel['levels'] = [elem for elem in range( 159 int(self.keys_en['multilevel'][i][1]))] 160 if opt == 'en_size': 161 self.multilevel['ne'] = [range(int(el)) 162 for el in self.keys_en['multilevel'][i][1]] 163 164 def _ext_prior_info(self): 165 """ 166 Extract prior information on STATE from keyword(s) PRIOR_<STATE entries>. 167 """ 168 # Parse prior info on each state entered in STATE. 169 # Store names given in STATE 170 if not isinstance(self.keys_en['state'], list): # Single string 171 state_names = [self.keys_en['state']] 172 else: # List 173 state_names = self.keys_en['state'] 174 175 # Check if PRIOR_<state names> exists for each entry in STATE 176 for name in state_names: 177 assert 'prior_' + name in self.keys_en, \ 178 'PRIOR_{0} is missing! This keyword is needed to make initial ensemble for {0} entered in ' \ 179 'STATE'.format(name.upper()) 180 181 # Init. prior info variable 182 self.prior_info = {keys: None for keys in state_names} 183 184 # Loop over each prior keyword and make an initial. ensemble for each state in STATE, 185 # which is subsequently stored in the state dictionary. If 3D grid dimensions are inputted, information for 186 # each layer must be inputted, else the single information will be copied to all layers. 187 grid_dim = np.array([0, 0]) 188 for name in state_names: 189 # initiallize an empty dictionary inside the dictionary. 190 self.prior_info[name] = {} 191 # List the option names inputted in prior keyword 192 # opt_list = list(zip(*self.keys_da['prior_'+name])) 193 mean = None 194 self.prior_info[name]['mean'] = mean 195 vario = [None] 196 self.prior_info[name]['vario'] = vario 197 aniso = [None] 198 self.prior_info[name]['aniso'] = aniso 199 angle = [None] 200 self.prior_info[name]['angle'] = angle 201 corr_length = [None] 202 self.prior_info[name]['corr_length'] = corr_length 203 self.prior_info[name]['nx'] = self.prior_info[name]['ny'] = self.prior_info[name]['nz'] = None 204 205 # Extract info. from the prior keyword 206 for i, opt in enumerate(list(zip(*self.keys_en['prior_' + name]))[0]): 207 if opt == 'vario': # Variogram model 208 if not isinstance(self.keys_en['prior_' + name][i][1], list): 209 vario = [self.keys_en['prior_' + name][i][1]] 210 else: 211 vario = self.keys_en['prior_' + name][i][1] 212 elif opt == 'mean': # Mean 213 mean = self.keys_en['prior_' + name][i][1] 214 elif opt == 'var': # Variance 215 if not isinstance(self.keys_en['prior_' + name][i][1], list): 216 variance = [self.keys_en['prior_' + name][i][1]] 217 else: 218 variance = self.keys_en['prior_' + name][i][1] 219 elif opt == 'aniso': # Anisotropy factor 220 if not isinstance(self.keys_en['prior_' + name][i][1], list): 221 aniso = [self.keys_en['prior_' + name][i][1]] 222 else: 223 aniso = self.keys_en['prior_' + name][i][1] 224 elif opt == 'angle': # Anisotropy angle 225 if not isinstance(self.keys_en['prior_' + name][i][1], list): 226 angle = [self.keys_en['prior_' + name][i][1]] 227 else: 228 angle = self.keys_en['prior_' + name][i][1] 229 elif opt == 'range': # Correlation length 230 if not isinstance(self.keys_en['prior_' + name][i][1], list): 231 corr_length = [self.keys_en['prior_' + name][i][1]] 232 else: 233 corr_length = self.keys_en['prior_' + name][i][1] 234 elif opt == 'grid': # Grid dimensions 235 grid_dim = self.keys_en['prior_' + name][i][1] 236 elif opt == 'limits': # Truncation values 237 limits = self.keys_en['prior_' + name][i][1:] 238 elif opt == 'active': # Number of active cells (single number) 239 active = self.keys_en['prior_' + name][i][1] 240 241 # Check if mean needs to be loaded, or if loaded 242 if type(mean) is str: 243 assert mean.endswith('.npz'), 'File name does not end with \'.npz\'!' 244 load_file = np.load(mean) 245 assert len(load_file.files) == 1, \ 246 'More than one variable located in {0}. Only the mean vector can be stored in the .npz file!' \ 247 .format(mean) 248 mean = load_file[load_file.files[0]] 249 else: # Single number inputted, make it a list if not already 250 if not isinstance(mean, list): 251 mean = [mean] 252 253 # Check if limits exists 254 try: 255 limits 256 except NameError: 257 limits = None 258 259 # check if active exists 260 try: 261 active 262 except NameError: 263 active = None 264 265 # Extract x- and y-dim 266 nx = int(grid_dim[0]) 267 ny = int(grid_dim[1]) 268 269 # Check if 3D grid inputted. If so, we check if info. has been given on all layers. In the case it has 270 # not been given, we just copy the info. given. 271 if len(grid_dim) == 3 and grid_dim[2] > 1: # 3D 272 nz = int(grid_dim[2]) 273 274 # Check mean when values have been inputted directly (not when mean has been loaded) 275 if isinstance(mean, list) and len(mean) < nz: 276 # Check if it is more than one entry and give error 277 assert len(mean) == 1, \ 278 'Information from MEAN has been given for {0} layers, whereas {1} is needed!' \ 279 .format(len(mean), nz) 280 281 # Only 1 entry; copy this to all layers 282 print( 283 '\033[1;33mSingle entry for MEAN will be copied to all {0} layers\033[1;m'.format(nz)) 284 self.prior_info[name]['mean'] = mean * nz 285 286 else: 287 self.prior_info[name]['mean'] = mean 288 289 # Check variogram model 290 if len(vario) < nz: 291 # Check if it is more than one entry and give error 292 assert len(vario) == 1, \ 293 'Information from VARIO has been given for {0} layers, whereas {1} is needed!' \ 294 .format(len(vario), nz) 295 296 # Only 1 entry; copy this to all layers 297 print( 298 '\033[1;33mSingle entry for VARIO will be copied to all {0} layers\033[1;m'.format(nz)) 299 self.prior_info[name]['vario'] = vario * nz 300 301 else: 302 self.prior_info[name]['vario'] = vario 303 304 # Variance 305 if len(variance) < nz: 306 # Check if it is more than one entry and give error 307 assert len(variance) == 1, \ 308 'Information from VAR has been given for {0} layers, whereas {1} is needed!' \ 309 .format(len(variance), nz) 310 311 # Only 1 entry; copy this to all layers 312 print( 313 '\033[1;33mSingle entry for VAR will be copied to all {0} layers\033[1;m'.format(nz)) 314 self.prior_info[name]['variance'] = variance * nz 315 316 else: 317 self.prior_info[name]['variance'] = variance 318 319 # Aniso factor 320 if len(aniso) < nz: 321 # Check if it is more than one entry and give error 322 assert len(aniso) == 1, \ 323 'Information from ANISO has been given for {0} layers, whereas {1} is needed!' \ 324 .format(len(aniso), nz) 325 326 # Only 1 entry; copy this to all layers 327 print( 328 '\033[1;33mSingle entry for ANISO will be copied to all {0} layers\033[1;m'.format(nz)) 329 self.prior_info[name]['aniso'] = aniso * nz 330 331 else: 332 self.prior_info[name]['aniso'] = aniso 333 334 # Aniso factor 335 if len(angle) < nz: 336 # Check if it is more than one entry and give error 337 assert len(angle) == 1, \ 338 'Information from ANGLE has been given for {0} layers, whereas {1} is needed!' \ 339 .format(len(angle), nz) 340 341 # Only 1 entry; copy this to all layers 342 print( 343 '\033[1;33mSingle entry for ANGLE will be copied to all {0} layers\033[1;m'.format(nz)) 344 self.prior_info[name]['angle'] = angle * nz 345 346 else: 347 self.prior_info[name]['angle'] = angle 348 349 # Corr. length 350 if len(corr_length) < nz: 351 # Check if it is more than one entry and give error 352 assert len(corr_length) == 1, \ 353 'Information from RANGE has been given for {0} layers, whereas {1} is needed!' \ 354 .format(len(corr_length), nz) 355 356 # Only 1 entry; copy this to all layers 357 print( 358 '\033[1;33mSingle entry for RANGE will be copied to all {0} layers\033[1;m'.format(nz)) 359 self.prior_info[name]['corr_length'] = corr_length * nz 360 361 else: 362 self.prior_info[name]['corr_length'] = corr_length 363 364 # Limits, if exists 365 if limits is not None: 366 if isinstance(limits[0], list) and len(limits) < nz or \ 367 not isinstance(limits[0], list) and len(limits) < 2 * nz: 368 # Check if it is more than one entry and give error 369 assert (isinstance(limits[0], list) and len(limits) == 1), \ 370 'Information from LIMITS has been given for {0} layers, whereas {1} is needed!' \ 371 .format(len(limits), nz) 372 assert (not isinstance(limits[0], list) and len(limits) == 2), \ 373 'Information from LIMITS has been given for {0} layers, whereas {1} is needed!' \ 374 .format(len(limits) / 2, nz) 375 376 # Only 1 entry; copy this to all layers 377 print( 378 '\033[1;33mSingle entry for RANGE will be copied to all {0} layers\033[1;m'.format(nz)) 379 self.prior_info[name]['limits'] = [limits] * nz 380 381 else: # 2D grid only, or optimization case 382 nz = 1 383 self.prior_info[name]['mean'] = mean 384 self.prior_info[name]['vario'] = vario 385 self.prior_info[name]['variance'] = variance 386 self.prior_info[name]['aniso'] = aniso 387 self.prior_info[name]['angle'] = angle 388 self.prior_info[name]['corr_length'] = corr_length 389 if limits is not None: 390 self.prior_info[name]['limits'] = limits 391 if active is not None: 392 self.prior_info[name]['active'] = active 393 394 self.prior_info[name]['nx'] = nx 395 self.prior_info[name]['ny'] = ny 396 self.prior_info[name]['nz'] = nz 397 398 # Loop over keys and input 399 400 def gen_init_ensemble(self): 401 """ 402 Generate the initial ensemble of (joint) state vectors using the GeoStat class in the "geostat" package. 403 TODO: Merge this function with the perturbation function _gen_state_ensemble in popt. 404 """ 405 # Initialize GeoStat class 406 init_en = Cholesky() 407 408 # (Re)initialize state variable as dictionary 409 self.state = {} 410 self.cov_prior = {} 411 412 for name in self.prior_info: 413 # Init. indices to pick out correct mean vector for each layer 414 ind_end = 0 415 416 # Extract info. 417 nz = self.prior_info[name]['nz'] 418 mean = self.prior_info[name]['mean'] 419 nx = self.prior_info[name]['nx'] 420 ny = self.prior_info[name]['ny'] 421 if nx == ny == 0: # assume ensemble will be generated elsewhere if dimensions are zero 422 break 423 variance = self.prior_info[name]['variance'] 424 corr_length = self.prior_info[name]['corr_length'] 425 aniso = self.prior_info[name]['aniso'] 426 vario = self.prior_info[name]['vario'] 427 angle = self.prior_info[name]['angle'] 428 if 'limits' in self.prior_info[name]: 429 limits = self.prior_info[name]['limits'] 430 else: 431 limits = None 432 433 # Loop over nz to make layers of 2D priors 434 for i in range(self.prior_info[name]['nz']): 435 # If mean is scalar, no covariance matrix is needed 436 if type(self.prior_info[name]['mean']).__module__ == 'numpy': 437 # Generate covariance matrix 438 cov = init_en.gen_cov2d( 439 nx, ny, variance[i], corr_length[i], aniso[i], angle[i], vario[i]) 440 else: 441 cov = np.array(variance[i]) 442 443 # Pick out the mean vector for the current layer 444 ind_start = ind_end 445 ind_end = int((i + 1) * (len(mean) / nz)) 446 mean_layer = mean[ind_start:ind_end] 447 448 # Generate realizations. If LIMITS have been entered, they must be taken account for here 449 if limits is None: 450 real = init_en.gen_real(mean_layer, cov, self.ne) 451 else: 452 real = init_en.gen_real(mean_layer, cov, self.ne, { 453 'upper': limits[i][1], 'lower': limits[i][0]}) 454 455 # Stack realizations for each layer 456 if i == 0: 457 real_out = real 458 else: 459 real_out = np.vstack((real_out, real)) 460 461 # Store realizations in dictionary with name given in STATICVAR 462 self.state[name] = real_out 463 464 # Store the covariance matrix 465 self.cov_prior[name] = cov 466 467 # Save the ensemble for later inspection 468 np.savez('prior.npz', **self.state) 469 470 def get_list_assim_steps(self): 471 """ 472 Returns list of assimilation steps. Useful in a 'loop'-script. 473 474 Returns 475 ------- 476 list_assim : list 477 List of total assimilation steps. 478 """ 479 # Get list of assim. steps. from ASSIMINDEX 480 list_assim = list(range(len(self.keys_da['assimindex']))) 481 482 # If it is a restart run, we only list the assimilation steps we have not done 483 if self.restart is True: 484 # List simulations we already have done. Do this by checking pred_data. 485 # OBS: Minus 1 here do to the aborted simulation is also not None. 486 sim_done = list( 487 range(len([ind for ind, p in enumerate(self.pred_data) if p is not None]) - 1)) 488 489 # Update list of assim. steps by removing simulations we have done 490 list_assim = [ind for ind in list_assim if ind not in sim_done] 491 492 # Return tot. assim. steps 493 return list_assim 494 495 def calc_prediction(self, input_state=None, save_prediction=None): 496 """ 497 Method for making predictions using the state variable. Will output the simulator response for all report steps 498 and all data values provided to the simulator. 499 500 Parameters 501 ---------- 502 input_state: 503 Use an input state instead of internal state (stored in self) to run predictions 504 save_prediction: 505 Save the predictions as a <save_prediction>.npz file (numpy compressed file) 506 507 Returns 508 ------- 509 prediction: 510 List of dictionaries with keys equal to data types (in DATATYPE), 511 containing the responses at each time step given in PREDICTION. 512 513 """ 514 515 if hasattr(self, 'multilevel'): 516 success = self.calc_ml_prediction(input_state) 517 else: 518 # Number of parallel runs 519 if 'parallel' in self.sim.input_dict: 520 no_tot_run = int(self.sim.input_dict['parallel']) 521 else: 522 no_tot_run = 1 523 self.pred_data = [] 524 525 # for level in self.multilevel['level']: # 526 # Setup forward simulator and redundant simulator at the correct fidelity 527 if self.sim.redund_sim is not None: 528 self.sim.redund_sim.setup_fwd_run() 529 self.sim.setup_fwd_run(redund_sim=self.sim.redund_sim) 530 531 # Ensure that we put all the states in a list 532 list_state = [deepcopy({}) for _ in range(self.ne)] 533 for i in range(self.ne): 534 if input_state is None: 535 for key in self.state.keys(): 536 if self.state[key].ndim == 1: 537 list_state[i][key] = deepcopy(self.state[key]) 538 elif self.state[key].ndim == 2: 539 list_state[i][key] = deepcopy(self.state[key][:, i]) 540 # elif self.state[key].ndim == 3: 541 # list_state[i][key] = deepcopy(self.state[key][level,:, i]) 542 else: 543 for key in self.state.keys(): 544 if input_state[key].ndim == 1: 545 list_state[i][key] = deepcopy(input_state[key]) 546 elif input_state[key].ndim == 2: 547 list_state[i][key] = deepcopy(input_state[key][:, i]) 548 # elif input_state[key].ndim == 3: 549 # list_state[i][key] = deepcopy(input_state[key][:,:, i]) 550 if self.aux_input is not None: # several models are used 551 list_state[i]['aux_input'] = self.aux_input[i] 552 553 # Index list of ensemble members 554 list_member_index = list(range(self.ne)) 555 556 if no_tot_run==1: # if not in parallel we use regular loop 557 en_pred = [self.sim.run_fwd_sim(state, member_index) for state, member_index in 558 tqdm(zip(list_state, list_member_index), total=len(list_state))] 559 else: # Run prediction in parallel using p_map 560 en_pred = p_map(self.sim.run_fwd_sim, list_state, 561 list_member_index, num_cpus=no_tot_run, disable=self.disable_tqdm) 562 563 # List successful runs and crashes 564 list_crash = [indx for indx, el in enumerate(en_pred) if el is False] 565 list_success = [indx for indx, el in enumerate(en_pred) if el is not False] 566 success = True 567 568 # Dump all information and print error if all runs have crashed 569 if not list_success: 570 self.save() 571 success = False 572 if len(list_crash) > 1: 573 print( 574 '\n\033[1;31mERROR: All started simulations has failed! We dump all information and exit!\033[1;m') 575 self.logger.info( 576 '\n\033[1;31mERROR: All started simulations has failed! We dump all information and exit!\033[1;m') 577 sys.exit(1) 578 return success 579 580 # Check crashed runs 581 if list_crash: 582 # Replace crashed runs with (random) successful runs. If there are more crashed runs than successful once, 583 # we draw with replacement. 584 if len(list_crash) < len(list_success): 585 copy_member = np.random.choice( 586 list_success, size=len(list_crash), replace=False) 587 else: 588 copy_member = np.random.choice( 589 list_success, size=len(list_crash), replace=True) 590 591 # Insert the replaced runs in prediction list 592 for indx, el in enumerate(copy_member): 593 print(f'\033[92m--- Ensemble member {list_crash[indx]} failed, has been replaced by ensemble member ' 594 f'{el}! ---\033[92m') 595 self.logger.info(f'\033[92m--- Ensemble member {list_crash[indx]} failed, has been replaced by ' 596 f'ensemble member {el}! ---\033[92m') 597 for key in self.state.keys(): 598 if self.state[key].ndim > 1: 599 self.state[key][:, list_crash[indx]] = deepcopy( 600 self.state[key][:, el]) 601 en_pred[list_crash[indx]] = deepcopy(en_pred[el]) 602 603 # Convert ensemble specific result into pred_data, and filter for NONE data 604 self.pred_data.extend([{typ: np.concatenate(tuple((el[ind][typ][:, np.newaxis]) for el in en_pred), axis=1) 605 if any(elem is not None for elem in tuple((el[ind][typ]) for el in en_pred)) 606 else None for typ in en_pred[0][0].keys()} for ind in range(len(en_pred[0]))]) 607 608 # some predicted data might need to be adjusted (e.g. scaled or compressed if it is 4D seis data). Do not 609 # include this here. 610 611 # Store results if needed 612 if save_prediction is not None: 613 np.savez(f'{save_prediction}.npz', **{'pred_data': self.pred_data}) 614 615 return success 616 617 def save(self): 618 """ 619 We use pickle to dump all the information we have in 'self'. Can be used, e.g., if some error has occurred. 620 621 Changelog 622 --------- 623 - ST 28/2-17 624 """ 625 # Open save file and dump all info. in self 626 with open(self.pickle_restart_file, 'wb') as f: 627 pickle.dump(self.__dict__, f, protocol=4) 628 629 def load(self): 630 """ 631 Load a pickled file and save all info. in self. 632 633 Changelog 634 --------- 635 - ST 28/2-17 636 """ 637 # Open file and read with pickle 638 with open(self.pickle_restart_file, 'rb') as f: 639 tmp_load = pickle.load(f) 640 641 # Save in 'self' 642 self.__dict__.update(tmp_load) 643 644 def calc_ml_prediction(self, input_state=None): 645 """ 646 Function for running the simulator over several levels. We assume that it is sufficient to provide the level 647 integer to the setup of the forward run. This will initiate the correct simulator fidelity. 648 The function then runs the set of state through the different simulator fidelities. 649 650 Parameters 651 ---------- 652 input_state: 653 If simulation is run stand-alone one can input any state. 654 """ 655 656 no_tot_run = int(self.sim.input_dict['parallel']) 657 ml_pred_data = [] 658 659 for level in tqdm(self.multilevel['levels'], desc='Fidelity level', position=1): 660 # Setup forward simulator and redundant simulator at the correct fidelity 661 if self.sim.redund_sim is not None: 662 self.sim.redund_sim.setup_fwd_run(level=level) 663 self.sim.setup_fwd_run(level=level) 664 ml_ne = self.multilevel['ne'][level] 665 # Ensure that we put all the states in a list 666 list_state = [deepcopy({}) for _ in ml_ne] 667 for i in ml_ne: 668 if input_state is None: 669 for key in self.state[level].keys(): 670 if self.state[level][key].ndim == 1: 671 list_state[i][key] = deepcopy(self.state[level][key]) 672 elif self.state[level][key].ndim == 2: 673 list_state[i][key] = deepcopy(self.state[level][key][:, i]) 674 else: 675 for key in self.state.keys(): 676 if input_state[level][key].ndim == 1: 677 list_state[i][key] = deepcopy(input_state[level][key]) 678 elif input_state[level][key].ndim == 2: 679 list_state[i][key] = deepcopy(input_state[level][key][:, i]) 680 if self.aux_input is not None: # several models are used 681 list_state[i]['aux_input'] = self.aux_input[i] 682 683 # Index list of ensemble members 684 list_member_index = list(ml_ne) 685 686 # Run prediction in parallel using p_map 687 en_pred = p_map(self.sim.run_fwd_sim, list_state, 688 list_member_index, num_cpus=no_tot_run, disable=self.disable_tqdm) 689 690 # List successful runs and crashes 691 list_crash = [indx for indx, el in enumerate(en_pred) if el is False] 692 list_success = [indx for indx, el in enumerate(en_pred) if el is not False] 693 success = True 694 695 # Dump all information and print error if all runs have crashed 696 if not list_success: 697 self.save() 698 success = False 699 if len(list_crash) > 1: 700 print( 701 '\n\033[1;31mERROR: All started simulations has failed! We dump all information and exit!\033[1;m') 702 self.logger.info( 703 '\n\033[1;31mERROR: All started simulations has failed! We dump all information and exit!\033[1;m') 704 sys.exit(1) 705 return success 706 707 # Check crashed runs 708 if list_crash: 709 # Replace crashed runs with (random) successful runs. If there are more crashed runs than successful once, 710 # we draw with replacement. 711 if len(list_crash) < len(list_success): 712 copy_member = np.random.choice( 713 list_success, size=len(list_crash), replace=False) 714 else: 715 copy_member = np.random.choice( 716 list_success, size=len(list_crash), replace=True) 717 718 # Insert the replaced runs in prediction list 719 for indx, el in enumerate(copy_member): 720 print(f'\033[92m--- Ensemble member {list_crash[indx]} failed, has been replaced by ensemble member ' 721 f'{el}! ---\033[92m') 722 self.logger.info(f'\033[92m--- Ensemble member {list_crash[indx]} failed, has been replaced by ' 723 f'ensemble member {el}! ---\033[92m') 724 for key in self.state[level].keys(): 725 self.state[level][key][:, list_crash[indx]] = deepcopy( 726 self.state[level][key][:, el]) 727 en_pred[list_crash[indx]] = deepcopy(en_pred[el]) 728 729 # Convert ensemble specific result into pred_data, and filter for NONE data 730 ml_pred_data.append([{typ: np.concatenate(tuple((el[ind][typ][:, np.newaxis]) for el in en_pred), axis=1) 731 if any(elem is not None for elem in tuple((el[ind][typ]) for el in en_pred)) 732 else None for typ in en_pred[0][0].keys()} for ind in range(len(en_pred[0]))]) 733 734 # loop over time instance first, and the level instance. 735 self.pred_data = np.array(ml_pred_data).T.tolist() 736 737 self.treat_modeling_error(self.iteration) 738 739 return success
28class Ensemble: 29 """ 30 Class for organizing misc. variables and simulator for an ensemble-based inversion run. Here, the forecast step 31 and prediction runs are performed. General methods that are useful in various ensemble loops have also been 32 implemented here. 33 """ 34 35 def __init__(self, keys_en, sim, redund_sim=None): 36 """ 37 Class extends the ReadInitFile class. First the PIPT init. file is passed to the parent class for reading and 38 parsing. Rest of the initialization uses the keywords parsed in ReadInitFile (parent) class to set up observed, 39 predicted data and data variance dictionaries. Also, the simulator to be used in forecast and/or predictions is 40 initialized with keywords parsed in ReadInitFile (parent) class. Lastly, the initial ensemble is generated (if 41 it has not been inputted), and some saving of variables can be done chosen in PIPT init. file. 42 43 Parameter 44 --------- 45 init_file : str 46 path to input file containing initiallization values 47 """ 48 # Internalize PET dictionary 49 self.keys_en = keys_en 50 self.sim = sim 51 self.sim.redund_sim = redund_sim 52 self.pred_data = None 53 54 # Auxilliary input to the simulator - can be used e.g., 55 # to allow for different models when optimizing. 56 self.aux_input = None 57 58 # Setup logger 59 logging.basicConfig(level=logging.INFO, 60 filename='pet_logger.log', 61 filemode='w', 62 format='%(asctime)s : %(levelname)s : %(name)s : %(message)s', 63 datefmt='%Y-%m-%d %H:%M:%S') 64 self.logger = logging.getLogger('PET') 65 66 # Check if folder contains any En_ files, and remove them! 67 for folder in glob('En_*'): 68 try: 69 if len(folder.split('_')) == 2: 70 int(folder.split('_')[1]) 71 rmtree(folder) 72 except: 73 pass 74 75 # Save name for (potential) pickle dump/load 76 self.pickle_restart_file = 'emergency_dump' 77 78 # initiallize the restart. Standard is no restart 79 self.restart = False 80 81 # If it is a restart run, we do not need to initialize anything, only load the self info. that exists in the 82 # pickle save file. If it is not a restart run, we initialize everything below. 83 if 'restart' in self.keys_en and self.keys_en['restart'] == 'yes': 84 # Initiate a restart run 85 self.logger.info('\033[92m--- Restart run initiated! ---\033[92m') 86 # Check if the pickle save file exists in folder 87 try: 88 assert (self.pickle_restart_file in [ 89 f for f in os.listdir('.') if os.path.isfile(f)]) 90 except AssertionError as err: 91 self.logger.exception('The restart file "{0}" does not exist in folder. Cannot restart!'.format( 92 self.pickle_restart_file)) 93 raise err 94 95 # Load restart file 96 self.load() 97 98 # Ensure that restart switch is ON since the error may not have happened during a restart run 99 self.restart = True 100 101 # Init. various variables/lists/dicts. needed in ensemble run 102 else: 103 # delete potential restart files to avoid any problems 104 if self.pickle_restart_file in [f for f in os.listdir('.') if os.path.isfile(f)]: 105 os.remove(self.pickle_restart_file) 106 107 # initialize sim limit 108 if 'sim_limit' in self.keys_en: 109 self.sim_limit = self.keys_en['sim_limit'] 110 else: 111 self.sim_limit = float('inf') 112 113 # bool that can be used to supress tqdm output (useful when testing code) 114 if 'disable_tqdm' in self.keys_en: 115 self.disable_tqdm = self.keys_en['disable_tqdm'] 116 else: 117 self.disable_tqdm = False 118 119 # extract information that is given for the prior model 120 self._ext_prior_info() 121 122 # Calculate initial ensemble if IMPORTSTATICVAR has not been given in init. file. 123 # Prior info. on state variables must be given by PRIOR_<STATICVAR-name> keyword. 124 if 'importstaticvar' not in self.keys_en: 125 self.ne = int(self.keys_en['ne']) 126 127 # Output = self.state, self.cov_prior 128 self.gen_init_ensemble() 129 130 else: 131 # State variable imported as a Numpy save file 132 tmp_load = np.load(self.keys_en['importstaticvar'], allow_pickle=True) 133 134 # We assume that the user has saved the state dict. as **state (effectively saved all keys in state 135 # individually). 136 self.state = {key: val for key, val in tmp_load.items()} 137 138 # Find the number of ensemble members from state variable 139 tmp_ne = [] 140 for tmp_state in self.state.keys(): 141 tmp_ne.extend([self.state[tmp_state].shape[1]]) 142 if max(tmp_ne) != min(tmp_ne): 143 print('\033[1;33mInput states have different ensemble size\033[1;m') 144 sys.exit(1) 145 self.ne = min(tmp_ne) 146 self._ext_ml_info() 147 148 def _ext_ml_info(self): 149 ''' 150 Extract the info needed for ML simulations. Note if the ML keyword is not in keys_en we initialize 151 such that we only have one level -- the high fidelity one 152 ''' 153 154 if 'multilevel' in self.keys_en: 155 # parse 156 self.multilevel = {} 157 for i, opt in enumerate(list(zip(*self.keys_en['multilevel']))[0]): 158 if opt == 'levels': 159 self.multilevel['levels'] = [elem for elem in range( 160 int(self.keys_en['multilevel'][i][1]))] 161 if opt == 'en_size': 162 self.multilevel['ne'] = [range(int(el)) 163 for el in self.keys_en['multilevel'][i][1]] 164 165 def _ext_prior_info(self): 166 """ 167 Extract prior information on STATE from keyword(s) PRIOR_<STATE entries>. 168 """ 169 # Parse prior info on each state entered in STATE. 170 # Store names given in STATE 171 if not isinstance(self.keys_en['state'], list): # Single string 172 state_names = [self.keys_en['state']] 173 else: # List 174 state_names = self.keys_en['state'] 175 176 # Check if PRIOR_<state names> exists for each entry in STATE 177 for name in state_names: 178 assert 'prior_' + name in self.keys_en, \ 179 'PRIOR_{0} is missing! This keyword is needed to make initial ensemble for {0} entered in ' \ 180 'STATE'.format(name.upper()) 181 182 # Init. prior info variable 183 self.prior_info = {keys: None for keys in state_names} 184 185 # Loop over each prior keyword and make an initial. ensemble for each state in STATE, 186 # which is subsequently stored in the state dictionary. If 3D grid dimensions are inputted, information for 187 # each layer must be inputted, else the single information will be copied to all layers. 188 grid_dim = np.array([0, 0]) 189 for name in state_names: 190 # initiallize an empty dictionary inside the dictionary. 191 self.prior_info[name] = {} 192 # List the option names inputted in prior keyword 193 # opt_list = list(zip(*self.keys_da['prior_'+name])) 194 mean = None 195 self.prior_info[name]['mean'] = mean 196 vario = [None] 197 self.prior_info[name]['vario'] = vario 198 aniso = [None] 199 self.prior_info[name]['aniso'] = aniso 200 angle = [None] 201 self.prior_info[name]['angle'] = angle 202 corr_length = [None] 203 self.prior_info[name]['corr_length'] = corr_length 204 self.prior_info[name]['nx'] = self.prior_info[name]['ny'] = self.prior_info[name]['nz'] = None 205 206 # Extract info. from the prior keyword 207 for i, opt in enumerate(list(zip(*self.keys_en['prior_' + name]))[0]): 208 if opt == 'vario': # Variogram model 209 if not isinstance(self.keys_en['prior_' + name][i][1], list): 210 vario = [self.keys_en['prior_' + name][i][1]] 211 else: 212 vario = self.keys_en['prior_' + name][i][1] 213 elif opt == 'mean': # Mean 214 mean = self.keys_en['prior_' + name][i][1] 215 elif opt == 'var': # Variance 216 if not isinstance(self.keys_en['prior_' + name][i][1], list): 217 variance = [self.keys_en['prior_' + name][i][1]] 218 else: 219 variance = self.keys_en['prior_' + name][i][1] 220 elif opt == 'aniso': # Anisotropy factor 221 if not isinstance(self.keys_en['prior_' + name][i][1], list): 222 aniso = [self.keys_en['prior_' + name][i][1]] 223 else: 224 aniso = self.keys_en['prior_' + name][i][1] 225 elif opt == 'angle': # Anisotropy angle 226 if not isinstance(self.keys_en['prior_' + name][i][1], list): 227 angle = [self.keys_en['prior_' + name][i][1]] 228 else: 229 angle = self.keys_en['prior_' + name][i][1] 230 elif opt == 'range': # Correlation length 231 if not isinstance(self.keys_en['prior_' + name][i][1], list): 232 corr_length = [self.keys_en['prior_' + name][i][1]] 233 else: 234 corr_length = self.keys_en['prior_' + name][i][1] 235 elif opt == 'grid': # Grid dimensions 236 grid_dim = self.keys_en['prior_' + name][i][1] 237 elif opt == 'limits': # Truncation values 238 limits = self.keys_en['prior_' + name][i][1:] 239 elif opt == 'active': # Number of active cells (single number) 240 active = self.keys_en['prior_' + name][i][1] 241 242 # Check if mean needs to be loaded, or if loaded 243 if type(mean) is str: 244 assert mean.endswith('.npz'), 'File name does not end with \'.npz\'!' 245 load_file = np.load(mean) 246 assert len(load_file.files) == 1, \ 247 'More than one variable located in {0}. Only the mean vector can be stored in the .npz file!' \ 248 .format(mean) 249 mean = load_file[load_file.files[0]] 250 else: # Single number inputted, make it a list if not already 251 if not isinstance(mean, list): 252 mean = [mean] 253 254 # Check if limits exists 255 try: 256 limits 257 except NameError: 258 limits = None 259 260 # check if active exists 261 try: 262 active 263 except NameError: 264 active = None 265 266 # Extract x- and y-dim 267 nx = int(grid_dim[0]) 268 ny = int(grid_dim[1]) 269 270 # Check if 3D grid inputted. If so, we check if info. has been given on all layers. In the case it has 271 # not been given, we just copy the info. given. 272 if len(grid_dim) == 3 and grid_dim[2] > 1: # 3D 273 nz = int(grid_dim[2]) 274 275 # Check mean when values have been inputted directly (not when mean has been loaded) 276 if isinstance(mean, list) and len(mean) < nz: 277 # Check if it is more than one entry and give error 278 assert len(mean) == 1, \ 279 'Information from MEAN has been given for {0} layers, whereas {1} is needed!' \ 280 .format(len(mean), nz) 281 282 # Only 1 entry; copy this to all layers 283 print( 284 '\033[1;33mSingle entry for MEAN will be copied to all {0} layers\033[1;m'.format(nz)) 285 self.prior_info[name]['mean'] = mean * nz 286 287 else: 288 self.prior_info[name]['mean'] = mean 289 290 # Check variogram model 291 if len(vario) < nz: 292 # Check if it is more than one entry and give error 293 assert len(vario) == 1, \ 294 'Information from VARIO has been given for {0} layers, whereas {1} is needed!' \ 295 .format(len(vario), nz) 296 297 # Only 1 entry; copy this to all layers 298 print( 299 '\033[1;33mSingle entry for VARIO will be copied to all {0} layers\033[1;m'.format(nz)) 300 self.prior_info[name]['vario'] = vario * nz 301 302 else: 303 self.prior_info[name]['vario'] = vario 304 305 # Variance 306 if len(variance) < nz: 307 # Check if it is more than one entry and give error 308 assert len(variance) == 1, \ 309 'Information from VAR has been given for {0} layers, whereas {1} is needed!' \ 310 .format(len(variance), nz) 311 312 # Only 1 entry; copy this to all layers 313 print( 314 '\033[1;33mSingle entry for VAR will be copied to all {0} layers\033[1;m'.format(nz)) 315 self.prior_info[name]['variance'] = variance * nz 316 317 else: 318 self.prior_info[name]['variance'] = variance 319 320 # Aniso factor 321 if len(aniso) < nz: 322 # Check if it is more than one entry and give error 323 assert len(aniso) == 1, \ 324 'Information from ANISO has been given for {0} layers, whereas {1} is needed!' \ 325 .format(len(aniso), nz) 326 327 # Only 1 entry; copy this to all layers 328 print( 329 '\033[1;33mSingle entry for ANISO will be copied to all {0} layers\033[1;m'.format(nz)) 330 self.prior_info[name]['aniso'] = aniso * nz 331 332 else: 333 self.prior_info[name]['aniso'] = aniso 334 335 # Aniso factor 336 if len(angle) < nz: 337 # Check if it is more than one entry and give error 338 assert len(angle) == 1, \ 339 'Information from ANGLE has been given for {0} layers, whereas {1} is needed!' \ 340 .format(len(angle), nz) 341 342 # Only 1 entry; copy this to all layers 343 print( 344 '\033[1;33mSingle entry for ANGLE will be copied to all {0} layers\033[1;m'.format(nz)) 345 self.prior_info[name]['angle'] = angle * nz 346 347 else: 348 self.prior_info[name]['angle'] = angle 349 350 # Corr. length 351 if len(corr_length) < nz: 352 # Check if it is more than one entry and give error 353 assert len(corr_length) == 1, \ 354 'Information from RANGE has been given for {0} layers, whereas {1} is needed!' \ 355 .format(len(corr_length), nz) 356 357 # Only 1 entry; copy this to all layers 358 print( 359 '\033[1;33mSingle entry for RANGE will be copied to all {0} layers\033[1;m'.format(nz)) 360 self.prior_info[name]['corr_length'] = corr_length * nz 361 362 else: 363 self.prior_info[name]['corr_length'] = corr_length 364 365 # Limits, if exists 366 if limits is not None: 367 if isinstance(limits[0], list) and len(limits) < nz or \ 368 not isinstance(limits[0], list) and len(limits) < 2 * nz: 369 # Check if it is more than one entry and give error 370 assert (isinstance(limits[0], list) and len(limits) == 1), \ 371 'Information from LIMITS has been given for {0} layers, whereas {1} is needed!' \ 372 .format(len(limits), nz) 373 assert (not isinstance(limits[0], list) and len(limits) == 2), \ 374 'Information from LIMITS has been given for {0} layers, whereas {1} is needed!' \ 375 .format(len(limits) / 2, nz) 376 377 # Only 1 entry; copy this to all layers 378 print( 379 '\033[1;33mSingle entry for RANGE will be copied to all {0} layers\033[1;m'.format(nz)) 380 self.prior_info[name]['limits'] = [limits] * nz 381 382 else: # 2D grid only, or optimization case 383 nz = 1 384 self.prior_info[name]['mean'] = mean 385 self.prior_info[name]['vario'] = vario 386 self.prior_info[name]['variance'] = variance 387 self.prior_info[name]['aniso'] = aniso 388 self.prior_info[name]['angle'] = angle 389 self.prior_info[name]['corr_length'] = corr_length 390 if limits is not None: 391 self.prior_info[name]['limits'] = limits 392 if active is not None: 393 self.prior_info[name]['active'] = active 394 395 self.prior_info[name]['nx'] = nx 396 self.prior_info[name]['ny'] = ny 397 self.prior_info[name]['nz'] = nz 398 399 # Loop over keys and input 400 401 def gen_init_ensemble(self): 402 """ 403 Generate the initial ensemble of (joint) state vectors using the GeoStat class in the "geostat" package. 404 TODO: Merge this function with the perturbation function _gen_state_ensemble in popt. 405 """ 406 # Initialize GeoStat class 407 init_en = Cholesky() 408 409 # (Re)initialize state variable as dictionary 410 self.state = {} 411 self.cov_prior = {} 412 413 for name in self.prior_info: 414 # Init. indices to pick out correct mean vector for each layer 415 ind_end = 0 416 417 # Extract info. 418 nz = self.prior_info[name]['nz'] 419 mean = self.prior_info[name]['mean'] 420 nx = self.prior_info[name]['nx'] 421 ny = self.prior_info[name]['ny'] 422 if nx == ny == 0: # assume ensemble will be generated elsewhere if dimensions are zero 423 break 424 variance = self.prior_info[name]['variance'] 425 corr_length = self.prior_info[name]['corr_length'] 426 aniso = self.prior_info[name]['aniso'] 427 vario = self.prior_info[name]['vario'] 428 angle = self.prior_info[name]['angle'] 429 if 'limits' in self.prior_info[name]: 430 limits = self.prior_info[name]['limits'] 431 else: 432 limits = None 433 434 # Loop over nz to make layers of 2D priors 435 for i in range(self.prior_info[name]['nz']): 436 # If mean is scalar, no covariance matrix is needed 437 if type(self.prior_info[name]['mean']).__module__ == 'numpy': 438 # Generate covariance matrix 439 cov = init_en.gen_cov2d( 440 nx, ny, variance[i], corr_length[i], aniso[i], angle[i], vario[i]) 441 else: 442 cov = np.array(variance[i]) 443 444 # Pick out the mean vector for the current layer 445 ind_start = ind_end 446 ind_end = int((i + 1) * (len(mean) / nz)) 447 mean_layer = mean[ind_start:ind_end] 448 449 # Generate realizations. If LIMITS have been entered, they must be taken account for here 450 if limits is None: 451 real = init_en.gen_real(mean_layer, cov, self.ne) 452 else: 453 real = init_en.gen_real(mean_layer, cov, self.ne, { 454 'upper': limits[i][1], 'lower': limits[i][0]}) 455 456 # Stack realizations for each layer 457 if i == 0: 458 real_out = real 459 else: 460 real_out = np.vstack((real_out, real)) 461 462 # Store realizations in dictionary with name given in STATICVAR 463 self.state[name] = real_out 464 465 # Store the covariance matrix 466 self.cov_prior[name] = cov 467 468 # Save the ensemble for later inspection 469 np.savez('prior.npz', **self.state) 470 471 def get_list_assim_steps(self): 472 """ 473 Returns list of assimilation steps. Useful in a 'loop'-script. 474 475 Returns 476 ------- 477 list_assim : list 478 List of total assimilation steps. 479 """ 480 # Get list of assim. steps. from ASSIMINDEX 481 list_assim = list(range(len(self.keys_da['assimindex']))) 482 483 # If it is a restart run, we only list the assimilation steps we have not done 484 if self.restart is True: 485 # List simulations we already have done. Do this by checking pred_data. 486 # OBS: Minus 1 here do to the aborted simulation is also not None. 487 sim_done = list( 488 range(len([ind for ind, p in enumerate(self.pred_data) if p is not None]) - 1)) 489 490 # Update list of assim. steps by removing simulations we have done 491 list_assim = [ind for ind in list_assim if ind not in sim_done] 492 493 # Return tot. assim. steps 494 return list_assim 495 496 def calc_prediction(self, input_state=None, save_prediction=None): 497 """ 498 Method for making predictions using the state variable. Will output the simulator response for all report steps 499 and all data values provided to the simulator. 500 501 Parameters 502 ---------- 503 input_state: 504 Use an input state instead of internal state (stored in self) to run predictions 505 save_prediction: 506 Save the predictions as a <save_prediction>.npz file (numpy compressed file) 507 508 Returns 509 ------- 510 prediction: 511 List of dictionaries with keys equal to data types (in DATATYPE), 512 containing the responses at each time step given in PREDICTION. 513 514 """ 515 516 if hasattr(self, 'multilevel'): 517 success = self.calc_ml_prediction(input_state) 518 else: 519 # Number of parallel runs 520 if 'parallel' in self.sim.input_dict: 521 no_tot_run = int(self.sim.input_dict['parallel']) 522 else: 523 no_tot_run = 1 524 self.pred_data = [] 525 526 # for level in self.multilevel['level']: # 527 # Setup forward simulator and redundant simulator at the correct fidelity 528 if self.sim.redund_sim is not None: 529 self.sim.redund_sim.setup_fwd_run() 530 self.sim.setup_fwd_run(redund_sim=self.sim.redund_sim) 531 532 # Ensure that we put all the states in a list 533 list_state = [deepcopy({}) for _ in range(self.ne)] 534 for i in range(self.ne): 535 if input_state is None: 536 for key in self.state.keys(): 537 if self.state[key].ndim == 1: 538 list_state[i][key] = deepcopy(self.state[key]) 539 elif self.state[key].ndim == 2: 540 list_state[i][key] = deepcopy(self.state[key][:, i]) 541 # elif self.state[key].ndim == 3: 542 # list_state[i][key] = deepcopy(self.state[key][level,:, i]) 543 else: 544 for key in self.state.keys(): 545 if input_state[key].ndim == 1: 546 list_state[i][key] = deepcopy(input_state[key]) 547 elif input_state[key].ndim == 2: 548 list_state[i][key] = deepcopy(input_state[key][:, i]) 549 # elif input_state[key].ndim == 3: 550 # list_state[i][key] = deepcopy(input_state[key][:,:, i]) 551 if self.aux_input is not None: # several models are used 552 list_state[i]['aux_input'] = self.aux_input[i] 553 554 # Index list of ensemble members 555 list_member_index = list(range(self.ne)) 556 557 if no_tot_run==1: # if not in parallel we use regular loop 558 en_pred = [self.sim.run_fwd_sim(state, member_index) for state, member_index in 559 tqdm(zip(list_state, list_member_index), total=len(list_state))] 560 else: # Run prediction in parallel using p_map 561 en_pred = p_map(self.sim.run_fwd_sim, list_state, 562 list_member_index, num_cpus=no_tot_run, disable=self.disable_tqdm) 563 564 # List successful runs and crashes 565 list_crash = [indx for indx, el in enumerate(en_pred) if el is False] 566 list_success = [indx for indx, el in enumerate(en_pred) if el is not False] 567 success = True 568 569 # Dump all information and print error if all runs have crashed 570 if not list_success: 571 self.save() 572 success = False 573 if len(list_crash) > 1: 574 print( 575 '\n\033[1;31mERROR: All started simulations has failed! We dump all information and exit!\033[1;m') 576 self.logger.info( 577 '\n\033[1;31mERROR: All started simulations has failed! We dump all information and exit!\033[1;m') 578 sys.exit(1) 579 return success 580 581 # Check crashed runs 582 if list_crash: 583 # Replace crashed runs with (random) successful runs. If there are more crashed runs than successful once, 584 # we draw with replacement. 585 if len(list_crash) < len(list_success): 586 copy_member = np.random.choice( 587 list_success, size=len(list_crash), replace=False) 588 else: 589 copy_member = np.random.choice( 590 list_success, size=len(list_crash), replace=True) 591 592 # Insert the replaced runs in prediction list 593 for indx, el in enumerate(copy_member): 594 print(f'\033[92m--- Ensemble member {list_crash[indx]} failed, has been replaced by ensemble member ' 595 f'{el}! ---\033[92m') 596 self.logger.info(f'\033[92m--- Ensemble member {list_crash[indx]} failed, has been replaced by ' 597 f'ensemble member {el}! ---\033[92m') 598 for key in self.state.keys(): 599 if self.state[key].ndim > 1: 600 self.state[key][:, list_crash[indx]] = deepcopy( 601 self.state[key][:, el]) 602 en_pred[list_crash[indx]] = deepcopy(en_pred[el]) 603 604 # Convert ensemble specific result into pred_data, and filter for NONE data 605 self.pred_data.extend([{typ: np.concatenate(tuple((el[ind][typ][:, np.newaxis]) for el in en_pred), axis=1) 606 if any(elem is not None for elem in tuple((el[ind][typ]) for el in en_pred)) 607 else None for typ in en_pred[0][0].keys()} for ind in range(len(en_pred[0]))]) 608 609 # some predicted data might need to be adjusted (e.g. scaled or compressed if it is 4D seis data). Do not 610 # include this here. 611 612 # Store results if needed 613 if save_prediction is not None: 614 np.savez(f'{save_prediction}.npz', **{'pred_data': self.pred_data}) 615 616 return success 617 618 def save(self): 619 """ 620 We use pickle to dump all the information we have in 'self'. Can be used, e.g., if some error has occurred. 621 622 Changelog 623 --------- 624 - ST 28/2-17 625 """ 626 # Open save file and dump all info. in self 627 with open(self.pickle_restart_file, 'wb') as f: 628 pickle.dump(self.__dict__, f, protocol=4) 629 630 def load(self): 631 """ 632 Load a pickled file and save all info. in self. 633 634 Changelog 635 --------- 636 - ST 28/2-17 637 """ 638 # Open file and read with pickle 639 with open(self.pickle_restart_file, 'rb') as f: 640 tmp_load = pickle.load(f) 641 642 # Save in 'self' 643 self.__dict__.update(tmp_load) 644 645 def calc_ml_prediction(self, input_state=None): 646 """ 647 Function for running the simulator over several levels. We assume that it is sufficient to provide the level 648 integer to the setup of the forward run. This will initiate the correct simulator fidelity. 649 The function then runs the set of state through the different simulator fidelities. 650 651 Parameters 652 ---------- 653 input_state: 654 If simulation is run stand-alone one can input any state. 655 """ 656 657 no_tot_run = int(self.sim.input_dict['parallel']) 658 ml_pred_data = [] 659 660 for level in tqdm(self.multilevel['levels'], desc='Fidelity level', position=1): 661 # Setup forward simulator and redundant simulator at the correct fidelity 662 if self.sim.redund_sim is not None: 663 self.sim.redund_sim.setup_fwd_run(level=level) 664 self.sim.setup_fwd_run(level=level) 665 ml_ne = self.multilevel['ne'][level] 666 # Ensure that we put all the states in a list 667 list_state = [deepcopy({}) for _ in ml_ne] 668 for i in ml_ne: 669 if input_state is None: 670 for key in self.state[level].keys(): 671 if self.state[level][key].ndim == 1: 672 list_state[i][key] = deepcopy(self.state[level][key]) 673 elif self.state[level][key].ndim == 2: 674 list_state[i][key] = deepcopy(self.state[level][key][:, i]) 675 else: 676 for key in self.state.keys(): 677 if input_state[level][key].ndim == 1: 678 list_state[i][key] = deepcopy(input_state[level][key]) 679 elif input_state[level][key].ndim == 2: 680 list_state[i][key] = deepcopy(input_state[level][key][:, i]) 681 if self.aux_input is not None: # several models are used 682 list_state[i]['aux_input'] = self.aux_input[i] 683 684 # Index list of ensemble members 685 list_member_index = list(ml_ne) 686 687 # Run prediction in parallel using p_map 688 en_pred = p_map(self.sim.run_fwd_sim, list_state, 689 list_member_index, num_cpus=no_tot_run, disable=self.disable_tqdm) 690 691 # List successful runs and crashes 692 list_crash = [indx for indx, el in enumerate(en_pred) if el is False] 693 list_success = [indx for indx, el in enumerate(en_pred) if el is not False] 694 success = True 695 696 # Dump all information and print error if all runs have crashed 697 if not list_success: 698 self.save() 699 success = False 700 if len(list_crash) > 1: 701 print( 702 '\n\033[1;31mERROR: All started simulations has failed! We dump all information and exit!\033[1;m') 703 self.logger.info( 704 '\n\033[1;31mERROR: All started simulations has failed! We dump all information and exit!\033[1;m') 705 sys.exit(1) 706 return success 707 708 # Check crashed runs 709 if list_crash: 710 # Replace crashed runs with (random) successful runs. If there are more crashed runs than successful once, 711 # we draw with replacement. 712 if len(list_crash) < len(list_success): 713 copy_member = np.random.choice( 714 list_success, size=len(list_crash), replace=False) 715 else: 716 copy_member = np.random.choice( 717 list_success, size=len(list_crash), replace=True) 718 719 # Insert the replaced runs in prediction list 720 for indx, el in enumerate(copy_member): 721 print(f'\033[92m--- Ensemble member {list_crash[indx]} failed, has been replaced by ensemble member ' 722 f'{el}! ---\033[92m') 723 self.logger.info(f'\033[92m--- Ensemble member {list_crash[indx]} failed, has been replaced by ' 724 f'ensemble member {el}! ---\033[92m') 725 for key in self.state[level].keys(): 726 self.state[level][key][:, list_crash[indx]] = deepcopy( 727 self.state[level][key][:, el]) 728 en_pred[list_crash[indx]] = deepcopy(en_pred[el]) 729 730 # Convert ensemble specific result into pred_data, and filter for NONE data 731 ml_pred_data.append([{typ: np.concatenate(tuple((el[ind][typ][:, np.newaxis]) for el in en_pred), axis=1) 732 if any(elem is not None for elem in tuple((el[ind][typ]) for el in en_pred)) 733 else None for typ in en_pred[0][0].keys()} for ind in range(len(en_pred[0]))]) 734 735 # loop over time instance first, and the level instance. 736 self.pred_data = np.array(ml_pred_data).T.tolist() 737 738 self.treat_modeling_error(self.iteration) 739 740 return success
Class for organizing misc. variables and simulator for an ensemble-based inversion run. Here, the forecast step and prediction runs are performed. General methods that are useful in various ensemble loops have also been implemented here.
35 def __init__(self, keys_en, sim, redund_sim=None): 36 """ 37 Class extends the ReadInitFile class. First the PIPT init. file is passed to the parent class for reading and 38 parsing. Rest of the initialization uses the keywords parsed in ReadInitFile (parent) class to set up observed, 39 predicted data and data variance dictionaries. Also, the simulator to be used in forecast and/or predictions is 40 initialized with keywords parsed in ReadInitFile (parent) class. Lastly, the initial ensemble is generated (if 41 it has not been inputted), and some saving of variables can be done chosen in PIPT init. file. 42 43 Parameter 44 --------- 45 init_file : str 46 path to input file containing initiallization values 47 """ 48 # Internalize PET dictionary 49 self.keys_en = keys_en 50 self.sim = sim 51 self.sim.redund_sim = redund_sim 52 self.pred_data = None 53 54 # Auxilliary input to the simulator - can be used e.g., 55 # to allow for different models when optimizing. 56 self.aux_input = None 57 58 # Setup logger 59 logging.basicConfig(level=logging.INFO, 60 filename='pet_logger.log', 61 filemode='w', 62 format='%(asctime)s : %(levelname)s : %(name)s : %(message)s', 63 datefmt='%Y-%m-%d %H:%M:%S') 64 self.logger = logging.getLogger('PET') 65 66 # Check if folder contains any En_ files, and remove them! 67 for folder in glob('En_*'): 68 try: 69 if len(folder.split('_')) == 2: 70 int(folder.split('_')[1]) 71 rmtree(folder) 72 except: 73 pass 74 75 # Save name for (potential) pickle dump/load 76 self.pickle_restart_file = 'emergency_dump' 77 78 # initiallize the restart. Standard is no restart 79 self.restart = False 80 81 # If it is a restart run, we do not need to initialize anything, only load the self info. that exists in the 82 # pickle save file. If it is not a restart run, we initialize everything below. 83 if 'restart' in self.keys_en and self.keys_en['restart'] == 'yes': 84 # Initiate a restart run 85 self.logger.info('\033[92m--- Restart run initiated! ---\033[92m') 86 # Check if the pickle save file exists in folder 87 try: 88 assert (self.pickle_restart_file in [ 89 f for f in os.listdir('.') if os.path.isfile(f)]) 90 except AssertionError as err: 91 self.logger.exception('The restart file "{0}" does not exist in folder. Cannot restart!'.format( 92 self.pickle_restart_file)) 93 raise err 94 95 # Load restart file 96 self.load() 97 98 # Ensure that restart switch is ON since the error may not have happened during a restart run 99 self.restart = True 100 101 # Init. various variables/lists/dicts. needed in ensemble run 102 else: 103 # delete potential restart files to avoid any problems 104 if self.pickle_restart_file in [f for f in os.listdir('.') if os.path.isfile(f)]: 105 os.remove(self.pickle_restart_file) 106 107 # initialize sim limit 108 if 'sim_limit' in self.keys_en: 109 self.sim_limit = self.keys_en['sim_limit'] 110 else: 111 self.sim_limit = float('inf') 112 113 # bool that can be used to supress tqdm output (useful when testing code) 114 if 'disable_tqdm' in self.keys_en: 115 self.disable_tqdm = self.keys_en['disable_tqdm'] 116 else: 117 self.disable_tqdm = False 118 119 # extract information that is given for the prior model 120 self._ext_prior_info() 121 122 # Calculate initial ensemble if IMPORTSTATICVAR has not been given in init. file. 123 # Prior info. on state variables must be given by PRIOR_<STATICVAR-name> keyword. 124 if 'importstaticvar' not in self.keys_en: 125 self.ne = int(self.keys_en['ne']) 126 127 # Output = self.state, self.cov_prior 128 self.gen_init_ensemble() 129 130 else: 131 # State variable imported as a Numpy save file 132 tmp_load = np.load(self.keys_en['importstaticvar'], allow_pickle=True) 133 134 # We assume that the user has saved the state dict. as **state (effectively saved all keys in state 135 # individually). 136 self.state = {key: val for key, val in tmp_load.items()} 137 138 # Find the number of ensemble members from state variable 139 tmp_ne = [] 140 for tmp_state in self.state.keys(): 141 tmp_ne.extend([self.state[tmp_state].shape[1]]) 142 if max(tmp_ne) != min(tmp_ne): 143 print('\033[1;33mInput states have different ensemble size\033[1;m') 144 sys.exit(1) 145 self.ne = min(tmp_ne) 146 self._ext_ml_info()
Class extends the ReadInitFile class. First the PIPT init. file is passed to the parent class for reading and parsing. Rest of the initialization uses the keywords parsed in ReadInitFile (parent) class to set up observed, predicted data and data variance dictionaries. Also, the simulator to be used in forecast and/or predictions is initialized with keywords parsed in ReadInitFile (parent) class. Lastly, the initial ensemble is generated (if it has not been inputted), and some saving of variables can be done chosen in PIPT init. file.
Parameter
init_file : str path to input file containing initiallization values
401 def gen_init_ensemble(self): 402 """ 403 Generate the initial ensemble of (joint) state vectors using the GeoStat class in the "geostat" package. 404 TODO: Merge this function with the perturbation function _gen_state_ensemble in popt. 405 """ 406 # Initialize GeoStat class 407 init_en = Cholesky() 408 409 # (Re)initialize state variable as dictionary 410 self.state = {} 411 self.cov_prior = {} 412 413 for name in self.prior_info: 414 # Init. indices to pick out correct mean vector for each layer 415 ind_end = 0 416 417 # Extract info. 418 nz = self.prior_info[name]['nz'] 419 mean = self.prior_info[name]['mean'] 420 nx = self.prior_info[name]['nx'] 421 ny = self.prior_info[name]['ny'] 422 if nx == ny == 0: # assume ensemble will be generated elsewhere if dimensions are zero 423 break 424 variance = self.prior_info[name]['variance'] 425 corr_length = self.prior_info[name]['corr_length'] 426 aniso = self.prior_info[name]['aniso'] 427 vario = self.prior_info[name]['vario'] 428 angle = self.prior_info[name]['angle'] 429 if 'limits' in self.prior_info[name]: 430 limits = self.prior_info[name]['limits'] 431 else: 432 limits = None 433 434 # Loop over nz to make layers of 2D priors 435 for i in range(self.prior_info[name]['nz']): 436 # If mean is scalar, no covariance matrix is needed 437 if type(self.prior_info[name]['mean']).__module__ == 'numpy': 438 # Generate covariance matrix 439 cov = init_en.gen_cov2d( 440 nx, ny, variance[i], corr_length[i], aniso[i], angle[i], vario[i]) 441 else: 442 cov = np.array(variance[i]) 443 444 # Pick out the mean vector for the current layer 445 ind_start = ind_end 446 ind_end = int((i + 1) * (len(mean) / nz)) 447 mean_layer = mean[ind_start:ind_end] 448 449 # Generate realizations. If LIMITS have been entered, they must be taken account for here 450 if limits is None: 451 real = init_en.gen_real(mean_layer, cov, self.ne) 452 else: 453 real = init_en.gen_real(mean_layer, cov, self.ne, { 454 'upper': limits[i][1], 'lower': limits[i][0]}) 455 456 # Stack realizations for each layer 457 if i == 0: 458 real_out = real 459 else: 460 real_out = np.vstack((real_out, real)) 461 462 # Store realizations in dictionary with name given in STATICVAR 463 self.state[name] = real_out 464 465 # Store the covariance matrix 466 self.cov_prior[name] = cov 467 468 # Save the ensemble for later inspection 469 np.savez('prior.npz', **self.state)
Generate the initial ensemble of (joint) state vectors using the GeoStat class in the "geostat" package. TODO: Merge this function with the perturbation function _gen_state_ensemble in popt.
471 def get_list_assim_steps(self): 472 """ 473 Returns list of assimilation steps. Useful in a 'loop'-script. 474 475 Returns 476 ------- 477 list_assim : list 478 List of total assimilation steps. 479 """ 480 # Get list of assim. steps. from ASSIMINDEX 481 list_assim = list(range(len(self.keys_da['assimindex']))) 482 483 # If it is a restart run, we only list the assimilation steps we have not done 484 if self.restart is True: 485 # List simulations we already have done. Do this by checking pred_data. 486 # OBS: Minus 1 here do to the aborted simulation is also not None. 487 sim_done = list( 488 range(len([ind for ind, p in enumerate(self.pred_data) if p is not None]) - 1)) 489 490 # Update list of assim. steps by removing simulations we have done 491 list_assim = [ind for ind in list_assim if ind not in sim_done] 492 493 # Return tot. assim. steps 494 return list_assim
Returns list of assimilation steps. Useful in a 'loop'-script.
Returns
- list_assim (list): List of total assimilation steps.
496 def calc_prediction(self, input_state=None, save_prediction=None): 497 """ 498 Method for making predictions using the state variable. Will output the simulator response for all report steps 499 and all data values provided to the simulator. 500 501 Parameters 502 ---------- 503 input_state: 504 Use an input state instead of internal state (stored in self) to run predictions 505 save_prediction: 506 Save the predictions as a <save_prediction>.npz file (numpy compressed file) 507 508 Returns 509 ------- 510 prediction: 511 List of dictionaries with keys equal to data types (in DATATYPE), 512 containing the responses at each time step given in PREDICTION. 513 514 """ 515 516 if hasattr(self, 'multilevel'): 517 success = self.calc_ml_prediction(input_state) 518 else: 519 # Number of parallel runs 520 if 'parallel' in self.sim.input_dict: 521 no_tot_run = int(self.sim.input_dict['parallel']) 522 else: 523 no_tot_run = 1 524 self.pred_data = [] 525 526 # for level in self.multilevel['level']: # 527 # Setup forward simulator and redundant simulator at the correct fidelity 528 if self.sim.redund_sim is not None: 529 self.sim.redund_sim.setup_fwd_run() 530 self.sim.setup_fwd_run(redund_sim=self.sim.redund_sim) 531 532 # Ensure that we put all the states in a list 533 list_state = [deepcopy({}) for _ in range(self.ne)] 534 for i in range(self.ne): 535 if input_state is None: 536 for key in self.state.keys(): 537 if self.state[key].ndim == 1: 538 list_state[i][key] = deepcopy(self.state[key]) 539 elif self.state[key].ndim == 2: 540 list_state[i][key] = deepcopy(self.state[key][:, i]) 541 # elif self.state[key].ndim == 3: 542 # list_state[i][key] = deepcopy(self.state[key][level,:, i]) 543 else: 544 for key in self.state.keys(): 545 if input_state[key].ndim == 1: 546 list_state[i][key] = deepcopy(input_state[key]) 547 elif input_state[key].ndim == 2: 548 list_state[i][key] = deepcopy(input_state[key][:, i]) 549 # elif input_state[key].ndim == 3: 550 # list_state[i][key] = deepcopy(input_state[key][:,:, i]) 551 if self.aux_input is not None: # several models are used 552 list_state[i]['aux_input'] = self.aux_input[i] 553 554 # Index list of ensemble members 555 list_member_index = list(range(self.ne)) 556 557 if no_tot_run==1: # if not in parallel we use regular loop 558 en_pred = [self.sim.run_fwd_sim(state, member_index) for state, member_index in 559 tqdm(zip(list_state, list_member_index), total=len(list_state))] 560 else: # Run prediction in parallel using p_map 561 en_pred = p_map(self.sim.run_fwd_sim, list_state, 562 list_member_index, num_cpus=no_tot_run, disable=self.disable_tqdm) 563 564 # List successful runs and crashes 565 list_crash = [indx for indx, el in enumerate(en_pred) if el is False] 566 list_success = [indx for indx, el in enumerate(en_pred) if el is not False] 567 success = True 568 569 # Dump all information and print error if all runs have crashed 570 if not list_success: 571 self.save() 572 success = False 573 if len(list_crash) > 1: 574 print( 575 '\n\033[1;31mERROR: All started simulations has failed! We dump all information and exit!\033[1;m') 576 self.logger.info( 577 '\n\033[1;31mERROR: All started simulations has failed! We dump all information and exit!\033[1;m') 578 sys.exit(1) 579 return success 580 581 # Check crashed runs 582 if list_crash: 583 # Replace crashed runs with (random) successful runs. If there are more crashed runs than successful once, 584 # we draw with replacement. 585 if len(list_crash) < len(list_success): 586 copy_member = np.random.choice( 587 list_success, size=len(list_crash), replace=False) 588 else: 589 copy_member = np.random.choice( 590 list_success, size=len(list_crash), replace=True) 591 592 # Insert the replaced runs in prediction list 593 for indx, el in enumerate(copy_member): 594 print(f'\033[92m--- Ensemble member {list_crash[indx]} failed, has been replaced by ensemble member ' 595 f'{el}! ---\033[92m') 596 self.logger.info(f'\033[92m--- Ensemble member {list_crash[indx]} failed, has been replaced by ' 597 f'ensemble member {el}! ---\033[92m') 598 for key in self.state.keys(): 599 if self.state[key].ndim > 1: 600 self.state[key][:, list_crash[indx]] = deepcopy( 601 self.state[key][:, el]) 602 en_pred[list_crash[indx]] = deepcopy(en_pred[el]) 603 604 # Convert ensemble specific result into pred_data, and filter for NONE data 605 self.pred_data.extend([{typ: np.concatenate(tuple((el[ind][typ][:, np.newaxis]) for el in en_pred), axis=1) 606 if any(elem is not None for elem in tuple((el[ind][typ]) for el in en_pred)) 607 else None for typ in en_pred[0][0].keys()} for ind in range(len(en_pred[0]))]) 608 609 # some predicted data might need to be adjusted (e.g. scaled or compressed if it is 4D seis data). Do not 610 # include this here. 611 612 # Store results if needed 613 if save_prediction is not None: 614 np.savez(f'{save_prediction}.npz', **{'pred_data': self.pred_data}) 615 616 return success
Method for making predictions using the state variable. Will output the simulator response for all report steps and all data values provided to the simulator.
Parameters
- input_state:: Use an input state instead of internal state (stored in self) to run predictions
- save_prediction:: Save the predictions as a
.npz file (numpy compressed file)
Returns
- prediction:: List of dictionaries with keys equal to data types (in DATATYPE), containing the responses at each time step given in PREDICTION.
618 def save(self): 619 """ 620 We use pickle to dump all the information we have in 'self'. Can be used, e.g., if some error has occurred. 621 622 Changelog 623 --------- 624 - ST 28/2-17 625 """ 626 # Open save file and dump all info. in self 627 with open(self.pickle_restart_file, 'wb') as f: 628 pickle.dump(self.__dict__, f, protocol=4)
We use pickle to dump all the information we have in 'self'. Can be used, e.g., if some error has occurred.
Changelog
- ST 28/2-17
630 def load(self): 631 """ 632 Load a pickled file and save all info. in self. 633 634 Changelog 635 --------- 636 - ST 28/2-17 637 """ 638 # Open file and read with pickle 639 with open(self.pickle_restart_file, 'rb') as f: 640 tmp_load = pickle.load(f) 641 642 # Save in 'self' 643 self.__dict__.update(tmp_load)
Load a pickled file and save all info. in self.
Changelog
- ST 28/2-17
645 def calc_ml_prediction(self, input_state=None): 646 """ 647 Function for running the simulator over several levels. We assume that it is sufficient to provide the level 648 integer to the setup of the forward run. This will initiate the correct simulator fidelity. 649 The function then runs the set of state through the different simulator fidelities. 650 651 Parameters 652 ---------- 653 input_state: 654 If simulation is run stand-alone one can input any state. 655 """ 656 657 no_tot_run = int(self.sim.input_dict['parallel']) 658 ml_pred_data = [] 659 660 for level in tqdm(self.multilevel['levels'], desc='Fidelity level', position=1): 661 # Setup forward simulator and redundant simulator at the correct fidelity 662 if self.sim.redund_sim is not None: 663 self.sim.redund_sim.setup_fwd_run(level=level) 664 self.sim.setup_fwd_run(level=level) 665 ml_ne = self.multilevel['ne'][level] 666 # Ensure that we put all the states in a list 667 list_state = [deepcopy({}) for _ in ml_ne] 668 for i in ml_ne: 669 if input_state is None: 670 for key in self.state[level].keys(): 671 if self.state[level][key].ndim == 1: 672 list_state[i][key] = deepcopy(self.state[level][key]) 673 elif self.state[level][key].ndim == 2: 674 list_state[i][key] = deepcopy(self.state[level][key][:, i]) 675 else: 676 for key in self.state.keys(): 677 if input_state[level][key].ndim == 1: 678 list_state[i][key] = deepcopy(input_state[level][key]) 679 elif input_state[level][key].ndim == 2: 680 list_state[i][key] = deepcopy(input_state[level][key][:, i]) 681 if self.aux_input is not None: # several models are used 682 list_state[i]['aux_input'] = self.aux_input[i] 683 684 # Index list of ensemble members 685 list_member_index = list(ml_ne) 686 687 # Run prediction in parallel using p_map 688 en_pred = p_map(self.sim.run_fwd_sim, list_state, 689 list_member_index, num_cpus=no_tot_run, disable=self.disable_tqdm) 690 691 # List successful runs and crashes 692 list_crash = [indx for indx, el in enumerate(en_pred) if el is False] 693 list_success = [indx for indx, el in enumerate(en_pred) if el is not False] 694 success = True 695 696 # Dump all information and print error if all runs have crashed 697 if not list_success: 698 self.save() 699 success = False 700 if len(list_crash) > 1: 701 print( 702 '\n\033[1;31mERROR: All started simulations has failed! We dump all information and exit!\033[1;m') 703 self.logger.info( 704 '\n\033[1;31mERROR: All started simulations has failed! We dump all information and exit!\033[1;m') 705 sys.exit(1) 706 return success 707 708 # Check crashed runs 709 if list_crash: 710 # Replace crashed runs with (random) successful runs. If there are more crashed runs than successful once, 711 # we draw with replacement. 712 if len(list_crash) < len(list_success): 713 copy_member = np.random.choice( 714 list_success, size=len(list_crash), replace=False) 715 else: 716 copy_member = np.random.choice( 717 list_success, size=len(list_crash), replace=True) 718 719 # Insert the replaced runs in prediction list 720 for indx, el in enumerate(copy_member): 721 print(f'\033[92m--- Ensemble member {list_crash[indx]} failed, has been replaced by ensemble member ' 722 f'{el}! ---\033[92m') 723 self.logger.info(f'\033[92m--- Ensemble member {list_crash[indx]} failed, has been replaced by ' 724 f'ensemble member {el}! ---\033[92m') 725 for key in self.state[level].keys(): 726 self.state[level][key][:, list_crash[indx]] = deepcopy( 727 self.state[level][key][:, el]) 728 en_pred[list_crash[indx]] = deepcopy(en_pred[el]) 729 730 # Convert ensemble specific result into pred_data, and filter for NONE data 731 ml_pred_data.append([{typ: np.concatenate(tuple((el[ind][typ][:, np.newaxis]) for el in en_pred), axis=1) 732 if any(elem is not None for elem in tuple((el[ind][typ]) for el in en_pred)) 733 else None for typ in en_pred[0][0].keys()} for ind in range(len(en_pred[0]))]) 734 735 # loop over time instance first, and the level instance. 736 self.pred_data = np.array(ml_pred_data).T.tolist() 737 738 self.treat_modeling_error(self.iteration) 739 740 return success
Function for running the simulator over several levels. We assume that it is sufficient to provide the level integer to the setup of the forward run. This will initiate the correct simulator fidelity. The function then runs the set of state through the different simulator fidelities.
Parameters
- input_state:: If simulation is run stand-alone one can input any state.