Hello everyone !! In this article, we will build a model to predict the next word in a poem writing using PyTorch. First, we will learn about RNN and LSTM and how they work. Then we will create our model. In the first step, we load our data and pre-process it. Then we will use PyTorch to train the model and save it. After that, we will make predictions from that model by giving it a small initial text and by using that it will generate the complete paragraph or a stanza of a poem.
 

What is RNN?

In machine learning, simple problems like classifying an image of a dog or cat can be solved by simply training it on the set of data and by using a classifier. But what if our problem is more complex for example, we have to predict the next word in the paragraph. So if we examine this problem closely we will find that we humans don't solve this problem by only using our existing knowledge of language and grammar. In this type of problem, we use the previous word of the paragraph and context of the paragraph or poem to predict the next word.

Traditional neural networks can't do this because they are trained on a fixed set of data and then use it to make predictions. RNN is expert in solving this type of problem. RNN stands for the Recurrent Neural Network. We can think of RNN as a neural network with a loop in it. It passes the information of one state to the next state. So information persists in the process and we can use that to understand the previous context and make accurate predictions.


 

What is LSTM?

I know you must be thinking if RNN can solve sequences of data problems then why do we need LSTM? To answer this question let’s have a look at the below-given examples.

Example 1:

"Birds live in the nest." Here it is easy to predict the word "nest" because we have the previous context of birds and RNN will work fine in this case.

Example 2:

"I grew up in India (......blah blah blah…..) so I can speak Hindi" so the task of predicting the word "Hindi" is difficult for RNN because here the gap between context is large. By looking at the line "I can speak ..." we can't predict the language we will need an extra context of India. So here we will need some long term dependency on our paragraph so we can understand the context.

For this purpose, we use LSTM(Long Short Term Memory). As the name suggests they have long term and short term memory(gate) and both are used in the conjunction to make predictions. If we talk about the architecture of the LSTM they contain 4 gates namely learn gate, forget gate, remember gate, use a gate. To keep this article simple and hands-on I am not going into theory of architecture of LSTM. But maybe we will talk about it in upcoming articles. (maybe in the next one 😉).


 

Let's Build Our Model.

So now we are done with the theory let's start the interesting part - Building our model.

Loading and Preprocessing the Data

I will use a poetry dataset from Kaggle. It has a total of 15,000 poetry so it will be enough for our model to learn and create patterns. Now let's start loading it in our notebook.

1. First import the libraries.

import numpy as np
import torch
from torch import nn
import torch.nn.functional as F

2. Now load data from the text file.

# open text file and read in data as `text`
with open('/data/poems_data.txt', 'r') as f:
    text = f.read()

3. We can verify our data by printing the first 100 characters.

text[:100]

4. As we know our neural network does not understand text so we have to convert our text data to the integer. For this purpose we can create a token dictionary and map character to integer and vice versa.

# encode the text and map each character to an integer and vice versa

# we create two dictionaries:
# 1. int2char, which maps integers to characters
# 2. char2int, which maps characters to unique integers
chars = tuple(set(text))
int2char = dict(enumerate(chars))
char2int = {ch: ii for ii, ch in int2char.items()}

# encode the text
encoded = np.array([char2int[ch] for ch in text])

5. One hot encoding is used to represent the character. Like if we have three characters a,b,c then we can represent them in this way [1,0,0], [0,1,0], [0,0,1] here we use 1 to represent that character and all others will be 0. For our use case we have many characters and symbols so our one-hot vector will be long. But it's fine.

def one_hot_encode(arr, n_labels):
    
    # Initialize the the encoded array
    one_hot = np.zeros((arr.size, n_labels), dtype=np.float32)
    
    # Fill the appropriate elements with ones
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.
    
    # Finally reshape it to get back to the original array
    one_hot = one_hot.reshape((*arr.shape, n_labels))
    
    return one_hot

Now test it in this way.

# check that the function works as expected
test_seq = np.array([[0, 5, 1]])
one_hot = one_hot_encode(test_seq, 8)

print(one_hot)

6. Let’s create batches for our model as it is a crucial part. In this we will choose a “batch_size” which is the number of rows and then “sequence_length” which is the number of columns in each batch.

def get_batches(arr, batch_size, seq_length):
    '''Create a generator that returns batches of size
       batch_size x seq_length from arr.
       
       Arguments
       ---------
       arr: Array you want to make batches from
       batch_size: Batch size, the number of sequences per batch
       seq_length: Number of encoded chars in a sequence
    '''
    
    batch_size_total = batch_size * seq_length
    # total number of batches we can make
    n_batches = len(arr)//batch_size_total
    
    # Keep only enough characters to make full batches
    arr = arr[:n_batches * batch_size_total]
    # Reshape into batch_size rows
    arr = arr.reshape((batch_size, -1))
    
    # iterate through the array, one sequence at a time
    for n in range(0, arr.shape[1], seq_length):
        # The features
        x = arr[:, n:n+seq_length]
        # The targets, shifted by one
        y = np.zeros_like(x)
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+seq_length]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y

7.Now we can check if GPU is available. (If GPU is not available keep the epochs number low)

# check if GPU is available
    train_on_gpu = torch.cuda.is_available()
    if(train_on_gpu):
        print('Training on GPU!')
    else: 
        print('No GPU available, training on CPU; consider making n_epochs very small.')

8. Here we have created a class called CharRNN. It's our class for model. In the “init method" we have to define the layers for our model. Here we are using two LSTM layers. We are also using dropout(it helps in avoiding overfitting). For the output we use a simple linear layer.

class CharRNN(nn.Module):
        
        def __init__(self, tokens, n_hidden=256, n_layers=2,
                                   drop_prob=0.5, lr=0.001):
            super().__init__()
            self.drop_prob = drop_prob
            self.n_layers = n_layers
            self.n_hidden = n_hidden
            self.lr = lr
            
            # creating character dictionaries
            self.chars = tokens
            self.int2char = dict(enumerate(self.chars))
            self.char2int = {ch: ii for ii, ch in self.int2char.items()}
            
            #lstm layer
            self.lstm=nn.LSTM(len(self.chars),n_hidden,n_layers,
                              dropout=drop_prob,batch_first=True)
            
            #dropout layer
            self.dropout=nn.Dropout(drop_prob)
            
            #output layer
            self.fc=nn.Linear(n_hidden,len(self.chars))
    
        
        def forward(self, x, hidden):
            ''' Forward pass through the network. 
                These inputs are x, and the hidden/cell state `hidden`. '''
            ## Get the outputs and the new hidden state from the lstm
            r_output, hidden = self.lstm(x, hidden)
            
            ## pass through a dropout layer
            out = self.dropout(r_output)
            
            # Stack up LSTM outputs using view
            # you may need to use contiguous to reshape the output
            out = out.contiguous().view(-1, self.n_hidden)
            
            ## put x through the fully-connected layer
            out = self.fc(out)
            return out, hidden
        
        
        def init_hidden(self, batch_size):
            ''' Initializes hidden state '''
            # Create two new tensors with sizes n_layers x batch_size x n_hidden,
            # initialized to zero, for hidden state and cell state of LSTM
            weight = next(self.parameters()).data
            
            if (train_on_gpu):
                hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(),
                      weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda())
            else:
                hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_(),
                          weight.new(self.n_layers, batch_size, self.n_hidden).zero_())
            
            return hidden

9. Our model is ready and it's time to train it. For training we have to use optimizer and loss function. We simply calculate the loss after each step then the optimizer step function back propagates it and weights are changed accordingly. Loss will slowly decrease and it means that the model is getting better.

We have calculated validation loss between the epochs to decide if our model is under-fitting or over-fitting.

def train(net, data, epochs=10, batch_size=10, seq_length=50, lr=0.001, clip=5, val_frac=0.1, print_every=10):
        ''' Training a network 
        
            Arguments
            ---------
            
            net: CharRNN network
            data: text data to train the network
            epochs: Number of epochs to train
            batch_size: Number of mini-sequences per mini-batch, aka batch size
            seq_length: Number of character steps per mini-batch
            lr: learning rate
            clip: gradient clipping
            val_frac: Fraction of data to hold out for validation
            print_every: Number of steps for printing training and validation loss
        
        '''
        net.train()
        
        opt = torch.optim.Adam(net.parameters(), lr=lr)
        criterion = nn.CrossEntropyLoss()
        
        # create training and validation data
        val_idx = int(len(data)*(1-val_frac))
        data, val_data = data[:val_idx], data[val_idx:]
        
        if(train_on_gpu):
            net.cuda()
        
        counter = 0
        n_chars = len(net.chars)
        for e in range(epochs):
            # initialize hidden state
            h = net.init_hidden(batch_size)
            
            for x, y in get_batches(data, batch_size, seq_length):
                counter += 1
                
                # One-hot encode our data and make them Torch tensors
                x = one_hot_encode(x, n_chars)
                inputs, targets = torch.from_numpy(x), torch.from_numpy(y)
                
                if(train_on_gpu):
                    inputs, targets = inputs.cuda(), targets.cuda()
    
                # Creating new variables for the hidden state, otherwise
                # we'd backprop through the entire training history
                h = tuple([each.data for each in h])
    
                # zero accumulated gradients
                net.zero_grad()
                
                # get the output from the model
                output, h = net(inputs, h)
                
                # calculate the loss and perform backprop
                loss = criterion(output, targets.view(batch_size*seq_length).long())
                loss.backward()
                # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
                nn.utils.clip_grad_norm_(net.parameters(), clip)
                opt.step()
                
                # loss stats
                if counter % print_every == 0:
                    # Get validation loss
                    val_h = net.init_hidden(batch_size)
                    val_losses = []
                    net.eval()
                    for x, y in get_batches(val_data, batch_size, seq_length):
                        # One-hot encode our data and make them Torch tensors
                        x = one_hot_encode(x, n_chars)
                        x, y = torch.from_numpy(x), torch.from_numpy(y)
                        
                        # Creating new variables for the hidden state, otherwise
                        # we'd backprop through the entire training history
                        val_h = tuple([each.data for each in val_h])
                        
                        inputs, targets = x, y
                        if(train_on_gpu):
                            inputs, targets = inputs.cuda(), targets.cuda()
    
                        output, val_h = net(inputs, val_h)
                        val_loss = criterion(output, targets.view(batch_size*seq_length).long())
                    
                        val_losses.append(val_loss.item())
                    
                    net.train() # reset to train mode after iterationg through validation data
                    
                    print("Epoch: {}/{}...".format(e+1, epochs),
                          "Step: {}...".format(counter),
                          "Loss: {:.4f}...".format(loss.item()),
                          "Val Loss: {:.4f}".format(np.mean(val_losses)))

now train it in the following way.

# define and print the net
n_hidden = 512
n_layers = 2

net = CharRNN(chars, n_hidden, n_layers)
print(net)
batch_size = 128
seq_length = 100
n_epochs =  10# start small if you are just testing initial behavior

# train the model
train(net, encoded, epochs=n_epochs, batch_size=batch_size, seq_length=seq_length, lr=0.001, print_every=10)

10. We can save the model in the following way.

# change the name, for saving multiple files
model_name = 'poem_4_epoch.net'

checkpoint = {'n_hidden': net.n_hidden,
              'n_layers': net.n_layers,
              'state_dict': net.state_dict(),
              'tokens': net.chars}

with open(model_name, 'wb') as f:
    torch.save(checkpoint, f)

11. As we can see that our model is trained, we'll use a “sample method” to make predictions about next characters! To sample, we pass in a character and have the network predict the next character. Then we take that character, pass it back in, and get another predicted character. Just keep doing this and tada you are up with a good paragraph! Here “top k sample” is just the number of letters that our model will predict and use the most relevant one from it.

def predict(net, char, h=None, top_k=None):
        ''' Given a character, predict the next character.
            Returns the predicted character and the hidden state.
        '''
        
        # tensor inputs
        x = np.array([[net.char2int[char]]])
        x = one_hot_encode(x, len(net.chars))
        inputs = torch.from_numpy(x)
        
        if(train_on_gpu):
            inputs = inputs.cuda()
        
        # detach hidden state from history
        h = tuple([each.data for each in h])
        # get the output of the model
        out, h = net(inputs, h)

        # get the character probabilities
        p = F.softmax(out, dim=1).data
        if(train_on_gpu):
            p = p.cpu() # move to cpu
        
        # get top characters
        if top_k is None:
            top_ch = np.arange(len(net.chars))
        else:
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.numpy().squeeze()
        
        # select the likely next character with some element of randomness
        p = p.numpy().squeeze()
        char = np.random.choice(top_ch, p=p/p.sum())
        
        # return the encoded value of the predicted char and the hidden state
        return net.int2char[char], h
def sample(net, size, prime='The', top_k=None):
        
    if(train_on_gpu):
        net.cuda()
    else:
        net.cpu()
    
    net.eval() # eval mode
    
    # First off, run through the prime characters
    chars = [ch for ch in prime]
    h = net.init_hidden(1)
    for ch in prime:
        char, h = predict(net, ch, h, top_k=top_k)

    chars.append(char)
    
    # Now pass in the previous character and get a new one
    for ii in range(size):
        char, h = predict(net, chars[-1], h, top_k=top_k)
        chars.append(char)

    return ''.join(chars)

12. Let’s use this sample method to make predictions.

print(sample(net, 500, prime='christmas', top_k=2))

and output will look something like this.

christmas a son of this
the sun wants the street of the stars, and the way the way
they went and too man and the star of the words
of a body of a street and the strange shoulder of the sky
and the sun, an end on the sun and the sun and so to the stars are stars
and the words of the water and the streets of the world
to see them to start a posture of the streets
on the street of the streets, and the sun and soul of the station
and so too too the world of a sound and stranger and to the world
to the sun a

As we can see our model was able to generate some good lines. Content doesn't make a lot of sense but it was able to generate some lines with correct grammar. If we can train it for some more time it can perform even better.

Conclusion

I hope you have found this article helpful. We have built our first poem model using PyTorch. You can also use this model to draft an article or to write a blog by using a different data set. If you have any queries or suggestions feel free to post it in the comment section below or contact me at yash@gkmit.co , I will be really glad to assist you.

Thanks for reading :)