- Notifications
You must be signed in to change notification settings - Fork 221
Open
Description
System information
- Environment: Google Colab, Vertex AI (
KubeflowV2DagRunner) - TensorFlow version: 2.8.0
- TFX Version: 1.7.0
- Python version: 3.7.12
Here's my preprocessing_fn (redacted for clarity):
_FEATURES= [# list of str ] _SPECIAL_IMPUTE={'special_foo': 1, } HOURS= [1, 2, 3, 4] TABLE_KEYS={'XXX': ['XXX_1', 'XXX_2', 'XXX_3'], 'YYY': ['YYY_1', 'YYY_2', 'YYY_3'], } @tf.functiondef_divide(a, b): returntf.math.divide_no_nan(tf.cast(a, tf.float32), tf.cast(b, tf.float32)) defpreprocessing_fn(inputs): x={} forname, tensorinsorted(inputs.items()): iftensor.dtype==tf.bool: tensor=tf.cast(tensor, tf.int64) ifisinstance(tensor, tf.sparse.SparseTensor): default_value=''iftensor.dtype==tf.stringelse0tensor=tft.sparse_tensor_to_dense_with_shape(tensor, [None, 1], default_value) x[name] =tensorx['foo'] =_divide((x['foo1'] -x['foo2']), x['foo_denom']) x['bar'] =tf.cast(x['bar'] >0, tf.int64) forhourinHOURS: total=tf.constant(0, dtype=tf.int64) fordevice_typeinDEVICE_TYPES.keys(): total=total+x[f'some_device_{device_type}_{hour}h'] # one hot encode categorical valuesforname, keysinTABLE_KEYS.items(): withtf.init_scope(): initializer=tf.lookup.KeyValueTensorInitializer( tf.constant(keys), tf.constant([iforiinrange(len(keys))])) table=tf.lookup.StaticHashTable(initializer, default_value=-1) indices=table.lookup(tf.squeeze(x[name], axis=1)) one_hot=tf.one_hot(indices, len(keys), dtype=tf.int64) fori, _tensorinenumerate(tf.split(one_hot, num_or_size_splits=len(keys), axis=1)): x[f'{name}_{keys[i]}'] =_tensorreturn{name: tft.scale_to_0_1(x[name]) fornamein_FEATURES}Here's the beam_pipeline_args:
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS= [ '--project='+GOOGLE_CLOUD_PROJECT, '--temp_location='+os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'), '--runner=DataflowRunner', '--region=us-central1', '--experiments=upload_graph', # must be enabled, otherwise fails with 413'--dataflow_service_options=enable_prime', '--autoscaling_algorithm=THROUGHPUT_BASED', ]Not sure if related but with the above preprocessing_fn, my transform first failed with the error:
RuntimeError: The order of analyzers in your `preprocessing_fn` appears to be non-deterministic. This can be fixed either by changing your `preprocessing_fn` such that tf.Transform analyzers are encountered in a deterministic order or by passing a unique name to each analyzer API call. I then added names to the tft.scale_to_0_1 analyzers:
return{name: tft.scale_to_0_1(x[name], name=f'{name}_scale_to_0_1') fornamein_FEATURES}After which my transform just silently failed without logs (see first screenshot). I check the worker logs but there's nothing substantial, only warnings (see second screenshot).
It's worth noting that I have the enable_prime flag.

