'Does NumPy have a function equivalent to Matlab's buffer?

I see there is an array_split and split methods but these are not very handy when you have to split an array of length which is not integer multiple of the chunk size. Moreover, these method’s input is the number of slices rather than the slice size. I need something more like Matlab's buffer method which is more suitable for signal processing.

For example, if I want to buffer a signals to chunks of size 60 I need to do: np.vstack(np.hsplit(x.iloc[0:((len(x)//60)*60)], len(x)//60)) which is cumbersome.



Solution 1:[1]

I wrote the following routine to handle the use cases I needed, but I have not implemented/tested for "underlap".

Please feel free to make suggestions for improvement.

def buffer(X, n, p=0, opt=None):
    '''Mimic MATLAB routine to generate buffer array

    MATLAB docs here: https://se.mathworks.com/help/signal/ref/buffer.html

    Parameters
    ----------
    x: ndarray
        Signal array
    n: int
        Number of data segments
    p: int
        Number of values to overlap
    opt: str
        Initial condition options. default sets the first `p` values to zero,
        while 'nodelay' begins filling the buffer immediately.

    Returns
    -------
    result : (n,n) ndarray
        Buffer array created from X
    '''
    import numpy as np

    if opt not in [None, 'nodelay']:
        raise ValueError('{} not implemented'.format(opt))

    i = 0
    first_iter = True
    while i < len(X):
        if first_iter:
            if opt == 'nodelay':
                # No zeros at array start
                result = X[:n]
                i = n
            else:
                # Start with `p` zeros
                result = np.hstack([np.zeros(p), X[:n-p]])
                i = n-p
            # Make 2D array and pivot
            result = np.expand_dims(result, axis=0).T
            first_iter = False
            continue

        # Create next column, add `p` results from last col if given
        col = X[i:i+(n-p)]
        if p != 0:
            col = np.hstack([result[:,-1][-p:], col])
        i += n-p

        # Append zeros if last row and not length `n`
        if len(col) < n:
            col = np.hstack([col, np.zeros(n-len(col))])

        # Combine result with next row
        result = np.hstack([result, np.expand_dims(col, axis=0).T])

    return result

Solution 2:[2]

def buffer(X = np.array([]), n = 1, p = 0):
    #buffers data vector X into length n column vectors with overlap p
    #excess data at the end of X is discarded
    n = int(n) #length of each data vector
    p = int(p) #overlap of data vectors, 0 <= p < n-1
    L = len(X) #length of data to be buffered
    m = int(np.floor((L-n)/(n-p)) + 1) #number of sample vectors (no padding)
    data = np.zeros([n,m]) #initialize data matrix
    for startIndex,column in zip(range(0,L-n,n-p),range(0,m)):
        data[:,column] = X[startIndex:startIndex + n] #fill in by column
    return data

Solution 3:[3]

Same as the other answer, but faster.

def buffer(X, n, p=0):

    '''
    Parameters
    ----------
    x: ndarray
        Signal array
    n: int
        Number of data segments
    p: int
        Number of values to overlap

    Returns
    -------
    result : (n,m) ndarray
        Buffer array created from X
    '''
    import numpy as np

    d = n - p
    m = len(X)//d

    if m * d != len(X):
        m = m + 1

    Xn = np.zeros(d*m)
    Xn[:len(X)] = X

    Xn = np.reshape(Xn,(m,d))
    Xne = np.concatenate((Xn,np.zeros((1,d))))
    Xn = np.concatenate((Xn,Xne[1:,0:p]), axis = 1)

    return np.transpose(Xn[:-1])

Solution 4:[4]

ryanjdillon's answer rewritten for significant performance improvement; it appends to a list instead of concatenating arrays, latter which copies the array iteratively and is much slower.

def buffer(x, n, p=0, opt=None):
    if opt not in ('nodelay', None):
        raise ValueError('{} not implemented'.format(opt))

    i = 0
    if opt == 'nodelay':
        # No zeros at array start
        result = x[:n]
        i = n
    else:
        # Start with `p` zeros
        result = np.hstack([np.zeros(p), x[:n-p]])
        i = n-p
    # Make 2D array, cast to list for .append()
    result = list(np.expand_dims(result, axis=0))

    while i < len(x):
        # Create next column, add `p` results from last col if given
        col = x[i:i+(n-p)]
        if p != 0:
            col = np.hstack([result[-1][-p:], col])

        # Append zeros if last row and not length `n`
        if len(col):
            col = np.hstack([col, np.zeros(n - len(col))])

        # Combine result with next row
        result.append(np.array(col))
        i += (n - p)

    return np.vstack(result).T

Solution 5:[5]

def buffer(X, n, p=0):
'''
Parameters:
x: ndarray, Signal array, input a long vector as raw speech wav
n: int, frame length
p: int, Number of values to overlap
-----------
Returns:
result : (n,m) ndarray, Buffer array created from X
'''
import numpy as np
d = n - p
#print(d)
m = len(X)//d
c = n//d
#print(c)
if m * d != len(X):
    m = m + 1
#print(m)

Xn = np.zeros(d*m)
Xn[:len(X)] = X
Xn = np.reshape(Xn,(m,d))
Xn_out = Xn
for i in range(c-1):
    Xne = np.concatenate((Xn,np.zeros((i+1,d))))
    Xn_out = np.concatenate((Xn_out, Xne[i+1:,:]),axis=1)
#print(Xn_out.shape)  
if n-d*c>0:
    Xne = np.concatenate((Xn, np.zeros((c,d))))
    Xn_out = np.concatenate((Xn_out,Xne[c:,:n-p*c]),axis=1)

return np.transpose(Xn_out)

here is a improved code of Ali Khodabakhsh's sample code which is not work in my cases. Feel free to comment and use it.

Solution 6:[6]

Comparing the execution time of the proposed answers, by running

x = np.arange(1,200000)
start = timer()
y = buffer(x,60,20)
end = timer()
print(end-start)

the results are:

Andrzej May, 0.005595300000095449

OverLordGoldDragon, 0.06954789999986133

ryanjdillon, 2.427092700000003

Solution 7:[7]

This Keras function may be considered as a Python equivalent of MATLAB Buffer().

See the Sample Code :

import numpy as np
S = np.arange(1,99) #A Demo Array

See Output Here

import tensorflow.keras.preprocessing as kp
list(kp.timeseries_dataset_from_array(S, targets = None,sequence_length=7,sequence_stride=7,batch_size=5))

See the Buffered Array Output Here

Reference : See This

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2
Solution 3 Ali Khodabakhsh
Solution 4 OverLordGoldDragon
Solution 5
Solution 6 Filipe Pinto
Solution 7 Ambarish Chandurkar