学习Tensorflow(2)

Wed 06 December 2017

学习Tensorflow (2)

如果你对Tensorflow不是怎么了解,建议你先阅读入门的教程。

Spark生成TFRecords格式

官方推荐的也是效率最高的训练数据的组织形式就是TFRecords格式。Spark是大数据处理的神器,处理完的数据放在hdfs上,Tensorflow可以直接读取。

很多公司的训练数据准备都是用Spark进行处理的,Google也为我们提供了在Spark中生成TFRecords格式的方法:https://github.com/tensorflow/ecosystem/tree/master/hadoop

因为Tensorflow用的protobuf版本和Hadoop的不一样,所以要新引入一个jar包,这个jar包在编译的时候用shade把protobuf的包名改名。注意protoc的版本,按照文档说明编译,基本上不会出错。

import org.tensorflow.hadoop.shaded.protobuf.ByteString
import org.apache.hadoop.io.{NullWritable, BytesWritable}
import org.apache.spark.{SparkConf, SparkContext}
import org.tensorflow.example.{BytesList, Int64List, Feature, Features, Example}
import org.tensorflow.hadoop.io.TFRecordFileOutputFormat

val inputPath = "path/to/input.txt"
val outputPath = "path/to/output.tfr"

val sparkConf = new SparkConf().setAppName("TFRecord Demo")
val sc = new SparkContext(sparkConf)

var features = sc.textFile(inputPath).map(line => {
  val text = BytesList.newBuilder().addValue(ByteString.copyFrom(line.getBytes)).build()
  val features = Features.newBuilder()
    .putFeature("text", Feature.newBuilder().setBytesList(text).build())
    .build()
  val example = Example.newBuilder()
    .setFeatures(features)
    .build()
  (new BytesWritable(example.toByteArray), NullWritable.get())
})

features.saveAsNewAPIHadoopFile[TFRecordFileOutputFormat](outputPath)

这是官方的简单示例,完美的避开了我们用Scala写Spark程序会遇到的问题,这就是转换问题。本质是Scala里面的对象和Java对象不一样。

先用import collection.JavaConversions.asJavaIterable来解决Scala容器转javaIterable的问题。

还有一个问题就是Scala里面的Float和Java的Float在这里没法自动转换,需要像下面这样进行显式的转换。

Feature.newBuilder().setFloatList(
    FloatList.newBuilder().addAllValue(data.toList.map(_.asInstanceOf[java.lang.Float])).build()
).build()

如果出现下面的错误:

tensorflow.python.framework.errors_impl.InvalidArgumentError: Could not parse example input, value: '

label

这不是说Tensorflow解析不出label,冒号后面是serialized_example,就是说serialized_example解析出来是错的,这个时候你要检查一下你生成Example的结构是否有问题。

TFRecords格式支持两种压缩方式:ZLIB和GZIP。并且压缩是对整个TFrecords文件压缩的,不是对每个example。理论上我们可以让Spark程序生成压缩的TFRecords文件,因为一般的Spark(Hadoop)内置了ZLIB编码库。

但不幸的是,在多次修改hadoop conf的尝试后,没能成功。

tensorflow.python.framework.errors_impl.DataLossError: inflate() failed with error -3: incorrect header check

在出现DataLossError错误的时候,可能是格式不对,混进了不同格式的数据;可能是指定的路径没有数据;也可能是真的没数据了都train完了。上面这个错误是用ZLIB选项去解析非压缩格式时出现的。

稀疏向量的生成和解析

稀疏向量在训练数据中可以减少训练数据的体积

Example中用两个可变长度的Feature来存储一个SparseTensor,其中一个Feature是Int64List,代表SparseTensor的Index部分,另一个是FloatList,代码SparseTensor的Value部分。

在用parse_single_example解析的时候,我们不能当成两个VarLenFeature来处理,虽然它们的实际存储是VarLenFeature。我们要使用SparseFeature来解析,参数index_keyvalue_key分别是那两个特征的key。另外还需要一个size参数指定Dense size。

如果按VarLenFeature来解析,就得到两个SparseTensor。虽然很多有介绍TFRecord解析的文章,介绍得很片面,很多人就是需要SparseFeature,但就是找不到。

SparseTensor的indices是不能有重复的,这点在生成TFRecords的时候需要确保。

parse_single_example vs parse_example

很好理解parse_single_example只解析单个example,parse_example解析的是一个batch_size的example。

在新的版本中Tensorflow推荐使用Dataset,而不是底层的Reader。如果我们面向的是tfrecords格式,那么我们就是需要使用TFRecordDataset,而不是TFRecordReader。这就存在一个蛋疼的问题了:Dataset的map需要的是parse_single_example的函数,它是先解析单条example,再合成一个batch_size的数据,如果单条example解析出的特征包含SparseTensor,那合成一个batch_size的数据的时候必然会报错……

解决的方法就是用sparse_tensor_to_denseSparseTensor转为DenseTensor。对于数值类型来说这没问题,但如果是可变的byteList特征,你让我怎么操作?要组合使用Feature Column处理string类型的特征都没办法了。

如果遇到上面的情况,那就在输入特征中提前把string特征进行ID化。如果需要用到feature column,就用categorical_column_with_identity这个fc替代。

再说回旧的方式,旧的方式因为parse_example输入的已经是一个batch_size的原始数据,因此允许解析出SparseTensor的类型,这种情况下可以使用包含sparse_matmul在内的sparse_xxx函数。

sparse_matmul虽然看起来很美好,但是属于Legacy: will be removed.的对象。

作为使用者要把握住版本的发展趋势,尽量不要使用不推荐的用法。

顺便说一下使用Reader可能会使用的问题。Reader是需要QueueRunner配合的,需要tf.train.start_queue_runners开启辅助的线程,否则你的程序就一直处于卡死的状态(没数据消费)。

Feature Column

下面会用fc代指Feature column

训练数据解析成Tensor,模型的训练输入也是Tensor。Feature Column是一个对Tensor的包装,很有用的工具。

一个Feature Column至少包含3个属性,它的base class是_FeatureColumn,需要实现:

  • name
  • _transform_feature
  • _parse_example_spec

_transform_feature是生成一个中间的Tensor,这样Feature Column可以进行高级的转换。 _parse_example_spec是用来指示解析Example的,tf.feature_column.make_parse_example_spec就会调用到每个fc的这个属性。

如果基于一个已有的非抽象fc实现一个新的fc,可能还有其他的约束。具体看代码。

官方的建议是除非你理解否则不要自己实现fc。

fc不是一个简单的封装,它是有效的特征工程工具:

tf.feature_column.numeric_column是最简单的fc,用来表示一个数值特征。

tf.feature_column.bucketized_column这个fc,可以帮你把连续特征离散化。例如:

 `boundaries=[0., 1., 2.]` generates buckets `(-inf, 0.)`, `[0., 1.)`,
  `[1., 2.)`, and `[2., +inf)`.

连续特征进去后,会生成[0, 1, 2, 3]这4个离散值。

gender = tf.feature_column.categorical_column_with_vocabulary_list(
    "gender", ["Female", "Male"])

categorical_column_with_vocabulary_list的特征输入是一个string或者int,它会在内存中生成一个映射表,把原始的输入特征映射成一个int型的ID,这适合vocabulary_list有限的情况。

类似的fc有categorical_column_with_vocabulary_file,适合更大规模的映射集合。

native_country = tf.feature_column.categorical_column_with_hash_bucket(
    "native_country", hash_bucket_size=1000)

这是一种利用hash函数来进行映射的fc, output_id = Hash(input_feature_string) % bucket_size,这个fc可以帮我们利用hash来对特征降维,适合集合很大,集合元素会变化同时我们不方便追踪的情况。

在特征工程中,特征交叉是一个很有效的工作,尤其在LR算法上面。tf.feature_column.crossed_column就是提供交叉的fc,它可以帮我们对多于1个的特征进行交叉。

在如果你需要边训练大网络,边embedding输入,fc就是你的不二选择。因为tf.feature_column.embedding_column背后会帮你做很多。为什么要边train边embedding就不解释了。

存在隐藏的问题,tf.feature_column.categorical_column_with_identitynum_buckets影响内存的消耗。在考虑内存问题的时候,不要忽略掉fc帮你做embedding需要的内存。

fc作为训练或者预测的输入,我们给的是一个fc的list,Tensorflow底层已经帮我们做了fc转Tensor,并concat的工作。

如果你只想用fc帮你做特征处理,不想把fc耦合进你的网络里面去,可以使用tf.feature_column.input_layer把一组columns转为一个Tensor。

更多内置的feature column的资料可以看官方的wide_and_deep的教程。这个教程展示了:把连续特征离散化作为wide特征(LR特征),把数值特征作为deep特征,把string特征embedding后作为deep特征等。

下面展示一个自己实现的Feature Column:

class _WideColumn(fc_core._DenseColumn,
                  collections.namedtuple('_WideColumn', [
                      'key', 'dtype', 'size',
                  ])):
    """see `wide_column`."""

    @property
    def _variable_shape(self):
        return tensor_shape.TensorShape([self.size])

    @property
    def name(self):
        return self.key

    @property
    def _parse_example_spec(self):
        return {
            self.key: parsing_ops.SparseFeature(
                index_key=self.key + '_idx',
                value_key=self.key + '_val',
                dtype=self.dtype,
                size=self.size
            )
        }

    def _transform_feature(self, inputs):
        return inputs.get(self.key)
        # input_tensor = inputs.get(self.key)
        # return sparse_ops.sparse_tensor_to_dense(input_tensor)

    def _get_dense_tensor(self, inputs, weight_collections=None, trainable=None):
        """Returns dense `Tensor` representing numeric feature.

        Args:
          inputs: A `_LazyBuilder` object to access inputs.
          weight_collections: Unused `weight_collections` since no variables are
            created in this function.
          trainable: Unused `trainable` bool since no variables are created in
            this function.

        Returns:
          Dense `Tensor` created within `_transform_feature`.
        """
        # Do nothing with weight_collections and trainable since no variables are
        # created in this function.
        del weight_collections
        del trainable
        # Feature has been already transformed. Return the intermediate
        # representation created by _transform_feature.
        return inputs.get(self)


def wide_column(key, size, dtype=tf.float32):
    """自定义的feature column"""
    return _WideColumn(key=key, size=size, dtype=dtype)

这个WideColumn会按照一个简单的规则从example里面解析出一个SparseTensor作为一个fc,为什么没有转为DenseTensor(我们实现的是_DenseColumn啊)?因为我会在parse_single_example的时候做。

这个自定义的Feature Column虽然简单,但很有用:

虽然内置的fc功能很强大,但parse_single_example却限制了它们。我无法绕过parse_single_example让fc帮我对string list进行交叉。

所以我在TFRecords里面存的是已经交叉好,且已经ID话的特征。想把这些特征套进官方提供的wide_and_deep的模型几乎不可能,你可能会把wide_and_deep的底层实现自己实现一遍,这显然不是一个合理且完美的解决方法(至少数据采集部分你搞不完美)。

反过来想一下,我们把自己的数据封装成一个fc就解决问题啦。

如果sample里面包含可变长度的特征,在解析出单个sample再组成batch sample的时候,需要用database的padded_batch方法,而不是batch。参数padded_shapes是一个list,和parse example出来要一致,给出的padded_shapes要满足VarLen的最大长度。

OOM

内存不足是一个和特征相关的,容易遇到的问题

OOM时的表现和你选择的Tensorflow底层计算引擎相关。如果你用的是基于CPU的普通版本,可能直接显示python进程被kill掉,没有更多提示信息。MKL版本会明确的显示是不能分配内存,并打出所有的内存分配。

[1]    71035 killed     python trainer.py

上面这个就是一个OOM。

影响内存的因素有:输入Tensor的大小,batch_size,模型本身的消耗,fc的隐藏消耗。

很难精确的算出程序需要多大的内存,但模型本身的消耗,除了存储weight外,还会有梯度相关的消耗(可以通过Tensorboard的Graphic中看)。

解决方法:

  • 如果有CPU有GPU,合理分配设备,CPU处理大内存部分。
  • 缩小网络规模(宽度,深度)
  • 缩小embedding的空间
  • 减小batch_size

内存的问题带来另一个问题是模型存储的问题,请准备足够的存储空间,使用Estimator一般是存储最近的n个模型。磁盘空间不足的表现和OOM一样比较隐讳。

Segmentation fault

看到上面的信息,先检查一下磁盘等硬件信息。不要惊呼我把Tensorflow搞挂了。

Category: 机器学习 Tagged: Tensorflow 深度学习 DNN TFRecords

Comments