Mercurial > hg > python
changeset 3:26d9c0308fcf
updated/added from ecclerig version
author | Henry S. Thompson <ht@inf.ed.ac.uk> |
---|---|
date | Mon, 09 Mar 2020 17:35:28 +0000 |
parents | e07789816ca5 |
children | 2d7c91f89f6b |
files | hmm/hmm.py hmm/semiSup.py hmm/tinySup.py |
diffstat | 3 files changed, 1305 insertions(+), 65 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/hmm/hmm.py Mon Mar 09 17:35:28 2020 +0000 @@ -0,0 +1,1279 @@ +# Natural Language Toolkit: Hidden Markov Model +# +# Copyright (C) 2001-2015 NLTK Project +# Author: Trevor Cohn <tacohn@csse.unimelb.edu.au> +# Philip Blunsom <pcbl@csse.unimelb.edu.au> +# Tiago Tresoldi <tiago@tresoldi.pro.br> (fixes) +# Steven Bird <stevenbird1@gmail.com> (fixes) +# Joseph Frazee <jfrazee@mail.utexas.edu> (fixes) +# Steven Xu <xxu@student.unimelb.edu.au> (fixes) +# URL: <http://nltk.org/> +# For license information, see LICENSE.TXT + +""" +Hidden Markov Models (HMMs) largely used to assign the correct label sequence +to sequential data or assess the probability of a given label and data +sequence. These models are finite state machines characterised by a number of +states, transitions between these states, and output symbols emitted while in +each state. The HMM is an extension to the Markov chain, where each state +corresponds deterministically to a given event. In the HMM the observation is +a probabilistic function of the state. HMMs share the Markov chain's +assumption, being that the probability of transition from one state to another +only depends on the current state - i.e. the series of states that led to the +current state are not used. They are also time invariant. + +The HMM is a directed graph, with probability weighted edges (representing the +probability of a transition between the source and sink states) where each +vertex emits an output symbol when entered. The symbol (or observation) is +non-deterministically generated. For this reason, knowing that a sequence of +output observations was generated by a given HMM does not mean that the +corresponding sequence of states (and what the current state is) is known. +This is the 'hidden' in the hidden markov model. + +Formally, a HMM can be characterised by: + +- the output observation alphabet. This is the set of symbols which may be + observed as output of the system. +- the set of states. +- the transition probabilities *a_{ij} = P(s_t = j | s_{t-1} = i)*. These + represent the probability of transition to each state from a given state. +- the output probability matrix *b_i(k) = P(X_t = o_k | s_t = i)*. These + represent the probability of observing each symbol in a given state. +- the initial state distribution. This gives the probability of starting + in each state. + +To ground this discussion, take a common NLP application, part-of-speech (POS) +tagging. An HMM is desirable for this task as the highest probability tag +sequence can be calculated for a given sequence of word forms. This differs +from other tagging techniques which often tag each word individually, seeking +to optimise each individual tagging greedily without regard to the optimal +combination of tags for a larger unit, such as a sentence. The HMM does this +with the Viterbi algorithm, which efficiently computes the optimal path +through the graph given the sequence of words forms. + +In POS tagging the states usually have a 1:1 correspondence with the tag +alphabet - i.e. each state represents a single tag. The output observation +alphabet is the set of word forms (the lexicon), and the remaining three +parameters are derived by a training regime. With this information the +probability of a given sentence can be easily derived, by simply summing the +probability of each distinct path through the model. Similarly, the highest +probability tagging sequence can be derived with the Viterbi algorithm, +yielding a state sequence which can be mapped into a tag sequence. + +This discussion assumes that the HMM has been trained. This is probably the +most difficult task with the model, and requires either MLE estimates of the +parameters or unsupervised learning using the Baum-Welch algorithm, a variant +of EM. + +For more information, please consult the source code for this module, +which includes extensive demonstration code. +""" +from __future__ import print_function, unicode_literals, division + +import re +import itertools + +try: + import numpy as np +except ImportError: + pass + +from nltk.probability import (FreqDist, ConditionalFreqDist, + ConditionalProbDist, DictionaryProbDist, + DictionaryConditionalProbDist, + LidstoneProbDist, MutableProbDist, + MLEProbDist, RandomProbDist) +from nltk.metrics import accuracy +from nltk.util import LazyMap, unique_list +from nltk.compat import python_2_unicode_compatible, izip, imap +from nltk.tag.api import TaggerI + + +_TEXT = 0 # index of text in a tuple +_TAG = 1 # index of tag in a tuple + +def _identity(labeled_symbols): + return labeled_symbols + +@python_2_unicode_compatible +class HiddenMarkovModelTagger(TaggerI): + """ + Hidden Markov model class, a generative model for labelling sequence data. + These models define the joint probability of a sequence of symbols and + their labels (state transitions) as the product of the starting state + probability, the probability of each state transition, and the probability + of each observation being generated from each state. This is described in + more detail in the module documentation. + + This implementation is based on the HMM description in Chapter 8, Huang, + Acero and Hon, Spoken Language Processing and includes an extension for + training shallow HMM parsers or specialized HMMs as in Molina et. + al, 2002. A specialized HMM modifies training data by applying a + specialization function to create a new training set that is more + appropriate for sequential tagging with an HMM. A typical use case is + chunking. + + :param symbols: the set of output symbols (alphabet) + :type symbols: seq of any + :param states: a set of states representing state space + :type states: seq of any + :param transitions: transition probabilities; Pr(s_i | s_j) is the + probability of transition from state i given the model is in + state_j + :type transitions: ConditionalProbDistI + :param outputs: output probabilities; Pr(o_k | s_i) is the probability + of emitting symbol k when entering state i + :type outputs: ConditionalProbDistI + :param priors: initial state distribution; Pr(s_i) is the probability + of starting in state i + :type priors: ProbDistI + :param transform: an optional function for transforming training + instances, defaults to the identity function. + :type transform: callable + """ + def __init__(self, symbols, states, transitions, outputs, priors, + transform=_identity): + self._symbols = unique_list(symbols) + self._states = unique_list(states) + self._transitions = transitions + self._outputs = outputs + self._priors = priors + self._cache = None + self._transform = transform + + @classmethod + def _train(cls, labeled_sequence, test_sequence=None, + unlabeled_sequence=None, transform=_identity, + estimator=None, **kwargs): + + if estimator is None: + def estimator(fd, bins): + return LidstoneProbDist(fd, 0.1, bins) + + labeled_sequence = LazyMap(transform, labeled_sequence) + symbols = unique_list(word for sent in labeled_sequence + for word, tag in sent) + tag_set = unique_list(tag for sent in labeled_sequence + for word, tag in sent) + + trainer = HiddenMarkovModelTrainer(tag_set, symbols) + hmm = trainer.train_supervised(labeled_sequence, estimator=estimator) + hmm = cls(hmm._symbols, hmm._states, hmm._transitions, hmm._outputs, + hmm._priors, transform=transform) + + if test_sequence: + hmm.test(test_sequence, verbose=kwargs.get('verbose', False)) + + if unlabeled_sequence: + max_iterations = kwargs.get('max_iterations', 5) + hmm = trainer.train_unsupervised(unlabeled_sequence, model=hmm, + max_iterations=max_iterations) + if test_sequence: + hmm.test(test_sequence, verbose=kwargs.get('verbose', False)) + + return hmm + + @classmethod + def train(cls, labeled_sequence, test_sequence=None, + unlabeled_sequence=None, **kwargs): + """ + Train a new HiddenMarkovModelTagger using the given labeled and + unlabeled training instances. Testing will be performed if test + instances are provided. + + :return: a hidden markov model tagger + :rtype: HiddenMarkovModelTagger + :param labeled_sequence: a sequence of labeled training instances, + i.e. a list of sentences represented as tuples + :type labeled_sequence: list(list) + :param test_sequence: a sequence of labeled test instances + :type test_sequence: list(list) + :param unlabeled_sequence: a sequence of unlabeled training instances, + i.e. a list of sentences represented as words + :type unlabeled_sequence: list(list) + :param transform: an optional function for transforming training + instances, defaults to the identity function, see ``transform()`` + :type transform: function + :param estimator: an optional function or class that maps a + condition's frequency distribution to its probability + distribution, defaults to a Lidstone distribution with gamma = 0.1 + :type estimator: class or function + :param verbose: boolean flag indicating whether training should be + verbose or include printed output + :type verbose: bool + :param max_iterations: number of Baum-Welch interations to perform + :type max_iterations: int + """ + return cls._train(labeled_sequence, test_sequence, + unlabeled_sequence, **kwargs) + + def probability(self, sequence): + """ + Returns the probability of the given symbol sequence. If the sequence + is labelled, then returns the joint probability of the symbol, state + sequence. Otherwise, uses the forward algorithm to find the + probability over all label sequences. + + :return: the probability of the sequence + :rtype: float + :param sequence: the sequence of symbols which must contain the TEXT + property, and optionally the TAG property + :type sequence: Token + """ + return 2**(self.log_probability(self._transform(sequence))) + + def log_probability(self, sequence): + """ + Returns the log-probability of the given symbol sequence. If the + sequence is labelled, then returns the joint log-probability of the + symbol, state sequence. Otherwise, uses the forward algorithm to find + the log-probability over all label sequences. + + :return: the log-probability of the sequence + :rtype: float + :param sequence: the sequence of symbols which must contain the TEXT + property, and optionally the TAG property + :type sequence: Token + """ + sequence = self._transform(sequence) + + T = len(sequence) + + if T > 0 and sequence[0][_TAG]: + last_state = sequence[0][_TAG] + p = self._priors.logprob(last_state) + \ + self._output_logprob(last_state, sequence[0][_TEXT]) + for t in range(1, T): + state = sequence[t][_TAG] + p += self._transitions[last_state].logprob(state) + \ + self._output_logprob(state, sequence[t][_TEXT]) + last_state = state + return p + else: + alpha = self._forward_probability(sequence) + p = logsumexp2(alpha[T-1]) + return p + + def tag(self, unlabeled_sequence): + """ + Tags the sequence with the highest probability state sequence. This + uses the best_path method to find the Viterbi path. + + :return: a labelled sequence of symbols + :rtype: list + :param unlabeled_sequence: the sequence of unlabeled symbols + :type unlabeled_sequence: list + """ + unlabeled_sequence = self._transform(unlabeled_sequence) + return self._tag(unlabeled_sequence) + + def _tag(self, unlabeled_sequence): + path = self._best_path(unlabeled_sequence) + return list(izip(unlabeled_sequence, path)) + + def _output_logprob(self, state, symbol): + """ + :return: the log probability of the symbol being observed in the given + state + :rtype: float + """ + return self._outputs[state].logprob(symbol) + + def _create_cache(self): + """ + The cache is a tuple (P, O, X, S) where: + + - S maps symbols to integers. I.e., it is the inverse + mapping from self._symbols; for each symbol s in + self._symbols, the following is true:: + + self._symbols[S[s]] == s + + - O is the log output probabilities:: + + O[i,k] = log( P(token[t]=sym[k]|tag[t]=state[i]) ) + + - X is the log transition probabilities:: + + X[i,j] = log( P(tag[t]=state[j]|tag[t-1]=state[i]) ) + + - P is the log prior probabilities:: + + P[i] = log( P(tag[0]=state[i]) ) + """ + if not self._cache: + N = len(self._states) + M = len(self._symbols) + P = np.zeros(N, np.float32) + X = np.zeros((N, N), np.float32) + O = np.zeros((N, M), np.float32) + for i in range(N): + si = self._states[i] + P[i] = self._priors.logprob(si) + for j in range(N): + X[i, j] = self._transitions[si].logprob(self._states[j]) + for k in range(M): + O[i, k] = self._output_logprob(si, self._symbols[k]) + S = {} + for k in range(M): + S[self._symbols[k]] = k + self._cache = (P, O, X, S) + + def _update_cache(self, symbols): + # add new symbols to the symbol table and repopulate the output + # probabilities and symbol table mapping + if symbols: + self._create_cache() + P, O, X, S = self._cache + for symbol in symbols: + if symbol not in self._symbols: + self._cache = None + self._symbols.append(symbol) + # don't bother with the work if there aren't any new symbols + if not self._cache: + N = len(self._states) + M = len(self._symbols) + Q = O.shape[1] + # add new columns to the output probability table without + # destroying the old probabilities + O = np.hstack([O, np.zeros((N, M - Q), np.float32)]) + for i in range(N): + si = self._states[i] + # only calculate probabilities for new symbols + for k in range(Q, M): + O[i, k] = self._output_logprob(si, self._symbols[k]) + # only create symbol mappings for new symbols + for k in range(Q, M): + S[self._symbols[k]] = k + self._cache = (P, O, X, S) + + def reset_cache(self): + self._cache = None + + def best_path(self, unlabeled_sequence): + """ + Returns the state sequence of the optimal (most probable) path through + the HMM. Uses the Viterbi algorithm to calculate this part by dynamic + programming. + + :return: the state sequence + :rtype: sequence of any + :param unlabeled_sequence: the sequence of unlabeled symbols + :type unlabeled_sequence: list + """ + unlabeled_sequence = self._transform(unlabeled_sequence) + return self._best_path(unlabeled_sequence) + + def _best_path(self, unlabeled_sequence): + T = len(unlabeled_sequence) + N = len(self._states) + self._create_cache() + self._update_cache(unlabeled_sequence) + P, O, X, S = self._cache + + V = np.zeros((T, N), np.float32) + B = -np.ones((T, N), np.int) + + V[0] = P + O[:, S[unlabeled_sequence[0]]] + for t in range(1, T): + for j in range(N): + vs = V[t-1, :] + X[:, j] + best = np.argmax(vs) + V[t, j] = vs[best] + O[j, S[unlabeled_sequence[t]]] + B[t, j] = best + + current = np.argmax(V[T-1,:]) + sequence = [current] + for t in range(T-1, 0, -1): + last = B[t, current] + sequence.append(last) + current = last + + sequence.reverse() + return list(map(self._states.__getitem__, sequence)) + + def best_path_simple(self, unlabeled_sequence): + """ + Returns the state sequence of the optimal (most probable) path through + the HMM. Uses the Viterbi algorithm to calculate this part by dynamic + programming. This uses a simple, direct method, and is included for + teaching purposes. + + :return: the state sequence + :rtype: sequence of any + :param unlabeled_sequence: the sequence of unlabeled symbols + :type unlabeled_sequence: list + """ + unlabeled_sequence = self._transform(unlabeled_sequence) + return self._best_path_simple(unlabeled_sequence) + + def _best_path_simple(self, unlabeled_sequence): + T = len(unlabeled_sequence) + N = len(self._states) + V = np.zeros((T, N), np.float64) + B = {} + + # find the starting log probabilities for each state + symbol = unlabeled_sequence[0] + for i, state in enumerate(self._states): + V[0, i] = self._priors.logprob(state) + \ + self._output_logprob(state, symbol) + B[0, state] = None + + # find the maximum log probabilities for reaching each state at time t + for t in range(1, T): + symbol = unlabeled_sequence[t] + for j in range(N): + sj = self._states[j] + best = None + for i in range(N): + si = self._states[i] + va = V[t-1, i] + self._transitions[si].logprob(sj) + if not best or va > best[0]: + best = (va, si) + V[t, j] = best[0] + self._output_logprob(sj, symbol) + B[t, sj] = best[1] + + # find the highest probability final state + best = None + for i in range(N): + val = V[T-1, i] + if not best or val > best[0]: + best = (val, self._states[i]) + + # traverse the back-pointers B to find the state sequence + current = best[1] + sequence = [current] + for t in range(T-1, 0, -1): + last = B[t, current] + sequence.append(last) + current = last + + sequence.reverse() + return sequence + + def random_sample(self, rng, length): + """ + Randomly sample the HMM to generate a sentence of a given length. This + samples the prior distribution then the observation distribution and + transition distribution for each subsequent observation and state. + This will mostly generate unintelligible garbage, but can provide some + amusement. + + :return: the randomly created state/observation sequence, + generated according to the HMM's probability + distributions. The SUBTOKENS have TEXT and TAG + properties containing the observation and state + respectively. + :rtype: list + :param rng: random number generator + :type rng: Random (or any object with a random() method) + :param length: desired output length + :type length: int + """ + + # sample the starting state and symbol prob dists + tokens = [] + state = self._sample_probdist(self._priors, rng.random(), self._states) + symbol = self._sample_probdist(self._outputs[state], + rng.random(), self._symbols) + tokens.append((symbol, state)) + + for i in range(1, length): + # sample the state transition and symbol prob dists + state = self._sample_probdist(self._transitions[state], + rng.random(), self._states) + symbol = self._sample_probdist(self._outputs[state], + rng.random(), self._symbols) + tokens.append((symbol, state)) + + return tokens + + def _sample_probdist(self, probdist, p, samples): + cum_p = 0 + for sample in samples: + add_p = probdist.prob(sample) + if cum_p <= p <= cum_p + add_p: + return sample + cum_p += add_p + raise Exception('Invalid probability distribution - ' + 'does not sum to one') + + def entropy(self, unlabeled_sequence): + """ + Returns the entropy over labellings of the given sequence. This is + given by:: + + H(O) = - sum_S Pr(S | O) log Pr(S | O) + + where the summation ranges over all state sequences, S. Let + *Z = Pr(O) = sum_S Pr(S, O)}* where the summation ranges over all state + sequences and O is the observation sequence. As such the entropy can + be re-expressed as:: + + H = - sum_S Pr(S | O) log [ Pr(S, O) / Z ] + = log Z - sum_S Pr(S | O) log Pr(S, 0) + = log Z - sum_S Pr(S | O) [ log Pr(S_0) + sum_t Pr(S_t | S_{t-1}) + sum_t Pr(O_t | S_t) ] + + The order of summation for the log terms can be flipped, allowing + dynamic programming to be used to calculate the entropy. Specifically, + we use the forward and backward probabilities (alpha, beta) giving:: + + H = log Z - sum_s0 alpha_0(s0) beta_0(s0) / Z * log Pr(s0) + + sum_t,si,sj alpha_t(si) Pr(sj | si) Pr(O_t+1 | sj) beta_t(sj) / Z * log Pr(sj | si) + + sum_t,st alpha_t(st) beta_t(st) / Z * log Pr(O_t | st) + + This simply uses alpha and beta to find the probabilities of partial + sequences, constrained to include the given state(s) at some point in + time. + """ + unlabeled_sequence = self._transform(unlabeled_sequence) + + T = len(unlabeled_sequence) + N = len(self._states) + + alpha = self._forward_probability(unlabeled_sequence) + beta = self._backward_probability(unlabeled_sequence) + normalisation = logsumexp2(alpha[T-1]) + + entropy = normalisation + + # starting state, t = 0 + for i, state in enumerate(self._states): + p = 2**(alpha[0, i] + beta[0, i] - normalisation) + entropy -= p * self._priors.logprob(state) + #print 'p(s_0 = %s) =' % state, p + + # state transitions + for t0 in range(T - 1): + t1 = t0 + 1 + for i0, s0 in enumerate(self._states): + for i1, s1 in enumerate(self._states): + p = 2**(alpha[t0, i0] + self._transitions[s0].logprob(s1) + + self._outputs[s1].logprob( + unlabeled_sequence[t1][_TEXT]) + + beta[t1, i1] - normalisation) + entropy -= p * self._transitions[s0].logprob(s1) + #print 'p(s_%d = %s, s_%d = %s) =' % (t0, s0, t1, s1), p + + # symbol emissions + for t in range(T): + for i, state in enumerate(self._states): + p = 2**(alpha[t, i] + beta[t, i] - normalisation) + entropy -= p * self._outputs[state].logprob( + unlabeled_sequence[t][_TEXT]) + #print 'p(s_%d = %s) =' % (t, state), p + + return entropy + + def point_entropy(self, unlabeled_sequence): + """ + Returns the pointwise entropy over the possible states at each + position in the chain, given the observation sequence. + """ + unlabeled_sequence = self._transform(unlabeled_sequence) + + T = len(unlabeled_sequence) + N = len(self._states) + + alpha = self._forward_probability(unlabeled_sequence) + beta = self._backward_probability(unlabeled_sequence) + normalisation = logsumexp2(alpha[T-1]) + + entropies = np.zeros(T, np.float64) + probs = np.zeros(N, np.float64) + for t in range(T): + for s in range(N): + probs[s] = alpha[t, s] + beta[t, s] - normalisation + + for s in range(N): + entropies[t] -= 2**(probs[s]) * probs[s] + + return entropies + + def _exhaustive_entropy(self, unlabeled_sequence): + unlabeled_sequence = self._transform(unlabeled_sequence) + + T = len(unlabeled_sequence) + N = len(self._states) + + labellings = [[state] for state in self._states] + for t in range(T - 1): + current = labellings + labellings = [] + for labelling in current: + for state in self._states: + labellings.append(labelling + [state]) + + log_probs = [] + for labelling in labellings: + labeled_sequence = unlabeled_sequence[:] + for t, label in enumerate(labelling): + labeled_sequence[t] = (labeled_sequence[t][_TEXT], label) + lp = self.log_probability(labeled_sequence) + log_probs.append(lp) + normalisation = _log_add(*log_probs) + + #ps = zeros((T, N), float64) + #for labelling, lp in zip(labellings, log_probs): + #for t in range(T): + #ps[t, self._states.index(labelling[t])] += \ + # 2**(lp - normalisation) + + #for t in range(T): + #print 'prob[%d] =' % t, ps[t] + + entropy = 0 + for lp in log_probs: + lp -= normalisation + entropy -= 2**(lp) * lp + + return entropy + + def _exhaustive_point_entropy(self, unlabeled_sequence): + unlabeled_sequence = self._transform(unlabeled_sequence) + + T = len(unlabeled_sequence) + N = len(self._states) + + labellings = [[state] for state in self._states] + for t in range(T - 1): + current = labellings + labellings = [] + for labelling in current: + for state in self._states: + labellings.append(labelling + [state]) + + log_probs = [] + for labelling in labellings: + labelled_sequence = unlabeled_sequence[:] + for t, label in enumerate(labelling): + labelled_sequence[t] = (labelled_sequence[t][_TEXT], label) + lp = self.log_probability(labelled_sequence) + log_probs.append(lp) + + normalisation = _log_add(*log_probs) + + probabilities = _ninf_array((T,N)) + + for labelling, lp in zip(labellings, log_probs): + lp -= normalisation + for t, label in enumerate(labelling): + index = self._states.index(label) + probabilities[t, index] = _log_add(probabilities[t, index], lp) + + entropies = np.zeros(T, np.float64) + for t in range(T): + for s in range(N): + entropies[t] -= 2**(probabilities[t, s]) * probabilities[t, s] + + return entropies + + def _transitions_matrix(self): + """ Return a matrix of transition log probabilities. """ + trans_iter = (self._transitions[sj].logprob(si) + for sj in self._states + for si in self._states) + + transitions_logprob = np.fromiter(trans_iter, dtype=np.float64) + N = len(self._states) + return transitions_logprob.reshape((N, N)).T + + def _outputs_vector(self, symbol): + """ + Return a vector with log probabilities of emitting a symbol + when entering states. + """ + out_iter = (self._output_logprob(sj, symbol) for sj in self._states) + return np.fromiter(out_iter, dtype=np.float64) + + def _forward_probability(self, unlabeled_sequence): + """ + Return the forward probability matrix, a T by N array of + log-probabilities, where T is the length of the sequence and N is the + number of states. Each entry (t, s) gives the probability of being in + state s at time t after observing the partial symbol sequence up to + and including t. + + :param unlabeled_sequence: the sequence of unlabeled symbols + :type unlabeled_sequence: list + :return: the forward log probability matrix + :rtype: array + """ + T = len(unlabeled_sequence) + N = len(self._states) + alpha = _ninf_array((T, N)) + + transitions_logprob = self._transitions_matrix() + + # Initialization + symbol = unlabeled_sequence[0][_TEXT] + for i, state in enumerate(self._states): + alpha[0, i] = self._priors.logprob(state) + \ + self._output_logprob(state, symbol) + + # Induction + for t in range(1, T): + symbol = unlabeled_sequence[t][_TEXT] + output_logprob = self._outputs_vector(symbol) + + for i in range(N): + summand = alpha[t-1] + transitions_logprob[i] + alpha[t, i] = logsumexp2(summand) + output_logprob[i] + + return alpha + + def _backward_probability(self, unlabeled_sequence): + """ + Return the backward probability matrix, a T by N array of + log-probabilities, where T is the length of the sequence and N is the + number of states. Each entry (t, s) gives the probability of being in + state s at time t after observing the partial symbol sequence from t + .. T. + + :return: the backward log probability matrix + :rtype: array + :param unlabeled_sequence: the sequence of unlabeled symbols + :type unlabeled_sequence: list + """ + T = len(unlabeled_sequence) + N = len(self._states) + beta = _ninf_array((T, N)) + + transitions_logprob = self._transitions_matrix().T + + # initialise the backward values; + # "1" is an arbitrarily chosen value from Rabiner tutorial + beta[T-1, :] = np.log2(1) + + # inductively calculate remaining backward values + for t in range(T-2, -1, -1): + symbol = unlabeled_sequence[t+1][_TEXT] + outputs = self._outputs_vector(symbol) + + for i in range(N): + summand = transitions_logprob[i] + beta[t+1] + outputs + beta[t, i] = logsumexp2(summand) + + return beta + + def test(self, test_sequence, verbose=False, **kwargs): + """ + Tests the HiddenMarkovModelTagger instance. + + :param test_sequence: a sequence of labeled test instances + :type test_sequence: list(list) + :param verbose: boolean flag indicating whether training should be + verbose or include printed output + :type verbose: bool + """ + + def words(sent): + return [word for (word, tag) in sent] + + def tags(sent): + return [tag for (word, tag) in sent] + + def flatten(seq): + return list(itertools.chain(*seq)) + + test_sequence = self._transform(test_sequence) + predicted_sequence = list(imap(self._tag, imap(words, test_sequence))) + + if verbose: + for test_sent, predicted_sent in izip(test_sequence, predicted_sequence): + print('Test:', + ' '.join('%s/%s' % (token, tag) + for (token, tag) in test_sent)) + print() + print('Untagged:', + ' '.join("%s" % token for (token, tag) in test_sent)) + print() + print('HMM-tagged:', + ' '.join('%s/%s' % (token, tag) + for (token, tag) in predicted_sent)) + print() + print('Entropy:', + self.entropy([(token, None) for + (token, tag) in predicted_sent])) + print() + print('-' * 60) + + test_tags = flatten(imap(tags, test_sequence)) + predicted_tags = flatten(imap(tags, predicted_sequence)) + + acc = accuracy(test_tags, predicted_tags) + count = sum(len(sent) for sent in test_sequence) + print('accuracy over %d tokens: %.2f' % (count, acc * 100)) + + def __repr__(self): + return ('<HiddenMarkovModelTagger %d states and %d output symbols>' + % (len(self._states), len(self._symbols))) + + +class HiddenMarkovModelTrainer(object): + """ + Algorithms for learning HMM parameters from training data. These include + both supervised learning (MLE) and unsupervised learning (Baum-Welch). + + Creates an HMM trainer to induce an HMM with the given states and + output symbol alphabet. A supervised and unsupervised training + method may be used. If either of the states or symbols are not given, + these may be derived from supervised training. + + :param states: the set of state labels + :type states: sequence of any + :param symbols: the set of observation symbols + :type symbols: sequence of any + """ + def __init__(self, states=None, symbols=None): + self._states = (states if states else []) + self._symbols = (symbols if symbols else []) + + def train(self, labeled_sequences=None, unlabeled_sequences=None, + **kwargs): + """ + Trains the HMM using both (or either of) supervised and unsupervised + techniques. + + :return: the trained model + :rtype: HiddenMarkovModelTagger + :param labelled_sequences: the supervised training data, a set of + labelled sequences of observations + :type labelled_sequences: list + :param unlabeled_sequences: the unsupervised training data, a set of + sequences of observations + :type unlabeled_sequences: list + :param kwargs: additional arguments to pass to the training methods + """ + assert labeled_sequences or unlabeled_sequences + model = None + if labeled_sequences: + model = self.train_supervised(labeled_sequences, **kwargs) + if unlabeled_sequences: + if model: kwargs['model'] = model + model = self.train_unsupervised(unlabeled_sequences, **kwargs) + return model + + + def _baum_welch_step(self, sequence, model, symbol_to_number): + N = len(model._states) + M = len(model._symbols) + T = len(sequence) + + # compute forward and backward probabilities + alpha = model._forward_probability(sequence) + beta = model._backward_probability(sequence) + + # find the log probability of the sequence + lpk = logsumexp2(alpha[T-1]) + + A_numer = _ninf_array((N, N)) + B_numer = _ninf_array((N, M)) + A_denom = _ninf_array(N) + B_denom = _ninf_array(N) + + transitions_logprob = model._transitions_matrix().T + + for t in range(T): + symbol = sequence[t][_TEXT] # not found? FIXME + next_symbol = None + if t < T - 1: + next_symbol = sequence[t+1][_TEXT] # not found? FIXME + xi = symbol_to_number[symbol] + + next_outputs_logprob = model._outputs_vector(next_symbol) + alpha_plus_beta = alpha[t] + beta[t] + + if t < T - 1: + numer_add = transitions_logprob + next_outputs_logprob + \ + beta[t+1] + alpha[t].reshape(N, 1) + A_numer = np.logaddexp2(A_numer, numer_add) + A_denom = np.logaddexp2(A_denom, alpha_plus_beta) + else: + B_denom = np.logaddexp2(A_denom, alpha_plus_beta) + + B_numer[:,xi] = np.logaddexp2(B_numer[:,xi], alpha_plus_beta) + + return lpk, A_numer, A_denom, B_numer, B_denom + + def train_unsupervised(self, unlabeled_sequences, update_outputs=True, + **kwargs): + """ + Trains the HMM using the Baum-Welch algorithm to maximise the + probability of the data sequence. This is a variant of the EM + algorithm, and is unsupervised in that it doesn't need the state + sequences for the symbols. The code is based on 'A Tutorial on Hidden + Markov Models and Selected Applications in Speech Recognition', + Lawrence Rabiner, IEEE, 1989. + + :return: the trained model + :rtype: HiddenMarkovModelTagger + :param unlabeled_sequences: the training data, a set of + sequences of observations + :type unlabeled_sequences: list + + kwargs may include following parameters: + + :param model: a HiddenMarkovModelTagger instance used to begin + the Baum-Welch algorithm + :param max_iterations: the maximum number of EM iterations + :param convergence_logprob: the maximum change in log probability to + allow convergence + """ + + # create a uniform HMM, which will be iteratively refined, unless + # given an existing model + model = kwargs.get('model') + if not model: + priors = RandomProbDist(self._states) + transitions = DictionaryConditionalProbDist( + dict((state, RandomProbDist(self._states)) + for state in self._states)) + outputs = DictionaryConditionalProbDist( + dict((state, RandomProbDist(self._symbols)) + for state in self._states)) + model = HiddenMarkovModelTagger(self._symbols, self._states, + transitions, outputs, priors) + + self._states = model._states + self._symbols = model._symbols + + N = len(self._states) + M = len(self._symbols) + symbol_numbers = dict((sym, i) for i, sym in enumerate(self._symbols)) + + # update model prob dists so that they can be modified + # model._priors = MutableProbDist(model._priors, self._states) + + model._transitions = DictionaryConditionalProbDist( + dict((s, MutableProbDist(model._transitions[s], self._states)) + for s in self._states)) + + if update_outputs: + model._outputs = DictionaryConditionalProbDist( + dict((s, MutableProbDist(model._outputs[s], self._symbols)) + for s in self._states)) + + model.reset_cache() + + # iterate until convergence + converged = False + last_logprob = None + iteration = 0 + max_iterations = kwargs.get('max_iterations', 1000) + epsilon = kwargs.get('convergence_logprob', 1e-6) + + while not converged and iteration < max_iterations: + A_numer = _ninf_array((N, N)) + B_numer = _ninf_array((N, M)) + A_denom = _ninf_array(N) + B_denom = _ninf_array(N) + + logprob = 0 + for sequence in unlabeled_sequences: + sequence = list(sequence) + if not sequence: + continue + + (lpk, seq_A_numer, seq_A_denom, + seq_B_numer, seq_B_denom) = self._baum_welch_step(sequence, model, symbol_numbers) + + # add these sums to the global A and B values + for i in range(N): + A_numer[i] = np.logaddexp2(A_numer[i], seq_A_numer[i]-lpk) + B_numer[i] = np.logaddexp2(B_numer[i], seq_B_numer[i]-lpk) + + A_denom = np.logaddexp2(A_denom, seq_A_denom-lpk) + B_denom = np.logaddexp2(B_denom, seq_B_denom-lpk) + + logprob += lpk + + # use the calculated values to update the transition and output + # probability values + for i in range(N): + logprob_Ai = A_numer[i] - A_denom[i] + logprob_Bi = B_numer[i] - B_denom[i] + + # We should normalize all probabilities (see p.391 Huang et al) + # Let sum(P) be K. + # We can divide each Pi by K to make sum(P) == 1. + # Pi' = Pi/K + # log2(Pi') = log2(Pi) - log2(K) + logprob_Ai -= logsumexp2(logprob_Ai) + logprob_Bi -= logsumexp2(logprob_Bi) + + # update output and transition probabilities + si = self._states[i] + + for j in range(N): + sj = self._states[j] + model._transitions[si].update(sj, logprob_Ai[j]) + + if update_outputs: + for k in range(M): + ok = self._symbols[k] + model._outputs[si].update(ok, logprob_Bi[k]) + + # Rabiner says the priors don't need to be updated. I don't + # believe him. FIXME + + # test for convergence + if iteration > 0 and abs(logprob - last_logprob) < epsilon: + converged = True + + print('iteration', iteration, 'logprob', logprob) + iteration += 1 + last_logprob = logprob + + return model + + def train_supervised(self, labelled_sequences, estimator=None): + """ + Supervised training maximising the joint probability of the symbol and + state sequences. This is done via collecting frequencies of + transitions between states, symbol observations while within each + state and which states start a sentence. These frequency distributions + are then normalised into probability estimates, which can be + smoothed if desired. + + :return: the trained model + :rtype: HiddenMarkovModelTagger + :param labelled_sequences: the training data, a set of + labelled sequences of observations + :type labelled_sequences: list + :param estimator: a function taking + a FreqDist and a number of bins and returning a CProbDistI; + otherwise a MLE estimate is used + """ + + # default to the MLE estimate + if estimator is None: + estimator = lambda fdist, bins: MLEProbDist(fdist) + + # count occurrences of starting states, transitions out of each state + # and output symbols observed in each state + known_symbols = set(self._symbols) + known_states = set(self._states) + + starting = FreqDist() + transitions = ConditionalFreqDist() + outputs = ConditionalFreqDist() + for sequence in labelled_sequences: + lasts = None + for token in sequence: + state = token[_TAG] + symbol = token[_TEXT] + if lasts is None: + starting[state] += 1 + else: + transitions[lasts][state] += 1 + outputs[state][symbol] += 1 + lasts = state + + # update the state and symbol lists + if state not in known_states: + self._states.append(state) + known_states.add(state) + + if symbol not in known_symbols: + self._symbols.append(symbol) + known_symbols.add(symbol) + + # create probability distributions (with smoothing) + N = len(self._states) + pi = estimator(starting, N) + A = ConditionalProbDist(transitions, estimator, N) + B = ConditionalProbDist(outputs, estimator, len(self._symbols)) + + return HiddenMarkovModelTagger(self._symbols, self._states, A, B, pi) + + +def _ninf_array(shape): + res = np.empty(shape, np.float64) + res.fill(-np.inf) + return res + + +def logsumexp2(arr): + max_ = arr.max() + return np.log2(np.sum(2**(arr - max_))) + max_ + + +def _log_add(*values): + """ + Adds the logged values, returning the logarithm of the addition. + """ + x = max(values) + if x > -np.inf: + sum_diffs = 0 + for value in values: + sum_diffs += 2**(value - x) + return x + np.log2(sum_diffs) + else: + return x + + +def _create_hmm_tagger(states, symbols, A, B, pi): + def pd(values, samples): + d = dict(zip(samples, values)) + return DictionaryProbDist(d) + + def cpd(array, conditions, samples): + d = {} + for values, condition in zip(array, conditions): + d[condition] = pd(values, samples) + return DictionaryConditionalProbDist(d) + + A = cpd(A, states, states) + B = cpd(B, states, symbols) + pi = pd(pi, states) + return HiddenMarkovModelTagger(symbols=symbols, states=states, + transitions=A, outputs=B, priors=pi) + + +def _market_hmm_example(): + """ + Return an example HMM (described at page 381, Huang et al) + """ + states = ['bull', 'bear', 'static'] + symbols = ['up', 'down', 'unchanged'] + A = np.array([[0.6, 0.2, 0.2], [0.5, 0.3, 0.2], [0.4, 0.1, 0.5]], np.float64) + B = np.array([[0.7, 0.1, 0.2], [0.1, 0.6, 0.3], [0.3, 0.3, 0.4]], np.float64) + pi = np.array([0.5, 0.2, 0.3], np.float64) + + model = _create_hmm_tagger(states, symbols, A, B, pi) + return model, states, symbols + + +def demo(): + # demonstrates HMM probability calculation + + print() + print("HMM probability calculation demo") + print() + + model, states, symbols = _market_hmm_example() + + print('Testing', model) + + for test in [['up', 'up'], ['up', 'down', 'up'], + ['down'] * 5, ['unchanged'] * 5 + ['up']]: + + sequence = [(t, None) for t in test] + + print('Testing with state sequence', test) + print('probability =', model.probability(sequence)) + print('tagging = ', model.tag([word for (word,tag) in sequence])) + print('p(tagged) = ', model.probability(sequence)) + print('H = ', model.entropy(sequence)) + print('H_exh = ', model._exhaustive_entropy(sequence)) + print('H(point) = ', model.point_entropy(sequence)) + print('H_exh(point)=', model._exhaustive_point_entropy(sequence)) + print() + +def load_pos(num_sents): + from nltk.corpus import brown + + sentences = brown.tagged_sents(categories='news')[:num_sents] + + tag_re = re.compile(r'[*]|--|[^+*-]+') + tag_set = set() + symbols = set() + + cleaned_sentences = [] + for sentence in sentences: + for i in range(len(sentence)): + word, tag = sentence[i] + word = word.lower() # normalize + symbols.add(word) # log this word + # Clean up the tag. + tag = tag_re.match(tag).group() + tag_set.add(tag) + sentence[i] = (word, tag) # store cleaned-up tagged token + cleaned_sentences += [sentence] + + return cleaned_sentences, list(tag_set), list(symbols) + +def demo_pos(): + # demonstrates POS tagging using supervised training + + print() + print("HMM POS tagging demo") + print() + + print('Training HMM...') + labelled_sequences, tag_set, symbols = load_pos(20000) + trainer = HiddenMarkovModelTrainer(tag_set, symbols) + hmm = trainer.train_supervised(labelled_sequences[10:], + estimator=lambda fd, bins: LidstoneProbDist(fd, 0.1, bins)) + + print('Testing...') + hmm.test(labelled_sequences[:10], verbose=True) + +def _untag(sentences): + unlabeled = [] + for sentence in sentences: + unlabeled.append([(token[_TEXT], None) for token in sentence]) + return unlabeled + +def demo_pos_bw(test=10, supervised=20, unsupervised=10, verbose=True, + max_iterations=5): + # demonstrates the Baum-Welch algorithm in POS tagging + + print() + print("Baum-Welch demo for POS tagging") + print() + + print('Training HMM (supervised, %d sentences)...' % supervised) + + sentences, tag_set, symbols = load_pos(test + supervised + unsupervised) + + symbols = set() + for sentence in sentences: + for token in sentence: + symbols.add(token[_TEXT]) + + trainer = HiddenMarkovModelTrainer(tag_set, list(symbols)) + hmm = trainer.train_supervised(sentences[test:test+supervised], + estimator=lambda fd, bins: LidstoneProbDist(fd, 0.1, bins)) + + hmm.test(sentences[:test], verbose=verbose) + + print('Training (unsupervised, %d sentences)...' % unsupervised) + # it's rather slow - so only use 10 samples by default + unlabeled = _untag(sentences[test+supervised:]) + hmm = trainer.train_unsupervised(unlabeled, model=hmm, + max_iterations=max_iterations) + hmm.test(sentences[:test], verbose=verbose) + +def demo_bw(): + # demo Baum Welch by generating some sequences and then performing + # unsupervised training on them + + print() + print("Baum-Welch demo for market example") + print() + + model, states, symbols = _market_hmm_example() + + # generate some random sequences + training = [] + import random + rng = random.Random() + rng.seed(0) + for i in range(10): + item = model.random_sample(rng, 5) + training.append([(i[0], None) for i in item]) + + # train on those examples, starting with the model that generated them + trainer = HiddenMarkovModelTrainer(states, symbols) + hmm = trainer.train_unsupervised(training, model=model, + max_iterations=1000) + + +if __name__ == "__main__": + import doctest + doctest.testmod(optionflags=doctest.NORMALIZE_WHITESPACE) + +
--- a/hmm/semiSup.py Mon Mar 09 16:48:09 2020 +0000 +++ b/hmm/semiSup.py Mon Mar 09 17:35:28 2020 +0000 @@ -1,38 +1,19 @@ '''Exploring the claim that a small dictionary can seed an otherwise unsupervised HMM to learn a decent POS-tagger''' -import nltk, random, itertools +import nltk, random from nltk.corpus import brown -from nltk.tag.hmm import HiddenMarkovModelTagger, HiddenMarkovModelTrainer, logsumexp2 +from nltk.tag.hmm import HiddenMarkovModelTagger, HiddenMarkovModelTrainer from nltk.probability import FreqDist,ConditionalFreqDist from nltk.probability import MLEProbDist, RandomProbDist, DictionaryConditionalProbDist -def totLogProb(self,sequences): - N = len(self._states) - M = len(self._symbols) - logProb = 0 - for sequence in sequences: - T = len(sequence) - # compute forward and backward probabilities - alpha = self._forward_probability(sequence) - beta = self._backward_probability(sequence) - # find the log probability of the sequence - logProb += logsumexp2(alpha[T-1]) - return logProb +trainTagsPercent=0.99 +trainHMMPercent=0.9 +knownWordsPercent=0.99 -HiddenMarkovModelTagger.totLogProb=totLogProb - -trainTagsPercent=1.0 -trainHMMPercent=0.9 -knownWordsPercent=1.0 - -SST=SSW='<s>' -EST=ESW='</s>' -SS=[(SSW,SST)] -ES=[(ESW,EST)] TAGSETS={ 'univ': [u'ADJ', u'ADP', u'ADV', u'CONJ', u'DET', u'NOUN', u'NUM', - u'PRON', u'PRT', u'VERB', u'X', u'.',SST,EST], + u'PRON', u'PRT', u'VERB', u'X', u'.'], 'brown': [u"ABL", u"ABN", u"ABX", u"AP", u"AP$", u"AP+AP", u"AT", u"BE", u"BED", u"BED*", u"BEDZ", u"BEDZ*", u"BEG", u"BEM", u"BEM*", @@ -78,13 +59,9 @@ TAGSETS['penn']=TAGSETS['upenn'] def setup(cat='news',tagset='brown',corpus=brown): - return ([list(itertools.chain(iter(SS), - ((word.lower(),tag) for (word,tag) in s) - ,iter(ES))) + return ([[(word.lower(),tag) for (word,tag) in s] for s in corpus.tagged_sents(categories=cat,tagset=tagset)], - list(itertools.chain(iter(SS), iter(ES), - ((word.lower(),tag) for (word,tag) in - corpus.tagged_words(categories=cat,tagset=tagset)))), + [(word.lower(),tag) for (word,tag) in corpus.tagged_words(categories=cat,tagset=tagset)], TAGSETS[tagset]) def notCurrent(s,missList): @@ -113,21 +90,17 @@ #wToT=ConditionalFreqDist(tagged) tToW=ConditionalFreqDist((t,w) for (w,t) in tagged) #print len(tToW[u'ADV']) - dd=dict((tag,(lambda wl,p=percent:\ + return dict((tag,(lambda wl,p=percent:\ wl[:int(p*len(wl))])( - sorted(tToW[tag].items(),key=lambda (k,v):v,reverse=True))) + sorted(tToW[tag].items(),key=lambda (k,v):v,reverse=True))) for tag in tToW.keys()) - return dd (tagged_s,tagged_w,tagset)=setup(tagset='universal') -true_tagged_w=tagged_w[2:] # not SS, SE - -wordTokens=FreqDist(word for word,tag in true_tagged_w) -wordsAsSuch=list(wordTokens.keys()) +wordTokens=FreqDist(word for word,tag in tagged_w) print len(wordTokens), wordTokens.N() -(trainTags,trainHMM,testHMM)=splitData(true_tagged_w,trainTagsPercent, +(trainTags,trainHMM,testHMM)=splitData(tagged_w,trainTagsPercent, tagged_s,trainHMMPercent) knownWords=pickWords(trainTags,knownWordsPercent) @@ -163,8 +136,8 @@ def words(self): return self._words - def buildPD(self,allTokens): - self._sfd=SubsetFreqDist(self._wordsAndCounts,allTokens) + def buildPD(self,tokens): + self._sfd=SubsetFreqDist(self._wordsAndCounts,tokens) self._pd=MLEProbDist(self._sfd) def getSFD(self): @@ -173,13 +146,6 @@ def getPD(self): return self._pd -class FixedTag(Tag): - def buildPD(self): - self._pd=MLEProbDist(FreqDist(dict(self._wordsAndCounts))) - - def getSFD(self): - raise NotImplementedError("not implemented for this subclass") - tags=dict((tagName,Tag(tagName,wl)) for tagName,wl in knownWords.items()) kws=dict((tagName,tag.words()) for tagName,tag in tags.items()) @@ -187,18 +153,13 @@ ((lambda i:False if not i[1] else i) (((tagset[i],tagset[j]), kws[tagset[i]].intersection(kws[tagset[j]])),) - for i in xrange(0,len(tagset)-2) - for j in xrange(i+1,len(tagset)-2)))) + for i in xrange(0,len(tagset)) + for j in xrange(i+1,len(tagset))))) for tag in tags.values(): tag.buildPD(wordTokens) -tags[SST]=FixedTag(SST,[(SSW,1)]) -tags[SST].buildPD() -tags[EST]=FixedTag(EST,[(ESW,1)]) -tags[EST].buildPD() - -priors = MLEProbDist(FreqDist(dict((tag,1 if tag==SST else 0) for tag in tagset))) +priors = RandomProbDist(tagset) transitions = DictionaryConditionalProbDist( dict((state, RandomProbDist(tagset)) @@ -208,16 +169,16 @@ dict((state, tags[state].getPD()) for state in tagset)) -model = HiddenMarkovModelTagger(wordsAsSuch, tagset, +model = HiddenMarkovModelTagger(wordTokens, tagset, transitions, outputs, priors) -print "model", model.evaluate(testHMM), model.totLogProb(testHMM) +print model.evaluate(testHMM) -nm=HiddenMarkovModelTrainer(states=tagset,symbols=wordsAsSuch) +nm=HiddenMarkovModelTrainer(states=tagset,symbols=wordTokens) # Note that contrary to naive reading of the documentation, # train_unsupervised expects a sequence of sequences of word/tag pairs, # it just ignores the tags -nnm=nm.train_unsupervised(trainHMM,True,model=model,max_iterations=10,testMe=testHMM) +nnm=nm.train_unsupervised(trainHMM,model=model,max_iterations=15,updateOutputs=False) -print nnm.totLogProb(testHMM) +print nnm.evaluate(testHMM)
--- a/hmm/tinySup.py Mon Mar 09 16:48:09 2020 +0000 +++ b/hmm/tinySup.py Mon Mar 09 17:35:28 2020 +0000 @@ -12,9 +12,9 @@ [('<s>','<s>'),('run','V'),('the','D'),('sheep','N'),('</s>','</s>')]] taglists=[('<s>',[('<s>',1),('the',0),('sheep',0),('run',0),('</s>',0)]), - ('D',[('the',1),('sheep',0),('run',0),('<s>',0),('</s>',0)]), - ('N',[('the',0),('sheep',.5),('run',.5),('<s>',0),('</s>',0)]), - ('V',[('the',0),('sheep',.5),('run',.5),('<s>',0),('</s>',0)]), + ('D',[('the',.8),('sheep',.1),('run',.1),('<s>',0),('</s>',0)]), + ('N',[('the',.2),('sheep',.4),('run',.4),('<s>',0),('</s>',0)]), + ('V',[('the',.2),('sheep',.4),('run',.4),('<s>',0),('</s>',0)]), ('</s>',[('<s>',0),('the',0),('sheep',0),('run',0),('</s>',1)])] tagdict=dict((k,MLEProbDist(FreqDist(dict(v)))) for k,v in taglists) @@ -48,7 +48,7 @@ # Note that contrary to naive reading of the documentation, # train_unsupervised expects a sequence of sequences of word/tag pairs, # it just ignores the tags -nnm=nm.train_unsupervised(sents,model=model,max_iterations=10,updateOutputs=False) +nnm=nm.train_unsupervised(sents,model=model,max_iterations=15,updateOutputs=False) for tag in tagset: if tag=='</s>':