classBatchGenerator(object): def__init__(self, text, batch_size, num_unrollings): self._text = text self._text_size = len(text) self._batch_size = batch_size self._num_unrollings = num_unrollings segment = self._text_size // batch_size self._cursor = [ offset * segment for offset inrange(batch_size)] self._last_batch = self._next_batch() def_next_batch(self): """Generate a single batch from the current cursor position in the data.""" batch = np.zeros(shape=(self._batch_size, vocabulary_size), dtype=np.float) for b inrange(self._batch_size): batch[b, char2id(self._text[self._cursor[b]])] = 1.0 self._cursor[b] = (self._cursor[b] + 1) % self._text_size return batch defnext(self): """Generate the next array of batches from the data. The array consists of the last batch of the previous array, followed by num_unrollings new ones. """ batches = [self._last_batch] for step inrange(self._num_unrollings): batches.append(self._next_batch()) self._last_batch = batches[-1] return batches
defcharacters(probabilities): """Turn a 1-hot encoding or a probability distribution over the possible characters back into its (most likely) character representation.""" return [id2char(c) for c in np.argmax(probabilities, 1)]
defbatches2string(batches): """Convert a sequence of batches back into their (most likely) string representation.""" s = [''] * batches[0].shape[0] for b in batches: s = [''.join(x) for x inzip(s, characters(b))] return s
deflogprob(predictions, labels): """Log-probability of the true labels in a predicted batch.""" predictions[predictions < 1e-10] = 1e-10 return np.sum(np.multiply(labels, -np.log(predictions))) / labels.shape[0]
defsample_distribution(distribution): """Sample one element from a distribution assumed to be an array of normalized probabilities. """ r = random.uniform(0, 1) s = 0 for i inrange(len(distribution)): s += distribution[i] if s >= r: return i returnlen(distribution) - 1
defsample(prediction): """Turn a (column) prediction into 1-hot encoded samples.""" p = np.zeros(shape=[1, vocabulary_size], dtype=np.float) p[0, sample_distribution(prediction[0])] = 1.0 return p
defrandom_distribution(): """Generate a random column of probabilities.""" b = np.random.uniform(0.0, 1.0, size=[1, vocabulary_size]) return b/np.sum(b, 1)[:,None]
graph = tf.Graph() with graph.as_default(): # Parameters: # Input gate: input, previous output, and bias. ix = tf.Variable(tf.truncated_normal([vocabulary_size, num_nodes], -0.1, 0.1)) im = tf.Variable(tf.truncated_normal([num_nodes, num_nodes], -0.1, 0.1)) ib = tf.Variable(tf.zeros([1, num_nodes])) # Forget gate: input, previous output, and bias. fx = tf.Variable(tf.truncated_normal([vocabulary_size, num_nodes], -0.1, 0.1)) fm = tf.Variable(tf.truncated_normal([num_nodes, num_nodes], -0.1, 0.1)) fb = tf.Variable(tf.zeros([1, num_nodes])) # Memory cell: input, state and bias. cx = tf.Variable(tf.truncated_normal([vocabulary_size, num_nodes], -0.1, 0.1)) cm = tf.Variable(tf.truncated_normal([num_nodes, num_nodes], -0.1, 0.1)) cb = tf.Variable(tf.zeros([1, num_nodes])) # Output gate: input, previous output, and bias. ox = tf.Variable(tf.truncated_normal([vocabulary_size, num_nodes], -0.1, 0.1)) om = tf.Variable(tf.truncated_normal([num_nodes, num_nodes], -0.1, 0.1)) ob = tf.Variable(tf.zeros([1, num_nodes])) # Variables saving state across unrollings. saved_output = tf.Variable(tf.zeros([batch_size, num_nodes]), trainable=False) saved_state = tf.Variable(tf.zeros([batch_size, num_nodes]), trainable=False) # Classifier weights and biases. w = tf.Variable(tf.truncated_normal([num_nodes, vocabulary_size], -0.1, 0.1)) b = tf.Variable(tf.zeros([vocabulary_size])) # Definition of the cell computation. deflstm_cell(i, o, state): """Create a LSTM cell. See e.g.: http://arxiv.org/pdf/1402.1128v1.pdf Note that in this formulation, we omit the various connections between the previous state and the gates.""" input_gate = tf.sigmoid(tf.matmul(i, ix) + tf.matmul(o, im) + ib) forget_gate = tf.sigmoid(tf.matmul(i, fx) + tf.matmul(o, fm) + fb) update = tf.matmul(i, cx) + tf.matmul(o, cm) + cb state = forget_gate * state + input_gate * tf.tanh(update) output_gate = tf.sigmoid(tf.matmul(i, ox) + tf.matmul(o, om) + ob) return output_gate * tf.tanh(state), state
# Input data. train_data = list() for _ inrange(num_unrollings + 1): train_data.append( tf.placeholder(tf.float32, shape=[batch_size,vocabulary_size])) train_inputs = train_data[:num_unrollings] train_labels = train_data[1:] # labels are inputs shifted by one time step.
# Unrolled LSTM loop. outputs = list() output = saved_output state = saved_state for i in train_inputs: output, state = lstm_cell(i, output, state) outputs.append(output)
# State saving across unrollings. with tf.control_dependencies([saved_output.assign(output), saved_state.assign(state)]): # Classifier. logits = tf.nn.xw_plus_b(tf.concat(0, outputs), w, b) loss = tf.reduce_mean( tf.nn.softmax_cross_entropy_with_logits( logits, tf.concat(0, train_labels)))
deflstm_cell(i, o, state): """Create a LSTM cell. See e.g.: http://arxiv.org/pdf/1402.1128v1.pdf Note that in this formulation, we omit the various connections between the previous state and the gates.""" all_gates = tf.matmul(i, ifcox) + tf.matmul(o, ifcom) + ifcob input_gate = tf.sigmoid(all_gates[:, 0:num_nodes]) forget_gate = tf.sigmoid(all_gates[:, num_nodes:2*num_nodes]) update = all_gates[:, 2*num_nodes:3*num_nodes] state = forget_gate * state + input_gate * tf.tanh(update) output_gate = tf.sigmoid(all_gates[:, 3*num_nodes:])