Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added code to convert object dtypes to string automatically in-order to remove the issue thrown out by TFDV during generate statistics #219

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions tensorflow_data_validation/utils/stats_gen_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ def generate_statistics_from_tfrecord(
batch_size = stats_options.desired_batch_size
# PyLint doesn't understand Beam PTransforms.
# pylint: disable=no-value-for-parameter

def _convert_dict(legacy_examples: Dict) -> Dict:
result = {}

for k, v in legacy_examples.items():
if np.issubdtype(v.dtype, np.number):
result[k] = v
else:
result[k] = v.astype(str)
return result

with beam.Pipeline(options=pipeline_options) as p:
# Auto detect tfrecord file compression format based on input data
# path suffix.
Expand All @@ -101,6 +112,7 @@ def generate_statistics_from_tfrecord(
schema=None,
telemetry_descriptors=['tfdv', 'generate_statistics_from_tfrecord'])
.BeamSource(batch_size))
| 'format' >> beam.Map(_convert_dict)
| 'GenerateStatistics' >> stats_api.GenerateStatistics(stats_options)
| 'WriteStatsOutput' >>
(stats_api.WriteStatisticsToTFRecord(output_path)))
Expand Down Expand Up @@ -163,6 +175,18 @@ def generate_statistics_from_csv(
constants.DEFAULT_DESIRED_INPUT_BATCH_SIZE)
# PyLint doesn't understand Beam PTransforms.
# pylint: disable=no-value-for-parameter

def _convert_dict(legacy_examples: Dict) -> Dict:
result = {}

for k, v in legacy_examples.items():
if np.issubdtype(v.dtype, np.number):
result[k] = v
else:
result[k] = v.astype(str)
return result


with beam.Pipeline(options=pipeline_options) as p:
# If a header is not provided, assume the first line in a file
# to be the header.
Expand All @@ -175,6 +199,7 @@ def generate_statistics_from_csv(
file_pattern=data_location,
skip_header_lines=skip_header_lines,
compression_type=compression_type)
| 'format' >> beam.Map(_convert_dict)
| 'DecodeData' >> csv_decoder.DecodeCSV(
column_names=column_names,
delimiter=delimiter,
Expand Down Expand Up @@ -229,6 +254,11 @@ def generate_statistics_from_dataframe(
dataframe, stats_options, stats_generators)
else:
# TODO(b/144580609): Consider using Beam for inmemory mode as well.

for col in dataframe.columns:
if ~ np.issubdtype(dataframe[col].dtype, np.number):
dataframe[col] = dataframe[col].astype(str)

splits = np.array_split(dataframe, n_jobs)
partial_stats = Parallel(n_jobs=n_jobs)(
delayed(_generate_partial_statistics_from_df)(
Expand Down Expand Up @@ -267,6 +297,10 @@ def _generate_partial_statistics_from_df(
type=schema_pb2.INT,
bool_domain=schema_pb2.BoolDomain())
dataframe = dataframe.drop(columns=drop_columns)
for col in dataframe.columns:
if ~ np.issubdtype(dataframe[col].dtype, np.number):
dataframe[col] = dataframe[col].astype(str)

if schema.feature:
stats_options_modified.schema = schema
record_batch_with_primitive_arrays = table_util.DataFrameToRecordBatch(
Expand Down