comparison hmm/hmm.py @ 3:26d9c0308fcf

updated/added from ecclerig version
author Henry S. Thompson <ht@inf.ed.ac.uk>
date Mon, 09 Mar 2020 17:35:28 +0000
parents
children
comparison
equal deleted inserted replaced
2:e07789816ca5 3:26d9c0308fcf
1 # Natural Language Toolkit: Hidden Markov Model
2 #
3 # Copyright (C) 2001-2015 NLTK Project
4 # Author: Trevor Cohn <tacohn@csse.unimelb.edu.au>
5 # Philip Blunsom <pcbl@csse.unimelb.edu.au>
6 # Tiago Tresoldi <tiago@tresoldi.pro.br> (fixes)
7 # Steven Bird <stevenbird1@gmail.com> (fixes)
8 # Joseph Frazee <jfrazee@mail.utexas.edu> (fixes)
9 # Steven Xu <xxu@student.unimelb.edu.au> (fixes)
10 # URL: <http://nltk.org/>
11 # For license information, see LICENSE.TXT
12
13 """
14 Hidden Markov Models (HMMs) largely used to assign the correct label sequence
15 to sequential data or assess the probability of a given label and data
16 sequence. These models are finite state machines characterised by a number of
17 states, transitions between these states, and output symbols emitted while in
18 each state. The HMM is an extension to the Markov chain, where each state
19 corresponds deterministically to a given event. In the HMM the observation is
20 a probabilistic function of the state. HMMs share the Markov chain's
21 assumption, being that the probability of transition from one state to another
22 only depends on the current state - i.e. the series of states that led to the
23 current state are not used. They are also time invariant.
24
25 The HMM is a directed graph, with probability weighted edges (representing the
26 probability of a transition between the source and sink states) where each
27 vertex emits an output symbol when entered. The symbol (or observation) is
28 non-deterministically generated. For this reason, knowing that a sequence of
29 output observations was generated by a given HMM does not mean that the
30 corresponding sequence of states (and what the current state is) is known.
31 This is the 'hidden' in the hidden markov model.
32
33 Formally, a HMM can be characterised by:
34
35 - the output observation alphabet. This is the set of symbols which may be
36 observed as output of the system.
37 - the set of states.
38 - the transition probabilities *a_{ij} = P(s_t = j | s_{t-1} = i)*. These
39 represent the probability of transition to each state from a given state.
40 - the output probability matrix *b_i(k) = P(X_t = o_k | s_t = i)*. These
41 represent the probability of observing each symbol in a given state.
42 - the initial state distribution. This gives the probability of starting
43 in each state.
44
45 To ground this discussion, take a common NLP application, part-of-speech (POS)
46 tagging. An HMM is desirable for this task as the highest probability tag
47 sequence can be calculated for a given sequence of word forms. This differs
48 from other tagging techniques which often tag each word individually, seeking
49 to optimise each individual tagging greedily without regard to the optimal
50 combination of tags for a larger unit, such as a sentence. The HMM does this
51 with the Viterbi algorithm, which efficiently computes the optimal path
52 through the graph given the sequence of words forms.
53
54 In POS tagging the states usually have a 1:1 correspondence with the tag
55 alphabet - i.e. each state represents a single tag. The output observation
56 alphabet is the set of word forms (the lexicon), and the remaining three
57 parameters are derived by a training regime. With this information the
58 probability of a given sentence can be easily derived, by simply summing the
59 probability of each distinct path through the model. Similarly, the highest
60 probability tagging sequence can be derived with the Viterbi algorithm,
61 yielding a state sequence which can be mapped into a tag sequence.
62
63 This discussion assumes that the HMM has been trained. This is probably the
64 most difficult task with the model, and requires either MLE estimates of the
65 parameters or unsupervised learning using the Baum-Welch algorithm, a variant
66 of EM.
67
68 For more information, please consult the source code for this module,
69 which includes extensive demonstration code.
70 """
71 from __future__ import print_function, unicode_literals, division
72
73 import re
74 import itertools
75
76 try:
77 import numpy as np
78 except ImportError:
79 pass
80
81 from nltk.probability import (FreqDist, ConditionalFreqDist,
82 ConditionalProbDist, DictionaryProbDist,
83 DictionaryConditionalProbDist,
84 LidstoneProbDist, MutableProbDist,
85 MLEProbDist, RandomProbDist)
86 from nltk.metrics import accuracy
87 from nltk.util import LazyMap, unique_list
88 from nltk.compat import python_2_unicode_compatible, izip, imap
89 from nltk.tag.api import TaggerI
90
91
92 _TEXT = 0 # index of text in a tuple
93 _TAG = 1 # index of tag in a tuple
94
95 def _identity(labeled_symbols):
96 return labeled_symbols
97
98 @python_2_unicode_compatible
99 class HiddenMarkovModelTagger(TaggerI):
100 """
101 Hidden Markov model class, a generative model for labelling sequence data.
102 These models define the joint probability of a sequence of symbols and
103 their labels (state transitions) as the product of the starting state
104 probability, the probability of each state transition, and the probability
105 of each observation being generated from each state. This is described in
106 more detail in the module documentation.
107
108 This implementation is based on the HMM description in Chapter 8, Huang,
109 Acero and Hon, Spoken Language Processing and includes an extension for
110 training shallow HMM parsers or specialized HMMs as in Molina et.
111 al, 2002. A specialized HMM modifies training data by applying a
112 specialization function to create a new training set that is more
113 appropriate for sequential tagging with an HMM. A typical use case is
114 chunking.
115
116 :param symbols: the set of output symbols (alphabet)
117 :type symbols: seq of any
118 :param states: a set of states representing state space
119 :type states: seq of any
120 :param transitions: transition probabilities; Pr(s_i | s_j) is the
121 probability of transition from state i given the model is in
122 state_j
123 :type transitions: ConditionalProbDistI
124 :param outputs: output probabilities; Pr(o_k | s_i) is the probability
125 of emitting symbol k when entering state i
126 :type outputs: ConditionalProbDistI
127 :param priors: initial state distribution; Pr(s_i) is the probability
128 of starting in state i
129 :type priors: ProbDistI
130 :param transform: an optional function for transforming training
131 instances, defaults to the identity function.
132 :type transform: callable
133 """
134 def __init__(self, symbols, states, transitions, outputs, priors,
135 transform=_identity):
136 self._symbols = unique_list(symbols)
137 self._states = unique_list(states)
138 self._transitions = transitions
139 self._outputs = outputs
140 self._priors = priors
141 self._cache = None
142 self._transform = transform
143
144 @classmethod
145 def _train(cls, labeled_sequence, test_sequence=None,
146 unlabeled_sequence=None, transform=_identity,
147 estimator=None, **kwargs):
148
149 if estimator is None:
150 def estimator(fd, bins):
151 return LidstoneProbDist(fd, 0.1, bins)
152
153 labeled_sequence = LazyMap(transform, labeled_sequence)
154 symbols = unique_list(word for sent in labeled_sequence
155 for word, tag in sent)
156 tag_set = unique_list(tag for sent in labeled_sequence
157 for word, tag in sent)
158
159 trainer = HiddenMarkovModelTrainer(tag_set, symbols)
160 hmm = trainer.train_supervised(labeled_sequence, estimator=estimator)
161 hmm = cls(hmm._symbols, hmm._states, hmm._transitions, hmm._outputs,
162 hmm._priors, transform=transform)
163
164 if test_sequence:
165 hmm.test(test_sequence, verbose=kwargs.get('verbose', False))
166
167 if unlabeled_sequence:
168 max_iterations = kwargs.get('max_iterations', 5)
169 hmm = trainer.train_unsupervised(unlabeled_sequence, model=hmm,
170 max_iterations=max_iterations)
171 if test_sequence:
172 hmm.test(test_sequence, verbose=kwargs.get('verbose', False))
173
174 return hmm
175
176 @classmethod
177 def train(cls, labeled_sequence, test_sequence=None,
178 unlabeled_sequence=None, **kwargs):
179 """
180 Train a new HiddenMarkovModelTagger using the given labeled and
181 unlabeled training instances. Testing will be performed if test
182 instances are provided.
183
184 :return: a hidden markov model tagger
185 :rtype: HiddenMarkovModelTagger
186 :param labeled_sequence: a sequence of labeled training instances,
187 i.e. a list of sentences represented as tuples
188 :type labeled_sequence: list(list)
189 :param test_sequence: a sequence of labeled test instances
190 :type test_sequence: list(list)
191 :param unlabeled_sequence: a sequence of unlabeled training instances,
192 i.e. a list of sentences represented as words
193 :type unlabeled_sequence: list(list)
194 :param transform: an optional function for transforming training
195 instances, defaults to the identity function, see ``transform()``
196 :type transform: function
197 :param estimator: an optional function or class that maps a
198 condition's frequency distribution to its probability
199 distribution, defaults to a Lidstone distribution with gamma = 0.1
200 :type estimator: class or function
201 :param verbose: boolean flag indicating whether training should be
202 verbose or include printed output
203 :type verbose: bool
204 :param max_iterations: number of Baum-Welch interations to perform
205 :type max_iterations: int
206 """
207 return cls._train(labeled_sequence, test_sequence,
208 unlabeled_sequence, **kwargs)
209
210 def probability(self, sequence):
211 """
212 Returns the probability of the given symbol sequence. If the sequence
213 is labelled, then returns the joint probability of the symbol, state
214 sequence. Otherwise, uses the forward algorithm to find the
215 probability over all label sequences.
216
217 :return: the probability of the sequence
218 :rtype: float
219 :param sequence: the sequence of symbols which must contain the TEXT
220 property, and optionally the TAG property
221 :type sequence: Token
222 """
223 return 2**(self.log_probability(self._transform(sequence)))
224
225 def log_probability(self, sequence):
226 """
227 Returns the log-probability of the given symbol sequence. If the
228 sequence is labelled, then returns the joint log-probability of the
229 symbol, state sequence. Otherwise, uses the forward algorithm to find
230 the log-probability over all label sequences.
231
232 :return: the log-probability of the sequence
233 :rtype: float
234 :param sequence: the sequence of symbols which must contain the TEXT
235 property, and optionally the TAG property
236 :type sequence: Token
237 """
238 sequence = self._transform(sequence)
239
240 T = len(sequence)
241
242 if T > 0 and sequence[0][_TAG]:
243 last_state = sequence[0][_TAG]
244 p = self._priors.logprob(last_state) + \
245 self._output_logprob(last_state, sequence[0][_TEXT])
246 for t in range(1, T):
247 state = sequence[t][_TAG]
248 p += self._transitions[last_state].logprob(state) + \
249 self._output_logprob(state, sequence[t][_TEXT])
250 last_state = state
251 return p
252 else:
253 alpha = self._forward_probability(sequence)
254 p = logsumexp2(alpha[T-1])
255 return p
256
257 def tag(self, unlabeled_sequence):
258 """
259 Tags the sequence with the highest probability state sequence. This
260 uses the best_path method to find the Viterbi path.
261
262 :return: a labelled sequence of symbols
263 :rtype: list
264 :param unlabeled_sequence: the sequence of unlabeled symbols
265 :type unlabeled_sequence: list
266 """
267 unlabeled_sequence = self._transform(unlabeled_sequence)
268 return self._tag(unlabeled_sequence)
269
270 def _tag(self, unlabeled_sequence):
271 path = self._best_path(unlabeled_sequence)
272 return list(izip(unlabeled_sequence, path))
273
274 def _output_logprob(self, state, symbol):
275 """
276 :return: the log probability of the symbol being observed in the given
277 state
278 :rtype: float
279 """
280 return self._outputs[state].logprob(symbol)
281
282 def _create_cache(self):
283 """
284 The cache is a tuple (P, O, X, S) where:
285
286 - S maps symbols to integers. I.e., it is the inverse
287 mapping from self._symbols; for each symbol s in
288 self._symbols, the following is true::
289
290 self._symbols[S[s]] == s
291
292 - O is the log output probabilities::
293
294 O[i,k] = log( P(token[t]=sym[k]|tag[t]=state[i]) )
295
296 - X is the log transition probabilities::
297
298 X[i,j] = log( P(tag[t]=state[j]|tag[t-1]=state[i]) )
299
300 - P is the log prior probabilities::
301
302 P[i] = log( P(tag[0]=state[i]) )
303 """
304 if not self._cache:
305 N = len(self._states)
306 M = len(self._symbols)
307 P = np.zeros(N, np.float32)
308 X = np.zeros((N, N), np.float32)
309 O = np.zeros((N, M), np.float32)
310 for i in range(N):
311 si = self._states[i]
312 P[i] = self._priors.logprob(si)
313 for j in range(N):
314 X[i, j] = self._transitions[si].logprob(self._states[j])
315 for k in range(M):
316 O[i, k] = self._output_logprob(si, self._symbols[k])
317 S = {}
318 for k in range(M):
319 S[self._symbols[k]] = k
320 self._cache = (P, O, X, S)
321
322 def _update_cache(self, symbols):
323 # add new symbols to the symbol table and repopulate the output
324 # probabilities and symbol table mapping
325 if symbols:
326 self._create_cache()
327 P, O, X, S = self._cache
328 for symbol in symbols:
329 if symbol not in self._symbols:
330 self._cache = None
331 self._symbols.append(symbol)
332 # don't bother with the work if there aren't any new symbols
333 if not self._cache:
334 N = len(self._states)
335 M = len(self._symbols)
336 Q = O.shape[1]
337 # add new columns to the output probability table without
338 # destroying the old probabilities
339 O = np.hstack([O, np.zeros((N, M - Q), np.float32)])
340 for i in range(N):
341 si = self._states[i]
342 # only calculate probabilities for new symbols
343 for k in range(Q, M):
344 O[i, k] = self._output_logprob(si, self._symbols[k])
345 # only create symbol mappings for new symbols
346 for k in range(Q, M):
347 S[self._symbols[k]] = k
348 self._cache = (P, O, X, S)
349
350 def reset_cache(self):
351 self._cache = None
352
353 def best_path(self, unlabeled_sequence):
354 """
355 Returns the state sequence of the optimal (most probable) path through
356 the HMM. Uses the Viterbi algorithm to calculate this part by dynamic
357 programming.
358
359 :return: the state sequence
360 :rtype: sequence of any
361 :param unlabeled_sequence: the sequence of unlabeled symbols
362 :type unlabeled_sequence: list
363 """
364 unlabeled_sequence = self._transform(unlabeled_sequence)
365 return self._best_path(unlabeled_sequence)
366
367 def _best_path(self, unlabeled_sequence):
368 T = len(unlabeled_sequence)
369 N = len(self._states)
370 self._create_cache()
371 self._update_cache(unlabeled_sequence)
372 P, O, X, S = self._cache
373
374 V = np.zeros((T, N), np.float32)
375 B = -np.ones((T, N), np.int)
376
377 V[0] = P + O[:, S[unlabeled_sequence[0]]]
378 for t in range(1, T):
379 for j in range(N):
380 vs = V[t-1, :] + X[:, j]
381 best = np.argmax(vs)
382 V[t, j] = vs[best] + O[j, S[unlabeled_sequence[t]]]
383 B[t, j] = best
384
385 current = np.argmax(V[T-1,:])
386 sequence = [current]
387 for t in range(T-1, 0, -1):
388 last = B[t, current]
389 sequence.append(last)
390 current = last
391
392 sequence.reverse()
393 return list(map(self._states.__getitem__, sequence))
394
395 def best_path_simple(self, unlabeled_sequence):
396 """
397 Returns the state sequence of the optimal (most probable) path through
398 the HMM. Uses the Viterbi algorithm to calculate this part by dynamic
399 programming. This uses a simple, direct method, and is included for
400 teaching purposes.
401
402 :return: the state sequence
403 :rtype: sequence of any
404 :param unlabeled_sequence: the sequence of unlabeled symbols
405 :type unlabeled_sequence: list
406 """
407 unlabeled_sequence = self._transform(unlabeled_sequence)
408 return self._best_path_simple(unlabeled_sequence)
409
410 def _best_path_simple(self, unlabeled_sequence):
411 T = len(unlabeled_sequence)
412 N = len(self._states)
413 V = np.zeros((T, N), np.float64)
414 B = {}
415
416 # find the starting log probabilities for each state
417 symbol = unlabeled_sequence[0]
418 for i, state in enumerate(self._states):
419 V[0, i] = self._priors.logprob(state) + \
420 self._output_logprob(state, symbol)
421 B[0, state] = None
422
423 # find the maximum log probabilities for reaching each state at time t
424 for t in range(1, T):
425 symbol = unlabeled_sequence[t]
426 for j in range(N):
427 sj = self._states[j]
428 best = None
429 for i in range(N):
430 si = self._states[i]
431 va = V[t-1, i] + self._transitions[si].logprob(sj)
432 if not best or va > best[0]:
433 best = (va, si)
434 V[t, j] = best[0] + self._output_logprob(sj, symbol)
435 B[t, sj] = best[1]
436
437 # find the highest probability final state
438 best = None
439 for i in range(N):
440 val = V[T-1, i]
441 if not best or val > best[0]:
442 best = (val, self._states[i])
443
444 # traverse the back-pointers B to find the state sequence
445 current = best[1]
446 sequence = [current]
447 for t in range(T-1, 0, -1):
448 last = B[t, current]
449 sequence.append(last)
450 current = last
451
452 sequence.reverse()
453 return sequence
454
455 def random_sample(self, rng, length):
456 """
457 Randomly sample the HMM to generate a sentence of a given length. This
458 samples the prior distribution then the observation distribution and
459 transition distribution for each subsequent observation and state.
460 This will mostly generate unintelligible garbage, but can provide some
461 amusement.
462
463 :return: the randomly created state/observation sequence,
464 generated according to the HMM's probability
465 distributions. The SUBTOKENS have TEXT and TAG
466 properties containing the observation and state
467 respectively.
468 :rtype: list
469 :param rng: random number generator
470 :type rng: Random (or any object with a random() method)
471 :param length: desired output length
472 :type length: int
473 """
474
475 # sample the starting state and symbol prob dists
476 tokens = []
477 state = self._sample_probdist(self._priors, rng.random(), self._states)
478 symbol = self._sample_probdist(self._outputs[state],
479 rng.random(), self._symbols)
480 tokens.append((symbol, state))
481
482 for i in range(1, length):
483 # sample the state transition and symbol prob dists
484 state = self._sample_probdist(self._transitions[state],
485 rng.random(), self._states)
486 symbol = self._sample_probdist(self._outputs[state],
487 rng.random(), self._symbols)
488 tokens.append((symbol, state))
489
490 return tokens
491
492 def _sample_probdist(self, probdist, p, samples):
493 cum_p = 0
494 for sample in samples:
495 add_p = probdist.prob(sample)
496 if cum_p <= p <= cum_p + add_p:
497 return sample
498 cum_p += add_p
499 raise Exception('Invalid probability distribution - '
500 'does not sum to one')
501
502 def entropy(self, unlabeled_sequence):
503 """
504 Returns the entropy over labellings of the given sequence. This is
505 given by::
506
507 H(O) = - sum_S Pr(S | O) log Pr(S | O)
508
509 where the summation ranges over all state sequences, S. Let
510 *Z = Pr(O) = sum_S Pr(S, O)}* where the summation ranges over all state
511 sequences and O is the observation sequence. As such the entropy can
512 be re-expressed as::
513
514 H = - sum_S Pr(S | O) log [ Pr(S, O) / Z ]
515 = log Z - sum_S Pr(S | O) log Pr(S, 0)
516 = log Z - sum_S Pr(S | O) [ log Pr(S_0) + sum_t Pr(S_t | S_{t-1}) + sum_t Pr(O_t | S_t) ]
517
518 The order of summation for the log terms can be flipped, allowing
519 dynamic programming to be used to calculate the entropy. Specifically,
520 we use the forward and backward probabilities (alpha, beta) giving::
521
522 H = log Z - sum_s0 alpha_0(s0) beta_0(s0) / Z * log Pr(s0)
523 + sum_t,si,sj alpha_t(si) Pr(sj | si) Pr(O_t+1 | sj) beta_t(sj) / Z * log Pr(sj | si)
524 + sum_t,st alpha_t(st) beta_t(st) / Z * log Pr(O_t | st)
525
526 This simply uses alpha and beta to find the probabilities of partial
527 sequences, constrained to include the given state(s) at some point in
528 time.
529 """
530 unlabeled_sequence = self._transform(unlabeled_sequence)
531
532 T = len(unlabeled_sequence)
533 N = len(self._states)
534
535 alpha = self._forward_probability(unlabeled_sequence)
536 beta = self._backward_probability(unlabeled_sequence)
537 normalisation = logsumexp2(alpha[T-1])
538
539 entropy = normalisation
540
541 # starting state, t = 0
542 for i, state in enumerate(self._states):
543 p = 2**(alpha[0, i] + beta[0, i] - normalisation)
544 entropy -= p * self._priors.logprob(state)
545 #print 'p(s_0 = %s) =' % state, p
546
547 # state transitions
548 for t0 in range(T - 1):
549 t1 = t0 + 1
550 for i0, s0 in enumerate(self._states):
551 for i1, s1 in enumerate(self._states):
552 p = 2**(alpha[t0, i0] + self._transitions[s0].logprob(s1) +
553 self._outputs[s1].logprob(
554 unlabeled_sequence[t1][_TEXT]) +
555 beta[t1, i1] - normalisation)
556 entropy -= p * self._transitions[s0].logprob(s1)
557 #print 'p(s_%d = %s, s_%d = %s) =' % (t0, s0, t1, s1), p
558
559 # symbol emissions
560 for t in range(T):
561 for i, state in enumerate(self._states):
562 p = 2**(alpha[t, i] + beta[t, i] - normalisation)
563 entropy -= p * self._outputs[state].logprob(
564 unlabeled_sequence[t][_TEXT])
565 #print 'p(s_%d = %s) =' % (t, state), p
566
567 return entropy
568
569 def point_entropy(self, unlabeled_sequence):
570 """
571 Returns the pointwise entropy over the possible states at each
572 position in the chain, given the observation sequence.
573 """
574 unlabeled_sequence = self._transform(unlabeled_sequence)
575
576 T = len(unlabeled_sequence)
577 N = len(self._states)
578
579 alpha = self._forward_probability(unlabeled_sequence)
580 beta = self._backward_probability(unlabeled_sequence)
581 normalisation = logsumexp2(alpha[T-1])
582
583 entropies = np.zeros(T, np.float64)
584 probs = np.zeros(N, np.float64)
585 for t in range(T):
586 for s in range(N):
587 probs[s] = alpha[t, s] + beta[t, s] - normalisation
588
589 for s in range(N):
590 entropies[t] -= 2**(probs[s]) * probs[s]
591
592 return entropies
593
594 def _exhaustive_entropy(self, unlabeled_sequence):
595 unlabeled_sequence = self._transform(unlabeled_sequence)
596
597 T = len(unlabeled_sequence)
598 N = len(self._states)
599
600 labellings = [[state] for state in self._states]
601 for t in range(T - 1):
602 current = labellings
603 labellings = []
604 for labelling in current:
605 for state in self._states:
606 labellings.append(labelling + [state])
607
608 log_probs = []
609 for labelling in labellings:
610 labeled_sequence = unlabeled_sequence[:]
611 for t, label in enumerate(labelling):
612 labeled_sequence[t] = (labeled_sequence[t][_TEXT], label)
613 lp = self.log_probability(labeled_sequence)
614 log_probs.append(lp)
615 normalisation = _log_add(*log_probs)
616
617 #ps = zeros((T, N), float64)
618 #for labelling, lp in zip(labellings, log_probs):
619 #for t in range(T):
620 #ps[t, self._states.index(labelling[t])] += \
621 # 2**(lp - normalisation)
622
623 #for t in range(T):
624 #print 'prob[%d] =' % t, ps[t]
625
626 entropy = 0
627 for lp in log_probs:
628 lp -= normalisation
629 entropy -= 2**(lp) * lp
630
631 return entropy
632
633 def _exhaustive_point_entropy(self, unlabeled_sequence):
634 unlabeled_sequence = self._transform(unlabeled_sequence)
635
636 T = len(unlabeled_sequence)
637 N = len(self._states)
638
639 labellings = [[state] for state in self._states]
640 for t in range(T - 1):
641 current = labellings
642 labellings = []
643 for labelling in current:
644 for state in self._states:
645 labellings.append(labelling + [state])
646
647 log_probs = []
648 for labelling in labellings:
649 labelled_sequence = unlabeled_sequence[:]
650 for t, label in enumerate(labelling):
651 labelled_sequence[t] = (labelled_sequence[t][_TEXT], label)
652 lp = self.log_probability(labelled_sequence)
653 log_probs.append(lp)
654
655 normalisation = _log_add(*log_probs)
656
657 probabilities = _ninf_array((T,N))
658
659 for labelling, lp in zip(labellings, log_probs):
660 lp -= normalisation
661 for t, label in enumerate(labelling):
662 index = self._states.index(label)
663 probabilities[t, index] = _log_add(probabilities[t, index], lp)
664
665 entropies = np.zeros(T, np.float64)
666 for t in range(T):
667 for s in range(N):
668 entropies[t] -= 2**(probabilities[t, s]) * probabilities[t, s]
669
670 return entropies
671
672 def _transitions_matrix(self):
673 """ Return a matrix of transition log probabilities. """
674 trans_iter = (self._transitions[sj].logprob(si)
675 for sj in self._states
676 for si in self._states)
677
678 transitions_logprob = np.fromiter(trans_iter, dtype=np.float64)
679 N = len(self._states)
680 return transitions_logprob.reshape((N, N)).T
681
682 def _outputs_vector(self, symbol):
683 """
684 Return a vector with log probabilities of emitting a symbol
685 when entering states.
686 """
687 out_iter = (self._output_logprob(sj, symbol) for sj in self._states)
688 return np.fromiter(out_iter, dtype=np.float64)
689
690 def _forward_probability(self, unlabeled_sequence):
691 """
692 Return the forward probability matrix, a T by N array of
693 log-probabilities, where T is the length of the sequence and N is the
694 number of states. Each entry (t, s) gives the probability of being in
695 state s at time t after observing the partial symbol sequence up to
696 and including t.
697
698 :param unlabeled_sequence: the sequence of unlabeled symbols
699 :type unlabeled_sequence: list
700 :return: the forward log probability matrix
701 :rtype: array
702 """
703 T = len(unlabeled_sequence)
704 N = len(self._states)
705 alpha = _ninf_array((T, N))
706
707 transitions_logprob = self._transitions_matrix()
708
709 # Initialization
710 symbol = unlabeled_sequence[0][_TEXT]
711 for i, state in enumerate(self._states):
712 alpha[0, i] = self._priors.logprob(state) + \
713 self._output_logprob(state, symbol)
714
715 # Induction
716 for t in range(1, T):
717 symbol = unlabeled_sequence[t][_TEXT]
718 output_logprob = self._outputs_vector(symbol)
719
720 for i in range(N):
721 summand = alpha[t-1] + transitions_logprob[i]
722 alpha[t, i] = logsumexp2(summand) + output_logprob[i]
723
724 return alpha
725
726 def _backward_probability(self, unlabeled_sequence):
727 """
728 Return the backward probability matrix, a T by N array of
729 log-probabilities, where T is the length of the sequence and N is the
730 number of states. Each entry (t, s) gives the probability of being in
731 state s at time t after observing the partial symbol sequence from t
732 .. T.
733
734 :return: the backward log probability matrix
735 :rtype: array
736 :param unlabeled_sequence: the sequence of unlabeled symbols
737 :type unlabeled_sequence: list
738 """
739 T = len(unlabeled_sequence)
740 N = len(self._states)
741 beta = _ninf_array((T, N))
742
743 transitions_logprob = self._transitions_matrix().T
744
745 # initialise the backward values;
746 # "1" is an arbitrarily chosen value from Rabiner tutorial
747 beta[T-1, :] = np.log2(1)
748
749 # inductively calculate remaining backward values
750 for t in range(T-2, -1, -1):
751 symbol = unlabeled_sequence[t+1][_TEXT]
752 outputs = self._outputs_vector(symbol)
753
754 for i in range(N):
755 summand = transitions_logprob[i] + beta[t+1] + outputs
756 beta[t, i] = logsumexp2(summand)
757
758 return beta
759
760 def test(self, test_sequence, verbose=False, **kwargs):
761 """
762 Tests the HiddenMarkovModelTagger instance.
763
764 :param test_sequence: a sequence of labeled test instances
765 :type test_sequence: list(list)
766 :param verbose: boolean flag indicating whether training should be
767 verbose or include printed output
768 :type verbose: bool
769 """
770
771 def words(sent):
772 return [word for (word, tag) in sent]
773
774 def tags(sent):
775 return [tag for (word, tag) in sent]
776
777 def flatten(seq):
778 return list(itertools.chain(*seq))
779
780 test_sequence = self._transform(test_sequence)
781 predicted_sequence = list(imap(self._tag, imap(words, test_sequence)))
782
783 if verbose:
784 for test_sent, predicted_sent in izip(test_sequence, predicted_sequence):
785 print('Test:',
786 ' '.join('%s/%s' % (token, tag)
787 for (token, tag) in test_sent))
788 print()
789 print('Untagged:',
790 ' '.join("%s" % token for (token, tag) in test_sent))
791 print()
792 print('HMM-tagged:',
793 ' '.join('%s/%s' % (token, tag)
794 for (token, tag) in predicted_sent))
795 print()
796 print('Entropy:',
797 self.entropy([(token, None) for
798 (token, tag) in predicted_sent]))
799 print()
800 print('-' * 60)
801
802 test_tags = flatten(imap(tags, test_sequence))
803 predicted_tags = flatten(imap(tags, predicted_sequence))
804
805 acc = accuracy(test_tags, predicted_tags)
806 count = sum(len(sent) for sent in test_sequence)
807 print('accuracy over %d tokens: %.2f' % (count, acc * 100))
808
809 def __repr__(self):
810 return ('<HiddenMarkovModelTagger %d states and %d output symbols>'
811 % (len(self._states), len(self._symbols)))
812
813
814 class HiddenMarkovModelTrainer(object):
815 """
816 Algorithms for learning HMM parameters from training data. These include
817 both supervised learning (MLE) and unsupervised learning (Baum-Welch).
818
819 Creates an HMM trainer to induce an HMM with the given states and
820 output symbol alphabet. A supervised and unsupervised training
821 method may be used. If either of the states or symbols are not given,
822 these may be derived from supervised training.
823
824 :param states: the set of state labels
825 :type states: sequence of any
826 :param symbols: the set of observation symbols
827 :type symbols: sequence of any
828 """
829 def __init__(self, states=None, symbols=None):
830 self._states = (states if states else [])
831 self._symbols = (symbols if symbols else [])
832
833 def train(self, labeled_sequences=None, unlabeled_sequences=None,
834 **kwargs):
835 """
836 Trains the HMM using both (or either of) supervised and unsupervised
837 techniques.
838
839 :return: the trained model
840 :rtype: HiddenMarkovModelTagger
841 :param labelled_sequences: the supervised training data, a set of
842 labelled sequences of observations
843 :type labelled_sequences: list
844 :param unlabeled_sequences: the unsupervised training data, a set of
845 sequences of observations
846 :type unlabeled_sequences: list
847 :param kwargs: additional arguments to pass to the training methods
848 """
849 assert labeled_sequences or unlabeled_sequences
850 model = None
851 if labeled_sequences:
852 model = self.train_supervised(labeled_sequences, **kwargs)
853 if unlabeled_sequences:
854 if model: kwargs['model'] = model
855 model = self.train_unsupervised(unlabeled_sequences, **kwargs)
856 return model
857
858
859 def _baum_welch_step(self, sequence, model, symbol_to_number):
860 N = len(model._states)
861 M = len(model._symbols)
862 T = len(sequence)
863
864 # compute forward and backward probabilities
865 alpha = model._forward_probability(sequence)
866 beta = model._backward_probability(sequence)
867
868 # find the log probability of the sequence
869 lpk = logsumexp2(alpha[T-1])
870
871 A_numer = _ninf_array((N, N))
872 B_numer = _ninf_array((N, M))
873 A_denom = _ninf_array(N)
874 B_denom = _ninf_array(N)
875
876 transitions_logprob = model._transitions_matrix().T
877
878 for t in range(T):
879 symbol = sequence[t][_TEXT] # not found? FIXME
880 next_symbol = None
881 if t < T - 1:
882 next_symbol = sequence[t+1][_TEXT] # not found? FIXME
883 xi = symbol_to_number[symbol]
884
885 next_outputs_logprob = model._outputs_vector(next_symbol)
886 alpha_plus_beta = alpha[t] + beta[t]
887
888 if t < T - 1:
889 numer_add = transitions_logprob + next_outputs_logprob + \
890 beta[t+1] + alpha[t].reshape(N, 1)
891 A_numer = np.logaddexp2(A_numer, numer_add)
892 A_denom = np.logaddexp2(A_denom, alpha_plus_beta)
893 else:
894 B_denom = np.logaddexp2(A_denom, alpha_plus_beta)
895
896 B_numer[:,xi] = np.logaddexp2(B_numer[:,xi], alpha_plus_beta)
897
898 return lpk, A_numer, A_denom, B_numer, B_denom
899
900 def train_unsupervised(self, unlabeled_sequences, update_outputs=True,
901 **kwargs):
902 """
903 Trains the HMM using the Baum-Welch algorithm to maximise the
904 probability of the data sequence. This is a variant of the EM
905 algorithm, and is unsupervised in that it doesn't need the state
906 sequences for the symbols. The code is based on 'A Tutorial on Hidden
907 Markov Models and Selected Applications in Speech Recognition',
908 Lawrence Rabiner, IEEE, 1989.
909
910 :return: the trained model
911 :rtype: HiddenMarkovModelTagger
912 :param unlabeled_sequences: the training data, a set of
913 sequences of observations
914 :type unlabeled_sequences: list
915
916 kwargs may include following parameters:
917
918 :param model: a HiddenMarkovModelTagger instance used to begin
919 the Baum-Welch algorithm
920 :param max_iterations: the maximum number of EM iterations
921 :param convergence_logprob: the maximum change in log probability to
922 allow convergence
923 """
924
925 # create a uniform HMM, which will be iteratively refined, unless
926 # given an existing model
927 model = kwargs.get('model')
928 if not model:
929 priors = RandomProbDist(self._states)
930 transitions = DictionaryConditionalProbDist(
931 dict((state, RandomProbDist(self._states))
932 for state in self._states))
933 outputs = DictionaryConditionalProbDist(
934 dict((state, RandomProbDist(self._symbols))
935 for state in self._states))
936 model = HiddenMarkovModelTagger(self._symbols, self._states,
937 transitions, outputs, priors)
938
939 self._states = model._states
940 self._symbols = model._symbols
941
942 N = len(self._states)
943 M = len(self._symbols)
944 symbol_numbers = dict((sym, i) for i, sym in enumerate(self._symbols))
945
946 # update model prob dists so that they can be modified
947 # model._priors = MutableProbDist(model._priors, self._states)
948
949 model._transitions = DictionaryConditionalProbDist(
950 dict((s, MutableProbDist(model._transitions[s], self._states))
951 for s in self._states))
952
953 if update_outputs:
954 model._outputs = DictionaryConditionalProbDist(
955 dict((s, MutableProbDist(model._outputs[s], self._symbols))
956 for s in self._states))
957
958 model.reset_cache()
959
960 # iterate until convergence
961 converged = False
962 last_logprob = None
963 iteration = 0
964 max_iterations = kwargs.get('max_iterations', 1000)
965 epsilon = kwargs.get('convergence_logprob', 1e-6)
966
967 while not converged and iteration < max_iterations:
968 A_numer = _ninf_array((N, N))
969 B_numer = _ninf_array((N, M))
970 A_denom = _ninf_array(N)
971 B_denom = _ninf_array(N)
972
973 logprob = 0
974 for sequence in unlabeled_sequences:
975 sequence = list(sequence)
976 if not sequence:
977 continue
978
979 (lpk, seq_A_numer, seq_A_denom,
980 seq_B_numer, seq_B_denom) = self._baum_welch_step(sequence, model, symbol_numbers)
981
982 # add these sums to the global A and B values
983 for i in range(N):
984 A_numer[i] = np.logaddexp2(A_numer[i], seq_A_numer[i]-lpk)
985 B_numer[i] = np.logaddexp2(B_numer[i], seq_B_numer[i]-lpk)
986
987 A_denom = np.logaddexp2(A_denom, seq_A_denom-lpk)
988 B_denom = np.logaddexp2(B_denom, seq_B_denom-lpk)
989
990 logprob += lpk
991
992 # use the calculated values to update the transition and output
993 # probability values
994 for i in range(N):
995 logprob_Ai = A_numer[i] - A_denom[i]
996 logprob_Bi = B_numer[i] - B_denom[i]
997
998 # We should normalize all probabilities (see p.391 Huang et al)
999 # Let sum(P) be K.
1000 # We can divide each Pi by K to make sum(P) == 1.
1001 # Pi' = Pi/K
1002 # log2(Pi') = log2(Pi) - log2(K)
1003 logprob_Ai -= logsumexp2(logprob_Ai)
1004 logprob_Bi -= logsumexp2(logprob_Bi)
1005
1006 # update output and transition probabilities
1007 si = self._states[i]
1008
1009 for j in range(N):
1010 sj = self._states[j]
1011 model._transitions[si].update(sj, logprob_Ai[j])
1012
1013 if update_outputs:
1014 for k in range(M):
1015 ok = self._symbols[k]
1016 model._outputs[si].update(ok, logprob_Bi[k])
1017
1018 # Rabiner says the priors don't need to be updated. I don't
1019 # believe him. FIXME
1020
1021 # test for convergence
1022 if iteration > 0 and abs(logprob - last_logprob) < epsilon:
1023 converged = True
1024
1025 print('iteration', iteration, 'logprob', logprob)
1026 iteration += 1
1027 last_logprob = logprob
1028
1029 return model
1030
1031 def train_supervised(self, labelled_sequences, estimator=None):
1032 """
1033 Supervised training maximising the joint probability of the symbol and
1034 state sequences. This is done via collecting frequencies of
1035 transitions between states, symbol observations while within each
1036 state and which states start a sentence. These frequency distributions
1037 are then normalised into probability estimates, which can be
1038 smoothed if desired.
1039
1040 :return: the trained model
1041 :rtype: HiddenMarkovModelTagger
1042 :param labelled_sequences: the training data, a set of
1043 labelled sequences of observations
1044 :type labelled_sequences: list
1045 :param estimator: a function taking
1046 a FreqDist and a number of bins and returning a CProbDistI;
1047 otherwise a MLE estimate is used
1048 """
1049
1050 # default to the MLE estimate
1051 if estimator is None:
1052 estimator = lambda fdist, bins: MLEProbDist(fdist)
1053
1054 # count occurrences of starting states, transitions out of each state
1055 # and output symbols observed in each state
1056 known_symbols = set(self._symbols)
1057 known_states = set(self._states)
1058
1059 starting = FreqDist()
1060 transitions = ConditionalFreqDist()
1061 outputs = ConditionalFreqDist()
1062 for sequence in labelled_sequences:
1063 lasts = None
1064 for token in sequence:
1065 state = token[_TAG]
1066 symbol = token[_TEXT]
1067 if lasts is None:
1068 starting[state] += 1
1069 else:
1070 transitions[lasts][state] += 1
1071 outputs[state][symbol] += 1
1072 lasts = state
1073
1074 # update the state and symbol lists
1075 if state not in known_states:
1076 self._states.append(state)
1077 known_states.add(state)
1078
1079 if symbol not in known_symbols:
1080 self._symbols.append(symbol)
1081 known_symbols.add(symbol)
1082
1083 # create probability distributions (with smoothing)
1084 N = len(self._states)
1085 pi = estimator(starting, N)
1086 A = ConditionalProbDist(transitions, estimator, N)
1087 B = ConditionalProbDist(outputs, estimator, len(self._symbols))
1088
1089 return HiddenMarkovModelTagger(self._symbols, self._states, A, B, pi)
1090
1091
1092 def _ninf_array(shape):
1093 res = np.empty(shape, np.float64)
1094 res.fill(-np.inf)
1095 return res
1096
1097
1098 def logsumexp2(arr):
1099 max_ = arr.max()
1100 return np.log2(np.sum(2**(arr - max_))) + max_
1101
1102
1103 def _log_add(*values):
1104 """
1105 Adds the logged values, returning the logarithm of the addition.
1106 """
1107 x = max(values)
1108 if x > -np.inf:
1109 sum_diffs = 0
1110 for value in values:
1111 sum_diffs += 2**(value - x)
1112 return x + np.log2(sum_diffs)
1113 else:
1114 return x
1115
1116
1117 def _create_hmm_tagger(states, symbols, A, B, pi):
1118 def pd(values, samples):
1119 d = dict(zip(samples, values))
1120 return DictionaryProbDist(d)
1121
1122 def cpd(array, conditions, samples):
1123 d = {}
1124 for values, condition in zip(array, conditions):
1125 d[condition] = pd(values, samples)
1126 return DictionaryConditionalProbDist(d)
1127
1128 A = cpd(A, states, states)
1129 B = cpd(B, states, symbols)
1130 pi = pd(pi, states)
1131 return HiddenMarkovModelTagger(symbols=symbols, states=states,
1132 transitions=A, outputs=B, priors=pi)
1133
1134
1135 def _market_hmm_example():
1136 """
1137 Return an example HMM (described at page 381, Huang et al)
1138 """
1139 states = ['bull', 'bear', 'static']
1140 symbols = ['up', 'down', 'unchanged']
1141 A = np.array([[0.6, 0.2, 0.2], [0.5, 0.3, 0.2], [0.4, 0.1, 0.5]], np.float64)
1142 B = np.array([[0.7, 0.1, 0.2], [0.1, 0.6, 0.3], [0.3, 0.3, 0.4]], np.float64)
1143 pi = np.array([0.5, 0.2, 0.3], np.float64)
1144
1145 model = _create_hmm_tagger(states, symbols, A, B, pi)
1146 return model, states, symbols
1147
1148
1149 def demo():
1150 # demonstrates HMM probability calculation
1151
1152 print()
1153 print("HMM probability calculation demo")
1154 print()
1155
1156 model, states, symbols = _market_hmm_example()
1157
1158 print('Testing', model)
1159
1160 for test in [['up', 'up'], ['up', 'down', 'up'],
1161 ['down'] * 5, ['unchanged'] * 5 + ['up']]:
1162
1163 sequence = [(t, None) for t in test]
1164
1165 print('Testing with state sequence', test)
1166 print('probability =', model.probability(sequence))
1167 print('tagging = ', model.tag([word for (word,tag) in sequence]))
1168 print('p(tagged) = ', model.probability(sequence))
1169 print('H = ', model.entropy(sequence))
1170 print('H_exh = ', model._exhaustive_entropy(sequence))
1171 print('H(point) = ', model.point_entropy(sequence))
1172 print('H_exh(point)=', model._exhaustive_point_entropy(sequence))
1173 print()
1174
1175 def load_pos(num_sents):
1176 from nltk.corpus import brown
1177
1178 sentences = brown.tagged_sents(categories='news')[:num_sents]
1179
1180 tag_re = re.compile(r'[*]|--|[^+*-]+')
1181 tag_set = set()
1182 symbols = set()
1183
1184 cleaned_sentences = []
1185 for sentence in sentences:
1186 for i in range(len(sentence)):
1187 word, tag = sentence[i]
1188 word = word.lower() # normalize
1189 symbols.add(word) # log this word
1190 # Clean up the tag.
1191 tag = tag_re.match(tag).group()
1192 tag_set.add(tag)
1193 sentence[i] = (word, tag) # store cleaned-up tagged token
1194 cleaned_sentences += [sentence]
1195
1196 return cleaned_sentences, list(tag_set), list(symbols)
1197
1198 def demo_pos():
1199 # demonstrates POS tagging using supervised training
1200
1201 print()
1202 print("HMM POS tagging demo")
1203 print()
1204
1205 print('Training HMM...')
1206 labelled_sequences, tag_set, symbols = load_pos(20000)
1207 trainer = HiddenMarkovModelTrainer(tag_set, symbols)
1208 hmm = trainer.train_supervised(labelled_sequences[10:],
1209 estimator=lambda fd, bins: LidstoneProbDist(fd, 0.1, bins))
1210
1211 print('Testing...')
1212 hmm.test(labelled_sequences[:10], verbose=True)
1213
1214 def _untag(sentences):
1215 unlabeled = []
1216 for sentence in sentences:
1217 unlabeled.append([(token[_TEXT], None) for token in sentence])
1218 return unlabeled
1219
1220 def demo_pos_bw(test=10, supervised=20, unsupervised=10, verbose=True,
1221 max_iterations=5):
1222 # demonstrates the Baum-Welch algorithm in POS tagging
1223
1224 print()
1225 print("Baum-Welch demo for POS tagging")
1226 print()
1227
1228 print('Training HMM (supervised, %d sentences)...' % supervised)
1229
1230 sentences, tag_set, symbols = load_pos(test + supervised + unsupervised)
1231
1232 symbols = set()
1233 for sentence in sentences:
1234 for token in sentence:
1235 symbols.add(token[_TEXT])
1236
1237 trainer = HiddenMarkovModelTrainer(tag_set, list(symbols))
1238 hmm = trainer.train_supervised(sentences[test:test+supervised],
1239 estimator=lambda fd, bins: LidstoneProbDist(fd, 0.1, bins))
1240
1241 hmm.test(sentences[:test], verbose=verbose)
1242
1243 print('Training (unsupervised, %d sentences)...' % unsupervised)
1244 # it's rather slow - so only use 10 samples by default
1245 unlabeled = _untag(sentences[test+supervised:])
1246 hmm = trainer.train_unsupervised(unlabeled, model=hmm,
1247 max_iterations=max_iterations)
1248 hmm.test(sentences[:test], verbose=verbose)
1249
1250 def demo_bw():
1251 # demo Baum Welch by generating some sequences and then performing
1252 # unsupervised training on them
1253
1254 print()
1255 print("Baum-Welch demo for market example")
1256 print()
1257
1258 model, states, symbols = _market_hmm_example()
1259
1260 # generate some random sequences
1261 training = []
1262 import random
1263 rng = random.Random()
1264 rng.seed(0)
1265 for i in range(10):
1266 item = model.random_sample(rng, 5)
1267 training.append([(i[0], None) for i in item])
1268
1269 # train on those examples, starting with the model that generated them
1270 trainer = HiddenMarkovModelTrainer(states, symbols)
1271 hmm = trainer.train_unsupervised(training, model=model,
1272 max_iterations=1000)
1273
1274
1275 if __name__ == "__main__":
1276 import doctest
1277 doctest.testmod(optionflags=doctest.NORMALIZE_WHITESPACE)
1278
1279