应用TensorFlow Transform来转换/缩放生产中的要素

use*_*178 9 python tensorflow tensorflow-serving apache-beam tensorflow-transform

概观

我按照以下指南编写了TF Records,我曾经在那里tf.Transform预处理我的功能.现在,我想部署我的模型,我需要在真实的实时数据上应用这个预处理功能.

我的方法

首先,假设我有2个功能:

features = ['amount', 'age']
Run Code Online (Sandbox Code Playgroud)

我有transform_fn来自Apache Beam,来自working_dir=gs://path-to-transform-fn/

然后我使用以下方法加载转换函数:

tf_transform_output = tft.TFTransformOutput(working_dir)

我认为在生产中服务的最简单方法是获取一系列处理过的数据,然后调用model.predict()(我使用的是Keras模型).

要做到这一点,我认为transform_raw_features()方法正是我所需要的.

但是,似乎在构建架构之后:

raw_features = {}
for k in features:
    raw_features.update({k: tf.constant(1)})

print(tf_transform_output.transform_raw_features(raw_features))
Run Code Online (Sandbox Code Playgroud)

我明白了:

AttributeError: 'Tensor' object has no attribute 'indices'
Run Code Online (Sandbox Code Playgroud)

现在,我假设发生了这种情况,因为我tf.VarLenFeature()在我定义架构时使用了preprocessing_fn.

def preprocessing_fn(inputs):
    outputs = inputs.copy()

    for _ in features:
        outputs[_] = tft.scale_to_z_score(outputs[_])
Run Code Online (Sandbox Code Playgroud)

我使用以下方法构建元数据:

RAW_DATA_FEATURE_SPEC = {}
for _ in features:
    RAW_DATA_FEATURE_SPEC[_] = tf.VarLenFeature(dtype=tf.float32)
    RAW_DATA_METADATA = dataset_metadata.DatasetMetadata(
    dataset_schema.from_feature_spec(RAW_DATA_FEATURE_SPEC))
Run Code Online (Sandbox Code Playgroud)

所以简而言之,给一本字典:

d = {'amount': [50], 'age': [32]},我想应用它transform_fn,并适当地缩放这些值以输入我的模型进行预测.这个字典正是我PCollectionpre_processing()函数处理数据之前的格式.

管道结构:

class BeamProccess():

def __init__(self):

    # init 

    self.run()


def run(self):

    def preprocessing_fn(inputs):

         # outputs = { 'id' : [list], 'amount': [list], 'age': [list] }
         return outputs

    with beam.Pipeline(options=self.pipe_opt) as p:
        with beam_impl.Context(temp_dir=self.google_cloud_options.temp_location):
            data = p | "read_table" >> beam.io.Read(table_bq) \
            | "create_data" >> beam.ParDo(ProcessFn())

            transformed_dataset, transform_fn = (
                        (train, RAW_DATA_METADATA) | beam_impl.AnalyzeAndTransformDataset(
                    preprocessing_fn))

            transformed_data, transformed_metadata = transformed_dataset

            transformed_data | "WriteTrainTFRecords" >> tfrecordio.WriteToTFRecord(
                    file_path_prefix=self.JOB_DIR + '/train/data',
                    file_name_suffix='.tfrecord',
                    coder=example_proto_coder.ExampleProtoCoder(transformed_metadata.schema))

            _ = (
                        transform_fn
                        | 'WriteTransformFn' >>
                        transform_fn_io.WriteTransformFn(path=self.JOB_DIR + '/transform/'))
Run Code Online (Sandbox Code Playgroud)

最后ParDo()是:

class ProcessFn(beam.DoFn):

    def process(self, element):

        yield { 'id' : [list], 'amount': [list], 'age': [list] }
Run Code Online (Sandbox Code Playgroud)

小智 7

问题在于代码段

raw_features = {}
for k in features:
    raw_features.update({k: tf.constant(1)})

print(tf_transform_output.transform_raw_features(raw_features))
Run Code Online (Sandbox Code Playgroud)

在此代码中,您构造了一个字典,其中值是张量.就像你说的那样,这不适用于VarLenFeature.而不是使用tf.constant尝试使用tf.placeholderaa FixedLenFeaturetf.sparse_placeholderfor VarLenFeature.