我想在数百 GB 的数据上创建一个预测模型。数据需要一些非密集的预处理,我可以在 pyspark 中进行,但在 tensorflow 中不行。在我的情况下,将预处理的结果直接传递给 TF 会方便得多,理想情况下将 pyspark 数据帧视为 TF 的虚拟输入文件,而不是将预处理后的数据保存到磁盘。但是,我对如何做到这一点一无所知,而且我在互联网上找不到任何地方。
经过一番思考,在我看来,我实际上需要一个迭代器(如由 定义的tf.data.Iterator)来处理 spark 的数据。然而,我在网上发现一些评论暗示 Spark 的分布式结构使它变得非常困难,如果不是不可能的话。为什么这样?想象一下,我不关心行的顺序,为什么不可能迭代火花数据?
张量流初学者在这里。这是我的第一个项目,我正在使用预定义的估算器。
我有一个极其不平衡的数据集,其中积极结果大约占总数据的 0.1%,我怀疑这种不平衡会极大地影响我的模型的性能。作为解决这个问题的第一次尝试,由于我有大量数据,我想扔掉大部分底片以创建一个平衡的数据集。我可以看到两种方法:预处理数据以仅保留千分之一的负数,然后将其保存在新文件中,然后再将其传递到张量流,例如使用 pyspark;并要求张量流仅使用它发现的一千个负数中的一个。
我尝试对最后一个想法进行编码,但没有成功。我修改了我的输入函数,使其读起来像
def train_input_fn(data_file="../data/train_input.csv", shuffle_size=100_000, batch_size=128):
"""Generate an input function for the Estimator."""
dataset = tf.data.TextLineDataset(data_file) # Extract lines from input files using the Dataset API.
dataset = dataset.map(parse_csv, num_parallel_calls=3)
dataset = dataset.shuffle(shuffle_size).repeat().batch(batch_size)
iterator = dataset.make_one_shot_iterator()
features, labels = iterator.get_next()
# TRY TO IMPLEMENT THE SELECTION OF NEGATIVES
thrown = 0
flag = np.random.randint(1000)
while labels == 0 and flag != 0:
features, labels = iterator.get_next()
thrown += 1
flag = np.random.randint(1000)
print("I've thrown away {} negative …Run Code Online (Sandbox Code Playgroud) 我已经将图像数据库转换为两个TFRecords,一个用于训练,另一个用于验证。我想使用这两个文件为数据输入keras训练一个简单的模型,但是出现了我无法理解的与数据形状有关的错误。
这是代码(所有大写字母的变量在其他地方定义):
def _parse_function(proto):
f = {
"x": tf.FixedLenSequenceFeature([IMG_SIZE[0] * IMG_SIZE[1]], tf.float32, default_value=0., allow_missing=True),
"label": tf.FixedLenSequenceFeature([1], tf.int64, default_value=0, allow_missing=True)
}
parsed_features = tf.parse_single_example(proto, f)
x = tf.reshape(parsed_features['x'] / 255, (IMG_SIZE[0], IMG_SIZE[1], 1))
y = tf.cast(parsed_features['label'], tf.float32)
return x, y
def load_dataset(input_path, batch_size, shuffle_buffer):
dataset = tf.data.TFRecordDataset(input_path)
dataset = dataset.shuffle(shuffle_buffer).repeat() # shuffle and repeat
dataset = dataset.map(_parse_function, num_parallel_calls=16)
dataset = dataset.batch(batch_size).prefetch(1) # batch and prefetch
return dataset.make_one_shot_iterator()
train_iterator = load_dataset(TRAIN_TFRECORDS, BATCH_SIZE, SHUFFLE_BUFFER)
val_iterator = load_dataset(VALIDATION_TFRECORDS, BATCH_SIZE, SHUFFLE_BUFFER)
model = tf.keras.Sequential()
model.add(tf.keras.layers.Flatten(input_shape=(IMG_SIZE[0], …Run Code Online (Sandbox Code Playgroud) 我知道这不是很重要,但我想知道是否可以节省两行代码。
我有一个数据集 inputData,我想将其分成两部分。我正在使用数据集类的 randomSplit 方法。但是,我被迫使用三行代码来执行此操作:
val sets = inputData.randomSplit(Array[Double](0.7, 0.3), 18)
val training = sets(0)
val test = sets(1)
Run Code Online (Sandbox Code Playgroud)
理想情况下,我想做类似的事情
val (training, test) = inputData.randomSplit(Array[Double](0.7, 0.3), 18)
Run Code Online (Sandbox Code Playgroud)
但由于错误,此代码无法编译:
Error:(146, 13) constructor cannot be instantiated to expected type;
found : (T1, T2)
required: Array[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]]
Run Code Online (Sandbox Code Playgroud)
有可能实现我想要的吗?
我已经训练了一个模型很长时间(200 000 次迭代)。在每次迭代中,我都通过类保存了大量数据,例如损失、准确度、权重等tf.summary.FileWriter()。是的,我知道:那是愚蠢的。结果,我生成了一个将近 50 GB 的巨大摘要。现在我想删除大部分信息并保留每 50 行一行。这将允许我节省大量硬盘空间并加速张量板可视化,同时不会对摘要的质量产生重大影响。有可能这样做吗?