Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Very weird behavior with merge + checkpoints + optimization #3133

Open
aldder opened this issue Jan 15, 2025 · 18 comments
Open

Very weird behavior with merge + checkpoints + optimization #3133

aldder opened this issue Jan 15, 2025 · 18 comments
Labels
bug Something isn't working

Comments

@aldder
Copy link

aldder commented Jan 15, 2025

Environment

Delta-rs version: 0.24.0

Binding:

Environment:

  • Cloud provider: AWS
  • OS: Windows 11
  • Other:

Bug

What happened:

I am trying to reproduce with deltalake a situation analogous to temporal/system-versioned tables on SQL, i.e. a table with fields: id, value, validity_from, validity_to where, when we enter data for which a value already exists and the new value is different from the last entered for the same key, on the first entry the field validity_to will be updated to the value of validity_from of the new entry and the new entry will be entered with validity_to equal to infinite (https://sqlspreads.com/blog/temporal-tables-in-sql-server/)

Example:

  1. at time t=2021-01-01 we get value value=10 for key id=A
table:
| id   |   value | validity_from   | validity_to   |
|:-----|--------:|:----------------|:--------------|
| A    |      10 | 2020-01-01      | 9999-12-31    |
  1. at time t=2021-01-02 we get value value=11 for key id=A
table:
| id   |   value | validity_from   | validity_to   |
|:-----|--------:|:----------------|:--------------|
| A    |      10 | 2020-01-01      | 2020-01-02    |
| A    |      11 | 2020-01-02      | 9999-12-31    |
  1. at time t=2021-01-03 we get value value=11 for key id=A
table:
| id   |   value | validity_from   | validity_to   |
|:-----|--------:|:----------------|:--------------|
| A    |      10 | 2020-01-01      | 2020-01-02    |
| A    |      11 | 2020-01-02      | 9999-12-31    |
(nothing happend because the value is equal to the last entered)

How to reproduce it:

This is the code to build this and to reproduce the error:

from deltalake import DeltaTable, Schema, Field
from deltalake.schema import PrimitiveType
import pandas as pd

schema = Schema([
    Field('id', PrimitiveType('string')),
    Field('value', PrimitiveType('double')),
    Field('validity_from', PrimitiveType('timestamp')),
    Field('validity_to', PrimitiveType('timestamp')),
])

dt = DeltaTable.create(
    'delta',
    schema=schema,
    configuration={"delta.checkpointInterval": "2"}
)

for i in range(10):
    df = pd.DataFrame([{
        'id': 'A',
        'value': float(i),
        'validity_from': pd.Timestamp.utcnow().round('ms'),
        'validity_to': pd.Timestamp('9999-12-31 23:59:59.999999', tz='UTC')
    }])

    dt.merge(
        source=df,
        predicate='t.id = s.id and t.validity_to = s.validity_to',
        target_alias='t',
        source_alias='s'
    ).when_matched_update(
        updates={'validity_to': 's.validity_from'},
        predicate='t.value != s.value'
    ).execute()

    dt.merge(
        source=df,
        predicate='t.id = s.id and t.validity_to = s.validity_to',
        target_alias='t',
        source_alias='s'
    ).when_not_matched_insert_all().execute()

    dt.optimize.compact()

What you expected to happen:

I expect that all entries validity_to field is updated with the next validity_from field for the subsequent entry (except for the last data entry):

image

instead i get this:
image
More details:

it is as if from a certain moment the match condition in the merge stopped working bringing the data into an inconsistent state.

I made a "lot" of different test and i noticed that:

  1. If i increase the checkpoint interval the error starts to appear later.
  2. If i remove the compaction the error does not occur.
  3. If i use different types for validity_start/validity_end fields (like monotonic increasing integers or timestamp converted to string) the error does not occur.
  4. if i break the loop, optimize the table on a different process, and then continue the loop, then error does not occur.

I hope I've managed to explain myself properly :)

@aldder aldder added the bug Something isn't working label Jan 15, 2025
@ion-elgreco
Copy link
Collaborator

@aldder to help me narrow this down and understand it better. Can you try this against some older versions and let me know if you also see that behaviour

@aldder
Copy link
Author

aldder commented Jan 16, 2025

@ion-elgreco Sure! Thank you so much for the support :)
that's the outcome:
v0.23.2 -> error
v0.22.3 -> error
v0.21.0 -> error
v0.20.2 -> error
v0.19.2 -> error
v0.18.2 -> working!
v0.17.4 -> working!

@ion-elgreco
Copy link
Collaborator

@aldder can you check v0.19?

It might be because of this PR: #2513

@aldder
Copy link
Author

aldder commented Jan 16, 2025

@ion-elgreco yes you're right, with 0.19.0 we have the error too

@aldder
Copy link
Author

aldder commented Jan 16, 2025

@ion-elgreco
Also, i would like to point out something that is outside the context of the current error, but I think may be very important from a computational point of view.
It would be better if the merge sequence could be unified so that there is only one atomically fashionable operation.
This might be possible if there were the .when_matched_insert[_all] method, as we would be able to update the validity_to field to the next validity_from and insert the new data at the same time, saving 50% transactions on the log.
Something like:

dt.merge(
    source=df,
    predicate='t.id = s.id and t.validity_to = s.validity_to',
    target_alias='t',
    source_alias='s'
).when_matched_update(
    updates={'validity_to': 's.validity_from'},
    predicate='t.value != s.value'
).when_matched_insert_all(
    predicate='t.value != s.value'
).when_not_matched_insert_all(
).execute()

I'm fairly new to deltalake so i don't knw if the transaction protocol allows for more than one matchedPredicates parameter.

@ion-elgreco
Copy link
Collaborator

@JonasDev1 do you perhaps have time to have a look at this? It's likely due to the min-max filtering

@JonasDev1
Copy link
Contributor

For me it looks like an inconsistency in the state after checkpointing a delta table. With the version 0.19.0 we rely on the min/max statistics and the predicate of the transactions.

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Jan 16, 2025

@JonasDev1 right, then it's actually related to this: #3057

Which I tried to address in this PR #3064, but Robert suggested it might better to refresh the table.

@aldder can you force doing DeltaTable("path") each time before doing merge and compact?

@aldder
Copy link
Author

aldder commented Jan 16, 2025

@ion-elgreco back to 0.24.0, having dt = DeltaTable('delta') before each merge and optimization operation still produre the error but this time all rows are involved:

| id   |   value | validity_from                    | validity_to                      |
|:-----|--------:|:---------------------------------|:---------------------------------|
| A    |       0 | 2025-01-16 10:23:58.301000+00:00 | 9999-12-31 23:59:59.999999+00:00 |
| A    |       1 | 2025-01-16 10:23:58.401000+00:00 | 9999-12-31 23:59:59.999999+00:00 |
| A    |       2 | 2025-01-16 10:23:58.499000+00:00 | 9999-12-31 23:59:59.999999+00:00 |
| A    |       3 | 2025-01-16 10:23:58.598000+00:00 | 9999-12-31 23:59:59.999999+00:00 |
| A    |       4 | 2025-01-16 10:23:58.701000+00:00 | 9999-12-31 23:59:59.999999+00:00 |
| A    |       5 | 2025-01-16 10:23:58.811000+00:00 | 9999-12-31 23:59:59.999999+00:00 |
| A    |       6 | 2025-01-16 10:23:58.924000+00:00 | 9999-12-31 23:59:59.999999+00:00 |
| A    |       7 | 2025-01-16 10:23:59.046000+00:00 | 9999-12-31 23:59:59.999999+00:00 |
| A    |       8 | 2025-01-16 10:23:59.187000+00:00 | 9999-12-31 23:59:59.999999+00:00 |
| A    |       9 | 2025-01-16 10:23:59.297000+00:00 | 9999-12-31 23:59:59.999999+00:00 |

Also, running more tests i focused on the timestamp fields (recall point 3 "If i use different types for validity_start/validity_end fields (like monotonic increasing integers or timestamp converted to string) the error does not occur."
I noticed that if I change the default validity_to field to milliseconds instead microseconds the error doesn't occur.
The weird thing is that the timestamp unit definition is instead irrelevant for the validity_from... i can set to micro or nanoseconds and it works as long as validity_to si not deeper than milliseconds.

    df = pd.DataFrame([{
        'id': 'A',
        'value': float(i),
        'validity_from': pd.Timestamp.utcnow().round('us'),
        'validity_to': pd.Timestamp('9999-12-31 23:59:59.999', tz='UTC') # it works with that
    }])

maybe because the validity_to field is in the predicate and the validity_from isn't?

@ion-elgreco
Copy link
Collaborator

Maybe the min max stats are corrupted?

@JonasDev1
Copy link
Contributor

JonasDev1 commented Jan 16, 2025

Things I've figured out so far:

The problem occurs during the first merge(which will update existing row) if there was an optimize and checkpoint beforehand.

If that's the case, it will find no matches and the merge isn't applied because it skips the file during the scan num_target_files_skipped_during_scan: 1.

The second merge works as expected.
Therefore, we have new rows, but existing rows aren't updated.

@ion-elgreco
Copy link
Collaborator

@JonasDev1 I am seeing num_target_files_skipped_during_scan: 1 also when you disable checkpointing for this mre

@ion-elgreco
Copy link
Collaborator

So my observations until now, the log files correctly show the timestamp values and the parquet checkpoint as well. If @aldder notices this: "I noticed that if I change the default validity_to field to milliseconds instead microseconds the error doesn't occur."

This might indicate some rounding error when the statistics are parsed from the checkpoint parquet, still diving into this though

@ion-elgreco
Copy link
Collaborator

@roeap do you have any ideas? I didn't get any further while debugging it yesterday

@roeap
Copy link
Collaborator

roeap commented Jan 17, 2025

Interesting case!! @ion-elgreco, from what I can tell, I would follow the reasoning you folks established.

could this somehow be related to type coercion somewhere maybe? since microsecond is technically not supported, we need to cast? I guess Microsecond values in stats would always loose precision but we would need to be careful with min / max stuff ...

Just a quick thought, but will try to spend some more time this we if we don't know by then ...

@ion-elgreco
Copy link
Collaborator

@roeap did you mean milli or nanoseconds?

The primitive types should be microsecond right? At least according to protocol

@roeap
Copy link
Collaborator

roeap commented Jan 17, 2025

My bad - yes microseconds are the supported type. Since there is a difference when using millis, mybe still worth looking into?

@ion-elgreco
Copy link
Collaborator

My bad - yes microseconds are the supported type. Since there is a difference when using millis, mybe still worth looking into?

Definitely! I was looking yesterday at the stats coming out of the parquet but I didn't see anything odd.

It could be also perhaps in the literal parsing in arrow/Datafusion :s

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants