Changeset 299
- Timestamp:
- 11/06/08 17:36:08 (2 months ago)
- Files:
-
- trunk/src/stgen.py (modified) (5 diffs)
- trunk/test/test_stgen.py (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/src/stgen.py
r298 r299 1 # -*- coding: utf8 -*-2 1 # TODO : needs refactoring 3 2 # TODO: needs python gsl wrapper / convert to numpy … … 6 5 from NeuroTools import check_dependency 7 6 8 have_gsl = check_dependency('pygsl') 9 if have_gsl: 10 import pygsl 7 from signals import SpikeTrain 11 8 12 9 from numpy import array, log … … 14 11 15 12 16 17 def calc_h_single(x,a,b,dt=1e-4): 18 from pygsl.sf import gamma_inc_Q 19 Hpre = -log(pygsl.sf.gamma_inc_Q(a,(x-dt)/b)[0])[0] 20 Hpost = -log(gamma_inc_Q(a,(x+dt)/b)[0])[0] 21 return 0.5*(Hpost-Hpre)/dt 22 23 def calc_h(x,a,b,dt=1e-4): 24 Hpre = -log(array(map(gamma_inc_Q,a,(x-dt)/b))) 25 Hpost = -log(array(map(gamma_inc_Q,a,(x-dt)/b))) 26 return 0.5*(Hpost-Hpre)/dt 27 13 def gamma_hazard(x,a,b,dt=1e-4): 14 """Compute the hazard function for a gamma process with parameters a,b 15 16 where a and b are the parameters of the gamma PDF: 17 18 y(t) = x^(a-1) \exp(-x/b) / (\Gamma(a)*b^a) 19 20 x is in units of seconds 21 22 b is in units of seconds 23 24 a is dimensionless 25 26 """ 27 28 # TODO: this algorithm has numerical problems 29 # Try: 30 # plot(stgen.gamma_hazard(arange(0,1000.0,0.1),10.0,1.0/50.0)) 31 # and look for the kinks. 32 33 from scipy.special import gammaincc 34 Hpre = -log(gammaincc(a,(x-dt)/b)) 35 Hpost = -log(gammaincc(a,(x+dt)/b)) 36 val = 0.5*(Hpost-Hpre)/dt 37 38 if isinstance(val,numpy.ndarray): 39 val[numpy.isnan(val)] = 1.0/b 40 return val 41 elif numpy.isnan(val): 42 return 1.0/b 43 else: 44 return val 45 28 46 29 47 class StGen: 30 48 31 def __init__(self, gslrng=None, numpyrng=None):49 def __init__(self, rng=None, seed=None): 32 50 """ Spike Train Generator 33 51 34 gslrng if specified is the gsl rng to be used 35 Similarly for numpyrng. 36 If neither is specified, a gsl rng is created. 37 If both are specified, the gslrng is used. 38 39 Of course, if pygsl is not available, you can't 40 have a gsl rng and will get a numpy rng. 52 And object to generate spiking random processes with various statistics 53 (inhomogeneous poisson, inhomogeneous gamma, etc.) as SpikeTrain objects. 54 55 rng should be None, or a numpy.random.RandomState object, 56 or an object with the same interface. 57 58 If rng is not None, the provided rng will be used 59 to generate random numbers, otherwise StGen will create 60 its own random number generator. 61 62 If a seed is provided, it is passed to rng.seed(seed) 63 64 41 65 """ 42 66 43 if have_gsl: 44 if gslrng: 45 self.rng = gslrng 46 elif numpyrng: 47 self.rng = numpyrng 48 else: 49 from pygsl.rng import rng 50 self.rng = rng() 67 if rng==None: 68 self.rng = numpy.random.RandomState() 51 69 else: 52 self.rng = numpy.random.RandomState() 70 self.rng = rng 71 72 if seed != None: 73 self.rng.seed(seed) 53 74 54 75 def seed(self,seed): 55 76 """ seed the gsl rng with a given seed """ 56 self.rng.set(seed) 57 58 59 def poisson_generator(self,rate,tsim): 60 """Returns list of poisson spikes with rate and length (fast algorithm) 61 if rate is in spikes/second, tsim should be in seconds and the returned times are in seconds. 62 so if you want times in milliseconds, rate should be in spikes/ms""" 63 64 number = int(tsim*2.0*rate) 77 self.rng.seed(seed) 78 79 80 def poisson_generator(self,rate,t_start=0.0,t_stop=1000.0,array=False): 81 """Returns a SpikeList whose spikes are a realization of a Poisson process 82 with the given rate (Hz) and stopping time t_stop (milliseconds). 83 84 Note: t_start is always 0.0, thus all realizations are as if 85 they spiked at t=0.0, though this spike is not included in the SpikeList. 86 87 if array is True, as numpy array of sorted spikes is returned, 88 rather than a SpikeList object. 89 90 """ 91 92 number = int((t_stop-t_start)/1000.0*2.0*rate) 65 93 if number > 0: 66 isi = self.rng.exponential(1.0/rate, number) 94 isi = self.rng.exponential(1.0/rate, number)*1000.0 67 95 if number > 1: 68 96 spikes = numpy.add.accumulate(isi) 69 97 else: 70 spikes = numpy.array([isi])98 spikes = isi 71 99 else: 72 100 spikes = numpy.array([]) 73 i = numpy.searchsorted(spikes, tsim) 74 return numpy.resize(spikes,(i,)) 75 76 77 def poissondyn_generator(self,tbins,rate,tsim): 78 """ generate a dynamic poisson renewal process from a 79 poisson spike train via thinning method (See Devroye, Mueller 2006) 80 81 tbins: the time bins at which to specify the parameters (Numeric array) NOTE: tbins[0] should logicallly be time zero 82 rate: the rate at time bins (Numeric array) 83 rmax: the rate of the poisson spike train to be thinned, must be > max(rate) 84 tsim: length of time to simulate process 101 102 spikes+=t_start 103 i = numpy.searchsorted(spikes, t_stop) 104 105 if i==len(spikes): 106 raise RuntimeError("Internal ISI buffer overrun. Please file a bug report ticket at http://neuralensemble.org/NeuroTools.") 107 108 if array: 109 return numpy.resize(spikes,(i,)) 110 111 return SpikeTrain(numpy.resize(spikes,(i,)), t_start=t_start,t_stop=t_stop) 112 113 114 def inh_poisson_generator(self,rate,t,t_stop,array=False): 115 """Returns a SpikeList whose spikes are a realization of an 116 inhomogeneous poisson process (dynamic rate). 117 118 The implementation uses the thinning method, as presented in the 119 references. 120 121 t: an array specifying the time bins (in milliseconds) 122 at which to specify the rate 123 124 NOTE: t_start=t[0] 125 126 rate: the rate (Hz), where rate[i] is active on the 127 interval [t[i],t[i+1]) (numpy array) 128 t_stop: length of time to simulate process 129 130 131 References: 132 133 Eilif Muller, Lars Buesing, Johannes Schemmel, and Karlheinz Meier 134 Spike-Frequency Adapting Neural Ensembles: Beyond Mean Adaptation and Renewal Theories 135 Neural Comput. 2007 19: 2958-3010. 136 137 Devroye, L. (1986). Non-uniform random variate generation. New York: Springer-Verlag. 138 139 85 140 """ 86 141 142 if numpy.shape(t)!=numpy.shape(rate): 143 raise ValueError('shape mismatch: t,rate must be of the same shape') 144 145 # get max rate and generate poisson process to be thinned 87 146 rmax = numpy.max(rate) 88 ps = self.poisson_generator(rmax, tsim) 89 90 if len(ps) > 0: 91 # gen uniform rand on 0,1 for each spike 92 try: 93 rn = array(self.rng.uniform(0, 1, len(ps))) # numpy 94 except TypeError: # 95 rn = array(self.rng.uniform(len(ps))) # gsl 96 97 # instantaneous rate for each spike 98 99 #idx = len(tbins)-1-numpy.searchsorted(-tbins[::-1],-ps) 100 ## for all times before first tbin, use first tbin 101 #idx = numpy.where(idx>=0,idx,0) 102 103 # TODO : this simplifies the above code but "there must a reason why this was more complex..." 104 idx=numpy.searchsorted(tbins,ps)-1 105 spike_rate = numpy.choose(idx,rate) 106 107 # thin and return spikes 108 keep = numpy.nonzero(rn<spike_rate/rmax) 109 #TODO: solve why I got a 2D array... 110 spike_train = numpy.take(ps,keep)[0,:] 111 else: 112 spike_train = ps 113 114 return spike_train 115 116 117 # TODO: have a array generator with spatio-temporal correlations 118 119 def OU_generator_slow(self,dt,tau,sigma,y0,tsim): 120 # TODO: if this is slow and not used, we should remove it! 147 ps = self.poisson_generator(rmax, t_start=t[0], t_stop=t_stop, array=True) 148 149 # return empty if no spikes 150 if len(ps) == 0: 151 return SpikeTrain(numpy.array([]), t_start=t[0],t_stop=t_stop) 152 153 # gen uniform rand on 0,1 for each spike 154 rn = numpy.array(self.rng.uniform(0, 1, len(ps))) 155 156 # instantaneous rate for each spike 157 158 idx=numpy.searchsorted(t,ps)-1 159 spike_rate = rate[idx] 160 161 # thin and return spikes 162 spike_train = ps[rn<spike_rate/rmax] 163 164 if array: 165 return spike_train 166 167 return SpikeTrain(spike_train, t_start=t[0],t_stop=t_stop) 168 169 170 171 def _inh_gamma_generator_python(self,a,b,t,t_stop,array=False): 172 """Returns a SpikeList whose spikes are a realization of an 173 inhomogeneous gamma process (dynamic rate). 174 175 The implementation uses the thinning method, as presented in the 176 references. 177 178 t: an array specifying the time bins (in milliseconds) 179 at which to specify the rate 180 181 NOTE: t_start=t[0] 182 183 a and b are the parameters of the gamma PDF, 184 where a[i] and b[i] are active on the 185 interval [t[i],t[i+1]) (numpy arrays). 186 187 a is a dimensionless quantity >0, but typically on the order of 2-10. 188 a=1 results in a poisson process. 189 190 b is assumed to be in units of 1/Hz (seconds). 191 192 y(t) = x^(a-1) \exp(-x/b) / (\Gamma(a)*b^a) 193 194 t_stop: length of time to simulate process 195 196 197 References: 198 199 Eilif Muller, Lars Buesing, Johannes Schemmel, and Karlheinz Meier 200 Spike-Frequency Adapting Neural Ensembles: Beyond Mean Adaptation and Renewal Theories 201 Neural Comput. 2007 19: 2958-3010. 202 203 Devroye, L. (1986). Non-uniform random variate generation. New York: Springer-Verlag. 204 205 206 """ 207 208 from numpy import shape 209 210 if shape(t)!=shape(a) or shape(a)!=shape(b): 211 raise ValueError('shape mismatch: t,a,b must be of the same shape') 212 213 # get max rate and generate poisson process to be thinned 214 rmax = numpy.max(1.0/b) 215 ps = self.poisson_generator(rmax, t_start=t[0], t_stop=t_stop, array=True) 216 217 # return empty if no spikes 218 if len(ps) == 0: 219 return SpikeTrain(numpy.array([]), t_start=t[0],t_stop=t_stop) 220 221 222 # gen uniform rand on 0,1 for each spike 223 rn = numpy.array(self.rng.uniform(0, 1, len(ps))) 224 225 # instantaneous a,b for each spike 226 227 idx=numpy.searchsorted(t,ps)-1 228 spike_a = a[idx] 229 spike_b = b[idx] 230 231 keep = numpy.zeros(shape(ps),bool) 232 233 # thin spikes 234 235 i = 0 236 t_last = 0.0 237 t_i = 0 238 239 while(i<len(ps)): 240 # find index in "t" time 241 t_i = numpy.searchsorted(t[t_i:],ps[i],'right')-1+t_i 242 if rn[i]<gamma_hazard((ps[i]-t_last)/1000.0,a[t_i],b[t_i])/rmax: 243 # keep spike 244 t_last = ps[i] 245 keep[i] = True 246 i+=1 247 248 249 spike_train = ps[keep] 250 251 if array: 252 return spike_train 253 254 return SpikeTrain(spike_train, t_start=t[0],t_stop=t_stop) 255 256 257 # use slow python implementation for the time being 258 # TODO: provide optimized C/weave implementation if possible 259 260 inh_gamma_generator = _inh_gamma_generator_python 261 262 263 def _OU_generator_python(self,dt,tau,sigma,y0,t_start=0.0,t_stop=1000.0,array=False): 121 264 """ generates an OU process using forward euler 122 dt = resolution 123 tau = correlation time 265 dt = resolution in milliseconds 266 tau = correlation time in milliseconds 124 267 sigma = std dev of process 125 268 y0 = mean/initial value 126 tsim = how long to simulate 269 t_stop = start time in milliseconds 270 t_stop = end time in milliseconds 127 271 """ 128 272 129 273 130 t = arange(0,tsim,dt) 131 y = zeros(shape(t),Float) 274 t = numpy.arange(t_start,t_stop,dt) 275 y = numpy.zeros(numpy.shape(t),float) 276 gauss = self.rng.standard_normal(len(t)-1) 132 277 133 278 y[0] = y0 … … 135 280 # python loop... bad+slow! 136 281 for i in xrange(1,len(t)): 137 y[i] = y[i-1]+dt/tau*(y0-y[i-1])+sqrt(2*dt/tau)*sigma*self.rng.gaussian(1.0) 138 282 y[i] = y[i-1]+dt/tau*(y0-y[i-1])+numpy.sqrt(2*dt/tau)*sigma*gauss[i-1] 283 284 #if array: 285 139 286 return (y,t) 140 287 141 142 143 def poisson_generator_slow(self,rate,tsim): 144 # TODO: if this is slow and not used, we should remove it! 145 146 """ returns list of poisson spikes with rate and length (slow algorithm) """ 147 148 from Numeric import array 149 import Numeric 150 151 spikes = Numeric.zeros(tsim*rate*2,Numeric.Float) 152 i = 1 153 spikes[0]=0.0 154 t=0.0 155 156 while t<tsim: 157 t+=self.rng.exponential(1.0/rate) 158 spikes[i] = t 159 i+=1 160 if i>=len(spikes): 161 spikes = Numeric.resize(spikes,(len(spikes)*2,)) 162 163 return Numeric.resize(spikes,(i-1,)) 164 165 166 167 def gen_g_conv(self,rate,tau,q,num,t): 168 169 """ generate g for quantal-increase-"q"-exponential-decay-"tau" synapse 288 #return AnalogSignal(y,dt,t_start=t_start,t_stop=t_stop) 289 290 # use slow python implementation for the time being 291 # TODO: provide optimized C/weave implementation if possible 292 293 OU_generator = _OU_generator_python 294 295 # TODO: inhomogeneous OU generator 296 297 298 # TODO: have a array generator with spatio-temporal correlations 299 300 # TODO fix shotnoise stuff below ... and write tests 301 302 def shotnoise_generator(self,rate,tau,q,num,t): 303 304 """ generate shotnoise 305 306 quantal-increase-"q"-exponential-decay-"tau" synapse 170 307 and a poisson spike train of "rate" and "num" """ 171 308 172 g = zeros(shape(t),Float)309 g = numpy.zeros(numpy.shape(t),float) 173 310 174 311 … … 180 317 181 318 return g 319 320 182 321 183 322 trunk/test/test_stgen.py
r248 r299 5 5 import unittest 6 6 from NeuroTools import stgen 7 from NeuroTools import signals 7 8 import numpy 9 10 class StatisticalError(Exception): 11 pass 8 12 9 13 class StGenInitTest(unittest.TestCase): … … 17 21 def testCreateWithNoArgs(self): 18 22 """ 19 With no arguments for the constructor, we should get a GSL RNG if 20 pygsl is available, and a Numpy RNG otherwise. 23 With no arguments for the constructor, we should get a Numpy RNG. 21 24 """ 22 25 stg = stgen.StGen() 23 if stgen.have_gsl: 24 import pygsl 25 self.assertEqual(type(stg.rng), type(pygsl.rng.rng())) 26 else: 27 assert isinstance(stg.rng, numpy.random.RandomState) 26 assert isinstance(stg.rng, numpy.random.RandomState) 27 28 29 def testMethodsBasic(self): 30 31 stg = stgen.StGen() 32 33 rate = 100.0 #Hz 34 t_stop = 1000.0 # milliseconds 35 st = stg.poisson_generator(rate,0.0,t_stop) 36 37 assert isinstance(st,signals.SpikeTrain) 38 39 st = stg.poisson_generator(rate,0.0,t_stop,array=True) 40 41 assert isinstance(st, numpy.ndarray) 42 43 44 def testStatsPoisson(self): 45 46 # this is a statistical test with non-zero chance of failure 47 48 stg = stgen.StGen() 49 50 rate = 100.0 #Hz 51 t_start = 500.0 52 t_stop = 1500.0 # milliseconds 53 54 st = stg.poisson_generator(rate,t_start=t_start,t_stop=t_stop,array=True) 55 56 assert st[-1] < t_stop 57 assert st[0] > t_start 58 59 60 # last spike should not be more than 4 ISI away from t_stop 61 err = """ 62 Last spike should not be more than 4 ISI behind t_stop. 63 There is a non-zero chance for this to occur during normal operation. 64 Re-run the test to see if the error persists.""" 65 66 if st[-1] < t_stop-4.0*1.0/rate*1000.0: 67 raise StatisticalError(err) 68 69 70 # first spike should not be more than 4 ISI away from t_start 71 err = """ 72 First spike should not be more than 4 ISI in front of t_start. 73 There is a non-zero chance for this to occur during normal operation. 74 Re-run the test to see if the error persists.""" 75 76 if st[0] > t_start+4.0*1.0/rate*1000.0: 77 raise StatisticalError(err) 78 79 err = """ 80 Number of spikes should be within 3 standard deviations of mean. 81 There is a non-zero chance for this to occur during normal operation. 82 Re-run the test to see if the error persists.""" 83 84 # time interval is one second 85 86 if len(st) > rate+3.0*numpy.sqrt(rate) or len(st) < rate-3.0*numpy.sqrt(rate): 87 raise StatisticalError(err) 88 89 90 def testStatsInhPoisson(self): 91 92 # this is a statistical test with non-zero chance of failure 93 94 stg = stgen.StGen() 95 96 rate = 100.0 #Hz 97 t_start = 500.0 98 t_stop = 1500.0 # milliseconds 99 100 st = stg.inh_poisson_generator(numpy.array([rate]),numpy.array([t_start]),t_stop,array=True) 101 102 assert st[-1] < t_stop 103 assert st[0] > t_start 104 105 106 # last spike should not be more than 4 ISI away from t_stop 107 err = """ 108 Last spike should not be more than 4 ISI behind t_stop. 109 There is a non-zero chance for this to occur during normal operation. 110 Re-run the test to see if the error persists.""" 111 112 if st[-1] < t_stop-4.0*1.0/rate*1000.0: 113 raise StatisticalError(err) 114 115 116 # first spike should not be more than 4 ISI away from t_start 117 err = """ 118 First spike should not be more than 4 ISI in front of t_start. 119 There is a non-zero chance for this to occur during normal operation. 120 Re-run the test to see if the error persists.""" 121 122 if st[0] > t_start+4.0*1.0/rate*1000.0: 123 raise StatisticalError(err) 124 125 err = """ 126 Number of spikes should be within 3 standard deviations of mean. 127 There is a non-zero chance for this to occur during normal operation. 128 Re-run the test to see if the error persists.""" 129 130 # time interval is one second 131 132 if len(st) > rate+3.0*numpy.sqrt(rate) or len(st) < rate-3.0*numpy.sqrt(rate): 133 raise StatisticalError(err) 134 135 136 # step in the rate 137 138 st = stg.inh_poisson_generator(numpy.array([100.0,200.0]),numpy.array([500.0,1500.0]),2500.0,array=True) 139 140 n1 = len(st[st<1500.0]) 141 n2 = len(st[st>1500.0]) 142 143 err = """ 144 Number of spikes should be within 3 standard deviations of mean. 145 There is a non-zero chance for this to occur during normal operation. 146 Re-run the test to see if the error persists.""" 147 148 if n2 > 200.0+3.0*numpy.sqrt(200.0) or n2 < 200.0-3.0*numpy.sqrt(200.0): 149 raise StatisticalError(err) 150 151 if n1 > 100.0+3.0*numpy.sqrt(100.0) or n1 < 100.0-3.0*numpy.sqrt(100.0): 152 raise StatisticalError(err) 153 154 155 156 def testStatsInhGamma(self): 157 158 # this is a statistical test with non-zero chance of failure 159 160 from numpy import array 161 162 stg = stgen.StGen() 163 164 # gamma step rate= 100Hz -> 200Hz, a = 3.0 165 166 st = stg.inh_gamma_generator(array([3.0,3.0]),array([1.0/100.0/3.0,1.0/200.0/3.0]),array([500.0,1500.0]),2500.0,array=True) 167 168 169 n1 = len(st[st<1500.0]) 170 n2 = len(st[st>1500.0]) 171 172 err = """ 173 Number of spikes should be within 3 standard deviations of mean. 174 There is a non-zero chance for this to occur during normal operation. 175 Re-run the test to see if the error persists.""" 176 177 if n2 > 200.0+3.0*numpy.sqrt(200.0) or n2 < 200.0-3.0*numpy.sqrt(200.0): 178 raise StatisticalError(err) 179 180 if n1 > 100.0+3.0*numpy.sqrt(100.0) or n1 < 100.0-3.0*numpy.sqrt(100.0): 181 raise StatisticalError(err) 182 183 184 def testStatsOUGen(self): 185 186 # this is a statistical test with non-zero chance of failure 187 188 from numpy import array 189 190 stg = stgen.StGen() 191 192 (ou,t) = stg.OU_generator(0.1,10.0,2.0,10.0,500.0,1500.0,array=True) 193 194 195 196 197 198 28 199 29 200 # ==============================================================================

