** 关于Flappy Bird **
Flappy Bird(非官方译名:笨鸟先飞)是一款2013年鸟飞类游戏,由越南河内独立游戏开发者阮哈东(Dong Nguyen)开发,另一个独立游戏开发商GEARS Studios发布。—— 以上内来自《维基百科》
Flappy Bird操作简单,通过点击手机屏幕使Bird上升,穿过柱状障碍物之后得分,碰到则游戏结束。由于障碍物高低不等,控制Bird上升和下降需要反应快并且灵活,要得到较高的分数并不容易,笔者目前最多得过10分。
关于CUDA以及cuDNN的配置,其中有一些坑包括:安装CUDA之后循环登录,屏幕分辨率无法正常调节等等,都是由于NVIDIA驱动安装的问题,这不是本文要讨论的主要内容,读者可自行Google。
本机软硬件配置:
系统:Ubuntu 16.04
显卡:NVIDIA GeForce GTX 745 4G
版本:TensorFlow 1.0
软件包:OpenCV 3.2.0、Pygame、Numpy、…
细心的朋友可能发现,笔者的显卡配置并不高,GeForce GTX 745,显存3.94G,可用3.77G(桌面占用了一部分),属于入门中的入门。对于专业做深度学习算法的朋友,这个显卡必然是不够的。知乎上有帖子教大家怎么配置更专业的显卡,有兴趣的可以移步。
比如,我们看到一个美女,可能最先观察到的是美女身上的某些部位(自己体会)。
不过这也带来另一方面的问题,那就是深度学习高度依赖大量的标签数据,而这些数据获取成本极高。
这张图是从UCL的课程中拷出来的,课程链接地址(YouTube):
https://www.youtube.com/watch?v=2pWv7GOvuf0
st,然后产生动作
at作用于环境,环境接收动作
at,并且对其进行评价,反馈给智能代理
rt。不断的循环这个过程,就会产生一个状态/动作/反馈的序列:(s1, a1, r1, s2, a2, r2.....,sn, an, rn),而这个序列让我们很自然的想起了:
HMM(马尔科夫模型)在语音识别,行为识别等机器学习领域有较为广泛的应用。条件随机场模型(Conditional Random Field)则用于自然语言处理。两大模型是语音识别、自然语言处理领域的基石。
s1),你在工作上刻苦努力,追求上进(对应图中的
a1),然后领导觉得你不错,准备给你升职(对应图中的
r1),于是,你升到了T2;你继续刻苦努力,追求上进......不断的努力,不断的升职,最后升到了
sn。当然,你也有可能不努力上进,这也是一种动作,换句话说,该动作a也属于动作集合A,然后得到的反馈r就是没有升职加薪的机会。
策略就是如何根据环境选取动作来执行的依据。策略分为稳定的策略和不稳定的策略,稳定的策略在相同的环境下,总是会给出相同的动作,不稳定的策略则反之,这里我们主要讨论稳定的策略。
s',
a'即下一个状态和动作。确定了损失函数,确定了获取样本的方式,DQN的整个算法也就成型了!
D—Experience Replay,也就是经验池,就是如何存储样本及采样的问题。
D中;存储到一定程度,就从中随机抽取数据,对损失函数进行梯度下降。
if input_actions[1] == 1:
if self.playery > -2 * PLAYER_HEIGHT:
self.playerVelY = self.playerFlapAcc
self.playerFlapped = True
# SOUNDS['wing'].play()
return image_data, reward, terminal
# 权重
def weight_variable(shape):
initial = tf.truncated_normal(shape, stddev=0.01)
return tf.Variable(initial)
# 偏置
def bias_variable(shape):
initial = tf.constant(0.01, shape=shape)
return tf.Variable(initial)
# 卷积
def conv2d(x, W, stride):
return tf.nn.conv2d(x, W, strides=[1, stride, stride, 1], padding="SAME")
# 池化
def max_pool_2x2(x):
return tf.nn.max_pool(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding="SAME")
def createNetwork():
# 第一层卷积
W_conv1 = weight_variable([8, 8, 4, 32])
b_conv1 = bias_variable([32])
# 第二层卷积
W_conv2 = weight_variable([4, 4, 32, 64])
b_conv2 = bias_variable([64])
# 第三层卷积
W_conv3 = weight_variable([3, 3, 64, 64])
b_conv3 = bias_variable([64])
# 第一层全连接
W_fc1 = weight_variable([1600, 512])
b_fc1 = bias_variable([512])
# 第二层全连接
W_fc2 = weight_variable([512, ACTIONS])
b_fc2 = bias_variable([ACTIONS])
# 输入层
s = tf.placeholder("float", [None, 80, 80, 4])
# 第一层隐藏层+池化层
h_conv1 = tf.nn.relu(conv2d(s, W_conv1, 4) + b_conv1)
h_pool1 = max_pool_2x2(h_conv1)
# 第二层隐藏层(这里只用了一层池化层)
h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2, 2) + b_conv2)
# h_pool2 = max_pool_2x2(h_conv2)
# 第三层隐藏层
h_conv3 = tf.nn.relu(conv2d(h_conv2, W_conv3, 1) + b_conv3)
# h_pool3 = max_pool_2x2(h_conv3)
# Reshape
# h_pool3_flat = tf.reshape(h_pool3, [-1, 256])
h_conv3_flat = tf.reshape(h_conv3, [-1, 1600])
# 全连接层
h_fc1 = tf.nn.relu(tf.matmul(h_conv3_flat, W_fc1) + b_fc1)
# 输出层
# readout layer
readout = tf.matmul(h_fc1, W_fc2) + b_fc2
return s, readout, h_fc1
在Ubuntu中安装opencv的步骤比较麻烦,当时也踩了不少坑,各种Google解决。建议安装opencv3。
x_t, r_0, terminal = game_state.frame_step(do_nothing)
# 首先将图像转换为80*80,然后进行灰度化
x_t = cv2.cvtColor(cv2.resize(x_t, (80, 80)), cv2.COLOR_BGR2GRAY)
# 对灰度图像二值化
ret, x_t = cv2.threshold(x_t, 1, 255, cv2.THRESH_BINARY)
# 四通道输入图像
s_t = np.stack((x_t, x_t, x_t, x_t), axis=2)
# define the cost function
a = tf.placeholder("float", [None, ACTIONS])
y = tf.placeholder("float", [None])
readout_action = tf.reduce_sum(tf.multiply(readout, a), axis=1)
cost = tf.reduce_mean(tf.square(y - readout_action))
train_step = tf.train.AdamOptimizer(1e-6).minimize(cost)
# open up a game state to communicate with emulator
game_state = game.GameState()
# store the previous observations in replay memory
D = deque()
a表示输出的动作,即强化学习模型中的Action,
y表示标签值,
readout_action表示模型输出与
a相乘后,在一维求和,损失函数对标签值与输出值的差进行平方,
train_step表示对损失函数进行
Adam优化。
# perform gradient step
train_step.run(feed_dict={
y: y_batch,
a: a_batch,
s: s_j_batch}
)
# open up a game state to communicate with emulator
game_state = game.GameState()
# store the previous observations in replay memory
D = deque()
dequeue()和
enqueue([y])
方法进行取出和压入数据。经验池 D用来存储实验过程中的数据,后面的训练过程会从中随机取出一定量的batch进行训练。# Create two variables.
weights = tf.Variable(tf.random_normal([784, 200], stddev=0.35),
name="weights")
biases = tf.Variable(tf.zeros([200]), name="biases")
...
# Add an op to initialize the variables.
init_op = tf.global_variables_initializer()
# Later, when launching the model
with tf.Session() as sess:
# Run the init operation.
sess.run(init_op)
...
# Use the model
...
tf.train.Saver()获取Saver实例。
saver = tf.train.Saver()
saver的
restore方法:
# Create some variables.
v1 = tf.Variable(..., name="v1")
v2 = tf.Variable(..., name="v2")
...
# Add ops to save and restore all the variables.
saver = tf.train.Saver()
# Later, launch the model, use the saver to restore variables from disk, and
# do some work with the model.
with tf.Session() as sess:
# Restore variables from disk.
saver.restore(sess, "/tmp/model.ckpt")
print("Model restored.")
# Do some work with the model
...
# saving and loading networks
saver = tf.train.Saver()
checkpoint = tf.train.get_checkpoint_state("saved_networks")
if checkpoint and checkpoint.model_checkpoint_path:
saver.restore(sess, checkpoint.model_checkpoint_path)
print("Successfully loaded:", checkpoint.model_checkpoint_path)
else:
print("Could not find old network weights")
saver.restore对已存在参数进行恢复。
# save progress every 10000 iterations
if t % 10000 == 0:
saver.save(sess, 'saved_networks/' + GAME + '-dqn', global_step=t)
# choose an action epsilon greedily
readout_t = readout.eval(feed_dict={s: [s_t]})[0]
a_t = np.zeros([ACTIONS])
action_index = 0
if t % FRAME_PER_ACTION == 0:
if random.random() <= epsilon:
print("----------Random Action----------")
action_index = random.randrange(ACTIONS)
a_t[random.randrange(ACTIONS)] = 1
else:
action_index = np.argmax(readout_t)
a_t[action_index] = 1
else:
a_t[0] = 1 # do nothing
readout_t是训练数据为之前提到的四通道图像的模型输出。
a_t是根据ε 概率选择的Action。
# run the selected action and observe next state and reward
x_t1_colored, r_t, terminal = game_state.frame_step(a_t)
x_t1 = cv2.cvtColor(cv2.resize(x_t1_colored, (80, 80)), cv2.COLOR_BGR2GRAY)
ret, x_t1 = cv2.threshold(x_t1, 1, 255, cv2.THRESH_BINARY)
x_t1 = np.reshape(x_t1, (80, 80, 1))
# s_t1 = np.append(x_t1, s_t[:,:,1:], axis = 2)
s_t1 = np.append(x_t1, s_t[:, :, :3], axis=2)
# store the transition in D
D.append((s_t, a_t, r_t, s_t1, terminal))
D保存的是一个马尔科夫序列。
(s_t, a_t, r_t, s_t1, terminal)分别表示
t时的状态
s_t,执行的动作
a_t,得到的反馈
r_t,以及得到的下一步的状态
s_t1和游戏是否结束的标志
terminal。
# update the old values
s_t = s_t1
t += 1
D中已经保存了一些样本数据后,就可以从这些样本数据中随机抽样,进行模型训练了。这里设置样本数为
OBSERVE = 100000.。随机抽样的样本数为
BATCH = 32。
if t > OBSERVE:
# sample a minibatch to train on
minibatch = random.sample(D, BATCH)
# get the batch variables
s_j_batch = [d[0] for d in minibatch]
a_batch = [d[1] for d in minibatch]
r_batch = [d[2] for d in minibatch]
s_j1_batch = [d[3] for d in minibatch]
y_batch = []
readout_j1_batch = readout.eval(feed_dict={s: s_j1_batch})
for i in range(0, len(minibatch)):
terminal = minibatch[i][4]
# if terminal, only equals reward
if terminal:
y_batch.append(r_batch[i])
else:
y_batch.append(r_batch[i] + GAMMA * np.max(readout_j1_batch[i]))
# perform gradient step
train_step.run(feed_dict={
y: y_batch,
a: a_batch,
s: s_j_batch}
)
s_j_batch、
a_batch、
r_batch、
s_j1_batch是从经验池
D中提取到的马尔科夫序列(Java童鞋羡慕Python的列表推导式啊),
y_batch为标签值,若游戏结束,则不存在下一步中状态对应的Q值(回忆Q值更新过程),直接添加
r_batch,若未结束,则用折合因子(0.99)和下一步中状态的最大Q值的乘积,添加至
y_batch。
s_j_batch、
a_batch和
y_batch。差不多经过2000000步(在本机上大概10个小时)训练之后,就能达到本文开头动图中的效果啦。
本文为 @ 21CTO 创作并授权 21CTO 发布,未经许可,请勿转载。
内容授权事宜请您联系 webmaster@21cto.com或关注 21CTO 公众号。
该文观点仅代表作者本人,21CTO 平台仅提供信息存储空间服务。