Mercurial > hg > python
comparison hmm/hmm.py @ 3:26d9c0308fcf
updated/added from ecclerig version
author | Henry S. Thompson <ht@inf.ed.ac.uk> |
---|---|
date | Mon, 09 Mar 2020 17:35:28 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
2:e07789816ca5 | 3:26d9c0308fcf |
---|---|
1 # Natural Language Toolkit: Hidden Markov Model | |
2 # | |
3 # Copyright (C) 2001-2015 NLTK Project | |
4 # Author: Trevor Cohn <tacohn@csse.unimelb.edu.au> | |
5 # Philip Blunsom <pcbl@csse.unimelb.edu.au> | |
6 # Tiago Tresoldi <tiago@tresoldi.pro.br> (fixes) | |
7 # Steven Bird <stevenbird1@gmail.com> (fixes) | |
8 # Joseph Frazee <jfrazee@mail.utexas.edu> (fixes) | |
9 # Steven Xu <xxu@student.unimelb.edu.au> (fixes) | |
10 # URL: <http://nltk.org/> | |
11 # For license information, see LICENSE.TXT | |
12 | |
13 """ | |
14 Hidden Markov Models (HMMs) largely used to assign the correct label sequence | |
15 to sequential data or assess the probability of a given label and data | |
16 sequence. These models are finite state machines characterised by a number of | |
17 states, transitions between these states, and output symbols emitted while in | |
18 each state. The HMM is an extension to the Markov chain, where each state | |
19 corresponds deterministically to a given event. In the HMM the observation is | |
20 a probabilistic function of the state. HMMs share the Markov chain's | |
21 assumption, being that the probability of transition from one state to another | |
22 only depends on the current state - i.e. the series of states that led to the | |
23 current state are not used. They are also time invariant. | |
24 | |
25 The HMM is a directed graph, with probability weighted edges (representing the | |
26 probability of a transition between the source and sink states) where each | |
27 vertex emits an output symbol when entered. The symbol (or observation) is | |
28 non-deterministically generated. For this reason, knowing that a sequence of | |
29 output observations was generated by a given HMM does not mean that the | |
30 corresponding sequence of states (and what the current state is) is known. | |
31 This is the 'hidden' in the hidden markov model. | |
32 | |
33 Formally, a HMM can be characterised by: | |
34 | |
35 - the output observation alphabet. This is the set of symbols which may be | |
36 observed as output of the system. | |
37 - the set of states. | |
38 - the transition probabilities *a_{ij} = P(s_t = j | s_{t-1} = i)*. These | |
39 represent the probability of transition to each state from a given state. | |
40 - the output probability matrix *b_i(k) = P(X_t = o_k | s_t = i)*. These | |
41 represent the probability of observing each symbol in a given state. | |
42 - the initial state distribution. This gives the probability of starting | |
43 in each state. | |
44 | |
45 To ground this discussion, take a common NLP application, part-of-speech (POS) | |
46 tagging. An HMM is desirable for this task as the highest probability tag | |
47 sequence can be calculated for a given sequence of word forms. This differs | |
48 from other tagging techniques which often tag each word individually, seeking | |
49 to optimise each individual tagging greedily without regard to the optimal | |
50 combination of tags for a larger unit, such as a sentence. The HMM does this | |
51 with the Viterbi algorithm, which efficiently computes the optimal path | |
52 through the graph given the sequence of words forms. | |
53 | |
54 In POS tagging the states usually have a 1:1 correspondence with the tag | |
55 alphabet - i.e. each state represents a single tag. The output observation | |
56 alphabet is the set of word forms (the lexicon), and the remaining three | |
57 parameters are derived by a training regime. With this information the | |
58 probability of a given sentence can be easily derived, by simply summing the | |
59 probability of each distinct path through the model. Similarly, the highest | |
60 probability tagging sequence can be derived with the Viterbi algorithm, | |
61 yielding a state sequence which can be mapped into a tag sequence. | |
62 | |
63 This discussion assumes that the HMM has been trained. This is probably the | |
64 most difficult task with the model, and requires either MLE estimates of the | |
65 parameters or unsupervised learning using the Baum-Welch algorithm, a variant | |
66 of EM. | |
67 | |
68 For more information, please consult the source code for this module, | |
69 which includes extensive demonstration code. | |
70 """ | |
71 from __future__ import print_function, unicode_literals, division | |
72 | |
73 import re | |
74 import itertools | |
75 | |
76 try: | |
77 import numpy as np | |
78 except ImportError: | |
79 pass | |
80 | |
81 from nltk.probability import (FreqDist, ConditionalFreqDist, | |
82 ConditionalProbDist, DictionaryProbDist, | |
83 DictionaryConditionalProbDist, | |
84 LidstoneProbDist, MutableProbDist, | |
85 MLEProbDist, RandomProbDist) | |
86 from nltk.metrics import accuracy | |
87 from nltk.util import LazyMap, unique_list | |
88 from nltk.compat import python_2_unicode_compatible, izip, imap | |
89 from nltk.tag.api import TaggerI | |
90 | |
91 | |
92 _TEXT = 0 # index of text in a tuple | |
93 _TAG = 1 # index of tag in a tuple | |
94 | |
95 def _identity(labeled_symbols): | |
96 return labeled_symbols | |
97 | |
98 @python_2_unicode_compatible | |
99 class HiddenMarkovModelTagger(TaggerI): | |
100 """ | |
101 Hidden Markov model class, a generative model for labelling sequence data. | |
102 These models define the joint probability of a sequence of symbols and | |
103 their labels (state transitions) as the product of the starting state | |
104 probability, the probability of each state transition, and the probability | |
105 of each observation being generated from each state. This is described in | |
106 more detail in the module documentation. | |
107 | |
108 This implementation is based on the HMM description in Chapter 8, Huang, | |
109 Acero and Hon, Spoken Language Processing and includes an extension for | |
110 training shallow HMM parsers or specialized HMMs as in Molina et. | |
111 al, 2002. A specialized HMM modifies training data by applying a | |
112 specialization function to create a new training set that is more | |
113 appropriate for sequential tagging with an HMM. A typical use case is | |
114 chunking. | |
115 | |
116 :param symbols: the set of output symbols (alphabet) | |
117 :type symbols: seq of any | |
118 :param states: a set of states representing state space | |
119 :type states: seq of any | |
120 :param transitions: transition probabilities; Pr(s_i | s_j) is the | |
121 probability of transition from state i given the model is in | |
122 state_j | |
123 :type transitions: ConditionalProbDistI | |
124 :param outputs: output probabilities; Pr(o_k | s_i) is the probability | |
125 of emitting symbol k when entering state i | |
126 :type outputs: ConditionalProbDistI | |
127 :param priors: initial state distribution; Pr(s_i) is the probability | |
128 of starting in state i | |
129 :type priors: ProbDistI | |
130 :param transform: an optional function for transforming training | |
131 instances, defaults to the identity function. | |
132 :type transform: callable | |
133 """ | |
134 def __init__(self, symbols, states, transitions, outputs, priors, | |
135 transform=_identity): | |
136 self._symbols = unique_list(symbols) | |
137 self._states = unique_list(states) | |
138 self._transitions = transitions | |
139 self._outputs = outputs | |
140 self._priors = priors | |
141 self._cache = None | |
142 self._transform = transform | |
143 | |
144 @classmethod | |
145 def _train(cls, labeled_sequence, test_sequence=None, | |
146 unlabeled_sequence=None, transform=_identity, | |
147 estimator=None, **kwargs): | |
148 | |
149 if estimator is None: | |
150 def estimator(fd, bins): | |
151 return LidstoneProbDist(fd, 0.1, bins) | |
152 | |
153 labeled_sequence = LazyMap(transform, labeled_sequence) | |
154 symbols = unique_list(word for sent in labeled_sequence | |
155 for word, tag in sent) | |
156 tag_set = unique_list(tag for sent in labeled_sequence | |
157 for word, tag in sent) | |
158 | |
159 trainer = HiddenMarkovModelTrainer(tag_set, symbols) | |
160 hmm = trainer.train_supervised(labeled_sequence, estimator=estimator) | |
161 hmm = cls(hmm._symbols, hmm._states, hmm._transitions, hmm._outputs, | |
162 hmm._priors, transform=transform) | |
163 | |
164 if test_sequence: | |
165 hmm.test(test_sequence, verbose=kwargs.get('verbose', False)) | |
166 | |
167 if unlabeled_sequence: | |
168 max_iterations = kwargs.get('max_iterations', 5) | |
169 hmm = trainer.train_unsupervised(unlabeled_sequence, model=hmm, | |
170 max_iterations=max_iterations) | |
171 if test_sequence: | |
172 hmm.test(test_sequence, verbose=kwargs.get('verbose', False)) | |
173 | |
174 return hmm | |
175 | |
176 @classmethod | |
177 def train(cls, labeled_sequence, test_sequence=None, | |
178 unlabeled_sequence=None, **kwargs): | |
179 """ | |
180 Train a new HiddenMarkovModelTagger using the given labeled and | |
181 unlabeled training instances. Testing will be performed if test | |
182 instances are provided. | |
183 | |
184 :return: a hidden markov model tagger | |
185 :rtype: HiddenMarkovModelTagger | |
186 :param labeled_sequence: a sequence of labeled training instances, | |
187 i.e. a list of sentences represented as tuples | |
188 :type labeled_sequence: list(list) | |
189 :param test_sequence: a sequence of labeled test instances | |
190 :type test_sequence: list(list) | |
191 :param unlabeled_sequence: a sequence of unlabeled training instances, | |
192 i.e. a list of sentences represented as words | |
193 :type unlabeled_sequence: list(list) | |
194 :param transform: an optional function for transforming training | |
195 instances, defaults to the identity function, see ``transform()`` | |
196 :type transform: function | |
197 :param estimator: an optional function or class that maps a | |
198 condition's frequency distribution to its probability | |
199 distribution, defaults to a Lidstone distribution with gamma = 0.1 | |
200 :type estimator: class or function | |
201 :param verbose: boolean flag indicating whether training should be | |
202 verbose or include printed output | |
203 :type verbose: bool | |
204 :param max_iterations: number of Baum-Welch interations to perform | |
205 :type max_iterations: int | |
206 """ | |
207 return cls._train(labeled_sequence, test_sequence, | |
208 unlabeled_sequence, **kwargs) | |
209 | |
210 def probability(self, sequence): | |
211 """ | |
212 Returns the probability of the given symbol sequence. If the sequence | |
213 is labelled, then returns the joint probability of the symbol, state | |
214 sequence. Otherwise, uses the forward algorithm to find the | |
215 probability over all label sequences. | |
216 | |
217 :return: the probability of the sequence | |
218 :rtype: float | |
219 :param sequence: the sequence of symbols which must contain the TEXT | |
220 property, and optionally the TAG property | |
221 :type sequence: Token | |
222 """ | |
223 return 2**(self.log_probability(self._transform(sequence))) | |
224 | |
225 def log_probability(self, sequence): | |
226 """ | |
227 Returns the log-probability of the given symbol sequence. If the | |
228 sequence is labelled, then returns the joint log-probability of the | |
229 symbol, state sequence. Otherwise, uses the forward algorithm to find | |
230 the log-probability over all label sequences. | |
231 | |
232 :return: the log-probability of the sequence | |
233 :rtype: float | |
234 :param sequence: the sequence of symbols which must contain the TEXT | |
235 property, and optionally the TAG property | |
236 :type sequence: Token | |
237 """ | |
238 sequence = self._transform(sequence) | |
239 | |
240 T = len(sequence) | |
241 | |
242 if T > 0 and sequence[0][_TAG]: | |
243 last_state = sequence[0][_TAG] | |
244 p = self._priors.logprob(last_state) + \ | |
245 self._output_logprob(last_state, sequence[0][_TEXT]) | |
246 for t in range(1, T): | |
247 state = sequence[t][_TAG] | |
248 p += self._transitions[last_state].logprob(state) + \ | |
249 self._output_logprob(state, sequence[t][_TEXT]) | |
250 last_state = state | |
251 return p | |
252 else: | |
253 alpha = self._forward_probability(sequence) | |
254 p = logsumexp2(alpha[T-1]) | |
255 return p | |
256 | |
257 def tag(self, unlabeled_sequence): | |
258 """ | |
259 Tags the sequence with the highest probability state sequence. This | |
260 uses the best_path method to find the Viterbi path. | |
261 | |
262 :return: a labelled sequence of symbols | |
263 :rtype: list | |
264 :param unlabeled_sequence: the sequence of unlabeled symbols | |
265 :type unlabeled_sequence: list | |
266 """ | |
267 unlabeled_sequence = self._transform(unlabeled_sequence) | |
268 return self._tag(unlabeled_sequence) | |
269 | |
270 def _tag(self, unlabeled_sequence): | |
271 path = self._best_path(unlabeled_sequence) | |
272 return list(izip(unlabeled_sequence, path)) | |
273 | |
274 def _output_logprob(self, state, symbol): | |
275 """ | |
276 :return: the log probability of the symbol being observed in the given | |
277 state | |
278 :rtype: float | |
279 """ | |
280 return self._outputs[state].logprob(symbol) | |
281 | |
282 def _create_cache(self): | |
283 """ | |
284 The cache is a tuple (P, O, X, S) where: | |
285 | |
286 - S maps symbols to integers. I.e., it is the inverse | |
287 mapping from self._symbols; for each symbol s in | |
288 self._symbols, the following is true:: | |
289 | |
290 self._symbols[S[s]] == s | |
291 | |
292 - O is the log output probabilities:: | |
293 | |
294 O[i,k] = log( P(token[t]=sym[k]|tag[t]=state[i]) ) | |
295 | |
296 - X is the log transition probabilities:: | |
297 | |
298 X[i,j] = log( P(tag[t]=state[j]|tag[t-1]=state[i]) ) | |
299 | |
300 - P is the log prior probabilities:: | |
301 | |
302 P[i] = log( P(tag[0]=state[i]) ) | |
303 """ | |
304 if not self._cache: | |
305 N = len(self._states) | |
306 M = len(self._symbols) | |
307 P = np.zeros(N, np.float32) | |
308 X = np.zeros((N, N), np.float32) | |
309 O = np.zeros((N, M), np.float32) | |
310 for i in range(N): | |
311 si = self._states[i] | |
312 P[i] = self._priors.logprob(si) | |
313 for j in range(N): | |
314 X[i, j] = self._transitions[si].logprob(self._states[j]) | |
315 for k in range(M): | |
316 O[i, k] = self._output_logprob(si, self._symbols[k]) | |
317 S = {} | |
318 for k in range(M): | |
319 S[self._symbols[k]] = k | |
320 self._cache = (P, O, X, S) | |
321 | |
322 def _update_cache(self, symbols): | |
323 # add new symbols to the symbol table and repopulate the output | |
324 # probabilities and symbol table mapping | |
325 if symbols: | |
326 self._create_cache() | |
327 P, O, X, S = self._cache | |
328 for symbol in symbols: | |
329 if symbol not in self._symbols: | |
330 self._cache = None | |
331 self._symbols.append(symbol) | |
332 # don't bother with the work if there aren't any new symbols | |
333 if not self._cache: | |
334 N = len(self._states) | |
335 M = len(self._symbols) | |
336 Q = O.shape[1] | |
337 # add new columns to the output probability table without | |
338 # destroying the old probabilities | |
339 O = np.hstack([O, np.zeros((N, M - Q), np.float32)]) | |
340 for i in range(N): | |
341 si = self._states[i] | |
342 # only calculate probabilities for new symbols | |
343 for k in range(Q, M): | |
344 O[i, k] = self._output_logprob(si, self._symbols[k]) | |
345 # only create symbol mappings for new symbols | |
346 for k in range(Q, M): | |
347 S[self._symbols[k]] = k | |
348 self._cache = (P, O, X, S) | |
349 | |
350 def reset_cache(self): | |
351 self._cache = None | |
352 | |
353 def best_path(self, unlabeled_sequence): | |
354 """ | |
355 Returns the state sequence of the optimal (most probable) path through | |
356 the HMM. Uses the Viterbi algorithm to calculate this part by dynamic | |
357 programming. | |
358 | |
359 :return: the state sequence | |
360 :rtype: sequence of any | |
361 :param unlabeled_sequence: the sequence of unlabeled symbols | |
362 :type unlabeled_sequence: list | |
363 """ | |
364 unlabeled_sequence = self._transform(unlabeled_sequence) | |
365 return self._best_path(unlabeled_sequence) | |
366 | |
367 def _best_path(self, unlabeled_sequence): | |
368 T = len(unlabeled_sequence) | |
369 N = len(self._states) | |
370 self._create_cache() | |
371 self._update_cache(unlabeled_sequence) | |
372 P, O, X, S = self._cache | |
373 | |
374 V = np.zeros((T, N), np.float32) | |
375 B = -np.ones((T, N), np.int) | |
376 | |
377 V[0] = P + O[:, S[unlabeled_sequence[0]]] | |
378 for t in range(1, T): | |
379 for j in range(N): | |
380 vs = V[t-1, :] + X[:, j] | |
381 best = np.argmax(vs) | |
382 V[t, j] = vs[best] + O[j, S[unlabeled_sequence[t]]] | |
383 B[t, j] = best | |
384 | |
385 current = np.argmax(V[T-1,:]) | |
386 sequence = [current] | |
387 for t in range(T-1, 0, -1): | |
388 last = B[t, current] | |
389 sequence.append(last) | |
390 current = last | |
391 | |
392 sequence.reverse() | |
393 return list(map(self._states.__getitem__, sequence)) | |
394 | |
395 def best_path_simple(self, unlabeled_sequence): | |
396 """ | |
397 Returns the state sequence of the optimal (most probable) path through | |
398 the HMM. Uses the Viterbi algorithm to calculate this part by dynamic | |
399 programming. This uses a simple, direct method, and is included for | |
400 teaching purposes. | |
401 | |
402 :return: the state sequence | |
403 :rtype: sequence of any | |
404 :param unlabeled_sequence: the sequence of unlabeled symbols | |
405 :type unlabeled_sequence: list | |
406 """ | |
407 unlabeled_sequence = self._transform(unlabeled_sequence) | |
408 return self._best_path_simple(unlabeled_sequence) | |
409 | |
410 def _best_path_simple(self, unlabeled_sequence): | |
411 T = len(unlabeled_sequence) | |
412 N = len(self._states) | |
413 V = np.zeros((T, N), np.float64) | |
414 B = {} | |
415 | |
416 # find the starting log probabilities for each state | |
417 symbol = unlabeled_sequence[0] | |
418 for i, state in enumerate(self._states): | |
419 V[0, i] = self._priors.logprob(state) + \ | |
420 self._output_logprob(state, symbol) | |
421 B[0, state] = None | |
422 | |
423 # find the maximum log probabilities for reaching each state at time t | |
424 for t in range(1, T): | |
425 symbol = unlabeled_sequence[t] | |
426 for j in range(N): | |
427 sj = self._states[j] | |
428 best = None | |
429 for i in range(N): | |
430 si = self._states[i] | |
431 va = V[t-1, i] + self._transitions[si].logprob(sj) | |
432 if not best or va > best[0]: | |
433 best = (va, si) | |
434 V[t, j] = best[0] + self._output_logprob(sj, symbol) | |
435 B[t, sj] = best[1] | |
436 | |
437 # find the highest probability final state | |
438 best = None | |
439 for i in range(N): | |
440 val = V[T-1, i] | |
441 if not best or val > best[0]: | |
442 best = (val, self._states[i]) | |
443 | |
444 # traverse the back-pointers B to find the state sequence | |
445 current = best[1] | |
446 sequence = [current] | |
447 for t in range(T-1, 0, -1): | |
448 last = B[t, current] | |
449 sequence.append(last) | |
450 current = last | |
451 | |
452 sequence.reverse() | |
453 return sequence | |
454 | |
455 def random_sample(self, rng, length): | |
456 """ | |
457 Randomly sample the HMM to generate a sentence of a given length. This | |
458 samples the prior distribution then the observation distribution and | |
459 transition distribution for each subsequent observation and state. | |
460 This will mostly generate unintelligible garbage, but can provide some | |
461 amusement. | |
462 | |
463 :return: the randomly created state/observation sequence, | |
464 generated according to the HMM's probability | |
465 distributions. The SUBTOKENS have TEXT and TAG | |
466 properties containing the observation and state | |
467 respectively. | |
468 :rtype: list | |
469 :param rng: random number generator | |
470 :type rng: Random (or any object with a random() method) | |
471 :param length: desired output length | |
472 :type length: int | |
473 """ | |
474 | |
475 # sample the starting state and symbol prob dists | |
476 tokens = [] | |
477 state = self._sample_probdist(self._priors, rng.random(), self._states) | |
478 symbol = self._sample_probdist(self._outputs[state], | |
479 rng.random(), self._symbols) | |
480 tokens.append((symbol, state)) | |
481 | |
482 for i in range(1, length): | |
483 # sample the state transition and symbol prob dists | |
484 state = self._sample_probdist(self._transitions[state], | |
485 rng.random(), self._states) | |
486 symbol = self._sample_probdist(self._outputs[state], | |
487 rng.random(), self._symbols) | |
488 tokens.append((symbol, state)) | |
489 | |
490 return tokens | |
491 | |
492 def _sample_probdist(self, probdist, p, samples): | |
493 cum_p = 0 | |
494 for sample in samples: | |
495 add_p = probdist.prob(sample) | |
496 if cum_p <= p <= cum_p + add_p: | |
497 return sample | |
498 cum_p += add_p | |
499 raise Exception('Invalid probability distribution - ' | |
500 'does not sum to one') | |
501 | |
502 def entropy(self, unlabeled_sequence): | |
503 """ | |
504 Returns the entropy over labellings of the given sequence. This is | |
505 given by:: | |
506 | |
507 H(O) = - sum_S Pr(S | O) log Pr(S | O) | |
508 | |
509 where the summation ranges over all state sequences, S. Let | |
510 *Z = Pr(O) = sum_S Pr(S, O)}* where the summation ranges over all state | |
511 sequences and O is the observation sequence. As such the entropy can | |
512 be re-expressed as:: | |
513 | |
514 H = - sum_S Pr(S | O) log [ Pr(S, O) / Z ] | |
515 = log Z - sum_S Pr(S | O) log Pr(S, 0) | |
516 = log Z - sum_S Pr(S | O) [ log Pr(S_0) + sum_t Pr(S_t | S_{t-1}) + sum_t Pr(O_t | S_t) ] | |
517 | |
518 The order of summation for the log terms can be flipped, allowing | |
519 dynamic programming to be used to calculate the entropy. Specifically, | |
520 we use the forward and backward probabilities (alpha, beta) giving:: | |
521 | |
522 H = log Z - sum_s0 alpha_0(s0) beta_0(s0) / Z * log Pr(s0) | |
523 + sum_t,si,sj alpha_t(si) Pr(sj | si) Pr(O_t+1 | sj) beta_t(sj) / Z * log Pr(sj | si) | |
524 + sum_t,st alpha_t(st) beta_t(st) / Z * log Pr(O_t | st) | |
525 | |
526 This simply uses alpha and beta to find the probabilities of partial | |
527 sequences, constrained to include the given state(s) at some point in | |
528 time. | |
529 """ | |
530 unlabeled_sequence = self._transform(unlabeled_sequence) | |
531 | |
532 T = len(unlabeled_sequence) | |
533 N = len(self._states) | |
534 | |
535 alpha = self._forward_probability(unlabeled_sequence) | |
536 beta = self._backward_probability(unlabeled_sequence) | |
537 normalisation = logsumexp2(alpha[T-1]) | |
538 | |
539 entropy = normalisation | |
540 | |
541 # starting state, t = 0 | |
542 for i, state in enumerate(self._states): | |
543 p = 2**(alpha[0, i] + beta[0, i] - normalisation) | |
544 entropy -= p * self._priors.logprob(state) | |
545 #print 'p(s_0 = %s) =' % state, p | |
546 | |
547 # state transitions | |
548 for t0 in range(T - 1): | |
549 t1 = t0 + 1 | |
550 for i0, s0 in enumerate(self._states): | |
551 for i1, s1 in enumerate(self._states): | |
552 p = 2**(alpha[t0, i0] + self._transitions[s0].logprob(s1) + | |
553 self._outputs[s1].logprob( | |
554 unlabeled_sequence[t1][_TEXT]) + | |
555 beta[t1, i1] - normalisation) | |
556 entropy -= p * self._transitions[s0].logprob(s1) | |
557 #print 'p(s_%d = %s, s_%d = %s) =' % (t0, s0, t1, s1), p | |
558 | |
559 # symbol emissions | |
560 for t in range(T): | |
561 for i, state in enumerate(self._states): | |
562 p = 2**(alpha[t, i] + beta[t, i] - normalisation) | |
563 entropy -= p * self._outputs[state].logprob( | |
564 unlabeled_sequence[t][_TEXT]) | |
565 #print 'p(s_%d = %s) =' % (t, state), p | |
566 | |
567 return entropy | |
568 | |
569 def point_entropy(self, unlabeled_sequence): | |
570 """ | |
571 Returns the pointwise entropy over the possible states at each | |
572 position in the chain, given the observation sequence. | |
573 """ | |
574 unlabeled_sequence = self._transform(unlabeled_sequence) | |
575 | |
576 T = len(unlabeled_sequence) | |
577 N = len(self._states) | |
578 | |
579 alpha = self._forward_probability(unlabeled_sequence) | |
580 beta = self._backward_probability(unlabeled_sequence) | |
581 normalisation = logsumexp2(alpha[T-1]) | |
582 | |
583 entropies = np.zeros(T, np.float64) | |
584 probs = np.zeros(N, np.float64) | |
585 for t in range(T): | |
586 for s in range(N): | |
587 probs[s] = alpha[t, s] + beta[t, s] - normalisation | |
588 | |
589 for s in range(N): | |
590 entropies[t] -= 2**(probs[s]) * probs[s] | |
591 | |
592 return entropies | |
593 | |
594 def _exhaustive_entropy(self, unlabeled_sequence): | |
595 unlabeled_sequence = self._transform(unlabeled_sequence) | |
596 | |
597 T = len(unlabeled_sequence) | |
598 N = len(self._states) | |
599 | |
600 labellings = [[state] for state in self._states] | |
601 for t in range(T - 1): | |
602 current = labellings | |
603 labellings = [] | |
604 for labelling in current: | |
605 for state in self._states: | |
606 labellings.append(labelling + [state]) | |
607 | |
608 log_probs = [] | |
609 for labelling in labellings: | |
610 labeled_sequence = unlabeled_sequence[:] | |
611 for t, label in enumerate(labelling): | |
612 labeled_sequence[t] = (labeled_sequence[t][_TEXT], label) | |
613 lp = self.log_probability(labeled_sequence) | |
614 log_probs.append(lp) | |
615 normalisation = _log_add(*log_probs) | |
616 | |
617 #ps = zeros((T, N), float64) | |
618 #for labelling, lp in zip(labellings, log_probs): | |
619 #for t in range(T): | |
620 #ps[t, self._states.index(labelling[t])] += \ | |
621 # 2**(lp - normalisation) | |
622 | |
623 #for t in range(T): | |
624 #print 'prob[%d] =' % t, ps[t] | |
625 | |
626 entropy = 0 | |
627 for lp in log_probs: | |
628 lp -= normalisation | |
629 entropy -= 2**(lp) * lp | |
630 | |
631 return entropy | |
632 | |
633 def _exhaustive_point_entropy(self, unlabeled_sequence): | |
634 unlabeled_sequence = self._transform(unlabeled_sequence) | |
635 | |
636 T = len(unlabeled_sequence) | |
637 N = len(self._states) | |
638 | |
639 labellings = [[state] for state in self._states] | |
640 for t in range(T - 1): | |
641 current = labellings | |
642 labellings = [] | |
643 for labelling in current: | |
644 for state in self._states: | |
645 labellings.append(labelling + [state]) | |
646 | |
647 log_probs = [] | |
648 for labelling in labellings: | |
649 labelled_sequence = unlabeled_sequence[:] | |
650 for t, label in enumerate(labelling): | |
651 labelled_sequence[t] = (labelled_sequence[t][_TEXT], label) | |
652 lp = self.log_probability(labelled_sequence) | |
653 log_probs.append(lp) | |
654 | |
655 normalisation = _log_add(*log_probs) | |
656 | |
657 probabilities = _ninf_array((T,N)) | |
658 | |
659 for labelling, lp in zip(labellings, log_probs): | |
660 lp -= normalisation | |
661 for t, label in enumerate(labelling): | |
662 index = self._states.index(label) | |
663 probabilities[t, index] = _log_add(probabilities[t, index], lp) | |
664 | |
665 entropies = np.zeros(T, np.float64) | |
666 for t in range(T): | |
667 for s in range(N): | |
668 entropies[t] -= 2**(probabilities[t, s]) * probabilities[t, s] | |
669 | |
670 return entropies | |
671 | |
672 def _transitions_matrix(self): | |
673 """ Return a matrix of transition log probabilities. """ | |
674 trans_iter = (self._transitions[sj].logprob(si) | |
675 for sj in self._states | |
676 for si in self._states) | |
677 | |
678 transitions_logprob = np.fromiter(trans_iter, dtype=np.float64) | |
679 N = len(self._states) | |
680 return transitions_logprob.reshape((N, N)).T | |
681 | |
682 def _outputs_vector(self, symbol): | |
683 """ | |
684 Return a vector with log probabilities of emitting a symbol | |
685 when entering states. | |
686 """ | |
687 out_iter = (self._output_logprob(sj, symbol) for sj in self._states) | |
688 return np.fromiter(out_iter, dtype=np.float64) | |
689 | |
690 def _forward_probability(self, unlabeled_sequence): | |
691 """ | |
692 Return the forward probability matrix, a T by N array of | |
693 log-probabilities, where T is the length of the sequence and N is the | |
694 number of states. Each entry (t, s) gives the probability of being in | |
695 state s at time t after observing the partial symbol sequence up to | |
696 and including t. | |
697 | |
698 :param unlabeled_sequence: the sequence of unlabeled symbols | |
699 :type unlabeled_sequence: list | |
700 :return: the forward log probability matrix | |
701 :rtype: array | |
702 """ | |
703 T = len(unlabeled_sequence) | |
704 N = len(self._states) | |
705 alpha = _ninf_array((T, N)) | |
706 | |
707 transitions_logprob = self._transitions_matrix() | |
708 | |
709 # Initialization | |
710 symbol = unlabeled_sequence[0][_TEXT] | |
711 for i, state in enumerate(self._states): | |
712 alpha[0, i] = self._priors.logprob(state) + \ | |
713 self._output_logprob(state, symbol) | |
714 | |
715 # Induction | |
716 for t in range(1, T): | |
717 symbol = unlabeled_sequence[t][_TEXT] | |
718 output_logprob = self._outputs_vector(symbol) | |
719 | |
720 for i in range(N): | |
721 summand = alpha[t-1] + transitions_logprob[i] | |
722 alpha[t, i] = logsumexp2(summand) + output_logprob[i] | |
723 | |
724 return alpha | |
725 | |
726 def _backward_probability(self, unlabeled_sequence): | |
727 """ | |
728 Return the backward probability matrix, a T by N array of | |
729 log-probabilities, where T is the length of the sequence and N is the | |
730 number of states. Each entry (t, s) gives the probability of being in | |
731 state s at time t after observing the partial symbol sequence from t | |
732 .. T. | |
733 | |
734 :return: the backward log probability matrix | |
735 :rtype: array | |
736 :param unlabeled_sequence: the sequence of unlabeled symbols | |
737 :type unlabeled_sequence: list | |
738 """ | |
739 T = len(unlabeled_sequence) | |
740 N = len(self._states) | |
741 beta = _ninf_array((T, N)) | |
742 | |
743 transitions_logprob = self._transitions_matrix().T | |
744 | |
745 # initialise the backward values; | |
746 # "1" is an arbitrarily chosen value from Rabiner tutorial | |
747 beta[T-1, :] = np.log2(1) | |
748 | |
749 # inductively calculate remaining backward values | |
750 for t in range(T-2, -1, -1): | |
751 symbol = unlabeled_sequence[t+1][_TEXT] | |
752 outputs = self._outputs_vector(symbol) | |
753 | |
754 for i in range(N): | |
755 summand = transitions_logprob[i] + beta[t+1] + outputs | |
756 beta[t, i] = logsumexp2(summand) | |
757 | |
758 return beta | |
759 | |
760 def test(self, test_sequence, verbose=False, **kwargs): | |
761 """ | |
762 Tests the HiddenMarkovModelTagger instance. | |
763 | |
764 :param test_sequence: a sequence of labeled test instances | |
765 :type test_sequence: list(list) | |
766 :param verbose: boolean flag indicating whether training should be | |
767 verbose or include printed output | |
768 :type verbose: bool | |
769 """ | |
770 | |
771 def words(sent): | |
772 return [word for (word, tag) in sent] | |
773 | |
774 def tags(sent): | |
775 return [tag for (word, tag) in sent] | |
776 | |
777 def flatten(seq): | |
778 return list(itertools.chain(*seq)) | |
779 | |
780 test_sequence = self._transform(test_sequence) | |
781 predicted_sequence = list(imap(self._tag, imap(words, test_sequence))) | |
782 | |
783 if verbose: | |
784 for test_sent, predicted_sent in izip(test_sequence, predicted_sequence): | |
785 print('Test:', | |
786 ' '.join('%s/%s' % (token, tag) | |
787 for (token, tag) in test_sent)) | |
788 print() | |
789 print('Untagged:', | |
790 ' '.join("%s" % token for (token, tag) in test_sent)) | |
791 print() | |
792 print('HMM-tagged:', | |
793 ' '.join('%s/%s' % (token, tag) | |
794 for (token, tag) in predicted_sent)) | |
795 print() | |
796 print('Entropy:', | |
797 self.entropy([(token, None) for | |
798 (token, tag) in predicted_sent])) | |
799 print() | |
800 print('-' * 60) | |
801 | |
802 test_tags = flatten(imap(tags, test_sequence)) | |
803 predicted_tags = flatten(imap(tags, predicted_sequence)) | |
804 | |
805 acc = accuracy(test_tags, predicted_tags) | |
806 count = sum(len(sent) for sent in test_sequence) | |
807 print('accuracy over %d tokens: %.2f' % (count, acc * 100)) | |
808 | |
809 def __repr__(self): | |
810 return ('<HiddenMarkovModelTagger %d states and %d output symbols>' | |
811 % (len(self._states), len(self._symbols))) | |
812 | |
813 | |
814 class HiddenMarkovModelTrainer(object): | |
815 """ | |
816 Algorithms for learning HMM parameters from training data. These include | |
817 both supervised learning (MLE) and unsupervised learning (Baum-Welch). | |
818 | |
819 Creates an HMM trainer to induce an HMM with the given states and | |
820 output symbol alphabet. A supervised and unsupervised training | |
821 method may be used. If either of the states or symbols are not given, | |
822 these may be derived from supervised training. | |
823 | |
824 :param states: the set of state labels | |
825 :type states: sequence of any | |
826 :param symbols: the set of observation symbols | |
827 :type symbols: sequence of any | |
828 """ | |
829 def __init__(self, states=None, symbols=None): | |
830 self._states = (states if states else []) | |
831 self._symbols = (symbols if symbols else []) | |
832 | |
833 def train(self, labeled_sequences=None, unlabeled_sequences=None, | |
834 **kwargs): | |
835 """ | |
836 Trains the HMM using both (or either of) supervised and unsupervised | |
837 techniques. | |
838 | |
839 :return: the trained model | |
840 :rtype: HiddenMarkovModelTagger | |
841 :param labelled_sequences: the supervised training data, a set of | |
842 labelled sequences of observations | |
843 :type labelled_sequences: list | |
844 :param unlabeled_sequences: the unsupervised training data, a set of | |
845 sequences of observations | |
846 :type unlabeled_sequences: list | |
847 :param kwargs: additional arguments to pass to the training methods | |
848 """ | |
849 assert labeled_sequences or unlabeled_sequences | |
850 model = None | |
851 if labeled_sequences: | |
852 model = self.train_supervised(labeled_sequences, **kwargs) | |
853 if unlabeled_sequences: | |
854 if model: kwargs['model'] = model | |
855 model = self.train_unsupervised(unlabeled_sequences, **kwargs) | |
856 return model | |
857 | |
858 | |
859 def _baum_welch_step(self, sequence, model, symbol_to_number): | |
860 N = len(model._states) | |
861 M = len(model._symbols) | |
862 T = len(sequence) | |
863 | |
864 # compute forward and backward probabilities | |
865 alpha = model._forward_probability(sequence) | |
866 beta = model._backward_probability(sequence) | |
867 | |
868 # find the log probability of the sequence | |
869 lpk = logsumexp2(alpha[T-1]) | |
870 | |
871 A_numer = _ninf_array((N, N)) | |
872 B_numer = _ninf_array((N, M)) | |
873 A_denom = _ninf_array(N) | |
874 B_denom = _ninf_array(N) | |
875 | |
876 transitions_logprob = model._transitions_matrix().T | |
877 | |
878 for t in range(T): | |
879 symbol = sequence[t][_TEXT] # not found? FIXME | |
880 next_symbol = None | |
881 if t < T - 1: | |
882 next_symbol = sequence[t+1][_TEXT] # not found? FIXME | |
883 xi = symbol_to_number[symbol] | |
884 | |
885 next_outputs_logprob = model._outputs_vector(next_symbol) | |
886 alpha_plus_beta = alpha[t] + beta[t] | |
887 | |
888 if t < T - 1: | |
889 numer_add = transitions_logprob + next_outputs_logprob + \ | |
890 beta[t+1] + alpha[t].reshape(N, 1) | |
891 A_numer = np.logaddexp2(A_numer, numer_add) | |
892 A_denom = np.logaddexp2(A_denom, alpha_plus_beta) | |
893 else: | |
894 B_denom = np.logaddexp2(A_denom, alpha_plus_beta) | |
895 | |
896 B_numer[:,xi] = np.logaddexp2(B_numer[:,xi], alpha_plus_beta) | |
897 | |
898 return lpk, A_numer, A_denom, B_numer, B_denom | |
899 | |
900 def train_unsupervised(self, unlabeled_sequences, update_outputs=True, | |
901 **kwargs): | |
902 """ | |
903 Trains the HMM using the Baum-Welch algorithm to maximise the | |
904 probability of the data sequence. This is a variant of the EM | |
905 algorithm, and is unsupervised in that it doesn't need the state | |
906 sequences for the symbols. The code is based on 'A Tutorial on Hidden | |
907 Markov Models and Selected Applications in Speech Recognition', | |
908 Lawrence Rabiner, IEEE, 1989. | |
909 | |
910 :return: the trained model | |
911 :rtype: HiddenMarkovModelTagger | |
912 :param unlabeled_sequences: the training data, a set of | |
913 sequences of observations | |
914 :type unlabeled_sequences: list | |
915 | |
916 kwargs may include following parameters: | |
917 | |
918 :param model: a HiddenMarkovModelTagger instance used to begin | |
919 the Baum-Welch algorithm | |
920 :param max_iterations: the maximum number of EM iterations | |
921 :param convergence_logprob: the maximum change in log probability to | |
922 allow convergence | |
923 """ | |
924 | |
925 # create a uniform HMM, which will be iteratively refined, unless | |
926 # given an existing model | |
927 model = kwargs.get('model') | |
928 if not model: | |
929 priors = RandomProbDist(self._states) | |
930 transitions = DictionaryConditionalProbDist( | |
931 dict((state, RandomProbDist(self._states)) | |
932 for state in self._states)) | |
933 outputs = DictionaryConditionalProbDist( | |
934 dict((state, RandomProbDist(self._symbols)) | |
935 for state in self._states)) | |
936 model = HiddenMarkovModelTagger(self._symbols, self._states, | |
937 transitions, outputs, priors) | |
938 | |
939 self._states = model._states | |
940 self._symbols = model._symbols | |
941 | |
942 N = len(self._states) | |
943 M = len(self._symbols) | |
944 symbol_numbers = dict((sym, i) for i, sym in enumerate(self._symbols)) | |
945 | |
946 # update model prob dists so that they can be modified | |
947 # model._priors = MutableProbDist(model._priors, self._states) | |
948 | |
949 model._transitions = DictionaryConditionalProbDist( | |
950 dict((s, MutableProbDist(model._transitions[s], self._states)) | |
951 for s in self._states)) | |
952 | |
953 if update_outputs: | |
954 model._outputs = DictionaryConditionalProbDist( | |
955 dict((s, MutableProbDist(model._outputs[s], self._symbols)) | |
956 for s in self._states)) | |
957 | |
958 model.reset_cache() | |
959 | |
960 # iterate until convergence | |
961 converged = False | |
962 last_logprob = None | |
963 iteration = 0 | |
964 max_iterations = kwargs.get('max_iterations', 1000) | |
965 epsilon = kwargs.get('convergence_logprob', 1e-6) | |
966 | |
967 while not converged and iteration < max_iterations: | |
968 A_numer = _ninf_array((N, N)) | |
969 B_numer = _ninf_array((N, M)) | |
970 A_denom = _ninf_array(N) | |
971 B_denom = _ninf_array(N) | |
972 | |
973 logprob = 0 | |
974 for sequence in unlabeled_sequences: | |
975 sequence = list(sequence) | |
976 if not sequence: | |
977 continue | |
978 | |
979 (lpk, seq_A_numer, seq_A_denom, | |
980 seq_B_numer, seq_B_denom) = self._baum_welch_step(sequence, model, symbol_numbers) | |
981 | |
982 # add these sums to the global A and B values | |
983 for i in range(N): | |
984 A_numer[i] = np.logaddexp2(A_numer[i], seq_A_numer[i]-lpk) | |
985 B_numer[i] = np.logaddexp2(B_numer[i], seq_B_numer[i]-lpk) | |
986 | |
987 A_denom = np.logaddexp2(A_denom, seq_A_denom-lpk) | |
988 B_denom = np.logaddexp2(B_denom, seq_B_denom-lpk) | |
989 | |
990 logprob += lpk | |
991 | |
992 # use the calculated values to update the transition and output | |
993 # probability values | |
994 for i in range(N): | |
995 logprob_Ai = A_numer[i] - A_denom[i] | |
996 logprob_Bi = B_numer[i] - B_denom[i] | |
997 | |
998 # We should normalize all probabilities (see p.391 Huang et al) | |
999 # Let sum(P) be K. | |
1000 # We can divide each Pi by K to make sum(P) == 1. | |
1001 # Pi' = Pi/K | |
1002 # log2(Pi') = log2(Pi) - log2(K) | |
1003 logprob_Ai -= logsumexp2(logprob_Ai) | |
1004 logprob_Bi -= logsumexp2(logprob_Bi) | |
1005 | |
1006 # update output and transition probabilities | |
1007 si = self._states[i] | |
1008 | |
1009 for j in range(N): | |
1010 sj = self._states[j] | |
1011 model._transitions[si].update(sj, logprob_Ai[j]) | |
1012 | |
1013 if update_outputs: | |
1014 for k in range(M): | |
1015 ok = self._symbols[k] | |
1016 model._outputs[si].update(ok, logprob_Bi[k]) | |
1017 | |
1018 # Rabiner says the priors don't need to be updated. I don't | |
1019 # believe him. FIXME | |
1020 | |
1021 # test for convergence | |
1022 if iteration > 0 and abs(logprob - last_logprob) < epsilon: | |
1023 converged = True | |
1024 | |
1025 print('iteration', iteration, 'logprob', logprob) | |
1026 iteration += 1 | |
1027 last_logprob = logprob | |
1028 | |
1029 return model | |
1030 | |
1031 def train_supervised(self, labelled_sequences, estimator=None): | |
1032 """ | |
1033 Supervised training maximising the joint probability of the symbol and | |
1034 state sequences. This is done via collecting frequencies of | |
1035 transitions between states, symbol observations while within each | |
1036 state and which states start a sentence. These frequency distributions | |
1037 are then normalised into probability estimates, which can be | |
1038 smoothed if desired. | |
1039 | |
1040 :return: the trained model | |
1041 :rtype: HiddenMarkovModelTagger | |
1042 :param labelled_sequences: the training data, a set of | |
1043 labelled sequences of observations | |
1044 :type labelled_sequences: list | |
1045 :param estimator: a function taking | |
1046 a FreqDist and a number of bins and returning a CProbDistI; | |
1047 otherwise a MLE estimate is used | |
1048 """ | |
1049 | |
1050 # default to the MLE estimate | |
1051 if estimator is None: | |
1052 estimator = lambda fdist, bins: MLEProbDist(fdist) | |
1053 | |
1054 # count occurrences of starting states, transitions out of each state | |
1055 # and output symbols observed in each state | |
1056 known_symbols = set(self._symbols) | |
1057 known_states = set(self._states) | |
1058 | |
1059 starting = FreqDist() | |
1060 transitions = ConditionalFreqDist() | |
1061 outputs = ConditionalFreqDist() | |
1062 for sequence in labelled_sequences: | |
1063 lasts = None | |
1064 for token in sequence: | |
1065 state = token[_TAG] | |
1066 symbol = token[_TEXT] | |
1067 if lasts is None: | |
1068 starting[state] += 1 | |
1069 else: | |
1070 transitions[lasts][state] += 1 | |
1071 outputs[state][symbol] += 1 | |
1072 lasts = state | |
1073 | |
1074 # update the state and symbol lists | |
1075 if state not in known_states: | |
1076 self._states.append(state) | |
1077 known_states.add(state) | |
1078 | |
1079 if symbol not in known_symbols: | |
1080 self._symbols.append(symbol) | |
1081 known_symbols.add(symbol) | |
1082 | |
1083 # create probability distributions (with smoothing) | |
1084 N = len(self._states) | |
1085 pi = estimator(starting, N) | |
1086 A = ConditionalProbDist(transitions, estimator, N) | |
1087 B = ConditionalProbDist(outputs, estimator, len(self._symbols)) | |
1088 | |
1089 return HiddenMarkovModelTagger(self._symbols, self._states, A, B, pi) | |
1090 | |
1091 | |
1092 def _ninf_array(shape): | |
1093 res = np.empty(shape, np.float64) | |
1094 res.fill(-np.inf) | |
1095 return res | |
1096 | |
1097 | |
1098 def logsumexp2(arr): | |
1099 max_ = arr.max() | |
1100 return np.log2(np.sum(2**(arr - max_))) + max_ | |
1101 | |
1102 | |
1103 def _log_add(*values): | |
1104 """ | |
1105 Adds the logged values, returning the logarithm of the addition. | |
1106 """ | |
1107 x = max(values) | |
1108 if x > -np.inf: | |
1109 sum_diffs = 0 | |
1110 for value in values: | |
1111 sum_diffs += 2**(value - x) | |
1112 return x + np.log2(sum_diffs) | |
1113 else: | |
1114 return x | |
1115 | |
1116 | |
1117 def _create_hmm_tagger(states, symbols, A, B, pi): | |
1118 def pd(values, samples): | |
1119 d = dict(zip(samples, values)) | |
1120 return DictionaryProbDist(d) | |
1121 | |
1122 def cpd(array, conditions, samples): | |
1123 d = {} | |
1124 for values, condition in zip(array, conditions): | |
1125 d[condition] = pd(values, samples) | |
1126 return DictionaryConditionalProbDist(d) | |
1127 | |
1128 A = cpd(A, states, states) | |
1129 B = cpd(B, states, symbols) | |
1130 pi = pd(pi, states) | |
1131 return HiddenMarkovModelTagger(symbols=symbols, states=states, | |
1132 transitions=A, outputs=B, priors=pi) | |
1133 | |
1134 | |
1135 def _market_hmm_example(): | |
1136 """ | |
1137 Return an example HMM (described at page 381, Huang et al) | |
1138 """ | |
1139 states = ['bull', 'bear', 'static'] | |
1140 symbols = ['up', 'down', 'unchanged'] | |
1141 A = np.array([[0.6, 0.2, 0.2], [0.5, 0.3, 0.2], [0.4, 0.1, 0.5]], np.float64) | |
1142 B = np.array([[0.7, 0.1, 0.2], [0.1, 0.6, 0.3], [0.3, 0.3, 0.4]], np.float64) | |
1143 pi = np.array([0.5, 0.2, 0.3], np.float64) | |
1144 | |
1145 model = _create_hmm_tagger(states, symbols, A, B, pi) | |
1146 return model, states, symbols | |
1147 | |
1148 | |
1149 def demo(): | |
1150 # demonstrates HMM probability calculation | |
1151 | |
1152 print() | |
1153 print("HMM probability calculation demo") | |
1154 print() | |
1155 | |
1156 model, states, symbols = _market_hmm_example() | |
1157 | |
1158 print('Testing', model) | |
1159 | |
1160 for test in [['up', 'up'], ['up', 'down', 'up'], | |
1161 ['down'] * 5, ['unchanged'] * 5 + ['up']]: | |
1162 | |
1163 sequence = [(t, None) for t in test] | |
1164 | |
1165 print('Testing with state sequence', test) | |
1166 print('probability =', model.probability(sequence)) | |
1167 print('tagging = ', model.tag([word for (word,tag) in sequence])) | |
1168 print('p(tagged) = ', model.probability(sequence)) | |
1169 print('H = ', model.entropy(sequence)) | |
1170 print('H_exh = ', model._exhaustive_entropy(sequence)) | |
1171 print('H(point) = ', model.point_entropy(sequence)) | |
1172 print('H_exh(point)=', model._exhaustive_point_entropy(sequence)) | |
1173 print() | |
1174 | |
1175 def load_pos(num_sents): | |
1176 from nltk.corpus import brown | |
1177 | |
1178 sentences = brown.tagged_sents(categories='news')[:num_sents] | |
1179 | |
1180 tag_re = re.compile(r'[*]|--|[^+*-]+') | |
1181 tag_set = set() | |
1182 symbols = set() | |
1183 | |
1184 cleaned_sentences = [] | |
1185 for sentence in sentences: | |
1186 for i in range(len(sentence)): | |
1187 word, tag = sentence[i] | |
1188 word = word.lower() # normalize | |
1189 symbols.add(word) # log this word | |
1190 # Clean up the tag. | |
1191 tag = tag_re.match(tag).group() | |
1192 tag_set.add(tag) | |
1193 sentence[i] = (word, tag) # store cleaned-up tagged token | |
1194 cleaned_sentences += [sentence] | |
1195 | |
1196 return cleaned_sentences, list(tag_set), list(symbols) | |
1197 | |
1198 def demo_pos(): | |
1199 # demonstrates POS tagging using supervised training | |
1200 | |
1201 print() | |
1202 print("HMM POS tagging demo") | |
1203 print() | |
1204 | |
1205 print('Training HMM...') | |
1206 labelled_sequences, tag_set, symbols = load_pos(20000) | |
1207 trainer = HiddenMarkovModelTrainer(tag_set, symbols) | |
1208 hmm = trainer.train_supervised(labelled_sequences[10:], | |
1209 estimator=lambda fd, bins: LidstoneProbDist(fd, 0.1, bins)) | |
1210 | |
1211 print('Testing...') | |
1212 hmm.test(labelled_sequences[:10], verbose=True) | |
1213 | |
1214 def _untag(sentences): | |
1215 unlabeled = [] | |
1216 for sentence in sentences: | |
1217 unlabeled.append([(token[_TEXT], None) for token in sentence]) | |
1218 return unlabeled | |
1219 | |
1220 def demo_pos_bw(test=10, supervised=20, unsupervised=10, verbose=True, | |
1221 max_iterations=5): | |
1222 # demonstrates the Baum-Welch algorithm in POS tagging | |
1223 | |
1224 print() | |
1225 print("Baum-Welch demo for POS tagging") | |
1226 print() | |
1227 | |
1228 print('Training HMM (supervised, %d sentences)...' % supervised) | |
1229 | |
1230 sentences, tag_set, symbols = load_pos(test + supervised + unsupervised) | |
1231 | |
1232 symbols = set() | |
1233 for sentence in sentences: | |
1234 for token in sentence: | |
1235 symbols.add(token[_TEXT]) | |
1236 | |
1237 trainer = HiddenMarkovModelTrainer(tag_set, list(symbols)) | |
1238 hmm = trainer.train_supervised(sentences[test:test+supervised], | |
1239 estimator=lambda fd, bins: LidstoneProbDist(fd, 0.1, bins)) | |
1240 | |
1241 hmm.test(sentences[:test], verbose=verbose) | |
1242 | |
1243 print('Training (unsupervised, %d sentences)...' % unsupervised) | |
1244 # it's rather slow - so only use 10 samples by default | |
1245 unlabeled = _untag(sentences[test+supervised:]) | |
1246 hmm = trainer.train_unsupervised(unlabeled, model=hmm, | |
1247 max_iterations=max_iterations) | |
1248 hmm.test(sentences[:test], verbose=verbose) | |
1249 | |
1250 def demo_bw(): | |
1251 # demo Baum Welch by generating some sequences and then performing | |
1252 # unsupervised training on them | |
1253 | |
1254 print() | |
1255 print("Baum-Welch demo for market example") | |
1256 print() | |
1257 | |
1258 model, states, symbols = _market_hmm_example() | |
1259 | |
1260 # generate some random sequences | |
1261 training = [] | |
1262 import random | |
1263 rng = random.Random() | |
1264 rng.seed(0) | |
1265 for i in range(10): | |
1266 item = model.random_sample(rng, 5) | |
1267 training.append([(i[0], None) for i in item]) | |
1268 | |
1269 # train on those examples, starting with the model that generated them | |
1270 trainer = HiddenMarkovModelTrainer(states, symbols) | |
1271 hmm = trainer.train_unsupervised(training, model=model, | |
1272 max_iterations=1000) | |
1273 | |
1274 | |
1275 if __name__ == "__main__": | |
1276 import doctest | |
1277 doctest.testmod(optionflags=doctest.NORMALIZE_WHITESPACE) | |
1278 | |
1279 |