-
Notifications
You must be signed in to change notification settings - Fork 22
Introduce UpsertWriter #169
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
da26c4b to
c1ee507
Compare
|
@luoyuxia @fresh-borzoni Appreciate a review here |
fresh-borzoni
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leekeiabstraction Thank you for the PR!
Left some comments. PTAL
# Conflicts: # crates/fluss/src/record/arrow.rs
# Conflicts: # crates/fluss/src/row/compacted/compacted_row.rs
298169e to
d6c672d
Compare
|
@fresh-borzoni Addressed all of your comments, PTAL. |
luoyuxia
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leekeiabstraction Thanks for the pr. Left minor comment. PTAL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request introduces the UpsertWriter functionality to enable insert/update and delete operations for primary key tables in Fluss. The changes refactor the type system to use RowType directly instead of wrapping it in DataType::Row, introduce support for auto-increment columns, and implement the core upsert writer with partial update capabilities.
Changes:
- Refactored
BinaryRowfrom trait to struct and changed encoding APIs to work with byte slices instead ofBinaryRowtrait bounds - Introduced
KeyEncoderFactoryandRowEncoderFactorywith improved API patterns for creating encoders - Added auto-increment column support to the
Schemabuilder and validation inUpsertWriter - Implemented
TableUpsert,UpsertWritertrait, andUpsertWriterImplwith support for partial updates and field validation
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/fluss/src/row/mod.rs | Changed BinaryRow from trait to struct, added as_encoded_bytes() method to InternalRow, exposed field_getter module |
| crates/fluss/src/row/encode/mod.rs | Introduced KeyEncoderFactory for creating key encoders, changed RowEncoder::finish_row() to return Bytes |
| crates/fluss/src/row/encode/compacted_row_encoder.rs | Updated to return Bytes from finish_row() |
| crates/fluss/src/row/compacted/compacted_row_writer.rs | Added flush_bytes() method to reuse the writer for multiple rows |
| crates/fluss/src/row/compacted/compacted_row.rs | Moved as_bytes() from trait implementation to inherent method, implemented as_encoded_bytes() |
| crates/fluss/src/record/kv/kv_record_read_context.rs | Simplified to use row_type() directly without validation |
| crates/fluss/src/record/kv/kv_record_batch_builder.rs | Changed to accept byte slices instead of BinaryRow trait objects |
| crates/fluss/src/record/kv/kv_record_batch.rs | Updated tests to work with byte slices |
| crates/fluss/src/record/arrow.rs | Changed to_arrow_schema() and builders to accept RowType directly |
| crates/fluss/src/metadata/table.rs | Changed Schema and TableInfo to use RowType directly, added auto-increment column support |
| crates/fluss/src/client/write/mod.rs | Added RowBytes enum and updated WriteRecord for upsert operations with byte-based APIs |
| crates/fluss/src/client/write/batch.rs | Updated to work with RowType and byte slices |
| crates/fluss/src/client/table/writer.rs | Added UpsertWriter trait with upsert() and delete() methods, introduced result types |
| crates/fluss/src/client/table/upsert.rs | New file implementing TableUpsert, UpsertWriterImpl, and comprehensive validation logic |
| crates/fluss/src/client/table/partition_getter.rs | New file for partition field extraction (TODO implementation) |
| crates/fluss/src/client/table/mod.rs | Added new_upsert() method to FlussTable |
| crates/fluss/src/client/table/lookup.rs | Updated to use KeyEncoderFactory |
| crates/fluss/src/client/table/log_fetch_buffer.rs | Updated tests to use RowType directly |
| crates/fluss/Cargo.toml | Added bitvec = "1" dependency for bit set operations |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
fresh-borzoni
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leekeiabstraction Thank you, left some comments. PTAL
|
@fresh-borzoni @luoyuxia I've addressed your comments, PTAL. |
|
@luoyuxia I'm thinking of raising integration tests in a separate PR. This makes it easier for me to manage with smaller PR merges, also I can raise bug fixes issue / PR if any found during integration test writing. LMK What you think |
fresh-borzoni
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR and nice code!
LGTM
Purpose
Linked issue: close #168
Wiring up TableUpsert. Tests to follow