Skip to the content.

实验名称

异步计算

实验目的

本实验通过简单MXNet库的异步计算的学习与实践,要求学生:

1.掌握异步计算的原理及使用场景

2.掌握异步计算的常见操作

3.掌握通过异步计算提升计算性能

4.掌握异步计算对内存的影响

实验背景

由于众所周知的Python语言的性能问题,MXNet框架把计算分为用户直接用来交互的前端和系统用来执行计算的后端,前端可以使用Python语言开发,后端使用C++来实现,前端把计算操作发送到后端,主要计算都发生在C++实现的后端来提升计算性能。本实验要求学生掌握MXNet的异步计算来提升计算性能和减少内存消耗。实验数据采用MXNet的NDArray类ones函数进行随机生成。

实验原理

广义上讲,MXNet包括用户直接用来交互的前端和系统用来执行计算的后端。例如,用户可以使用不同的前端语言编写MXNet程序,如Python、R、Scala和C++。无论使用何种前端编程语言,MXNet程序的执行主要都发生在C++实现的后端。换句话说,用户写好的前端MXNet程序会传给后端执行计算。后端有自己的线程在队列中不断收集任务并执行它们。

MXNet通过前端线程和后端线程的交互实现异步计算。异步计算指,前端线程无须等待当前指令从后端线程返回结果就继续执行后面的指令。

在异步计算中,Python前端线程执行的时候,仅仅是把任务放进后端的队列里就返回了。当需要打印计算结果时,Python前端线程会等待C++后端线程把结果计算完。此设计的一个好处是,这里的Python前端线程不需要做实际计算。因此,无论Python的性能如何,它对整个程序性能的影响很小。只要C++后端足够高效,那么不管前端编程语言性能如何,MXNet都可以提供一致的高性能。

实验环境

ubuntu 16.04

Python 3.6.5

mxnet 1.4.0

建议课时

1课时

实验步骤

一、环境准备

本实验在jupyter notebook进行开发。

打开终端:

终端执行:

jupyter notebook

创建python3文件,进行代码的编辑:

在新建的jupyter notebook中进行实验:

二、代码编写

1、MXNet中的异步计算

包导入

%matplotlib inline
import os
import subprocess
import time

from mxnet import autograd, gluon, nd
from mxnet.gluon import loss as gloss, nn

使用MXNet框架中NDArray的ones(shape, ctx=None, dtype=None, **kwargs)函数创建1行2列的数组,数组元素值为1,(1, 2)参数为数组的shape

a = nd.ones((1, 2))
b = nd.ones((1, 2))
c = a * b + 2
c

输出结果:

[[3. 3.]] <NDArray 1x2 @cpu(0)>

在异步计算中,Python前端线程执行前3条语句的时候,仅仅是把任务放进后端的队列里就返回了。当最后一条语句需要打印计算结果时,Python前端线程会等待C++后端线程把变量c的结果计算完。

为了演示异步计算的性能,我们先实现一个简单的计时类。

# 简单的计时类
class Benchmark():
    # 类的初始化函数,设置prefix成员变量
    def __init__(self, prefix=None):
        self.prefix = prefix + ' ' if prefix else ''

    # 类的入口函数,开始计时
    def __enter__(self):
        self.start = time.time()

    # 类的退出函数,打印计算时间
    def __exit__(self, *args):
        print('%stime: %.4f sec' % (self.prefix, time.time() - self.start))

下面的例子通过计时来展示异步计算的效果。可以看到,当y = nd.dot(x, x).sum()返回的时候并没有等待变量y真正被计算完。只有当print函数需要打印变量y时才必须等待它计算完。

# 计算加入到后端队列中
with Benchmark('Workloads are queued.'):
    # 使用NDArray的random类的uniform方法创建shape为2000 * 2000随机样本
    x = nd.random.uniform(shape=(2000, 2000))
    # 计算数组的点积并求和
    y = nd.dot(x, x).sum()

# 等待计算完成,并返回打印
with Benchmark('Workloads are finished.'):
    print('sum =', y)

输出结果:

Workloads are queued. time: 0.0007 sec

sum =

[2.0003661e+09]

<NDArray 1 @cpu(0)>

Workloads are finished. time: 0.1164 sec

的确,除非我们需要打印或者保存计算结果,否则我们基本无须关心目前结果在内存中是否已经计算好了。只要数据是保存在NDArray里并使用MXNet提供的运算符,MXNet将默认使用异步计算来获取高计算性能。

2、用同步函数让前端等待计算结果

除了刚刚介绍的print函数外,我们还有其他方法让前端线程等待后端的计算结果完成。我们可以使用wait_to_read函数让前端等待某个的NDArray的计算结果完成,再执行前端中后面的语句。或者,我们可以用waitall函数令前端等待前面所有计算结果完成。后者是性能测试中常用的方法。

下面是使用wait_to_read函数的例子。输出用时包含了变量y的计算时间。

with Benchmark():
    x = nd.random.uniform(shape=(2000, 2000))
    y = nd.dot(x, x)
    # 使用wait_to_read函数让前端等待计算结果
    y.wait_to_read()

输出结果:time: 0.0692 sec

下面是使用waitall函数的例子。输出用时包含了变量y和变量z的计算时间。

with Benchmark():
    x = nd.random.uniform(shape=(2000, 2000))
    y = nd.dot(x, x)
    z = nd.dot(x, x)
    # 使用waitall同步函数让前端等待计算结果
    nd.waitall()

输出结果:time: 0.1343 sec

此外,任何将NDArray转换成其他不支持异步计算的数据结构的操作都会让前端等待计算结果。例如,当我们调用asnumpy函数和asscalar函数时。

with Benchmark():
    x = nd.random.uniform(shape=(2000, 2000))
    y = nd.dot(x, x)
    # 使用asnumpy同步函数让前端等待计算结果
    y.asnumpy()

输出结果:time: 0.0712 sec

with Benchmark():
    x = nd.random.uniform(shape=(2000, 2000))
    y = nd.dot(x, x)
    # 使用asscalar同步函数让前端等待计算结果
    y.norm().asscalar()

输出结果:time: 0.1101 sec

上面介绍的wait_to_read函数、waitall函数、asnumpy函数、asscalar函数和print函数会触发让前端等待后端计算结果的行为。这类函数通常称为同步函数。

3、使用异步计算提升计算性能

在下面的例子中,我们用for循环不断对变量y赋值。当在for循环内使用同步函数wait_to_read时,每次赋值不使用异步计算;当在for循环外使用同步函数waitall时,则使用异步计算。

# 同步计算,在for循环中使用wait_to_read同步函数等待计算结果
with Benchmark('synchronous.'):
    x = nd.random.uniform(shape=(2000, 2000))
    for _ in range(1000):
        y = x + 1
        # 使用wait_to_read同步函数等待计算结果,每计算一次都有等待
        y.wait_to_read()

# 异步计算,在for循环外使用waitall同步函数等待计算结果
with Benchmark('asynchronous.'):
    x = nd.random.uniform(shape=(2000, 2000))
    for _ in range(1000):
        y = x + 1
    # 使用waitall同步函数等待所有计算结果
    nd.waitall()

输出结果:

synchronous. time: 0.5839 sec

asynchronous. time: 0.4061 sec

我们观察到,使用异步计算能提升一定的计算性能。为了解释这一现象,让我们对Python前端线程和C++后端线程的交互稍作简化。在每一次循环中,前端和后端的交互大约可以分为3个阶段:

  1. 前端令后端将计算任务y = x + 1放进队列;
  2. 后端从队列中获取计算任务并执行真正的计算;
  3. 后端将计算结果返回给前端。

我们将这3个阶段的耗时分别设为t1,t2,t3。如果不使用异步计算,执行1000次计算的总耗时大约为1000(t1+t2+t3);如果使用异步计算,由于每次循环中前端都无须等待后端返回计算结果,执行1000次计算的总耗时可以降为t1+1000t2+t3。

4、异步计算对内存的影响

下面我们来演示异步计算对内存的影响。我们先定义一个数据获取函数data_iter,它会从被调用时开始计时,并定期打印到目前为止获取数据批量的总耗时。

# 一个数据获取函数data_iter,从被调用时开始计时,并定期打印到目前为止获取数据批量的总耗时
def data_iter():
    start = time.time()
    # 定义数据的批次和每批次数据的数量
    num_batches, batch_size = 100, 1024
    for i in range(num_batches):
        # 使用normal函数生成正则分布的数据,shape为batch_size * 512
        X = nd.random.normal(shape=(batch_size, 512))
        # 使用ones函数生成shape为batch_size的值为1数组
        y = nd.ones((batch_size,))
        yield X, y

        # 每50批次打印时间
        if (i + 1) % 50 == 0:
            print('batch %d, time %f sec' % (i + 1, time.time() - start))

下面定义多层感知机、优化算法和损失函数。

net = nn.Sequential()
# 添加层感知机,算法使用relu
net.add(nn.Dense(2048, activation='relu'),
        nn.Dense(512, activation='relu'),
        nn.Dense(1))
net.initialize()
# 学习,optimizer为算法sgd
trainer = gluon.Trainer(net.collect_params(), 'sgd', {'learning_rate': 0.005})
# 损失函数
loss = gloss.L2Loss()

这里定义辅助函数来监测内存的使用。需要注意的是,这个函数只能在Linux或macOS上运行。

# 获取系统内存消耗
def get_mem():
    # 使用Linux的ps命令获取内存
    res = subprocess.check_output(['ps', 'u', '-p', str(os.getpid())])
    return int(str(res).split()[15]) / 1e3

现在我们可以做测试了。我们先试运行一次,让系统把net的参数初始化。

# 使用data_iter函数获取数据
for X, y in data_iter():
    break
# 使用wait_to_read同步函数等待计算结果
loss(y, net(X)).wait_to_read()

对于训练模型net来说,我们可以自然地使用同步函数asscalar将每个小批量的损失从NDArray格式中取出,并打印每个迭代周期后的模型损失。此时,每个小批量的生成间隔较长,不过内存开销较小。

# 使用get_mem获取计算前内存
l_sum, mem = 0, get_mem()
for X, y in data_iter():
    with autograd.record():
        l = loss(y, net(X))
    # 使用同步函数asscalar等待计算结果
    l_sum += l.mean().asscalar()
    l.backward()
    trainer.step(X.shape[0])
# 使用waitall同步函数等待所以计算结果
nd.waitall()
# 计算和打印内存消耗
print('increased memory: %f MB' % (get_mem() - mem))

输出结果:

batch 50, time 3.770091 sec

batch 100, time 7.576762 sec

increased memory: 3.200000 MB

如果去掉同步函数,虽然每个小批量的生成间隔较短,但训练过程中可能会导致内存占用较高。这是因为在默认异步计算下,前端会将所有小批量计算在短时间内全部丢给后端。这可能在内存积压大量中间结果无法释放。实验中我们看到,不到一秒,所有数据(Xy)就都已经产生。但因为训练速度没有跟上,所以这些数据只能放在内存里不能及时清除,从而占用额外内存。

# 使用get_mem获取计算前内存
mem = get_mem()
for X, y in data_iter():
    with autograd.record():
        l = loss(y, net(X))
    # 此处没有使用同步函数asscalar等待计算结果
    l.backward()
    trainer.step(X.shape[0])
# 使用waitall同步函数等待所以计算结果
nd.waitall()
print('increased memory: %f MB' % (get_mem() - mem))

输出结果:

batch 50, time 0.076749 sec

batch 100, time 0.153021 sec

increased memory: 2.844000 MB

实验总结

MXNet包括用户直接用来交互的前端和系统用来执行计算的后端。

MXNet能够通过异步计算提升计算性能。

建议使用每个小批量训练或预测时至少使用一个同步函数,从而避免在短时间内将过多计算任务丢给后端。