From 1a79a01983aa60fe886b6bbf9218d67b84cc9a8b Mon Sep 17 00:00:00 2001 From: yo mama Date: Wed, 16 Mar 2016 21:29:42 -0700 Subject: ok --- hmmestimate.py | 169 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 169 insertions(+) create mode 100644 hmmestimate.py diff --git a/hmmestimate.py b/hmmestimate.py new file mode 100644 index 0000000..16d3e20 --- /dev/null +++ b/hmmestimate.py @@ -0,0 +1,169 @@ +import numpy as np +from copy import copy +import matplotlib.pyplot as plt + +class HMM: + def __init__(self): + pass + + def simulate(self,nSteps): + + def drawFrom(probs): + return np.where(np.random.multinomial(1,probs) == 1)[0][0] + + observations = np.zeros(nSteps) + states = np.zeros(nSteps) + states[0] = drawFrom(self.pi) + observations[0] = drawFrom(self.B[states[0],:]) + for t in range(1,nSteps): + states[t] = drawFrom(self.A[states[t-1],:]) + observations[t] = drawFrom(self.B[states[t],:]) + return observations,states + + + def train(self,observations,criterion,graphics=False): + if graphics: + plt.ion() + + nStates = self.A.shape[0] + nSamples = len(observations) + + A = self.A + B = self.B + pi = copy(self.pi) + + done = False + while not done: + + # alpha_t(i) = P(O_1 O_2 ... O_t, q_t = S_i | hmm) + # Initialize alpha + alpha = np.zeros((nStates,nSamples)) + c = np.zeros(nSamples) #scale factors + alpha[:,0] = pi.T * self.B[:,observations[0]] + c[0] = 1.0/np.sum(alpha[:,0]) + alpha[:,0] = c[0] * alpha[:,0] + # Update alpha for each observation step + for t in range(1,nSamples): + alpha[:,t] = np.dot(alpha[:,t-1].T, self.A).T * self.B[:,observations[t]] + c[t] = 1.0/np.sum(alpha[:,t]) + alpha[:,t] = c[t] * alpha[:,t] + + # beta_t(i) = P(O_t+1 O_t+2 ... O_T | q_t = S_i , hmm) + # Initialize beta + beta = np.zeros((nStates,nSamples)) + beta[:,nSamples-1] = 1 + beta[:,nSamples-1] = c[nSamples-1] * beta[:,nSamples-1] + # Update beta backwards from end of sequence + for t in range(len(observations)-1,0,-1): + beta[:,t-1] = np.dot(self.A, (self.B[:,observations[t]] * beta[:,t])) + beta[:,t-1] = c[t-1] * beta[:,t-1] + + xi = np.zeros((nStates,nStates,nSamples-1)); + for t in range(nSamples-1): + denom = np.dot(np.dot(alpha[:,t].T, self.A) * self.B[:,observations[t+1]].T, + beta[:,t+1]) + for i in range(nStates): + numer = alpha[i,t] * self.A[i,:] * self.B[:,observations[t+1]].T * \ + beta[:,t+1].T + xi[i,:,t] = numer / denom + + # gamma_t(i) = P(q_t = S_i | O, hmm) + gamma = np.squeeze(np.sum(xi,axis=1)) + # Need final gamma element for new B + prod = (alpha[:,nSamples-1] * beta[:,nSamples-1]).reshape((-1,1)) + gamma = np.hstack((gamma, prod / np.sum(prod))) #append one more to gamma!!! + + newpi = gamma[:,0] + newA = np.sum(xi,2) / np.sum(gamma[:,:-1],axis=1).reshape((-1,1)) + newB = copy(B) + + if graphics: + plt.subplot(2,1,1) + plt.cla() + #plt.plot(gamma.T) + plt.plot(gamma[1]) + plt.ylim(-0.1,1.1) + plt.legend(('Probability State=1')) + plt.xlabel('Time') + plt.draw() + + numLevels = self.B.shape[1] + sumgamma = np.sum(gamma,axis=1) + for lev in range(numLevels): + mask = observations == lev + newB[:,lev] = np.sum(gamma[:,mask],axis=1) / sumgamma + + if np.max(abs(pi - newpi)) < criterion and \ + np.max(abs(A - newA)) < criterion and \ + np.max(abs(B - newB)) < criterion: + done = 1; + + A[:],B[:],pi[:] = newA,newB,newpi + + self.A[:] = newA + self.B[:] = newB + self.pi[:] = newpi + self.gamma = gamma + + +if __name__ == '__main__': + np.set_printoptions(precision=3,suppress=True) + if True: + #'Two states, three possible observations in a state' + + hmm = HMM() + hmm.pi = np.array([0.5, 0.5]) + hmm.A = np.array([[0.85, 0.15], + [0.12, 0.88]]) + hmm.B = np.array([[0.8, 0.1, 0.1], + [0.0, 0.0, 1]]) + + hmmguess = HMM() + hmmguess.pi = np.array([0.5, 0.5]) + hmmguess.A = np.array([[0.5, 0.5], + [0.5, 0.5]]) + hmmguess.B = np.array([[0.3, 0.3, 0.4], + [0.2, 0.5, 0.3]]) + else: + #three states + print "Error....this example with three states is not working correctly." + hmm = HMM() + hmm.pi = np.array([0.1, 0.4, 0.5]) + hmm.A = np.array([[0.7, 0.2, 0.1], + [0.1, 0.6, 0.3], + [0.4, 0.2, 0.4]]) + hmm.B = np.array([[0.5, 0.3, 0.2], + [0.1, 0.6, 0.3], + [0.0, 0.3, 0.7]]) + hmmguess = HMM() + hmmguess.pi = np.array([0.333, 0.333, 0.333]) + hmmguess.A = np.array([[0.3333, 0.3333, 0.3333], + [0.3333, 0.3333, 0.3333], + [0.3333, 0.3333, 0.3333]]) + hmmguess.B = np.array([[0.3, 0.3, 0.4], + [0.2, 0.5, 0.3], + [0.3, 0.3, 0.4]]) + + o,s = hmm.simulate(1000) + hmmguess.train(o,0.0001,graphics=True) + + print 'Actual probabilities\n',hmm.pi + print 'Estimated initial probabilities\n',hmmguess.pi + + print 'Actual state transition probabililities\n',hmm.A + print 'Estimated state transition probabililities\n',hmmguess.A + + print 'Actual observation probabililities\n',hmm.B + print 'Estimated observation probabililities\n',hmmguess.B + + plt.subplot(2,1,2) + plt.cla() + plt.plot(np.vstack((s*0.9+0.05,hmmguess.gamma[1,:])).T,'-o',alpha=0.7) + plt.legend(('True State','Guessed Probability of State=1')) + plt.ylim(-0.1,1.1) + plt.xlabel('Time') + plt.draw() + + + + -- cgit v1.2.3-70-g09d2