关键词: 隐马尔科夫模型,HMM,维特比,前向算法,Viterbi
对于算法,果然是要多练啊,上午花了俩小时算是基本对隐马尔科夫模型中的前向算法和维特比算法有认识了。
后向算法也不难,后续我再补充进来,而Baum-Welch方法的参数估计,是使用的前向和后向算法的中间结果,进行迭代计算的。算法推理使用EM,我还没搞很清楚,最后如果只进行计算倒不算很难。先歇息一下,后续一定要补上。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# 隐马尔科夫模型 import random random.seed(1) import numpy as np states = ('Rainy', 'Sunny') # O = ['shop', 'clean', 'walk'] O =['shop', 'clean', 'walk', 'shop'] # O = ['shop', 'clean', 'walk', 'walk'] # O = ['clean', 'shop'] start_probability = {'Rainy': 0.6, 'Sunny': 0.4} transition_probability = { 'Rainy': {'Rainy': 0.7, 'Sunny': 0.3}, 'Sunny': {'Rainy': 0.4, 'Sunny': 0.6}, } emission_probability = { 'Rainy': {'walk': 0.1, 'shop': 0.4, 'clean': 0.5}, 'Sunny': {'walk': 0.6, 'shop': 0.3, 'clean': 0.1}, } # 问题1: 给定 λ=(初始状态概率、转移矩阵、发射矩阵),并给出观测序列O, 求p(O|λ) # 1、根据初始状态概率,产生初始状态 def get_start_state(start_probability): if sum(start_probability.values()) != 1.0: raise "start_probability概率和不为1" p = random.random() accumulate = 0 for key in start_probability.keys(): accumulate += start_probability[key] if accumulate > p: return key # 给定序列长度和状态集合,求出所有组合 def get_all_combines(states=('Rainy', 'Sunny'), length=4): state_length = len(states) result = [] # 标记该列该使用哪一个状态 current = np.zeros(length).astype(int) for _ in range(state_length**length): seq = [] for i in range(length): seq.append(states[current[i]]) result.append(seq) current[0] += 1 for idx in range(length-1): if current[idx] == state_length: current[idx] = 0 current[idx + 1] += 1 return result # 2、给定观测序列 O = ['shop', 'clean', 'walk', 'walk'], 求 p(O|λ) # 1) 穷举计算法 def compute_by_exhaustion(): # start_state = get_start_state(start_probability) # print('start_state:', start_state) # 所有可能的隐藏序列 Is = get_all_combines(states, len(O)) p_O_by_λ = 0 for state_seq in Is: # 计算该隐藏状态序列的可能性 start_state = state_seq[0] p_S = start_probability[start_state] pre_state = start_state for state in state_seq[1:]: p_S *= transition_probability[pre_state][state] pre_state = state # 计算该隐藏状态序列条件下,观察序列可能性 p_O_by_S = 1 for i in range(len(O)): state = state_seq[i] observation = O[i] p_O_by_S *= emission_probability[state][observation] p_O_by_λ += p_S*p_O_by_S return p_O_by_λ # 前向算法计算 def compute_by_forward(): if len(O) <= 1: raise "序列过短" # 初始化 alpha1 # alpha = {'Rainy': 0, 'Sunny': 0} alpha_pre = dict(zip(states, range(len(states)))) for key in alpha_pre.keys(): alpha_pre[key] = start_probability[key]*emission_probability[key][O[0]] print(alpha_pre) # 迭代计算 for seq in O[1:]: alpha = dict(zip(states, range(len(states)))) for alpha_state in alpha.keys(): b = emission_probability[alpha_state][seq] # 对所有状态求和 sum_a = 0 for state in states: sum_a += alpha_pre[state]*transition_probability[state][alpha_state] alpha[alpha_state] = sum_a*b alpha_pre = alpha print(alpha_pre) # print(alpha_pre) # print(sum(alpha_pre.values())) return sum(alpha_pre.values()) # 后向算法计算 def compute_by_backward(): beta_pre = dict(zip(states, range(len(states)))) for key in beta_pre.keys(): beta_pre[key] = 1.0 seqs_rev = O[1:] seqs_rev.reverse() for seq in seqs_rev: print(seq) beta = dict(zip(states, range(len(states)))) for beta_state in beta.keys(): # 求和 sum = 0 for state in states: sum += transition_probability[beta_state][state]*emission_probability[state][seq]*beta_pre[state] beta[beta_state] = sum beta_pre = beta result = 0 for state in beta_pre.keys(): result += start_probability[state]*emission_probability[state][O[0]]*beta_pre[state] print(beta_pre) print(result) # 维特比算法 # 2、给定 λ=(初始状态概率、转移矩阵、发射矩阵),并给出观测序列O, 求最有可能的隐含序列 def viterbi(): # 路径概率表 V[时间][隐状态] = 概率 V = [{}] # 最优路径表 P P[隐含状态] => 序列[隐状态,隐状态,] P = {} # 初始化 alpha_pre = dict(zip(states, range(len(states)))) for state in states: P[state] = [state] alpha_pre[state] = start_probability[state] * emission_probability[state][O[0]] V[0] = alpha_pre # 递归求解 for seq in O[1:]: V.append(dict(zip(states, range(len(states))))) # 防止在P上修改出现问题 tmp_path = {} # 遍历当前每一个隐含状态,计算从前一个状态到当前状态的概率,取最大 for cur_state in states: (prob, max_state) = max([ # 遍历上一个状态到当前状态的所有路径,求出最大的 (V[-2][pre_state]*transition_probability[pre_state][cur_state]*emission_probability[cur_state][seq], pre_state) for pre_state in states]) V[-1][cur_state] = prob # P[cur_state]当前状态最优路径 = 到达当前状态的最大概率状态的最优路径 + 当前状态 tmp_path[cur_state] = P[max_state] + [cur_state] P = tmp_path # 找到最后一个时间,最大概率的隐含状态 (prob, state) = max([(V[-1][y], y) for y in states]) return prob, P[state] # 3、 给出观察序列, 进行模型参数估计。 EM if __name__ == "__main__": # print(get_all_combines()) # print(compute_by_exhaustion()) print(compute_by_forward()) print(compute_by_backward()) # print(viterbi()) |
2017.5.15 新增
今天又花了4个小时,把代码改成矩阵计算的方式。包括前向、后向。并实现了有标注的模型参数估计。
对于Baum-Welch算法的无标注估计,我也是能力有限,再找机会去实现吧。
有标注的参数估计,已经可以把hmm模型给学习出来了,在此基础上再进行分词什么的已经是可以做的工作了,后续有时间我会实现一下,难度已经不大了。我找到的网上分词语料 https://pan.baidu.com/s/1geZkMif 。
另外说一点:关于前向、后向的矩阵递归计算,我对着公式写的,但是中间波折很多,我也比较难以直观的从公式直接转换到矩阵计算代码,毕竟矩阵计算可以忽略求和等。这块我很乱,虽然写出来了,但思路不够清晰。对于有理论公式到代码实现,我还需要多练习,多学习。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# 隐马尔科夫模型 矩阵方式计算 import numpy as np class Hmm(object): # 观察序列 [1, 2, 0] observe_seqs = None # 序列长度 N = None # 隐含状态个数 M = None # 不同观测个数 U = None # alpha 存储前向计算时 各时刻的各隐含状态累积概率 # N * M N序列长度,M隐含状态个数 alpha = None # beta 存储后向计算 各状态累积概率 beta = None # ['shop', 'clean', 'walk', 'shop'] = [1, 2, 0, 1] def __init__(self, observe_seqs=[1, 2, 0, 1]): self.start_prob = np.mat( [.6, .4] ) self.trans_prob = np.mat( [ [.7, .3], [.4, .6] ] ) self.emmission_prob = np.mat( [ [.1, .4, .5], [.6, .3, .1] ] ) self.states = { 'Rainy': 0, 'Sunny': 1 } self.M = 2 self.observes = { 'walk': 0, 'shop': 1, 'clean': 2 } self.U = 3 self.observe_seqs = observe_seqs self.N = len(self.observe_seqs) # 检测序列长度 if self.N <= 1: raise "序列过短" def compute_by_forward(self): # 初始化前向计算 概率矩阵 self.alpha = np.mat(np.zeros((self.N, self.M))) # 递归计算 for t, observe_seq in enumerate(self.observe_seqs): # 计算初值 if t == 0: # 计算alpha初值 # 1*2 * (2*1).T = 1*2 self.alpha[t] = np.multiply(self.start_prob, self.emmission_prob[:, observe_seq].T) else: # 递归计算 # self.alpha[t - 1] * self.trans_prob ==> 1*2 * 2*2 = 1*2 进行了各个隐含状态转换的求和 self.alpha[t] = np.multiply(self.alpha[t - 1] * self.trans_prob, self.emmission_prob[:, observe_seq].T) return np.sum(self.alpha[-1]) # 后向计算 def compute_by_backward(self): self.beta = np.mat(np.zeros((self.N, self.M))) observe_seqs_rev = self.observe_seqs[:] observe_seqs_rev.reverse() # 设置初值 beta T self.beta[-1] = 1. for t, observe_seq in enumerate(observe_seqs_rev): # 修改时间序 t = self.N - 2 - t # 最后一个序列不计算 if t < 0: break # 问题就在这! # 我最终根据数据写出来了, 但我不知道怎么比较容易的写到这一步。 # np.multiply(self.emmission_prob[:, observe_seq], self.beta[t+1].T) ==> 状态对应的乘其累积概率. 2*1 # 此处矩阵乘法产生 累加 self.beta[t] = (self.trans_prob * np.multiply(self.emmission_prob[:, observe_seq], self.beta[t+1].T)).T return np.sum( np.multiply( np.multiply(self.start_prob, self.emmission_prob[:, self.observe_seqs[0]].T) , self.beta[0] ) ) # 有标注的参数学习 # 后续可以用这个学一下分词。 def params_compute(self): # 需要有隐含状态标注的语料, 学习hmm的参数 # ['shop', 'clean', 'walk', 'shop'] # Rainy , 'Sunny # 标注语料, 多个序列, (观测, 隐状态) corpus = [ [('shop', 'Rainy'), ('clean', 'Rainy'), ('shop', 'Sunny'), ('walk', 'Rainy'), ('shop', 'Sunny'), ('clean', 'Sunny')], [('walk', 'Sunny'), ('shop', 'Rainy'), ('shop', 'Rainy'), ('walk', 'Sunny'), ('clean', 'Sunny'), ('walk', 'Rainy')], [('clean', 'Rainy'), ('clean', 'Sunny'), ('walk', 'Sunny'), ('shop', 'Rainy'), ('shop', 'Rainy'), ('clean', 'Sunny')], [('clean', 'Rainy'), ('walk', 'Sunny'), ('clean', 'Sunny'), ('shop', 'Sunny'), ('walk', 'Sunny'), ('clean', 'Rainy')], [('shop', 'Rainy'), ('clean', 'Rainy'), ('walk', 'Rainy'), ('clean', 'Rainy'), ('shop', 'Rainy'), ('walk', 'Sunny')], ] # 状态转移计数 A = np.mat(np.zeros((self.M, self.M))) # 隐状态到观测计数 B = np.mat(np.zeros((self.M, self.U))) # 初始状态计数 PI = np.mat(np.zeros(self.M)) for seqs in corpus: pre_seq = None for seq in seqs: state = seq[1] observe = seq[0] B[self.states[state], self.observes[observe]] += 1 # 不为None时才能计算A值 if pre_seq is not None: pre_state = pre_seq[1] A[self.states[pre_state], self.states[state]] += 1 else: PI[0, self.states[state]] += 1 pre_seq = seq trans_prob = A / np.sum(A, axis=1) emmission_prob = B / np.sum(B, axis=1) start_prob = PI / np.sum(PI) print(trans_prob) print(emmission_prob) print(start_prob) return start_prob, trans_prob, emmission_prob # Baum-Welch 无标注学习 # 该算法比较难实现 def Baum_Welch(self): # 1、根据 计算 γ gamma ξ xi # gamma = np.mat(np.zeros((self.N, self.M))) A = np.multiply(self.alpha, self.beta) b = np.sum(A, axis=1) gamma = A / b # 这个不知道该怎么用矩阵计算 xi = np.mat(np.zeros((self.N, self.M, self.M))) print(gamma) if __name__ == '__main__': hmm = Hmm() print(hmm.compute_by_forward()) print(hmm.compute_by_backward()) print(hmm.alpha) print(hmm.beta) hmm.params_compute() |