Tensorflow數據讀取指南

tensorflow的靈活性帶來的學習成本是很多人頭疼的問題,在tf中,讀取數據基本有四種方法:
1. tf.data (官方推薦):方便地構建復雜的輸入管道
2. Feeding:通過input_fn來yield數據
3. QueueRunner:基于隊列的輸入管道
4. 預加載數據。用constant或variable在內存中存儲所有的數據

其實從tf的歷史發展軌跡可以看出,tf.data是在feedintg技術和QueueRunner的基礎上發展而來。
Feeding技術

1
2
3
4
with tf.Session():
  input = tf.placeholder(tf.float32)
  classifier = ...
  print(classifier.eval(feed_dict={input: my_python_preprocessing_fn()}))

簡單教程中的數據加載,都是一行偽代碼:(x, y) = load_data() 就搞定了
但現實中,令訓練數據的使用變得復雜的主要原因有很多:
一、數據集不一致,不同的數據源的數據格式不一樣,為了統一,我們需要一個轉接層,我們稱之為統一化過程
二、數據集過大,不適合一次性導入,需要分批次導入,邊訓練邊導入
三、需要數據增強,在線生成,甚至完全在線生成,這樣就需要將訓練時的feed和讀取的統一化數據之間加入一個動態生成過程
四、當然批次化(batch),train和val的劃分也都是重復性的工作

tf.data就是用來解決以上問題的。

兩個重要類介紹:tf.data.Dataset和tf.data.Iterator,
– Dataset用來表示一組數據,是一個序列,序列的每個元素包含一個或多個Tensor對象。
– Iterator用戶從數據集中抽取元素,Iterator.get_next()可以返回數據集的下一個元素,可以看作是輸入管道與模型之間的接口。

Dataset
面對輸入的數據集,我們要建立一個清晰的數據流概念,一個原始的數據集開始,后續的每一次轉換都是流水線式的操作。因而我們可以看到這樣的代碼形式:data.TextLineDataset(filenames).map(decode_func).shuffle(buffer_size=10000).batch(batch_size).make_initializable_iterator()。

Dataset的每一個元素一定是相同的結構,每個元素包含一個或多個Tensor對象,每個Tensor都有一個tf.DType來表示tensor中的元素數據類型,還有伊特tf.TensorShape來表示tensor中每個元素的形狀。
Dataset的map、flat_map、filter等等方法都是常用的操作鏈上過濾器方法。過濾器模式是常見的設計模式。

Iterator
如果說Dataset是從源中一步一步創建出一個統一化的數據集的話,Iterator就是負責怎么訪問這個統一后的數據集然后喂給你的模型。有四種迭代器
– one-shot
– initializable
– reinitializable
– feedable

one-shot最簡單,就是一個元素一個元素的吐,但不能初始化,即參數化,就是遍歷一遍數據集就什么也干不了了。
initializable迭代器就可以初始化,可以接受一個placeholder,作為初始化參數

1
2
3
4
5
6
max_value = tf.placeholder(tf.int64, shape=[])
dataset = tf.data.Dataset.range(max_value)
iterator = dataset.make_initializable_iterator()
next_element = iterator.get_next()
# Initialize an iterator over a dataset with 10 elements.
sess.run(iterator.initializer, feed_dict={max_value: 10})

initializable更典型的應用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# initializable iterator to switch between dataset
EPOCHS = 10
x, y = tf.placeholder(tf.float32, shape=[None,2]), tf.placeholder(tf.float32, shape=[None,1])
dataset = tf.data.Dataset.from_tensor_slices((x, y))
train_data = (np.random.sample((100,2)), np.random.sample((100,1)))
test_data = (np.array([[1,2]]), np.array([[0]]))
iter = dataset.make_initializable_iterator()
features, labels = iter.get_next()
with tf.Session() as sess:
#     initialise iterator with train data
    sess.run(iter.initializer, feed_dict={ x: train_data[0], y: train_data[1]})
    for _ in range(EPOCHS):
        sess.run([features, labels])
#     switch to test data
    sess.run(iter.initializer, feed_dict={ x: test_data[0], y: test_data[1]})
    print(sess.run([features, labels]))

Dataset的創建依賴一個placeholder,所以只是表明了數據集的結構信息,具體的數據是什么還得在運行期的時候賦值。

但如果我們不想在運行期使用feed_dict怎么辦呢,用reinitializable迭代器支持多個數據集對象的初始化,創建的迭代器不再是dataset.make_xxx_iterator這種形式,而是使用結構直接使用Iterator的靜態方法初始化tf.data.Iterator.from_structure(train_dataset.output_types, train_dataset.output_shapes)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# Reinitializable iterator to switch between Datasets
EPOCHS = 10
# making fake data using numpy
train_data = (np.random.sample((100,2)), np.random.sample((100,1)))
test_data = (np.random.sample((10,2)), np.random.sample((10,1)))
# create two datasets, one for training and one for test
train_dataset = tf.data.Dataset.from_tensor_slices(train_data)
test_dataset = tf.data.Dataset.from_tensor_slices(test_data)
# create a iterator of the correct shape and type
iter = tf.data.Iterator.from_structure(train_dataset.output_types,
train_dataset.output_shapes)
features, labels = iter.get_next()
# create the initialisation operations
train_init_op = iter.make_initializer(train_dataset)
test_init_op = iter.make_initializer(test_dataset)
with tf.Session() as sess:
    sess.run(train_init_op) # switch to train dataset
    for _ in range(EPOCHS):
        sess.run([features, labels])
    sess.run(test_init_op) # switch to val dataset
    print(sess.run([features, labels]))

feedable結合了initializable和reinitializable兩者,可以使用placeholder和多個數據集一起初始化。如果說reinitializable方便在數據集間切換,那么feedable就是方便在迭代器間切換。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# Define training and validation datasets with the same structure.
training_dataset = tf.data.Dataset.range(100).map(
    lambda x: x + tf.random_uniform([], -10, 10, tf.int64)).repeat()
validation_dataset = tf.data.Dataset.range(50)
#
# A feedable iterator is defined by a handle placeholder and its structure. We
# could use the `output_types` and `output_shapes` properties of either
# `training_dataset` or `validation_dataset` here, because they have
# identical structure.
handle = tf.placeholder(tf.string, shape=[])
iterator = tf.data.Iterator.from_string_handle(
    handle, training_dataset.output_types, training_dataset.output_shapes)
#
next_element = iterator.get_next()
#
# You can use feedable iterators with a variety of different kinds of iterator
# (such as one-shot and initializable iterators).
training_iterator = training_dataset.make_one_shot_iterator()
validation_iterator = validation_dataset.make_initializable_iterator()
#
# The `Iterator.string_handle()` method returns a tensor that can be evaluated
# and used to feed the `handle` placeholder.
training_handle = sess.run(training_iterator.string_handle())
validation_handle = sess.run(validation_iterator.string_handle())
#
# Loop forever, alternating between training and validation.
while True:
  # Run 200 steps using the training dataset. Note that the training dataset is
  # infinite, and we resume from where we left off in the previous `while` loop
  # iteration.
  for _ in range(200):
    sess.run(next_element, feed_dict={handle: training_handle})
  #
  # Run one pass over the validation dataset.
  sess.run(validation_iterator.initializer)
  for _ in range(50):
    sess.run(next_element, feed_dict={handle: validation_handle})

但怎么說呢,tensorflow的設計師一如既往地發揮了他們接口設計紛繁復雜的能力,真的是極其混亂難用,一點也不清晰,要不然tensorflow里也不會引入keras包了。

數據的消費

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
EPOCHS = 10
BATCH_SIZE = 16
# using two numpy arrays
features, labels = (np.array([np.random.sample((100,2))]),
                    np.array([np.random.sample((100,1))]))
dataset = tf.data.Dataset.from_tensor_slices((features,labels)).repeat().batch(BATCH_SIZE)
iter = dataset.make_one_shot_iterator()
x, y = iter.get_next()
# make a simple model
net = tf.layers.dense(x, 8, activation=tf.tanh) # pass the first value from iter.get_next() as input
net = tf.layers.dense(net, 8, activation=tf.tanh)
prediction = tf.layers.dense(net, 1, activation=tf.tanh)
loss = tf.losses.mean_squared_error(prediction, y) # pass the second value from iter.get_net() as label
train_op = tf.train.AdamOptimizer().minimize(loss)
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    for i in range(EPOCHS):
        _, loss_value = sess.run([train_op, loss])
        print("Iter: {}, Loss: {:.4f}".format(i, loss_value))

其中x, y = iter.get_next()表示x,y都是一個獲取數據的操作,每次sess.run就是執行一次x(), y()這樣就獲得一個樣本數據。

kera使用數據集就特別簡單

1
2
3
4
5
6
7
8
9
dataset = tf.data.Dataset.from_tensor_slices((data, labels))
dataset = dataset.batch(32).repeat()
#
val_dataset = tf.data.Dataset.from_tensor_slices((val_data, val_labels))
val_dataset = val_dataset.batch(32).repeat()
#
model.fit(dataset, epochs=10, steps_per_epoch=30,
          validation_data=val_dataset,
          validation_steps=3)

過濾器函數
從原始數據變成統一的數據格式,最重要的就是map

1
2
3
4
5
6
7
8
9
10
11
def input_parser(img_path, label):
    # convert the label to one-hot encoding
    one_hot = tf.one_hot(label, NUM_CLASSES)
    #
    # read the img from file
    img_file = tf.read_file(img_path)
    img_decoded = tf.image.decode_image(img_file, channels=3)
    #
    return img_decoded, one_hot

tr_data = tr_data.map(input_parser)

input_parser接受了數據集每個元素,將轉換完的結果返回,數據集就脫胎換骨了。

圖像的轉換處理
對于圖像的轉換處理,我們常常用到 tf.image下的decode_bmp decode_jpeg resize_images transpose_image等等函數,具體的參考[2]。
一個image文件,變成一個Tensor一般需要這樣幾步:
1. 獲取文件名列表
2. [Optional]shuffle文件名列表
3. [Optional]根據epoch扔掉一些文件
4. 建立文件名的queue
5. 使用對應文件的Reader,不同的文件格式用不同的reader,比如TFRecordReader解析TFRecord格式,普通文件的話,就用FileReader,但讀取普通文件也比較復雜,比如讀單個文件可以使用tf.gfile.FastGfile(path, “r”).read(),對于filename_queue,則使用WholeFileReader(),如下面的語句

1
2
3
4
image_reader = tf.WholeFileReader()
data_queue = tf.train.string_input_producer([image_dir], shuffle=False)
image_key, image_value = image_reader.read(data_queue)
img = tf.image.decode_jpeg(image_value, channels=3)

實際上還有更方便的read_file方法:

1
2
image_value = tf.read_file(image_dir)
img = tf.image.decode_jpeg(image_value, channels=3)

以csv的讀取為例,看queue的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
filename_queue = tf.train.string_input_producer(["file0.csv", "file1.csv"])
#
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
#
# Default values, in case of empty columns. Also specifies the type of the
# decoded result.
record_defaults = [[1], [1], [1], [1], [1]]
col1, col2, col3, col4, col5 = tf.decode_csv(
value, record_defaults=record_defaults)
features = tf.stack([col1, col2, col3, col4])
#
with tf.Session() as sess:
  # Start populating the filename queue.
  coord = tf.train.Coordinator()
  threads = tf.train.start_queue_runners(coord=coord)
  #
  for i in range(1200):
    # Retrieve a single instance:
    example, label = sess.run([features, col5])
  #
  coord.request_stop()
  coord.join(threads)

當然,google建議將任何形式的原始數據都轉換成TFRecords文件。關于TFRecords文件如何制作,請參考[9]。

1
2
3
4
5
dataset = tf.data.TFRecordDataset(filename)
dataset = dataset.repeat(num_epochs)
#
# map takes a python function and applies it to every sample
dataset = dataset.map(decode)

6. 使用Decoder從Reader中反解成Tensor
7. [Optional] preprocessing預處理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def read_my_file_format(filename_queue):
  reader = tf.SomeReader()
  key, record_string = reader.read(filename_queue)
  example, label = tf.some_decoder(record_string)
  processed_example = some_processing(example)
  return processed_example, label
#
def input_pipeline(filenames, batch_size, num_epochs=None):
  filename_queue = tf.train.string_input_producer(
      filenames, num_epochs=num_epochs, shuffle=True)
  example, label = read_my_file_format(filename_queue)
  # min_after_dequeue defines how big a buffer we will randomly sample
  #   from -- bigger means better shuffling but slower start up and more
  #   memory used.
  # capacity must be larger than min_after_dequeue and the amount larger
  #   determines the maximum we will prefetch.  Recommendation:
  #   min_after_dequeue + (num_threads + a small safety margin) * batch_size
  min_after_dequeue = 10000
  capacity = min_after_dequeue + 3 * batch_size
  example_batch, label_batch = tf.train.shuffle_batch(
      [example, label], batch_size=batch_size, capacity=capacity,
      min_after_dequeue=min_after_dequeue)
  return example_batch, label_batch

參考資料
[1] tf.data指引 https://www.tensorflow.org/guide/datasets
[2] tensorflow Images處理指引 https://www.tensorflow.org/api_guides/python/image
[3] 使用coordinator加載圖像 https://gist.github.com/eerwitt/518b0c9564e500b4b50f
[4] 使用Tensorlofw讀取大數據踩坑之路 https://zhuanlan.zhihu.com/p/28450111
[5] Example of TensorFlows new Input Pipeline https://kratzert.github.io/2017/06/15/example-of-tensorflows-new-input-pipeline.html
[6]load_jpeg_with_tensorflow https://gist.github.com/eerwitt/518b0c9564e500b4b50f
[7] tensorflow Inputs and Readers指引 https://www.tensorflow.org/api_guides/python/io_ops#Readers
[8]tensorflow example mnist 數據集的讀取 https://github.com/tensorflow/tensorflow/blob/r1.10/tensorflow/examples/tutorials/mnist/fully_connected_feed.py
[9] http://www.machinelearninguru.com/deep_learning/tensorflow/basics/tfrecord/tfrecord.html

3d历史开奖结果 福彩福建快3走势图 河南快3开奖走势图 山西快乐十分软件 那个时时彩平台信誉好 pk10八码百分百 股票分析微信 股票指数怎么买卖 种类 安徽11选五开奖直播 重庆分分彩手机版 上海老张期货配资 日本股票涨跌幅限制 内蒙古11选5去哪里买 重庆时时全天计划飞 贵州十一选五中奖 上证指数是什么意思 吉林11选五规则玩法介绍