General helper functions used throughout the library
source
random_rand
def random_rand(
d:VAR_POSITIONAL, # int or tuple of ints, optional. The dimensions of the returned array, must be non-negative.
dtype:NoneType= None , # data type of the output.
out:NoneType= None , # ndarray, optional. Alternative output array in which to place the result.
seed:NoneType= None , # int or None, optional. Seed for the random number generator.
):
Same as np.random.rand but with a faster random generator, dtype and seed
source
random_randint
def random_randint(
low, # int, lower endpoint of interval (inclusive)
high:NoneType= None , # int, upper endpoint of interval (exclusive), or None for a single-argument form of low.
size:NoneType= None , # int or tuple of ints, optional. Output shape.
dtype:type = int , # data type of the output.
endpoint:bool = False , # bool, optional. If True, `high` is an inclusive endpoint. If False, the range is open on the right.
seed:NoneType= None , # int or None, optional. Seed for the random number generator.
):
Same as np.random.randint but with a faster random generator and seed
source
random_choice
def random_choice(
a, # 1-D array-like or int. The values from which to draw the samples.
size:NoneType= None , # int or tuple of ints, optional. The shape of the output.
replace:bool = True , # bool, optional. Whether or not to allow the same value to be drawn multiple times.
p:NoneType= None , # 1-D array-like, optional. The probabilities associated with each entry in a.
axis:int = 0 , # int, optional. The axis along which the samples are drawn.
shuffle:bool = True , # bool, optional. Whether or not to shuffle the samples before returning them.
dtype:NoneType= None , # data type of the output.
seed:NoneType= None , # int or None, optional. Seed for the random number generator.
):
Same as np.random.choice but with a faster random generator, dtype and seed
a = random_choice(10 , size= (2 ,3 ,4 ), replace= True , p= None , seed= 1 )
b = random_choice(10 , size= (2 ,3 ,4 ), replace= True , p= None , seed= 1 )
test_eq(a, b)
c = random_choice(10 , size= (2 ,3 ,4 ), replace= True , p= None , seed= 2 )
test_ne(a, c)
assert random_choice(10 , size= 3 , replace= True , p= None ).shape == (3 ,)
assert random_choice(10 , size= (2 ,3 ,4 ), replace= True , p= None ).shape == (2 ,3 ,4 )
print (random_choice(10 , size= 3 , replace= True , p= None ))
print (random_choice(10 , size= 3 , replace= False , p= None ))
a = [2 , 5 , 4 , 9 , 13 , 25 , 56 , 83 , 99 , 100 ]
print (random_choice(a, size= 3 , replace= False , p= None ))
[5 7 5]
[0 1 6]
[ 4 83 100]
a = random_randint(10 , 20 , 100 , seed= 1 )
b = random_randint(10 , 20 , 100 , seed= 1 )
test_eq(a, b)
c = random_randint(10 , 20 , 100 , seed= 2 )
test_ne(a, c)
assert (a >= 10 ).all () and (a < 20 ).all ()
a = random_rand(2 , 3 , 4 , seed= 123 )
b = random_rand(2 , 3 , 4 , seed= 123 )
test_eq(a, b)
c = random_rand(2 , 3 , 4 , seed= 124 )
test_ne(a, c)
assert (a >= 0 ).all () and (a < 1 ).all ()
a = random_rand(2 , 3 , 4 )
a_copy = a.copy()
random_rand(2 , 3 , 4 , out= a)
test_ne(a, a_copy)
source
is_slice
Call self as a function.
source
is_memmap
Call self as a function.
source
is_dask
Call self as a function.
source
is_zarr
Call self as a function.
source
is_tensor
Call self as a function.
source
is_nparray
Call self as a function.
# ensure these folders exist for testing purposes
fns = ['data' , 'export' , 'models' ]
for fn in fns:
path = Path('.' )/ fn
if not os.path.exists(path): os.makedirs(path)
source
todtype
Call self as a function.
source
to3dPlusArray
Call self as a function.
source
to3dPlusTensor
Call self as a function.
source
to2dPlusArray
Call self as a function.
source
to2dPlusTensor
Call self as a function.
source
to3dPlus
Call self as a function.
source
to2dPlus
Call self as a function.
source
to1d
Call self as a function.
source
to2d
Call self as a function.
source
to3d
Call self as a function.
source
to1darray
Call self as a function.
source
to2darray
Call self as a function.
source
to3darray
Call self as a function.
source
to1dtensor
Call self as a function.
source
to2dtensor
Call self as a function.
source
to3dtensor
Call self as a function.
source
toL
Call self as a function.
source
toarray
Call self as a function.
source
totensor
Call self as a function.
a = np.random.rand(100 ).astype(np.float32)
b = torch.from_numpy(a).float ()
test_eq(totensor(a), b)
test_eq(a, toarray(b))
test_eq(to3dtensor(a).ndim, 3 )
test_eq(to2dtensor(a).ndim, 2 )
test_eq(to1dtensor(a).ndim, 1 )
test_eq(to3darray(b).ndim, 3 )
test_eq(to2darray(b).ndim, 2 )
test_eq(to1darray(b).ndim, 1 )
data = np.random.rand(10 , 20 )
df = pd.DataFrame(data)
df['target' ] = np.random.randint(0 , 3 , len (df))
X = df[df.columns[:- 1 ]]
y = df['target' ]
test_eq(to3darray(X).shape, (10 , 1 , 20 ))
test_eq(toarray(y).shape, (10 ,))
source
get_file_size
def get_file_size(
file_path:str , # path to file
return_str:bool = True , # True returns size in human-readable format (KB, MB, GB, ...). False in bytes.
decimals:int = 2 , # Number of decimals in the output
):
Call self as a function.
source
get_dir_size
def get_dir_size(
dir_path:str , # path to directory
return_str:bool = True , # True returns size in human-readable format (KB, MB, GB, ...). False in bytes.
decimals:int = 2 , # Number of decimals in the output
verbose:bool = False , # Controls verbosity
):
Call self as a function.
source
get_size
def get_size(
o, # Any python object
return_str:bool = False , # True returns size in human-readable format (KB, MB, GB, ...). False in bytes.
decimals:int = 2 , # Number of decimals in the output
):
Call self as a function.
source
bytes2str
def bytes2str(
size_bytes:int , # Number of bytes
decimals:int = 2 , # Number of decimals in the output
)-> str :
Call self as a function.
a = np.random.rand(10 , 5 , 3 )
test_eq(get_size(a, True , 1 ), '1.2 KB' )
source
is_np_view
def is_np_view(
o, # a numpy array
):
Call self as a function.
a = np.array([1. , 2. , 3. ])
test_eq(is_np_view(a), False )
test_eq(is_np_view(a[1 :]), True )
source
is_dir
Call self as a function.
source
is_file
Call self as a function.
test_eq(is_file("002_utils.ipynb" ), True )
test_eq(is_file("utils.ipynb" ), False )
source
delete_all_in_dir
def delete_all_in_dir(
tgt_dir, exception:NoneType= None
):
Call self as a function.
source
reverse_dict
def reverse_dict(
dictionary
):
Call self as a function.
source
is_tuple
Call self as a function.
source
itemify
def itemify(
o:VAR_POSITIONAL, tup_id:NoneType= None
):
Call self as a function.
a = [1 , 2 , 3 ]
b = [4 , 5 , 6 ]
print (itemify(a, b))
test_eq(len (itemify(a, b)), len (a))
a = [1 , 2 , 3 ]
b = None
print (itemify(a, b))
test_eq(len (itemify(a, b)), len (a))
a = [1 , 2 , 3 ]
b = [4 , 5 , 6 ]
c = None
print (itemify(a, b, c))
test_eq(len (itemify(a, b, c)), len (a))
[(1, 4), (2, 5), (3, 6)]
[(1,), (2,), (3,)]
[(1, 4), (2, 5), (3, 6)]
source
ifelse
b if a is True else c
source
exists
Call self as a function.
source
isnone
Call self as a function.
a = np.array(3 )
test_eq(isnone(a), False )
test_eq(exists(a), True )
b = None
test_eq(isnone(b), True )
test_eq(exists(b), False )
source
test_eq_nan
test that a==b excluding nan values (valid for torch.Tensor and np.ndarray)
source
test_error
def test_error(
error, f, args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):
Call self as a function.
source
test_not_ok
def test_not_ok(
f, args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):
Call self as a function.
source
test_ok
def test_ok(
f, args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):
Call self as a function.
source
test_type
Call self as a function.
source
test_not_close
def test_not_close(
a, b, eps:float = 1e-05
):
test that a is within eps of b
source
is_not_close
def is_not_close(
a, b, eps:float = 1e-05
):
Is a within eps of b
source
assert_fn
def assert_fn(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):
Call self as a function.
source
test_gt
test that a>b
test_ok(test_gt, 5 , 4 )
test_not_ok(test_gt, 4 , 4 )
test_ok(test_ge, 4 , 4 )
test_not_ok(test_ge, 3 , 4 )
test_ok(test_lt, 3 , 4 )
test_not_ok(test_lt, 4 , 4 )
test_ok(test_le, 4 , 4 )
test_not_ok(test_le, 5 , 4 )
t = torch.rand(100 )
test_eq(t, t)
test_eq_nan(t, t)
source
stack_pad
def stack_pad(
o, padding_value:float = nan
):
Converts a an iterable into a numpy array using padding if necessary
source
stack
def stack(
o, axis:int = 0 , retain:bool = True
):
Call self as a function.
o = [[0 ,1 ,2 ], [4 ,5 ,6 ,7 ]]
test_eq(stack_pad(o).shape, (1 , 2 , 4 ))
test_eq(type (stack_pad(o)), np.ndarray)
test_eq(np.isnan(stack_pad(o)).sum (), 1 )
o = 3
print (stack_pad(o))
test_eq(stack_pad(o), np.array([[3. ]]))
o = [4 ,5 ]
print (stack_pad(o))
test_eq(stack_pad(o), np.array([[4. , 5. ]]))
o = [[0 ,1 ,2 ], [4 ,5 ,6 ,7 ]]
print (stack_pad(o))
o = np.array([0 , [1 ,2 ]], dtype= object )
print (stack_pad(o))
o = np.array([[[0 ], [10 , 20 ], [100 , 200 , 300 ]], [[0 , 1 , 2 , 3 ], [10 , 20 ], [100 ]]], dtype= object )
print (stack_pad(o))
o = np.array([0 , [10 , 20 ]], dtype= object )
print (stack_pad(o))
[[3.]]
[[4. 5.]]
[[[ 0. 1. 2. nan]
[ 4. 5. 6. 7.]]]
[[ 0. nan]
[ 1. 2.]]
[[[ 0. nan nan nan]
[ 10. 20. nan nan]
[100. 200. 300. nan]]
[[ 0. 1. 2. 3.]
[ 10. 20. nan nan]
[100. nan nan nan]]]
[[ 0. nan]
[10. 20.]]
a = np.random.rand(2 , 3 , 4 )
t = torch.from_numpy(a)
test_eq_type(stack(itemify(a, tup_id= 0 )), a)
test_eq_type(stack(itemify(t, tup_id= 0 )), t)
source
pad_sequences
def pad_sequences(
o, # Iterable object
maxlen:int = None , # Optional max length of the output. If None, max length of the longest individual sequence.
dtype:(< class 'str' > , < class 'type' > )= float64, # Type of the output sequences. To pad sequences with variable length strings, you can use object.
padding:str = 'pre' , # 'pre' or 'post' pad either before or after each sequence.
truncating:str = 'pre' , # 'pre' or 'post' remove values from sequences larger than maxlen, either at the beginning or at the end of the sequences.
padding_value:float = nan, # Value used for padding.
):
Transforms an iterable with sequences into a 3d numpy array using padding or truncating sequences if necessary
This function transforms a list (of length n_samples) of sequences into a 3d numpy array of shape:
[n_samples x n_vars x seq_len]
seq_len is either the maxlen argument if provided, or the length of the longest sequence in the list.
Sequences that are shorter than seq_len are padded with value until they are seq_len long.
Sequences longer than seq_len are truncated so that they fit the desired length.
The position where padding or truncation happens is determined by the arguments padding and truncating, respectively. Pre-padding or removing values from the beginning of the sequence is the default.
Input sequences to pad_sequences may be have 1, 2 or 3 dimensions:
# 1 dim
a1 = np.arange(6 )
a2 = np.arange(3 ) * 10
a3 = np.arange(2 ) * 100
o = [a1, a2, a3]
padded_o = pad_sequences(o, maxlen= 4 , dtype= np.float64, padding= 'post' , truncating= 'pre' , padding_value= np.nan)
test_eq(padded_o.shape, (3 , 1 , 4 ))
padded_o
array([[[ 2., 3., 4., 5.]],
[[ 0., 10., 20., nan]],
[[ 0., 100., nan, nan]]])
# 2 dim
a1 = np.arange(12 ).reshape(2 , 6 )
a2 = np.arange(6 ).reshape(2 , 3 ) * 10
a3 = np.arange(4 ).reshape(2 , 2 ) * 100
o = [a1, a2, a3]
padded_o = pad_sequences(o, maxlen= 4 , dtype= np.float64, padding= 'post' , truncating= 'pre' , padding_value= np.nan)
test_eq(padded_o.shape, (3 , 2 , 4 ))
padded_o
array([[[ 2., 3., 4., 5.],
[ 8., 9., 10., 11.]],
[[ 0., 10., 20., nan],
[ 30., 40., 50., nan]],
[[ 0., 100., nan, nan],
[200., 300., nan, nan]]])
# 3 dim
a1 = np.arange(10 ).reshape(1 , 2 , 5 )
a2 = np.arange(6 ).reshape(1 , 2 , 3 ) * 10
a3 = np.arange(4 ).reshape(1 , 2 , 2 ) * 100
o = [a1, a2, a3]
padded_o = pad_sequences(o, maxlen= None , dtype= np.float64, padding= 'pre' , truncating= 'pre' , padding_value= np.nan)
test_eq(padded_o.shape, (3 , 2 , 5 ))
padded_o
array([[[ 0., 1., 2., 3., 4.],
[ 5., 6., 7., 8., 9.]],
[[ nan, nan, 0., 10., 20.],
[ nan, nan, 30., 40., 50.]],
[[ nan, nan, nan, 0., 100.],
[ nan, nan, nan, 200., 300.]]])
# 3 dim
a1 = np.arange(10 ).reshape(1 , 2 , 5 )
a2 = np.arange(6 ).reshape(1 , 2 , 3 ) * 10
a3 = np.arange(4 ).reshape(1 , 2 , 2 ) * 100
o = [a1, a2, a3]
padded_o = pad_sequences(o, maxlen= 4 , dtype= np.float64, padding= 'pre' , truncating= 'pre' , padding_value= np.nan)
test_eq(padded_o.shape, (3 , 2 , 4 ))
padded_o
array([[[ 1., 2., 3., 4.],
[ 6., 7., 8., 9.]],
[[ nan, 0., 10., 20.],
[ nan, 30., 40., 50.]],
[[ nan, nan, 0., 100.],
[ nan, nan, 200., 300.]]])
# 3 dim
a1 = np.arange(10 ).reshape(1 , 2 , 5 )
a2 = np.arange(6 ).reshape(1 , 2 , 3 ) * 10
a3 = np.arange(4 ).reshape(1 , 2 , 2 ) * 100
o = [a1, a2, a3]
padded_o = pad_sequences(o, maxlen= 4 , dtype= np.float64, padding= 'post' , truncating= 'pre' , padding_value= np.nan)
test_eq(padded_o.shape, (3 , 2 , 4 ))
padded_o
array([[[ 1., 2., 3., 4.],
[ 6., 7., 8., 9.]],
[[ 0., 10., 20., nan],
[ 30., 40., 50., nan]],
[[ 0., 100., nan, nan],
[200., 300., nan, nan]]])
# 3 dim
a1 = np.arange(10 ).reshape(1 , 2 , 5 )
a2 = np.arange(6 ).reshape(1 , 2 , 3 ) * 10
a3 = np.arange(4 ).reshape(1 , 2 , 2 ) * 100
o = [a1, a2, a3]
padded_o = pad_sequences(o, maxlen= 4 , dtype= np.float64, padding= 'post' , truncating= 'post' , padding_value= np.nan)
test_eq(padded_o.shape, (3 , 2 , 4 ))
padded_o
array([[[ 0., 1., 2., 3.],
[ 5., 6., 7., 8.]],
[[ 0., 10., 20., nan],
[ 30., 40., 50., nan]],
[[ 0., 100., nan, nan],
[200., 300., nan, nan]]])
# iterable is a list of lists
a1 = np.arange(12 ).reshape(1 , 2 , 6 ).tolist()
a2 = (np.arange(6 ).reshape(1 , 2 , 3 ) * 10 ).tolist()
a3 = (np.arange(4 ).reshape(1 , 2 , 2 ) * 100 ).tolist()
o = [a1, a2, a3]
padded_o = pad_sequences(o, maxlen= None , dtype= np.float64, padding= 'post' , truncating= 'pre' , padding_value= np.nan)
test_eq(padded_o.shape, (3 , 2 , 6 ))
padded_o
array([[[ 0., 1., 2., 3., 4., 5.],
[ 6., 7., 8., 9., 10., 11.]],
[[ 0., 10., 20., nan, nan, nan],
[ 30., 40., 50., nan, nan, nan]],
[[ 0., 100., nan, nan, nan, nan],
[200., 300., nan, nan, nan, nan]]])
source
match_seq_len
def match_seq_len(
arrays:VAR_POSITIONAL
):
Call self as a function.
a = np.random.rand(10 , 5 , 8 )
b = np.random.rand(3 , 5 , 10 )
c, d = match_seq_len(a, b)
test_eq(c.shape[- 1 ], d.shape[- 1 ])
source
random_shuffle
def random_shuffle(
o, random_state:NoneType= None
):
Call self as a function.
a = np.arange(10 )
test_eq_type(random_shuffle(a, 1 ), np.array([2 , 9 , 6 , 4 , 0 , 3 , 1 , 7 , 8 , 5 ]))
t = torch.arange(10 )
test_eq_type(random_shuffle(t, 1 ), tensor([2 , 9 , 6 , 4 , 0 , 3 , 1 , 7 , 8 , 5 ]))
l = list (a)
test_eq(random_shuffle(l, 1 ), [2 , 9 , 6 , 4 , 0 , 3 , 1 , 7 , 8 , 5 ])
l2 = L(l)
test_eq_type(random_shuffle(l2, 1 ), L([2 , 9 , 6 , 4 , 0 , 3 , 1 , 7 , 8 , 5 ]))
source
cat2int
Call self as a function.
a = np.array(['b' , 'a' , 'a' , 'b' , 'a' , 'b' , 'a' ])
test_eq_type(cat2int(a), TensorCategory([1 , 0 , 0 , 1 , 0 , 1 , 0 ]))
source
cycle_dl_estimate
def cycle_dl_estimate(
dl, iters:int = 10
):
Call self as a function.
source
cycle_dl_to_device
def cycle_dl_to_device(
dl, show_progress_bar:bool = True
):
Call self as a function.
source
cycle_dl
def cycle_dl(
dl, show_progress_bar:bool = True
):
Call self as a function.
source
cache_data
def cache_data(
o, slice_len:int = 10000 , verbose:bool = False
):
Call self as a function.
source
get_func_defaults
def get_func_defaults(
f
):
Call self as a function.
source
get_idx_from_df_col_vals
def get_idx_from_df_col_vals(
df, col, val_list
):
Call self as a function.
source
get_sublist_idxs
def get_sublist_idxs(
aList, bList
):
Get idxs that when applied to aList will return bList. aList must contain all values in bList
x = np.array([3 , 5 , 7 , 1 , 9 , 8 , 6 , 2 ])
y = np.array([6 , 1 , 5 , 7 ])
idx = get_sublist_idxs(x, y)
test_eq(x[idx], y)
x = np.array([3 , 5 , 7 , 1 , 9 , 8 , 6 , 6 , 2 ])
y = np.array([6 , 1 , 5 , 7 , 5 ])
idx = get_sublist_idxs(x, y)
test_eq(x[idx], y)
source
flatten_list
Call self as a function.
source
display_pd_df
def display_pd_df(
df, max_rows:Union= False , max_columns:Union= False
):
Call self as a function.
old_max_rows, old_max_columns = pd.get_option('display.max_rows' ), pd.get_option('display.max_columns' )
df = pd.DataFrame(np.random.rand(70 , 25 ))
display_pd_df(df, max_rows= 2 , max_columns= 3 )
test_eq(old_max_rows, pd.get_option('display.max_rows' ))
test_eq(old_max_columns, pd.get_option('display.max_columns' ))
0
0.436034
...
0.231616
...
...
...
...
69
0.633051
...
0.051762
70 rows × 25 columns
source
tscore
Call self as a function.
source
kstest
def kstest(
data1, data2, alternative:str = 'two-sided' , mode:str = 'auto' , by_axis:NoneType= None
):
Performs the two-sample Kolmogorov-Smirnov test for goodness of fit.
Parameters data1, data2: Two arrays of sample observations assumed to be drawn from a continuous distributions. Sample sizes can be different. alternative: {‘two-sided’, ‘less’, ‘greater’}, optional. Defines the null and alternative hypotheses. Default is ‘two-sided’. mode: {‘auto’, ‘exact’, ‘asymp’}, optional. Defines the method used for calculating the p-value. by_axis (optional, int): for arrays with more than 1 dimension, the test will be run for each variable in that axis if by_axis is not None.
source
ttest
def ttest(
data1, data2, equal_var:bool = False
):
Calculates t-statistic and p-value based on 2 sample distributions
a = np.random.normal(0.5 , 1 , 100 )
b = np.random.normal(0.15 , .5 , 50 )
plt.hist(a, 50 )
plt.hist(b, 50 )
plt.show()
ttest(a,b)
a = np.random.normal(0.5 , 1 , (100 ,3 ))
b = np.random.normal(0.5 , 1 , (50 ,))
kstest(a,b)
(0.22333333333333333, 0.02452803315700394)
a = np.random.normal(0.5 , 1 , (100 ,3 ))
b = np.random.normal(0.15 , .5 , (50 ,))
kstest(a,b)
(0.31, 0.0004061333917852463)
data1 = np.random.normal(0 ,1 ,(100 , 5 , 3 ))
data2 = np.random.normal(0 ,2 ,(100 , 5 , 3 ))
kstest(data1, data2, by_axis= 1 )
([0.22,
0.16333333333333333,
0.16333333333333333,
0.18666666666666668,
0.21666666666666667],
[8.994053173844458e-07,
0.0006538374533623971,
0.0006538374533623971,
5.522790313356146e-05,
1.4007759411179028e-06])
a = np.random.normal(0.5 , 1 , 100 )
t = torch.normal(0.5 , 1 , (100 , ))
tscore(a), tscore(t)
(4.33309224863388, tensor(5.7798))
source
scc
Call self as a function.
source
pcc
Call self as a function.
source
remove_fn
def remove_fn(
fn, verbose:bool = False
):
Removes a file (fn) if exists
source
npsave
def npsave(
array_fn, array, verbose:bool = True
):
Call self as a function.
fn = 'data/remove_fn_test.npy'
a = np.zeros(1 )
npsave(fn, a)
del a
np.load(fn, mmap_mode= 'r+' )
remove_fn(fn, True )
remove_fn(fn, True )
data/remove_fn_test.npy does not exist
saving data/remove_fn_test.npy...
...data/remove_fn_test.npy saved
data/remove_fn_test.npy file removed
data/remove_fn_test.npy does not exist
source
permute_2D
def permute_2D(
array, axis:NoneType= None
):
Permute rows or columns in an array. This can be used, for example, in feature permutation
s = np.arange(100 * 50 ).reshape(100 , 50 )
test_eq(permute_2D(s, axis= 0 ).mean(0 ), s.mean(0 ))
test_ne(permute_2D(s, axis= 0 ), s)
test_eq(permute_2D(s, axis= 1 ).mean(1 ), s.mean(1 ))
test_ne(permute_2D(s, axis= 1 ), s)
test_ne(permute_2D(s), s)
source
random_half_normal_tensor
def random_half_normal_tensor(
shape:int = 1 , device:NoneType= None
):
Returns a tensor of a predefined shape between 0 and 1 with a half-normal distribution
source
random_normal_tensor
def random_normal_tensor(
shape:int = 1 , device:NoneType= None
):
Returns a tensor of a predefined shape between -1 and 1 with a normal distribution
source
random_half_normal
def random_half_normal(
):
Returns a number between 0 and 1 with a half-normal distribution
source
random_normal
Returns a number between -1 and 1 with a normal distribution
source
fig2buf
Call self as a function.
source
get_plot_fig
def get_plot_fig(
size:NoneType= None , dpi:int = 100
):
Call self as a function.
source
default_dpi
Call self as a function.
source
plot_scatter
def plot_scatter(
x, y, deg:int = 1
):
Call self as a function.
a = np.random.rand(100 )
b = np.random.rand(100 )** 2
plot_scatter(a, b)
source
get_idxs
def get_idxs(
o, aList
):
Call self as a function.
a = random_shuffle(np.arange(100 , 200 ))
b = np.random.choice(a, 10 , False )
idxs = get_idxs(a, b)
test_eq(a[idxs], b)
source
apply_cmap
def apply_cmap(
o, cmap
):
Call self as a function.
a = np.random.rand(16 , 1 , 40 , 50 )
s = L(a.shape)
s[1 ] = 3
test_eq(L(apply_cmap(a, 'viridis' ).shape), s)
s[0 ] = 1
a = np.random.rand(1 , 40 , 50 )
test_eq(L(apply_cmap(a, 'viridis' ).shape), s)
source
torch_tile
def torch_tile(
a, n_tile, dim:int = 0
):
Call self as a function.
test_eq(torch_tile(torch.arange(2 ), 3 ), tensor([0 , 1 , 0 , 1 , 0 , 1 ]))
source
to_tsfresh_df
Prepares a time series (Tensor/ np.ndarray) to be used as a tsfresh dataset to allow feature extraction
ts = torch.rand(16 , 3 , 20 )
a = to_tsfresh_df(ts)
ts = ts.numpy()
b = to_tsfresh_df(ts)
source
scorr
Call self as a function.
source
pcorr
Call self as a function.
source
torch_diff
def torch_diff(
t, lag:int = 1 , pad:bool = True , append:int = 0
):
Call self as a function.
t = torch.arange(24 ).reshape(2 ,3 ,4 )
test_eq(torch_diff(t, 1 )[..., 1 :].float ().mean(), 1. )
test_eq(torch_diff(t, 2 )[..., 2 :].float ().mean(), 2. )
source
torch_clamp
def torch_clamp(
o, min :NoneType= None , max :NoneType= None
):
Clamp torch.Tensor using 1 or multiple dimensions
source
get_percentile
def get_percentile(
o, percentile, axis:NoneType= None
):
Call self as a function.
source
clip_outliers
def clip_outliers(
o, axis:NoneType= None
):
Call self as a function.
source
get_outliers_IQR
def get_outliers_IQR(
o, axis:NoneType= None , quantile_range:tuple = (25.0 , 75.0 )
):
Call self as a function.
t = torch.randn(2 ,3 ,100 )
test_eq(type (get_outliers_IQR(t, - 1 )[0 ]), torch.Tensor)
a = t.numpy()
test_eq(type (get_outliers_IQR(a, - 1 )[0 ]), np.ndarray)
test_close(get_percentile(t, 25 ).numpy(), get_percentile(a, 25 ))
source
get_robustscale_params
def get_robustscale_params(
o, sel_vars:NoneType= None , not_sel_vars:NoneType= None , by_var:bool = True , percentiles:tuple = (25 , 75 ),
eps:float = 1e-06
):
Calculates median and inter-quartile range required to robust scaler inputs
a = np.random.rand(16 , 3 , 100 )
a[a> .8 ] = np.nan
median, IQR = get_robustscale_params(a, by_var= True , percentiles= (25 , 75 ))
a_scaled = (a - median) / IQR
test_eq(a.shape, a_scaled.shape)
test_eq(np.isnan(median).sum (),0 )
test_eq(np.isnan(IQR).sum (),0 )
test_eq(np.isnan(a), np.isnan(a_scaled))
source
torch_slice_by_dim
def torch_slice_by_dim(
t, index, dim:int =- 1 , kwargs:VAR_KEYWORD
):
Call self as a function.
t = torch.rand(5 , 3 )
index = torch.randint(0 , 3 , (5 , 1 ))
# index = [[0, 2], [0, 1], [1, 2], [0, 2], [0, 1]]
torch_slice_by_dim(t, index)
tensor([[0.5341],
[0.4543],
[0.0942],
[0.9645],
[0.0405]])
source
torch_nanstd
def torch_nanstd(
o, dim:NoneType= None , keepdim:bool = False
):
There’s currently no torch.nanstd function
source
torch_nanmean
def torch_nanmean(
o, dim:NoneType= None , keepdim:bool = False
):
There’s currently no torch.nanmean function
t = torch.rand(1000 )
t[:100 ] = float ('nan' )
assert torch_nanmean(t).item() > 0
source
concat
def concat(
ls:VAR_POSITIONAL, dim:int = 0
):
Concatenate tensors, arrays, lists, or tuples by a dimension
source
reduce_memory_usage
def reduce_memory_usage(
df
):
Call self as a function.
source
cls_name
Call self as a function.
test_eq(cls_name(timer), 'Timer' )
source
rotate_axis2
def rotate_axis2(
o, steps:int = 1
):
Call self as a function.
source
rotate_axis1
def rotate_axis1(
o, steps:int = 1
):
Call self as a function.
source
rotate_axis0
def rotate_axis0(
o, steps:int = 1
):
Call self as a function.
source
random_roll3d
def random_roll3d(
o, axis:tuple = (), replace:bool = False
):
Randomly rolls a 3D object along the indicated axes This solution is based on https://stackoverflow.com/questions/20360675/roll-rows-of-a-matrix-independently
source
random_roll2d
def random_roll2d(
o, axis:tuple = (), replace:bool = False
):
Rolls a 2D object on the indicated axis This solution is based on https://stackoverflow.com/questions/20360675/roll-rows-of-a-matrix-independently
source
roll3d
def roll3d(
o, roll1:Union= None , roll2:Union= None , roll3:Union= None
):
Rolls a 3D object on the indicated axis This solution is based on https://stackoverflow.com/questions/20360675/roll-rows-of-a-matrix-independently
source
roll2d
def roll2d(
o, roll1:Union= None , roll2:Union= None
):
Rolls a 2D object on the indicated axis This solution is based on https://stackoverflow.com/questions/20360675/roll-rows-of-a-matrix-independently
a = np.tile(np.arange(10 ), 3 ).reshape(3 , 10 ) * np.array([1 , 10 , 100 ]).reshape(- 1 , 1 )
a
array([[ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
[ 0, 10, 20, 30, 40, 50, 60, 70, 80, 90],
[ 0, 100, 200, 300, 400, 500, 600, 700, 800, 900]])
roll2d(a, roll1= [2 , 1 , 0 ])
array([[ 0, 100, 200, 300, 400, 500, 600, 700, 800, 900],
[ 0, 10, 20, 30, 40, 50, 60, 70, 80, 90],
[ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]])
array([[ 7, 8, 9, 0, 1, 2, 3, 4, 5, 6],
[ 70, 80, 90, 0, 10, 20, 30, 40, 50, 60],
[700, 800, 900, 0, 100, 200, 300, 400, 500, 600]])
o = torch.arange(24 ).reshape(2 ,3 ,4 )
test_eq(rotate_axis0(o)[1 ], o[0 ])
test_eq(rotate_axis1(o)[:,1 ], o[:,0 ])
test_eq(rotate_axis2(o)[...,1 ], o[...,0 ])
source
chunks_calculator
def chunks_calculator(
shape, dtype:str = 'float32' , n_bytes:int = 1073741824
):
*Function to calculate chunks for a given size of n_bytes (default = 1024**3 == 1GB).* It guarantees > 50% of the chunk will be filled
shape = (1_000 , 10 , 1000 )
dtype = 'float32'
test_eq(chunks_calculator(shape, dtype), False )
shape = (54684 , 10 , 1000 )
dtype = 'float32'
test_eq(chunks_calculator(shape, dtype), (27342 , - 1 , - 1 ))
source
is_memory_shared
def is_memory_shared(
a, b
):
Check if 2 array-like objects share memory
a = np.random.rand(2 ,3 ,4 )
t1 = torch.from_numpy(a)
test_eq(is_memory_shared(a, t1), True )
a = np.random.rand(2 ,3 ,4 )
t2 = torch.as_tensor(a)
test_eq(is_memory_shared(a, t2), True )
a = np.random.rand(2 ,3 ,4 )
t3 = torch.tensor(a)
test_eq(is_memory_shared(a, t3), False )
source
assign_in_chunks
def assign_in_chunks(
a, b, chunksize:str = 'auto' , inplace:bool = True , verbose:bool = True
):
Assigns values in b to an array-like object a using chunks to avoid memory overload. The resulting a retains it’s dtype and share it’s memory. a: array-like object b: may be an integer, float, str, ‘rand’ (for random data), or another array like object. chunksize: is the size of chunks. If ‘auto’ chunks will have around 1GB each.
a = np.random.rand(10 ,3 ,4 ).astype('float32' )
a_dtype = a.dtype
a_id = id (a)
b = np.random.rand(10 ,3 ,4 ).astype('float64' )
assign_in_chunks(a, b, chunksize= 2 , inplace= True , verbose= True )
test_close(a, b)
test_eq(a.dtype, a_dtype)
test_eq(id (a), a_id)
a = np.random.rand(10 ,3 ,4 ).astype('float32' )
a_dtype = a.dtype
a_id = id (a)
b = 1
assign_in_chunks(a, b, chunksize= 2 , inplace= True , verbose= True )
test_eq(a, np.ones_like(a).astype(a.dtype))
test_eq(a.dtype, a_dtype)
test_eq(id (a), a_id)
a = np.random.rand(10 ,3 ,4 ).astype('float32' )
a_dtype = a.dtype
a_id = id (a)
b = 0.5
assign_in_chunks(a, b, chunksize= 2 , inplace= True , verbose= True )
test_eq(a.dtype, a_dtype)
test_eq(id (a), a_id)
a = np.random.rand(10 ,3 ,4 ).astype('float32' )
a_dtype = a.dtype
a_id = id (a)
b = 'rand'
assign_in_chunks(a, b, chunksize= 2 , inplace= True , verbose= True )
test_eq(a.dtype, a_dtype)
test_eq(id (a), a_id)
a = np.random.rand(10 ,3 ,4 ).astype('float32' )
b = np.random.rand(10 ,3 ,4 ).astype('float64' )
c = assign_in_chunks(a, b, chunksize= 2 , inplace= False , verbose= True )
test_close(c, b)
test_eq(a.dtype, c.dtype)
test_eq(is_memory_shared(a, c), True )
a = np.random.rand(10 ,3 ,4 ).astype('float32' )
b = 1
c = assign_in_chunks(a, b, chunksize= 2 , inplace= False , verbose= True )
test_eq(a, np.ones_like(a).astype(a.dtype))
test_eq(a.dtype, c.dtype)
test_eq(is_memory_shared(a, c), True )
a = np.random.rand(10 ,3 ,4 ).astype('float32' )
b = 0.5
c = assign_in_chunks(a, b, chunksize= 2 , inplace= False , verbose= True )
test_eq(a.dtype, c.dtype)
test_eq(is_memory_shared(a, c), True )
a = np.random.rand(10 ,3 ,4 ).astype('float32' )
b = 'rand'
c = assign_in_chunks(a, b, chunksize= 2 , inplace= False , verbose= True )
test_eq(a.dtype, c.dtype)
test_eq(is_memory_shared(a, c), True )
source
create_array
def create_array(
shape, fname:NoneType= None , path:str = './data' , on_disk:bool = True , dtype:str = 'float32' , mode:str = 'r+' ,
fill_value:str = 'rand' , chunksize:str = 'auto' , verbose:bool = True , kwargs:VAR_KEYWORD
):
mode: ‘r’: Open existing file for reading only. ‘r+’: Open existing file for reading and writing. ‘w+’: Create or overwrite existing file for reading and writing. ‘c’: Copy-on-write: assignments affect data in memory, but changes are not saved to disk. The file on disk is read-only. fill_value: ‘rand’ (for random numbers), int or float chunksize = ‘auto’ to calculate chunks of 1GB, or any integer (for a given number of samples)
fname = 'X_on_disk'
shape = (100 , 10 , 10 )
X = create_array(shape, fname, on_disk= True , mode= 'r+' , chunksize= 25 , verbose= False )
test_eq(isinstance (X, np.memmap), True )
test_eq(X.shape, shape)
test_eq(X.dtype, np.dtype('float32' ))
test_ne(abs (X).sum (), 0 )
filename = X.filename
del X
os.remove(filename)
fname = 'X_on_disk_empty'
shape = (100 , 10 , 10 )
X = create_empty_array(shape, fname, on_disk= True , mode= 'r+' , dtype= 'float32' )
test_eq(isinstance (X, np.memmap), True )
test_eq(abs (X).sum (), 0 )
test_eq(X.shape, shape)
test_eq(X.dtype, np.dtype('float32' ))
filename = X.filename
del X
os.remove(filename)
source
np_load_compressed
def np_load_compressed(
fname:NoneType= None , path:str = './data' , kwargs:VAR_KEYWORD
):
Call self as a function.
source
np_save_compressed
def np_save_compressed(
arr, fname:NoneType= None , path:str = './data' , verbose:bool = False , kwargs:VAR_KEYWORD
):
Call self as a function.
X1 = np.random.rand(10 )
np_save_compressed(X1, 'X_comp' , path= './data' )
X2 = np_load_compressed('X_comp' )
test_eq(X1, X2)
source
np2memmap
def np2memmap(
arr, fname:NoneType= None , path:str = './data' , dtype:str = 'float32' , mode:str = 'c' , kwargs:VAR_KEYWORD
):
Function that turns an ndarray into a memmap ndarray mode: ‘r’: Open existing file for reading only. ‘r+’: Open existing file for reading and writing. ‘w+’: Create or overwrite existing file for reading and writing. ‘c’: Copy-on-write: assignments affect data in memory, but changes are not saved to disk. The file on disk is read-only.
X1 = np.random.rand(10 )
X2 = np2memmap(X1, 'X1_test' )
test_eq(X1, X2)
test_ne(type (X1), type (X2))
source
torch_mean_groupby
def torch_mean_groupby(
o, idxs
):
Computes torch mean along axis 0 grouped by the idxs. Need to ensure that idxs have the same order as o
o = torch.arange(6 * 2 * 3 ).reshape(6 , 2 , 3 ).float ()
idxs = np.array([[0 ,1 ,2 ,3 ], [2 ,3 ]], dtype= object )
output = torch_mean_groupby(o, idxs)
test_eq(o[:2 ], output[:2 ])
test_eq(o[2 :4 ].mean(0 ), output[2 ])
test_eq(o[4 :6 ].mean(0 ), output[3 ])
source
torch_flip
def torch_flip(
t, dims:int =- 1
):
Call self as a function.
t = torch.randn(2 , 3 , 4 )
test_eq(torch.flip(t, (2 ,)), torch_flip(t, dims=- 1 ))
source
torch_masked_to_num
def torch_masked_to_num(
o, mask, num:int = 0 , inplace:bool = False
):
Call self as a function.
source
torch_nan_to_num
def torch_nan_to_num(
o, num:int = 0 , inplace:bool = False
):
Call self as a function.
x = torch.rand(2 , 4 , 6 )
x[:, :3 ][x[:, :3 ] < .5 ] = np.nan
nan_values = torch.isnan(x).sum ()
y = torch_nan_to_num(x[:, :3 ], inplace= False )
test_eq(torch.isnan(y).sum (), 0 )
test_eq(torch.isnan(x).sum (), nan_values)
torch_nan_to_num(x[:, :3 ], inplace= True )
test_eq(torch.isnan(x).sum (), 0 )
x = torch.rand(2 , 4 , 6 )
mask = x[:, :3 ] > .5
x[:, :3 ] = torch_masked_to_num(x[:, :3 ], mask, num= 0 , inplace= False )
test_eq(x[:, :3 ][mask].sum (), 0 )
x = torch.rand(2 , 4 , 6 )
mask = x[:, :3 ] > .5
torch_masked_to_num(x[:, :3 ], mask, num= 0 , inplace= True )
test_eq(x[:, :3 ][mask].sum (), 0 )
source
mpl_trend
def mpl_trend(
x, y, deg:int = 1
):
Call self as a function.
x = np.sort(np.random.randint(0 , 100 , 100 )/ 10 )
y = np.random.rand(100 ) + np.linspace(0 , 10 , 100 )
trend = mpl_trend(x, y)
plt.scatter(x, y)
plt.plot(x, trend, 'r' )
plt.show()
source
array2digits
def array2digits(
o, n_digits:NoneType= None , normalize:bool = True
):
Call self as a function.
source
int2digits
def int2digits(
o, n_digits:NoneType= None , normalize:bool = True
):
Call self as a function.
o = - 9645
test_eq(int2digits(o, 6 ), np.array([ 0 , 0 , - .9 , - .6 , - .4 , - .5 ]))
a = np.random.randint(- 1000 , 1000 , 10 )
test_eq(array2digits(a,5 ).shape, (10 ,5 ))
source
sincos_encoding
def sincos_encoding(
seq_len, device:NoneType= None , to_np:bool = False
):
Call self as a function.
sin, cos = sincos_encoding(100 )
plt.plot(sin.cpu().numpy())
plt.plot(cos.cpu().numpy())
plt.show()
source
linear_encoding
def linear_encoding(
seq_len, device:NoneType= None , to_np:bool = False , lin_range:tuple = (- 1 , 1 )
):
Call self as a function.
lin = linear_encoding(100 )
plt.plot(lin.cpu().numpy())
plt.show()
source
encode_positions
def encode_positions(
pos_arr, min_val:NoneType= None , max_val:NoneType= None , linear:bool = False , lin_range:tuple = (- 1 , 1 )
):
Encodes an array with positions using a linear or sincos methods
n_samples = 10
length = 500
_a = []
for i in range (n_samples):
a = np.arange(- 4000 , 4000 , 10 )
mask = np.random.rand(len (a)) > .5
a = a[mask]
a = np.concatenate([a, np.array([np.nan] * (length - len (a)))])
_a.append(a.reshape(- 1 ,1 ))
a = np.concatenate(_a, - 1 ).transpose(1 ,0 )
sin, cos = encode_positions(a, linear= False )
test_eq(a.shape, (n_samples, length))
test_eq(sin.shape, (n_samples, length))
test_eq(cos.shape, (n_samples, length))
plt.plot(sin.T)
plt.plot(cos.T)
plt.xlim(0 , 500 )
plt.show()
n_samples = 10
length = 500
_a = []
for i in range (n_samples):
a = np.arange(- 4000 , 4000 , 10 )
mask = np.random.rand(len (a)) > .5
a = a[mask]
a = np.concatenate([a, np.array([np.nan] * (length - len (a)))])
_a.append(a.reshape(- 1 ,1 ))
a = np.concatenate(_a, - 1 ).transpose(1 ,0 )
lin = encode_positions(a, linear= True )
test_eq(a.shape, (n_samples, length))
test_eq(lin.shape, (n_samples, length))
plt.plot(lin.T)
plt.xlim(0 , 500 )
plt.show()
source
sort_generator
def sort_generator(
generator, bs
):
Call self as a function.
generator = (i for i in np.random.permutation(np.arange(1000000 )).tolist())
l = list (sort_generator(generator, 512 ))
test_eq(l[:512 ], sorted (l[:512 ]))
source
get_subset_dict
def get_subset_dict(
d, keys
):
Call self as a function.
keys = string.ascii_lowercase
values = np.arange(len (keys))
d = {k:v for k,v in zip (keys,values)}
test_eq(get_subset_dict(d, ['a' , 'k' , 'j' , 'e' ]), {'a' : 0 , 'k' : 10 , 'j' : 9 , 'e' : 4 })
source
remove_dir
def remove_dir(
directory, verbose:bool = True
):
Call self as a function.
source
create_dir
def create_dir(
directory, verbose:bool = True
):
Call self as a function.
path = "wandb3/wandb2/wandb"
create_dir(path)
assert Path(path).exists()
paths = ["wandb3/wandb2/wandb" , "wandb3/wandb2" , "wandb" ]
remove_dir(paths)
for p in paths:
assert not Path(p).exists()
path = "wandb3"
assert Path(path).exists()
remove_dir(path)
assert not Path(path).exists()
wandb3/wandb2/wandb directory created.
wandb3/wandb2/wandb directory removed.
wandb3/wandb2 directory removed.
wandb directory doesn't exist.
wandb3 directory removed.
a = 5
def fn(b): return a + b
Writing ./test/mod_dev.py
fname = "./test/mod_dev.py"
while True :
if fname[0 ] in "/ ." : fname = fname.split(fname[0 ], 1 )[1 ]
else : break
if '/' in fname and fname.rsplit('/' , 1 )[0 ] not in sys.path: sys.path.append(fname.rsplit('/' , 1 )[0 ])
mod = import_file_as_module(fname)
test_eq(mod.fn(3 ), 8 )
sys.path = sys.path[:- 1 ]
remove_dir('./test/' )
source
named_partial
def named_partial(
name, func, args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):
Create a partial function with a name
def add_1(x, add= 1 ): return x+ add
test_eq(add_1(1 ), 2 )
add_2 = partial(add_1, add= 2 )
test_eq(add_2(2 ), 4 )
test_ne(str (add_2), "add_2" )
add_2 = named_partial('add_2' , add_1, add= 2 )
test_eq(add_2(2 ), 4 )
test_eq(str (add_2), "add_2" )
class _A():
def __init__ (self , add= 1 ): self .add = add
def __call__ (self , x): return x + self .add
test_eq(_A()(1 ), 2 )
_A2 = partial(_A, add= 2 )
test_eq(_A2()(1 ), 3 )
test_ne(str (_A2), '_A2' )
_A2 = named_partial('_A2' , _A, add= 2 )
test_eq(_A2()(1 ), 3 )
test_eq(str (_A2), '_A2' )
source
dict2attrdict
def dict2attrdict(
d:dict , # a dict
):
Converts a (nested) dict to an AttrDict.
source
attrdict2dict
def attrdict2dict(
d:dict , # a dict
):
Converts a (nested) AttrDict dict to a dict.
# Test attrdict2dict
d = AttrDict({'a' : 1 , 'b' : AttrDict({'c' : 2 , 'd' : 3 })})
test_eq(attrdict2dict(d), {'a' : 1 , 'b' : {'c' : 2 , 'd' : 3 }})
# Test dict2attrdict
d = {'a' : 1 , 'b' : {'c' : 2 , 'd' : 3 }}
test_eq(dict2attrdict(d), AttrDict({'a' : 1 , 'b' : AttrDict({'c' : 2 , 'd' : 3 })}))
source
get_config
def get_config(
file_path
):
Gets a config from a yaml file.
source
yaml2dict
def yaml2dict(
file_path, # a path to a yaml file
attrdict:bool = True , # if True, convert output to AttrDict
):
Converts a yaml file to a dict (optionally AttrDict).
source
dict2yaml
def dict2yaml(
d, # a dict
file_path, # a path to a yaml file
sort_keys:bool = False , # if True, sort the keys
):
Converts a dict to a yaml file.
program: wandb_scripts/ train_script.py # (required) Path to training script.
method: bayes # (required) Specify the search strategy: grid, random or bayes
parameters: # (required) Specify parameters bounds to search.
bs:
values: [32 , 64 , 128 ]
depth:
values: [3 , 6 , 9 , 12 ]
fc_dropout:
distribution: uniform
min : 0.
max : 0.5
lr_max:
values: [0.001 , 0.003 , 0.01 , 0.03 , 0.1 ]
n_epoch:
values: [10 , 15 , 20 ]
nb_filters:
values: [32 , 64 , 128 ]
name: LSST_sweep_01
metric:
name: accuracy # This must match one of the metrics in the training script
goal: maximize
early_terminate:
type : hyperband
min_iter: 3
project: LSST_wandb_hpo
Writing sweep_config.yaml
fname = "sweep_config.yaml"
sweep_config = yaml2dict(fname)
print (sweep_config)
test_eq(sweep_config.method, 'bayes' )
test_eq(sweep_config['metric' ], {'name' : 'accuracy' , 'goal' : 'maximize' })
os.remove(fname)
{'program': 'wandb_scripts/train_script.py', 'method': 'bayes', 'parameters': {'bs': {'values': [32, 64, 128]}, 'depth': {'values': [3, 6, 9, 12]}, 'fc_dropout': {'distribution': 'uniform', 'min': 0.0, 'max': 0.5}, 'lr_max': {'values': [0.001, 0.003, 0.01, 0.03, 0.1]}, 'n_epoch': {'values': [10, 15, 20]}, 'nb_filters': {'values': [32, 64, 128]}}, 'name': 'LSST_sweep_01', 'metric': {'name': 'accuracy', 'goal': 'maximize'}, 'early_terminate': {'type': 'hyperband', 'min_iter': 3}, 'project': 'LSST_wandb_hpo'}
source
get_cat_cols
Call self as a function.
source
get_cont_cols
Call self as a function.
source
str2index
Call self as a function.
source
str2list
Call self as a function.
source
map_array
def map_array(
arr, dim:int = 1
):
Call self as a function.
source
get_mapping
def get_mapping(
arr, dim:int = 1 , return_counts:bool = False
):
Call self as a function.
a = np.asarray(alphabet[np.random.randint(0 ,15 ,30 )]).reshape(10 ,3 )
b = np.asarray(ALPHABET[np.random.randint(6 ,10 ,30 )]).reshape(10 ,3 )
x = concat(a,b,dim= 1 )
maps, counts = get_mapping(x, dim= 1 , return_counts= True )
x, maps, counts
(array([['d', 'k', 'l', 'I', 'I', 'G'],
['g', 'i', 'l', 'I', 'J', 'I'],
['e', 'l', 'n', 'G', 'H', 'I'],
['e', 'l', 'a', 'I', 'H', 'G'],
['k', 'l', 'b', 'I', 'I', 'J'],
['c', 'f', 'k', 'I', 'H', 'I'],
['e', 'j', 'f', 'I', 'H', 'J'],
['n', 'd', 'g', 'G', 'J', 'J'],
['d', 'f', 'a', 'I', 'H', 'H'],
['i', 'c', 'm', 'J', 'G', 'G']], dtype='<U1'),
[(#7) ['c','d','e','g','i','k','n'],
(#7) ['c','d','f','i','j','k','l'],
(#8) ['a','b','f','g','k','l','m','n'],
(#3) ['G','I','J'],
(#4) ['G','H','I','J'],
(#4) ['G','H','I','J']],
[7, 7, 8, 3, 4, 4])
x = np.asarray(alphabet[np.random.randint(0 ,15 ,30 )]).reshape(10 ,3 )
x, map_array(x), map_array(x, 1 )
(array([['i', 'm', 'd'],
['h', 'm', 'g'],
['i', 'g', 'd'],
['k', 'm', 'n'],
['n', 'j', 'l'],
['n', 'l', 'i'],
['f', 'c', 'k'],
['i', 'm', 'a'],
['l', 'i', 'f'],
['k', 'o', 'g']], dtype='<U1'),
array([[2, 5, 1],
[1, 5, 3],
[2, 1, 1],
[3, 5, 7],
[5, 3, 6],
[5, 4, 4],
[0, 0, 5],
[2, 5, 0],
[4, 2, 2],
[3, 6, 3]]),
array([[2, 5, 1],
[1, 5, 3],
[2, 1, 1],
[3, 5, 7],
[5, 3, 6],
[5, 4, 4],
[0, 0, 5],
[2, 5, 0],
[4, 2, 2],
[3, 6, 3]]))
source
log_tfm
def log_tfm(
o, inplace:bool = False
):
Log transforms an array-like object with positive and/or negative values
arr = np.asarray([- 1000 , - 100 , - 10 , - 1 , 0 , 1 , 10 , 100 , 1000 ]).astype(float )
plt.plot(arr, log_tfm(arr, False ))
plt.show()
t = tensor([- 1000 , - 100 , - 10 , - 1 , 0 , 1 , 10 , 100 , 1000 ]).float ()
plt.plot(t, log_tfm(t, False ))
plt.show()
source
to_sincos_time
def to_sincos_time(
arr, max_value
):
Call self as a function.
arr = np.sort(np.random.rand(100 ) * 5 )
arr_sin, arr_cos = to_sincos_time(arr, 5 )
plt.scatter(arr, arr_sin)
plt.scatter(arr, arr_cos)
plt.show()
source
plot_feature_dist
def plot_feature_dist(
X, percentiles:list = [0 , 0.1 , 0.5 , 1 , 5 , 10 , 25 , 50 , 75 , 90 , 95 , 99 , 99.5 , 99.9 , 100 ]
):
Call self as a function.
arr = np.random.rand(10 , 3 , 100 )
plot_feature_dist(arr, percentiles= [0 ,0.1 ,0.5 ,1 ,5 ,10 ,25 ,50 ,75 ,90 ,95 ,99 ,99.5 ,99.9 ,100 ])
source
rolling_moving_average
def rolling_moving_average(
o, window:int = 2
):
Call self as a function.
a = np.arange(60 ).reshape(2 ,3 ,10 ).astype(float )
t = torch.arange(60 ).reshape(2 ,3 ,10 ).float ()
test_close(rolling_moving_average(a, window= 3 ), rolling_moving_average(t, window= 3 ).numpy())
print (t)
print (rolling_moving_average(t, window= 3 ))
tensor([[[ 0., 1., 2., 3., 4., 5., 6., 7., 8., 9.],
[10., 11., 12., 13., 14., 15., 16., 17., 18., 19.],
[20., 21., 22., 23., 24., 25., 26., 27., 28., 29.]],
[[30., 31., 32., 33., 34., 35., 36., 37., 38., 39.],
[40., 41., 42., 43., 44., 45., 46., 47., 48., 49.],
[50., 51., 52., 53., 54., 55., 56., 57., 58., 59.]]])
tensor([[[ 0.0000, 0.5000, 1.0000, 2.0000, 3.0000, 4.0000, 5.0000,
6.0000, 7.0000, 8.0000],
[10.0000, 10.5000, 11.0000, 12.0000, 13.0000, 14.0000, 15.0000,
16.0000, 17.0000, 18.0000],
[20.0000, 20.5000, 21.0000, 22.0000, 23.0000, 24.0000, 25.0000,
26.0000, 27.0000, 28.0000]],
[[30.0000, 30.5000, 31.0000, 32.0000, 33.0000, 34.0000, 35.0000,
36.0000, 37.0000, 38.0000],
[40.0000, 40.5000, 41.0000, 42.0000, 43.0000, 44.0000, 45.0000,
46.0000, 47.0000, 48.0000],
[50.0000, 50.5000, 51.0000, 52.0000, 53.0000, 54.0000, 55.0000,
56.0000, 57.0000, 58.0000]]])
source
fbfill_sequence
def fbfill_sequence(
o
):
Forward and backward fills an array-like object alongside sequence dimension
source
bfill_sequence
Backward fills an array-like object alongside sequence dimension
source
ffill_sequence
Forward fills an array-like object alongside sequence dimension
a = np.arange(80 ).reshape(2 , 4 , 10 ).astype(float )
mask = np.random.rand(* a.shape)
a[mask > .8 ] = np.nan
t = torch.from_numpy(a)
t
tensor([[[ 0., 1., 2., 3., 4., 5., 6., 7., 8., nan],
[10., 11., nan, nan, 14., 15., nan, 17., nan, 19.],
[20., 21., 22., 23., nan, 25., 26., 27., 28., 29.],
[30., 31., 32., 33., nan, 35., 36., 37., 38., 39.]],
[[40., 41., 42., 43., 44., 45., 46., 47., nan, 49.],
[nan, 51., nan, 53., 54., 55., nan, 57., 58., 59.],
[60., 61., 62., 63., 64., nan, nan, 67., 68., 69.],
[70., nan, 72., 73., 74., 75., 76., nan, 78., 79.]]],
dtype=torch.float64)
# forward fill
filled_a = ffill_sequence(a)
print (filled_a)
m = np.isnan(filled_a)
test_eq(filled_a[~ m], ffill_sequence(t).numpy()[~ m])
[[[ 0. 1. 2. 3. 4. 5. 6. 7. 8. 8.]
[10. 11. 11. 11. 14. 15. 15. 17. 17. 19.]
[20. 21. 22. 23. 23. 25. 26. 27. 28. 29.]
[30. 31. 32. 33. 33. 35. 36. 37. 38. 39.]]
[[40. 41. 42. 43. 44. 45. 46. 47. 47. 49.]
[nan 51. 51. 53. 54. 55. 55. 57. 58. 59.]
[60. 61. 62. 63. 64. 64. 64. 67. 68. 69.]
[70. 70. 72. 73. 74. 75. 76. 76. 78. 79.]]]
# backward fill
filled_a = bfill_sequence(a)
print (filled_a)
m = np.isnan(filled_a)
test_eq(filled_a[~ m], bfill_sequence(t).numpy()[~ m])
[[[ 0. 1. 2. 3. 4. 5. 6. 7. 8. nan]
[10. 11. 14. 14. 14. 15. 17. 17. 19. 19.]
[20. 21. 22. 23. 25. 25. 26. 27. 28. 29.]
[30. 31. 32. 33. 35. 35. 36. 37. 38. 39.]]
[[40. 41. 42. 43. 44. 45. 46. 47. 49. 49.]
[51. 51. 53. 53. 54. 55. 57. 57. 58. 59.]
[60. 61. 62. 63. 64. 67. 67. 67. 68. 69.]
[70. 72. 72. 73. 74. 75. 76. 78. 78. 79.]]]
# forward & backward fill
filled_a = fbfill_sequence(a)
print (filled_a)
m = np.isnan(filled_a)
test_eq(filled_a[~ m], fbfill_sequence(t).numpy()[~ m])
[[[ 0. 1. 2. 3. 4. 5. 6. 7. 8. 8.]
[10. 11. 11. 11. 14. 15. 15. 17. 17. 19.]
[20. 21. 22. 23. 23. 25. 26. 27. 28. 29.]
[30. 31. 32. 33. 33. 35. 36. 37. 38. 39.]]
[[40. 41. 42. 43. 44. 45. 46. 47. 47. 49.]
[51. 51. 51. 53. 54. 55. 55. 57. 58. 59.]
[60. 61. 62. 63. 64. 64. 64. 67. 68. 69.]
[70. 70. 72. 73. 74. 75. 76. 76. 78. 79.]]]
source
dummify
def dummify(
o:Union, by_var:bool = True , inplace:bool = False , skip:Optional= None , random_state:NoneType= None
):
Shuffles an array-like object along all dimensions or dimension 1 (variables) if by_var is True.
arr = np.random.rand(2 ,3 ,10 )
arr_original = arr.copy()
dummy_arr = dummify(arr)
test_ne(arr_original, dummy_arr)
test_eq(arr_original, arr)
dummify(arr, inplace= True )
test_ne(arr_original, arr)
t = torch.rand(2 ,3 ,10 )
t_original = t.clone()
dummy_t = dummify(t)
test_ne(t_original, dummy_t)
test_eq(t_original, t)
dummify(t, inplace= True )
test_ne(t_original, t)
source
shuffle_along_axis
def shuffle_along_axis(
o, axis:int =- 1 , random_state:NoneType= None
):
Call self as a function.
X = np.arange(60 ).reshape(2 ,3 ,10 ) + 10
X_shuffled = shuffle_along_axis(X,(0 , - 1 ), random_state= 23 )
test_eq(X_shuffled, np.array([[[13 , 15 , 41 , 14 , 40 , 49 , 18 , 42 , 47 , 46 ],
[28 , 56 , 53 , 50 , 52 , 25 , 24 , 57 , 51 , 59 ],
[34 , 30 , 38 , 35 , 69 , 66 , 63 , 67 , 61 , 62 ]],
[[19 , 10 , 11 , 16 , 43 , 12 , 17 , 48 , 45 , 44 ],
[23 , 20 , 26 , 22 , 21 , 27 , 58 , 29 , 54 , 55 ],
[36 , 31 , 39 , 60 , 33 , 68 , 37 , 32 , 65 , 64 ]]]))
source
analyze_array
def analyze_array(
o, bins:int = 100 , density:bool = False , feature_names:NoneType= None , clip_outliers_plot:bool = False ,
quantile_range:tuple = (25.0 , 75.0 ), percentiles:list = [1 , 25 , 50 , 75 , 99 ], text_len:int = 12 , figsize:tuple = (10 , 6 )
):
Call self as a function.
source
analyze_feature
def analyze_feature(
feature, bins:int = 100 , density:bool = False , feature_name:NoneType= None , clip_outliers_plot:bool = False ,
quantile_range:tuple = (25.0 , 75.0 ), percentiles:list = [1 , 25 , 50 , 75 , 99 ], text_len:int = 12 , figsize:tuple = (10 , 6 )
):
Call self as a function.
x = np.random.normal(size= (1000 ))
analyze_array(x)
array shape: (1000,)
dtype: float64
nan values: 0.0%
max: 3.581094060980321
1: -2.1615590829115185
25: -0.5910961139851849
50: -0.002247946765973052
75: 0.6259274030927355
99: 2.3412961380708084
min: -2.9413736207935037
outlier min: -2.416631389602066
outlier max: 2.4514626787096163
outliers: 1.3%
mean: 0.0252125277963861
std: 0.946955486669799
normal dist: True
x1 = np.random.normal(size= (1000 ,2 ))
x2 = np.random.normal(3 , 5 , size= (1000 ,2 ))
x = x1 + x2
analyze_array(x)
array shape: (1000, 2)
0 feature: 0
dtype: float64
nan values: 0.0%
max: 20.323075761234193
1: -8.260661592413742
25: -0.6268118569038604
50: 2.7491159998190335
75: 6.1659732833324234
99: 15.387037197243288
min: -13.122296090020368
outlier min: -10.815989567258287
outlier max: 16.35515099368685
outliers: 0.9%
mean: 2.9347218553275445
std: 5.134940196769919
normal dist: True
1 feature: 1
dtype: float64
nan values: 0.0%
max: 19.86661808715871
1: -8.727124941895372
25: -0.45908489661153007
50: 2.875134866985423
75: 6.288434737224429
99: 14.424046274543118
min: -10.963913297285615
outlier min: -10.58036434736547
outlier max: 16.409714187978366
outliers: 0.6%
mean: 2.9552584127690014
std: 4.99683092772426
normal dist: True
source
get_relpath
Call self as a function.
source
to_root_path
def to_root_path(
path
):
Converts a path to an absolute path from the root directory of the repository.
source
get_root
Returns the root directory of the git repository.
source
split_in_chunks
def split_in_chunks(
o, chunksize, start:int = 0 , shuffle:bool = False , drop_last:bool = False
):
Call self as a function.
a = np.arange(5 , 15 )
test_eq(split_in_chunks(a, 3 , drop_last= False ), [array([5 , 6 , 7 ]), array([ 8 , 9 , 10 ]), array([11 , 12 , 13 ]), array([14 ])])
test_eq(split_in_chunks(a, 3 , drop_last= True ), [array([5 , 6 , 7 ]), array([ 8 , 9 , 10 ]), array([11 , 12 , 13 ])])
test_eq(split_in_chunks(a, 3 , start= 2 , drop_last= True ), [array([7 , 8 , 9 ]), array([10 , 11 , 12 ])])
source
load_object
def load_object(
file_path
):
Call self as a function.
source
save_object
def save_object(
o, file_path, verbose:bool = True
):
Call self as a function.
split = np.arange(100 )
save_object(split, file_path= 'data/test' )
split2 = load_object('data/test.pkl' )
test_eq(split, split2)
data directory already exists.
ndarray saved as data/test.pkl
splits = L([[[0 ,1 ,2 ,3 ,4 ], [5 ,6 ,7 ,8 ,9 ]],[[10 ,11 ,12 ,13 ,14 ], [15 ,16 ,17 ,18 ,19 ]]])
save_object(splits, file_path= Path('data/test' ))
splits2 = load_object('data/test' )
test_eq(splits, splits2)
data directory already exists.
L saved as data/test.pkl
source
get_idxs_to_keep
def get_idxs_to_keep(
o, cond, crit:str = 'all' , invert:bool = False , axis:tuple = (1 , 2 ), keepdims:bool = False
):
Call self as a function.
a = np.random.rand(100 , 2 , 10 )
a[a > .95 ] = np.nan
idxs_to_keep = get_idxs_to_keep(a, np.isfinite)
if idxs_to_keep.size> 0 :
test_eq(np.isnan(a[idxs_to_keep]).sum (), 0 )
source
zerofy
def zerofy(
a, stride, keep:bool = False
):
Create copies of an array setting individual/ group values to zero
stride = 3
a = np.arange(2 * 5 ).reshape(2 ,5 ) + 1
zerofy(a, stride, keep= False )
array([[[ 0., 0., 3., 4., 5.],
[ 6., 7., 8., 9., 10.]],
[[ 1., 2., 0., 0., 0.],
[ 6., 7., 8., 9., 10.]],
[[ 1., 2., 3., 4., 5.],
[ 0., 0., 8., 9., 10.]],
[[ 1., 2., 3., 4., 5.],
[ 6., 7., 0., 0., 0.]]])
source
feat2list
Call self as a function.
a = 'a'
test_eq(feat2list(a), ['a' ])
a = ['a' , 'b' ]
test_eq(feat2list(a), ['a' , 'b' ])
a = None
test_eq(feat2list(a), [])
source
smallest_dtype
def smallest_dtype(
num, use_unsigned:bool = False
):
Find the smallest dtype that can safely hold num
test_eq(smallest_dtype(3654 ), 'int16' )
test_eq(smallest_dtype(2048. ), 'float16' )
test_eq(smallest_dtype(365454 ), 'int32' )
test_eq(smallest_dtype(365454. ), 'float32' )
test_eq(smallest_dtype(3654545134897 ), 'int64' )
source
plot_forecast
def plot_forecast(
X_true, y_true, y_pred, sel_vars:NoneType= None , idx:NoneType= None , figsize:tuple = (8 , 4 ), n_samples:int = 1
):
Call self as a function.
source
str2callable
def str2callable(
object_path:str = None , # The string representing the object path.
):
Transform a string into a callable object without importing it in the script.
# test showing you don't need to import the object in the script. The library needs to be installed though.
try :
pyts
except Exception as e:
print (0 , e)
try :
pyts.image
except Exception as e:
print (1 , e)
try :
gasf = eval ("pyts.image.GramianAngularField(method='summation')" )
print (f"2 success: { gasf} " )
except Exception as e:
print (2 , e)
try :
gasf = str2callable("pyts.image.GramianAngularField(method='summation')" )
print (f"3 success: { gasf} " )
except Exception as e:
print (3 , e)
0 name 'pyts' is not defined
1 name 'pyts' is not defined
2 name 'pyts' is not defined
3 success: GramianAngularField()