Source code for sp.multirate

#!/usr/bin/env python
"""Module providing Multirate signal processing functionality.
Largely based on MATLAB's Multirate signal processing toolbox with consultation 
of Octave m-file source code.
"""

import sys
import fractions
import numpy
from scipy import signal


[docs]def downsample(s, n, phase=0): """Decrease sampling rate by integer factor n with included offset phase. """ return s[phase::n]
[docs]def upsample(s, n, phase=0): """Increase sampling rate by integer factor n with included offset phase. """ return numpy.roll(numpy.kron(s, numpy.r_[1, numpy.zeros(n-1)]), phase)
[docs]def decimate(s, r, n=None, fir=False): """Decimation - decrease sampling rate by r. The decimation process filters the input data s with an order n lowpass filter and then resamples the resulting smoothed signal at a lower rate. By default, decimate employs an eighth-order lowpass Chebyshev Type I filter with a cutoff frequency of 0.8/r. It filters the input sequence in both the forward and reverse directions to remove all phase distortion, effectively doubling the filter order. If 'fir' is set to True decimate uses an order 30 FIR filter (by default otherwise n), instead of the Chebyshev IIR filter. Here decimate filters the input sequence in only one direction. This technique conserves memory and is useful for working with long sequences. """ if fir: if n is None: n = 30 b = signal.firwin(n, 1.0/r) a = 1 f = signal.lfilter(b, a, s) else: #iir if n is None: n = 8 b, a = signal.cheby1(n, 0.05, 0.8/r) f = signal.filtfilt(b, a, s) return downsample(f, r)
[docs]def interp(s, r, l=4, alpha=0.5): """Interpolation - increase sampling rate by integer factor r. Interpolation increases the original sampling rate for a sequence to a higher rate. interp performs lowpass interpolation by inserting zeros into the original sequence and then applying a special lowpass filter. l specifies the filter length and alpha the cut-off frequency. The length of the FIR lowpass interpolating filter is 2*l*r+1. The number of original sample values used for interpolation is 2*l. Ordinarily, l should be less than or equal to 10. The original signal is assumed to be band limited with normalized cutoff frequency 0=alpha=1, where 1 is half the original sampling frequency (the Nyquist frequency). The default value for l is 4 and the default value for alpha is 0.5. """ b = signal.firwin(2*l*r+1, alpha/r); a = 1 return r*signal.lfilter(b, a, upsample(s, r))[r*l+1:-1]
[docs]def resample(s, p, q, h=None): """Change sampling rate by rational factor. This implementation is based on the Octave implementation of the resample function. It designs the anti-aliasing filter using the window approach applying a Kaiser window with the beta term calculated as specified by [2]. Ref [1] J. G. Proakis and D. G. Manolakis, Digital Signal Processing: Principles, Algorithms, and Applications, 4th ed., Prentice Hall, 2007. Chap. 6 Ref [2] A. V. Oppenheim, R. W. Schafer and J. R. Buck, Discrete-time signal processing, Signal processing series, Prentice-Hall, 1999 """ gcd = fractions.gcd(p,q) if gcd>1: p=p/gcd q=q/gcd if h is None: #design filter #properties of the antialiasing filter log10_rejection = -3.0 stopband_cutoff_f = 1.0/(2.0 * max(p,q)) roll_off_width = stopband_cutoff_f / 10.0 #determine filter length #use empirical formula from [2] Chap 7, Eq. (7.63) p 476 rejection_db = -20.0*log10_rejection; l = numpy.ceil((rejection_db-8.0) / (28.714 * roll_off_width)) #ideal sinc filter t = numpy.arange(-l, l + 1) ideal_filter=2*p*stopband_cutoff_f*numpy.sinc(2*stopband_cutoff_f*t) #determine parameter of Kaiser window #use empirical formula from [2] Chap 7, Eq. (7.62) p 474 beta = signal.kaiser_beta(rejection_db) #apodize ideal filter response h = numpy.kaiser(2*l+1, beta)*ideal_filter ls = len(s) lh = len(h) l = (lh - 1)/2.0 ly = numpy.ceil(ls*p/float(q)) #pre and postpad filter response nz_pre = numpy.floor(q - numpy.mod(l,q)) hpad = h[-lh+nz_pre:] offset = numpy.floor((l+nz_pre)/q) nz_post = 0; while numpy.ceil(((ls-1)*p + nz_pre + lh + nz_post )/q ) - offset < ly: nz_post += 1 hpad = hpad[:lh + nz_pre + nz_post] #filtering xfilt = upfirdn(s, hpad, p, q) return xfilt[offset-1:offset-1+ly]
[docs]def upfirdn(s, h, p, q): """Upsample signal s by p, apply FIR filter as specified by h, and downsample by q. Using fftconvolve as opposed to lfilter as it does not seem to do a full convolution operation (and its much faster than convolve). """ return downsample(signal.fftconvolve(h, upsample(s, p)), q)
def main(): """Show simple use cases for functionality provided by this module. Each example below attempts to mimic the examples provided by mathworks MATLAB documentation, http://www.mathworks.com/help/toolbox/signal/ """ import pylab argv = sys.argv if len(argv) != 1: print >>sys.stderr, 'usage: python -m pim.sp.multirate' sys.exit(2) #Downsample x = numpy.arange(1, 11) print 'Down Sampling %s by 3' % x print downsample(x, 3) print 'Down Sampling %s by 3 with phase offset 2' % x print downsample(x, 3, phase=2) #Upsample x = numpy.arange(1, 5) print 'Up Sampling %s by 3' % x print upsample(x, 3) print 'Up Sampling %s by 3 with phase offset 2' % x print upsample(x, 3, 2) #Decimate t = numpy.arange(0, 1, 0.00025) x = numpy.sin(2*numpy.pi*30*t) + numpy.sin(2*numpy.pi*60*t) y = decimate(x,4) pylab.figure() pylab.subplot(2, 1, 1) pylab.title('Original Signal') pylab.stem(numpy.arange(len(x[0:120])), x[0:120]) pylab.subplot(2, 1, 2) pylab.title('Decimated Signal') pylab.stem(numpy.arange(len(y[0:30])), y[0:30]) #Interp t = numpy.arange(0, 1, 0.001) x = numpy.sin(2*numpy.pi*30*t) + numpy.sin(2*numpy.pi*60*t) y = interp(x,4) pylab.figure() pylab.subplot(2, 1, 1) pylab.title('Original Signal') pylab.stem(numpy.arange(len(x[0:30])), x[0:30]) pylab.subplot(2, 1, 2) pylab.title('Interpolated Signal') pylab.stem(numpy.arange(len(y[0:120])), y[0:120]) #upfirdn L = 147.0 M = 160.0 N = 24.0*L h = signal.firwin(N-1, 1/M, window=('kaiser', 7.8562)) h = L*h Fs = 48000.0 n = numpy.arange(0, 10239) x = numpy.sin(2*numpy.pi*1000/Fs*n) y = upfirdn(x, h, L, M) pylab.figure() pylab.stem(n[1:49]/Fs, x[1:49]) pylab.stem(n[1:45]/(Fs*L/M), y[13:57], 'r', markerfmt='ro',) pylab.xlabel('Time (sec)') pylab.ylabel('Signal value') #resample fs1 = 10.0 t1 = numpy.arange(0, 1 + 1.0/fs1, 1.0/fs1) x = t1 y = resample(x, 3, 2) t2 = numpy.arange(0,(len(y)))*2.0/(3.0*fs1) pylab.figure() pylab.plot(t1, x, '*') pylab.plot(t2, y, 'o') pylab.plot(numpy.arange(-0.5,1.5, 0.01), numpy.arange(-0.5,1.5, 0.01), ':') pylab.legend(('original','resampled')) pylab.xlabel('Time') x = numpy.hstack([numpy.arange(1,11), numpy.arange(9,0,-1)]) y = resample(x,3,2) pylab.figure() pylab.subplot(2, 1, 1) pylab.title('Edge Effects Not Noticeable') pylab.plot(numpy.arange(19)+1, x, '*') pylab.plot(numpy.arange(29)*2/3.0 + 1, y, 'o') pylab.legend(('original', 'resampled')) x = numpy.hstack([numpy.arange(10, 0, -1), numpy.arange(2,11)]) y = resample(x,3,2) pylab.subplot(2, 1, 2) pylab.plot(numpy.arange(19)+1, x, '*') pylab.plot(numpy.arange(29)*2/3.0 + 1, y, 'o') pylab.title('Edge Effects Very Noticeable') pylab.legend(('original', 'resampled')) pylab.show() return 0 if __name__ == '__main__': sys.exit(main())