Lunar Lander using policy gradients

Say our agent is driving the space vehicle and the goal of our agent is to land correctly on the landing pad. If our agent (lander) lands away from the landing pad, then it loses the reward and the episode will get terminated if the agent crashes or comes to rest. Four discrete actions available in the environment are do nothing, fire left orientation engine, fire main engine, and fire right orientation engine.

Now we will see how to train our agents to correctly land on the landing pad with policy gradients. Credit for the code used in this section goes to Gabriel (

First, we import the necessary libraries:

import tensorflow as tf
import numpy as np
from tensorflow.python.framework import ops
import gym
import numpy as np
import time

Then we define the PolicyGradient class, which implements the policy gradient algorithm. Let's break down the class and see each function separately. You can look at the whole program as a Jupyter notebook (

class PolicyGradient: 

# first we define the __init__ method where we initialize all variables

def __init__(self, n_x,n_y,learning_rate=0.01, reward_decay=0.95):

# number of states in the environment
self.n_x = n_x

# number of actions in the environment
self.n_y = n_y

# learning rate of the network = learning_rate

# discount factor
self.gamma = reward_decay

# initialize the lists for storing observations,
# actions and rewards
self.episode_observations, self.episode_actions, self.episode_rewards = [], [], []

# we define a function called build_network for
# building the neural network

# stores the cost i.e loss
self.cost_history = []

# initialize tensorflow session
self.sess = tf.Session()

Next, we define a store_transition function which stores the transitions, that is, state, action, and reward. We can use this information for training the network:

def store_transition(self, s, a, r):


# store actions as list of arrays
action = np.zeros(self.n_y)
action[a] = 1

We define the choose_action function for choosing the action given the state:

    def choose_action(self, observation):

# reshape observation to (num_features, 1)
observation = observation[:, np.newaxis]

# run forward propagation to get softmax probabilities
prob_weights =, feed_dict = {self.X: observation})

        # select action using a biased sample this will return 
# the index of the action we have sampled
action = np.random.choice(range(len(prob_weights.ravel())), p=prob_weights.ravel())

return action

We define the build_network function for building the neural network:

    def build_network(self):

# placeholders for input x, and output y
self.X = tf.placeholder(tf.float32, shape=(self.n_x, None), name="X")
self.Y = tf.placeholder(tf.float32, shape=(self.n_y, None), name="Y")

# placeholder for reward
self.discounted_episode_rewards_norm = tf.placeholder(tf.float32, [None, ], name="actions_value")

# we build 3 layer neural network with 2 hidden layers and
# 1 output layer

# number of neurons in the hidden layer
units_layer_1 = 10
units_layer_2 = 10

# number of neurons in the output layer
units_output_layer = self.n_y

# now let us initialize weights and bias value using
# tensorflow's tf.contrib.layers.xavier_initializer

W1 = tf.get_variable("W1", [units_layer_1, self.n_x], initializer = tf.contrib.layers.xavier_initializer(seed=1))
b1 = tf.get_variable("b1", [units_layer_1, 1], initializer = tf.contrib.layers.xavier_initializer(seed=1))
W2 = tf.get_variable("W2", [units_layer_2, units_layer_1], initializer = tf.contrib.layers.xavier_initializer(seed=1))
b2 = tf.get_variable("b2", [units_layer_2, 1], initializer = tf.contrib.layers.xavier_initializer(seed=1))
W3 = tf.get_variable("W3", [self.n_y, units_layer_2], initializer = tf.contrib.layers.xavier_initializer(seed=1))
b3 = tf.get_variable("b3", [self.n_y, 1], initializer = tf.contrib.layers.xavier_initializer(seed=1))

# and then, we perform forward propagation

Z1 = tf.add(tf.matmul(W1,self.X), b1)
A1 = tf.nn.relu(Z1)
Z2 = tf.add(tf.matmul(W2, A1), b2)
A2 = tf.nn.relu(Z2)
Z3 = tf.add(tf.matmul(W3, A2), b3)
A3 = tf.nn.softmax(Z3)

# as we require, probabilities, we apply softmax activation
# function in the output layer,

logits = tf.transpose(Z3)
labels = tf.transpose(self.Y)
self.outputs_softmax = tf.nn.softmax(logits, name='A3')

# next we define our loss function as cross entropy loss
neg_log_prob = tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=labels)

# reward guided loss
loss = tf.reduce_mean(neg_log_prob * self.discounted_episode_rewards_norm)

# we use adam optimizer for minimizing the loss
self.train_op = tf.train.AdamOptimizer(

Next, we define the discount_and_norm_rewards function which will result in the discount and normalized reward:

def discount_and_norm_rewards(self):
discounted_episode_rewards = np.zeros_like(self.episode_rewards)
cumulative = 0
for t in reversed(range(len(self.episode_rewards))):
cumulative = cumulative * self.gamma + self.episode_rewards[t]
discounted_episode_rewards[t] = cumulative

discounted_episode_rewards -= np.mean(discounted_episode_rewards)
discounted_episode_rewards /= np.std(discounted_episode_rewards)
return discounted_episode_rewards

Now we actually perform the learning:

    def learn(self):
# discount and normalize episodic reward
discounted_episode_rewards_norm = self.discount_and_norm_rewards()

# train the network, feed_dict={
self.X: np.vstack(self.episode_observations).T,
self.Y: np.vstack(np.array(self.episode_actions)).T,
self.discounted_episode_rewards_norm: discounted_episode_rewards_norm,

        # reset the episodic data
self.episode_observations, self.episode_actions, self.episode_rewards = [], [], []

return discounted_episode_rewards_norm

You can see the output as follows:

