I have tried out with several configurations to use PipeModeDataset together with tf.keras and I run into trouble re-using the same dataset (e.g. validation) for use in both fit and evaluate. It seems that on the second call the Sagemaker instance exhausts all available GPU memory and goes into some kind of loop.
This is my current training script (I will try to strip it down further, but this works perfectly when using File mode but fails on the evaluate call when executed in ``Pipe` mode:
import argparse, os
import logging
import math
import json
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("tensorflow").setLevel(logging.INFO)
import tensorflow as tf
import glob
def load_data_as_dataset(channel_name, channel, data_config):
def get_filenames(channel):
return(glob.glob(channel + "/*.tfrecord"))
mode = data_config[channel_name]['TrainingInputMode']
logging.info("Running {} in mode: {}".format(channel_name, mode))
if mode == 'Pipe':
# Construct a `PipeModeDataset` reading from a 'training' channel, using
# the TF Record encoding.
from sagemaker_tensorflow import PipeModeDataset
ds = PipeModeDataset(channel=channel_name, record_format='TFRecord')
else:
filenames = get_filenames(channel)
logging.info("Loading files {}".format(filenames))
ds = tf.data.TFRecordDataset(filenames)
return ds
def extract_example(example_proto):
image_feature_description = {
'image/encoded': tf.io.FixedLenFeature([], tf.string),
'image/class/obj1_center_x': tf.io.FixedLenFeature([], tf.float32),
'image/class/obj1_center_y': tf.io.FixedLenFeature([], tf.float32),
'image/class/obj2_center_x': tf.io.FixedLenFeature([], tf.float32),
'image/class/obj2_center_y': tf.io.FixedLenFeature([], tf.float32)
}
feature = tf.io.parse_single_example(example_proto, image_feature_description)
image = feature['image/encoded']
image = tf.image.decode_jpeg(image, channels=3)
image = tf.image.convert_image_dtype(image, tf.float32)
return (image, tf.convert_to_tensor([feature['image/class/obj1_center_x'],
feature['image/class/obj1_center_y'],
feature['image/class/obj2_center_x'],
feature['image/class/obj2_center_y']]))
def train_preprocess(image, label):
image = tf.image.random_brightness(image, max_delta=32.0 / 255.0)
image = tf.image.random_saturation(image, lower=0.5, upper=1.5)
#Make sure the image is still in [0, 1]
image = tf.clip_by_value(image, 0.0, 1.0)
return image, label
def build_model(input_shape):
model = tf.keras.Sequential()
model.add(tf.keras.layers.Conv2D(8, kernel_size=(3,3), padding='same',
input_shape=input_shape))
model.add(tf.keras.layers.LeakyReLU())
model.add(tf.keras.layers.Conv2D(8, kernel_size=(3,3), padding='same'))
model.add(tf.keras.layers.LeakyReLU())
model.add(tf.keras.layers.MaxPooling2D(pool_size=(2,2), strides=2))
model.add(tf.keras.layers.Conv2D(16, kernel_size=(3,3), padding='same'))
model.add(tf.keras.layers.LeakyReLU())
model.add(tf.keras.layers.Conv2D(16, kernel_size=(3,3), padding='same'))
model.add(tf.keras.layers.LeakyReLU())
model.add(tf.keras.layers.MaxPooling2D(pool_size=(2,2), strides=2))
model.add(tf.keras.layers.Conv2D(32, kernel_size=(3,3), padding='same'))
model.add(tf.keras.layers.LeakyReLU())
model.add(tf.keras.layers.Conv2D(32, kernel_size=(3,3), padding='same'))
model.add(tf.keras.layers.LeakyReLU())
model.add(tf.keras.layers.GlobalAveragePooling2D())
model.add(tf.keras.layers.Dense(32))
model.add(tf.keras.layers.LeakyReLU())
model.add(tf.keras.layers.Dropout(rate = 0.1))
model.add(tf.keras.layers.Dense(4, activation='linear'))
return (model)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
#
# Standard parameters required by Sagemaker
#
parser.add_argument('--gpu-count', type=int, default=os.environ['SM_NUM_GPUS'])
parser.add_argument('--output-dir', type=str, default=os.environ.get('SM_OUTPUT_DIR'))
parser.add_argument('--model-dir', type=str, default=os.environ.get('SM_MODEL_DIR'))
parser.add_argument('--data-config',type=json.loads,default=os.environ.get('SM_INPUT_DATA_CONFIG'))
#
# Input Channels
#
parser.add_argument('--training', type=str, required=False, default=os.environ.get('SM_CHANNEL_TRAINING'))
parser.add_argument('--validation', type=str, required=False, default=os.environ.get('SM_CHANNEL_VALIDATION'))
#
# Input Parameters
#
parser.add_argument('--num-channels', type=int, default=3)
parser.add_argument('--img-height', type=int, default=416)
parser.add_argument('--img-width', type=int, default=416)
parser.add_argument('--num-samples', type=int, required=True)
parser.add_argument('--num-validation', type=int, default=64)
#
# Training Parameters
#
parser.add_argument('--epochs', type=int, default=10)
parser.add_argument('--learning-rate', type=float, default=0.01)
parser.add_argument('--batch-size', type=int, default=16)
args, _ = parser.parse_known_args()
# NCWH format
input_shape = (args.img_width,
args.img_height,
args.num_channels)
#
# Build model
#
model = build_model(input_shape)
model.compile(optimizer = tf.keras.optimizers.Adam(lr = args.learning_rate),
loss= "mse",
metrics=['mse', 'mae'])
len_train = args.num_samples
logging.info("Training samples: {}".format(len_train))
len_val = args.num_validation
logging.info("Validation samples: {}".format(len_val))
#
# Prepare tf.dataset input
#
x_train = load_data_as_dataset("training", args.training, args.data_config)
x_train = x_train.shuffle(buffer_size = 8 * args.batch_size)
x_train = x_train.repeat()
x_train = x_train.map(extract_example, num_parallel_calls=4)
x_train = x_train.map(train_preprocess, num_parallel_calls=4)
x_train = x_train.batch(args.batch_size)
x_train = x_train.prefetch(1)
if (args.validation != None and args.num_validation > 0):
x_val = load_data_as_dataset("validation", args.validation, args.data_config)
x_val = x_val.repeat()
x_val = x_val.map(extract_example, num_parallel_calls=4)
x_val = x_val.batch(args.batch_size)
x_val = x_val.prefetch(1)
else:
x_val = None
#
# Train model
#
logging.info("Batch size: {}".format(args.batch_size))
steps_per_epoch = len_train // args.batch_size
logging.info("Training steps per epoch {}".format(steps_per_epoch))
validation_steps = len_val // args.batch_size
logging.info("Validation steps per epoch {}".format(validation_steps))
if (x_val != None):
history = model.fit(x = x_train,
validation_data = x_val,
epochs = args.epochs,
steps_per_epoch = steps_per_epoch,
validation_steps = validation_steps,
verbose = 2)
else:
history = model.fit(x = x_train,
epochs = args.epochs,
steps_per_epoch = steps_per_epoch,
verbose = 2)
logging.info("Result: {}".format(history.history))
#
# Evaluate model
#
if (x_val != None):
score = model.evaluate(x_val,
steps=validation_steps,
verbose=2)
logging.info('Validation: {}'.format(list(zip(score, model.metrics_names))))
#
# Save/Export model
#
tf.contrib.saved_model.save_keras_model(model, os.path.join(args.model_dir, 'model/1'))
model.save(os.path.join(args.output_dir, 'model.h5'))
I have tried out with several configurations to use PipeModeDataset together with tf.keras and I run into trouble re-using the same dataset (e.g. validation) for use in both
fitandevaluate. It seems that on the second call the Sagemaker instance exhausts all available GPU memory and goes into some kind of loop.This is my current training script (I will try to strip it down further, but this works perfectly when using
Filemode but fails on theevaluatecall when executed in ``Pipe` mode:I tried with framework version:
v1.13,v1.14both show the same behaviour and this seems to be related to re-using the dataset aftermodel.fitis done. If I don't callmodel.evaluate, then everything is fine.Unfortunately not much logging output except for this warning: