Source code for mldas.explore.prepare

__copyright__ = """
Machine Learning for Distributed Acoustic Sensing data (MLDAS)
Copyright (c) 2020, The Regents of the University of California,
through Lawrence Berkeley National Laboratory (subject to receipt of
any required approvals from the U.S. Dept. of Energy). All rights reserved.

If you have questions about your rights to use or distribute this software,
please contact Berkeley Lab's Intellectual Property Office at
IPO@lbl.gov.

NOTICE.  This Software was developed under funding from the U.S. Department
of Energy and the U.S. Government consequently retains certain rights.  As
such, the U.S. Government has been granted for itself and others acting on
its behalf a paid-up, nonexclusive, irrevocable, worldwide license in the
Software to reproduce, distribute copies to the public, prepare derivative 
works, and perform publicly and display publicly, and to permit others to do so.
"""
__license__ = "Modified BSD license (see LICENSE.txt)"
__maintainer__ = "Vincent Dumont"
__email__ = "vincentdumont11@gmail.com"

# System
import os,glob,random,time

# Externals
import torch,h5py,numpy

[docs]def set_from_file(filename,img_size=100,stride=10,adjust=False,shuffle=True,select=None,nrand=None): ''' Create set of images out of single DAS data file. Parameters ---------- filename : :py:class:`str` Path the data file img_size : :py:class:`int` Size of squared image stride : :py:class:`int` Sliding interval adjust : :py:class:`bool` Brightness adjustment shuffle : :py:class:`bool` Shuffle output dataset select : :py:class:`int`, :py:class:`list` Index(es) of specific image index nrand : :py:class:`int` Number of images to be randomly selected Returns ------- Xtrain : :py:class:`list` List of individual images ''' assert os.path.exists(filename), '%s: Path not found, make sure the Google Drive is loaded.'%datapath assert not(select!=None and nrand!=None), "Choose either random number or selected indexes. Not both." # Load file f = h5py.File(filename,'r') data = numpy.array(f[f.get('variable/dat')[0,0]]) f.close() # Get corner indexes of every images idxs = numpy.array([[[i,j] for j in range(0,data.shape[1]-img_size+1,stride)] for i in range(0,data.shape[0]-img_size+1,stride)]) idxs = idxs.reshape(idxs.shape[0]*idxs.shape[1],2) # If select defined, select index if select!=None: idxs = idxs[select] if type(select)==list else idxs[[select]] # If nrand non zero and less than actual size of idxs, randomly select indexes if nrand!=None and nrand<len(idxs): idxs = idxs[random.sample(range(0,idxs.shape[0]),nrand)] # Loop over images and extract data Xtrain = [] for k,(i,j) in enumerate(idxs): img = data[i:i+img_size,j:j+img_size].copy() img = (img-img.min())/(img.max()-img.min()) if adjust: img = mean_shift(img) Xtrain.append(img) if shuffle: random.shuffle(Xtrain) return Xtrain
[docs]def mean_shift(data,loops=1): ''' Shit all values from input 2D array to match mean value of 0.5. Parameters ---------- data : :py:class:`numpy.ndarray` Input 2D image loops : :py:class:`int` Number of iteration of algorithm Returns ------- data : :py:class:`numpy.ndarray` Modified 2D data ''' for i in range(loops): ref_mean = data.mean() infs = numpy.where(data<=ref_mean) sups = numpy.where(data>ref_mean) data[infs] = data[infs] / ref_mean * 0.5 data[sups] = 1 - (1-data[sups]) / (1-ref_mean) * 0.5 return data
[docs]def set_creation(datapath,img_size=100,stride=10,sample_size=1,adjust=False,shuffle=True,nrand=None,select=None,verbose=False): """ Create PyTorch tensor training set of single-channel data images. Parameters ---------- datapath : :py:class:`str` Path to data repository img_size : :py:class:`int` Size of squared image stride : :py:class:`int` Sliding interval sample_size : :py:class:`int` Number of input MAT data files to use adjust : :py:class:`bool` Do brightness adjustment shuffle : :py:class:`bool` Shuffling images in set nrand : :py:class:`int` Specify total number of images in final set select : :py:class:`int`, :py:class:`list` Index(es) of specific image index verbose : :py:class:`bool` Do verbose Returns ------- Xtrain : :py:class:`torch.Tensor` Tensor of images """ assert os.path.exists(datapath), '%s: Path not found, make sure the Google Drive is loaded.'%datapath # List all data files available in target repository sample = [datapath] if datapath.endswith('.mat') else glob.glob(datapath+'/*.mat') # Randomly select files idxs = random.sample(range(0,len(sample)),sample_size) # Initialize list to store training images Xtrain = [] # Loop over randomly selecetd files for i,idx in enumerate(idxs): if verbose: print(sample[idx]) # If multiple file selected, wait to load all images before random selection n = None if nrand!=None and sample_size>1 else nrand # Store training data in dataset list Xtrain.extend(set_from_file(sample[idx],img_size,stride,adjust,shuffle,select,n)) # Convert list to numpy array Xtrain = numpy.array(Xtrain,dtype=float) # Check if number of random images is less than dataset size if nrand!=None and nrand<len(Xtrain): # Select random images from dataset Xtrain = Xtrain[random.sample(range(0,len(Xtrain)),nrand)] # Convert numpy array to PyTorch tensor Xtrain = torch.from_numpy(numpy.reshape(Xtrain,(len(Xtrain),1,img_size,img_size))) return Xtrain
[docs]def prepare_loader(datapath,img_size,stride,sample_size,batch_size,nrand=None,adjust=False,shuffle=True,verbose=True): """ Create custom data loader with unlabeled, single-channel, data images directly extracted from raw data files. Parameters ---------- datapath : :py:class:`str` Path to raw data repository img_size : :py:class:`int` Size of squared image stride : :py:class:`int` Sliding interval sample_size : :py:class:`int` Number of input MAT data files to use batch_size : :py:class:`int` Batch size to use in :py:class:`~torch.utils.data.DataLoader` nrand : :py:class:`int` Specify total number of images in final set adjust : :py:class:`bool` Do brightness adjustment shuffle : :py:class:`bool` Shuffling images in set verbose : :py:class:`bool` Do verbose Returns ------- train_loader : :py:class:`torch.utils.data.DataLoader` Data loader """ start_load = time.time() Xtrain = set_creation(datapath,img_size,stride,sample_size,adjust,shuffle,nrand) # Select random images if nrand not None if nrand!=None and nrand<len(Xtrain): Xtrain = Xtrain[random.sample(range(0, len(Xtrain)), nrand)] # Check that the number of training images is larger than the batch size if len(Xtrain)<batch_size: if verbose: print('Input batch size too large (%i), reset to dataset size (%i).'%(batch_size,len(Xtrain))) batch_size = len(Xtrain) # Create Dataloader objects train_loader = torch.utils.data.DataLoader(dataset=Xtrain,batch_size=batch_size,shuffle=True) # Get time spent and final data size if verbose: print('Train loader created in {0:.2f} seconds from {1:,} files with {2:,} images split into:'.format(time.time()-start_load,sample_size,len(Xtrain))) print('\t{0:,} batches of {1:,} images'.format(len(Xtrain)//batch_size,batch_size)) if len(Xtrain)//batch_size!=len(train_loader) and verbose: print('\t1 batch of {:,} images'.format(len(Xtrain)-(len(Xtrain)//batch_size)*batch_size)) return train_loader