Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slow parquet to TFRecord using parquet_executor.Executor #6389

Open
tgrunzweig-cpacket opened this issue Oct 22, 2023 · 4 comments
Open

Slow parquet to TFRecord using parquet_executor.Executor #6389

tgrunzweig-cpacket opened this issue Oct 22, 2023 · 4 comments

Comments

@tgrunzweig-cpacket
Copy link

If the bug is related to a specific library below, please raise an issue in the
respective repo directly:

TensorFlow Data Validation Repo

TensorFlow Model Analysis Repo

TensorFlow Transform Repo

TensorFlow Serving Repo

System information

  • Have I specified the code to reproduce the issue (Yes, No): Yes
  • Environment in which the code is executed (e.g., Local(Linux/MacOS/Windows),
    Interactive Notebook, Google Cloud, etc): Linux, interactive notebook, EC2 instance
  • TensorFlow version: 2.13.1
  • TFX Version:14.0
  • Python version: 3.9.18
  • Python dependencies (from pip freeze output):

Describe the current behavior
Reading a single 20MB file using
example_gen = FileBasedExampleGen(input_base=_data_root,
custom_executor_spec=custom_executor_spec)
context.run(example_gen)

takes 37s on this machine
Describe the expected behavior
reading the same file using pd.read_parquet() takes 37ms, thats 1000x times faster.

Standalone code to reproduce the issue

in jupyter notebook

import pandas as pd
import numpy as np
import string

import tensorflow as tf
from tfx import v1 as tfx
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from google.protobuf.json_format import MessageToDict

from tfx.components import FileBasedExampleGen
from tfx.components.example_gen.custom_executors import parquet_executor
from tfx.dsl.components.base import executor_spec

arr_random = np.random.randint(low=2, high=10, size=(100000,26))
columns = list(string.ascii_uppercase)
df = pd.DataFrame(arr_random, columns=columns)
df.to_parquet('./gen_data/test.parquet')

_pipeline_root = './pipeline/'
_data_root = './gen_data/'

context = InteractiveContext(pipeline_root=_pipeline_root)
custom_executor_spec = executor_spec.BeamExecutorSpec(parquet_executor.Executor)
example_gen = FileBasedExampleGen(input_base=_data_root,
custom_executor_spec=custom_executor_spec)

context.run(example_gen)

Providing a bare minimum test case or step(s) to reproduce the problem will
greatly help us to debug the issue. If possible, please share a link to
Colab/Jupyter/any notebook.

Name of your Organization (Optional)
cpacket networks
Other info / logs

in jupyter lab cells.

%%time
context.run(example_gen)

CPU times: user 37.3 s, sys: 176 ms, total: 37.5 s
Wall time: 37.8 s

%%time
df = pd.read_parquet('./gen_data/test.parquet')

CPU times: user 37.1 ms, sys: 32.3 ms, total: 69.4 ms
Wall time: 37.7 ms

maybe related to issue #4561?

Include any logs or source code that would be helpful to diagnose the problem.
If including tracebacks, please include the full traceback. Large logs and files
should be attached.

@singhniraj08
Copy link
Contributor

@tgrunzweig-cpacket,

The difference in execution time of FileBasedExampleGen component and pandas.read_parquet is because of the fact that pandas read_parquet will just read the parquet object and return a pandas dataframe whereas FileBasedExampleGen reads the Parquet files and transform the data to TF examples.

Internally it does the data type conversion by running dict_to_example() and splits the data into train and eval sets which leads to more execution time.

Thank you!

@tgrunzweig-cpacket
Copy link
Author

Hi Niraj, thanks for the quick reply.

Sure, converting a file to tfrecords is more than loading the file to memory, but I would think that those other things should not take that long either. For example if I trivially implement something like your other steps in straight python it looks like this:

df = pd.read_parquet('./gen_data/test.parquet')
df_train,df_test = train_test_split(df)
d_train = dict(zip(df_train.index, df_train.values))
d_test = dict(zip(df_test.index, df_test.values))

For the 20MB file I generated above the time to execute (using %%time in my notebook)

loading from disk to df: 37ms
splitting to train, test: 30ms
converting to dicts: 52ms

I'm not sure whats the dict format that dict_to_example() expects (its not described in the function's docstring), are you suggesting that this function would take 37s for this 100,000 long dictionary? Is that truly what is expected?

@singhniraj08
Copy link
Contributor

@tgrunzweig-cpacket,

Going through parquet_executor.py code, the executor class runs a beam pipeline which reads parquet files using beam.io.ReadFromParquet and then converts the parquet object to TFExample using dict_to_example.

The slower execution can be because of the fact that that each input split is transformed by this function separately.

@briron, TFX ExampleGen with parquet executor takes around 45s in execution whereas this is done by pandas in 44ms with same size of data. Is this expected? Thanks!

@pritamdodeja
Copy link
Contributor

This runs in ~9 seconds for me in tfx 1.15.1

CPU times: user 9.2 s, sys: 125 ms, total: 9.33 s
Wall time: 9.6 s
Out[10]: 
ExecutionResult(
    component_id: FileBasedExampleGen
    execution_id: 1
    outputs:
        examples: OutputChannel(artifact_type=Examples, producer_component_id=FileBasedExampleGen, output_key=examples, additional_properties={}, additional_custom_properties={}, _input_trigger=None, _is_async=False))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants