-
Notifications
You must be signed in to change notification settings - Fork 285
Iscp integration (Step 2) #22609
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
cpegeric
wants to merge
476
commits into
matrixorigin:main
Choose a base branch
from
cpegeric:iscp_final
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Iscp integration (Step 2) #22609
+4,023
−1,284
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Labels
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
User description
What type of PR is this?
Which issue(s) this PR fixes:
issue ##21835
What this PR does / why we need it:
load the models once and perform all batches from DataRetriever and finally save model files into database.
This changes save a lot of time for upload/download the model every time there is a 8192 vector block.
PR Type
Enhancement, Tests
Description
• ISCP Integration: Complete integration of Index Sync Change Processing (ISCP) for asynchronous vector and fulltext index operations
• SqlProcess Abstraction: Introduced new
SqlProcessinterface to abstract both frontend (process.Process) and background (SqlContext) execution contexts across all vector index operations• HNSW Performance Improvements: Refactored HNSW update operations to use direct APIs instead of SQL, enabling model loading once and batch processing for significant performance gains
• Async Index Support: Added comprehensive support for asynchronous IVF, HNSW, and FullText indexes with CDC task management
• Transaction Event System: Enhanced transaction event callback system with context support and structured
TxnEventCallbackwrapper• DDL Integration: Integrated ISCP job management with DDL operations (create, drop, alter table) for proper index lifecycle management
• Test Coverage: Added extensive test suites for async vector operations and updated existing tests for new interfaces
Diagram Walkthrough
File Walkthrough
37 files
sync.go
Refactor HNSW sync to use SqlProcess abstractionpkg/vectorindex/hnsw/sync.go
• Refactored
CdcSyncfunction to use newHnswSyncstruct with separateinitialization, update, and save phases
• Replaced
process.Processparameter with
sqlexec.SqlProcessfor better abstraction• Added new
methods
NewHnswSync,RunOnce,Update,Save, andDownloadAllto theHnswSyncstruct• Modified function signatures to accept
SqlProcessinstead of
process.Processthroughout the filesqlexec.go
Create SqlProcess abstraction for frontend and background executionpkg/vectorindex/sqlexec/sqlexec.go
• Introduced
SqlProcessstruct to abstract both frontend(
process.Process) and background (SqlContext) execution contexts•
Added
SqlContextfor background SQL execution with required context,UUID, transaction operator, and account information
• Implemented
RunTxnWithSqlContextfunction for running transactions in backgroundmode
• Modified existing functions to work with
SqlProcessabstractioninstead of direct
process.Processoperator.go
Add context support to transaction event handlingpkg/txn/client/operator.go
• Added context parameter to various transaction event handling
methods
• Updated
triggerEventand related methods to acceptcontext.Contextparameter• Modified transaction lifecycle methods
(
Commit,Rollback,closeLocked) to pass context to event handlers•
Enhanced error handling by joining multiple errors in transaction
operations
mock_consumer.go
Update mock consumer for new transaction interfacepkg/iscp/mock_consumer.go
• Updated
NewInteralSqlConsumerto acceptcnEngineandcnTxnClientparameters
• Modified transaction handling to use direct transaction
operations instead of executor interface
• Updated
consumeDatamethodsignature to work with
client.TxnOperator• Enhanced error handling
and transaction lifecycle management in consumer operations
build_dml_util.go
Add async index support to DML operationspkg/sql/plan/build_dml_util.go
• Added async index detection logic to skip synchronous operations for
async fulltext and vector indexes
• Enhanced DML plan building to
handle async indexes appropriately
• Updated index processing logic in
insert, delete, and update operations to check for async configuration
• Improved error handling for index async parameter parsing
fulltext.go
Update fulltext functions for SqlProcess interfacepkg/sql/colexec/table_function/fulltext.go
• Updated function calls to use
sqlexec.NewSqlProcess(proc)wrapper•
Modified
runWordStatsandrunCountStarfunctions to work with newSqlProcess interface
• Maintained existing functionality while
adapting to new SQL execution abstraction
cache_test.go
Update vector index cache tests for SQL process interfacepkg/vectorindex/cache/cache_test.go
• Replaced
process.Processwithsqlexec.SqlProcessin mock searchinterfaces
• Updated test functions to create and use
SqlProcessinstances
• Modified all Search and Load method signatures to use the
new SQL process interface
model.go
Refactor HNSW model to use SQL process interfacepkg/vectorindex/hnsw/model.go
• Replaced
process.Processwithsqlexec.SqlProcessthroughout the file• Added
NThreadfield toHnswModelstruct for thread configuration•
Enhanced error handling and initialization logic in
initIndexmethod•
Updated streaming SQL execution to use new SQL process interface
search.go
Refactor IVF flat search to use SQL process interfacepkg/vectorindex/ivfflat/search.go
• Replaced
process.Processwithsqlexec.SqlProcessin all methodsignatures
• Updated context handling to use SQL process context
•
Modified streaming operations to work with new SQL process interface
service_txn_event.go
Update transaction trace service for new event callback systempkg/txn/trace/service_txn_event.go
• Updated transaction event callback registration to use new callback
wrapper
• Modified event handler signatures to include context and
return error
• Enhanced event handling with proper error propagation
search.go
Refactor HNSW search to use SQL process interfacepkg/vectorindex/hnsw/search.go
• Replaced
process.Processwithsqlexec.SqlProcessin all methodsignatures
• Updated context handling and error messages to use SQL
process context
• Modified metadata loading and index operations for
new interface
cache.go
Update vector index cache for SQL process interfacepkg/vectorindex/cache/cache.go
• Updated
VectorIndexSearchIfinterface to usesqlexec.SqlProcess•
Modified cache search and load operations for new SQL process
interface
• Updated all method signatures and implementations
consistently
client.go
Implement new transaction event callback systempkg/txn/client/client.go
• Updated transaction event callback system with new wrapper structure
• Modified callback registration to use
TxnEventCallbackwrapper•
Enhanced event handlers with context and error return parameters
index_sqlwriter.go
Refactor HNSW SQL writer for CDC integrationpkg/iscp/index_sqlwriter.go
• Modified HNSW SQL writer to output JSON instead of SQL statements
•
Added
NewSyncmethod for creating HNSW synchronization objects•
Updated CDC writer capacity configuration for better performance
ivf_search.go
Update IVF search table function for SQL process interfacepkg/sql/colexec/table_function/ivf_search.go
• Updated IVF search table function to use
sqlexec.SqlProcess•
Modified version retrieval and cache search operations
• Wrapped
process instances with SQL process interface
service.go
Update shard service for new transaction callback systempkg/shardservice/service.go
• Updated transaction event callback registration with new wrapper
system
• Modified callback functions to include context and error
return
• Enhanced error handling in shard service operations
operator_events.go
Implement enhanced transaction event callback systempkg/txn/client/operator_events.go
• Introduced
TxnEventCallbackwrapper structure for callbacks•
Updated callback registration and triggering mechanisms
• Enhanced
event system with context and error handling support
build.go
Update HNSW build operations for SQL process interfacepkg/vectorindex/hnsw/build.go
• Updated HNSW build operations to use
sqlexec.SqlProcess• Modified
context handling in worker threads
• Updated method signatures for
consistency with new interface
storage_test.go
Update transaction event callback interface in partition storage testspkg/partitionservice/storage_test.go
• Updated transaction event callback to use new
TxnEventCallbackstructure
• Changed callback function signature to include context,
operator, event and callback data parameters
• Added error return type
to callback function
build_test.go
Integrate SqlProcess wrapper for HNSW build operationspkg/vectorindex/hnsw/build_test.go
• Added import for
sqlexecpackage• Updated
NewHnswBuildcalls to usesqlexec.NewSqlProcess(proc)instead ofprocdirectlyhnsw_search.go
Integrate SqlProcess wrapper for HNSW search operationspkg/sql/colexec/table_function/hnsw_search.go
• Added import for
sqlexecpackage• Updated vector cache search call
to use
sqlexec.NewSqlProcess(proc)wrapperdata_retriever.go
Refactor watermark update to use transaction operator interfacepkg/iscp/data_retriever.go
• Updated
UpdateWatermarkmethod signature to use context andtransaction operator instead of executor
• Added system account
context setup with timeout
• Replaced SQL execution with
ExecWithResultfunction callservice.go
Update transaction event handling in increment servicepkg/incrservice/service.go
• Updated transaction event callback registration to use new
TxnEventCallbackstructure• Modified
txnClosedmethod signature tomatch new callback interface
storage_test.go
Update transaction event callback interface in shard storage testspkg/shardservice/storage_test.go
• Updated transaction event callbacks to use new
TxnEventCallbackstructure
• Changed callback function signatures to include context,
operator, event and callback data parameters
• Added error return type
to callback functions
hnsw_create.go
Integrate SqlProcess wrapper for HNSW creation operationspkg/sql/colexec/table_function/hnsw_create.go
• Updated SQL execution calls to use
sqlexec.NewSqlProcess(proc)wrapper
• Modified HNSW build creation calls to use the new SQL
process interface
consumer.go
Extend consumer creation with engine and transaction clientpkg/iscp/consumer.go
• Added engine and transaction client parameters to
NewConsumerfunction
• Updated consumer creation calls to pass additional
parameters
types.go
Introduce structured transaction event callback systempkg/txn/client/types.go
• Updated
AppendEventCallbackinterface to useTxnEventCallbackinstead of function type
• Added new
TxnEventCallbackstruct withfunction and value fields
• Added constructor functions for creating
callback instances
types.go
Update DataRetriever interface for transaction operator usagepkg/iscp/types.go
• Updated
DataRetrieverinterface method signature forUpdateWatermark• Removed executor import and updated to use context and transaction
operator
metadata_scan.go
Integrate SqlProcess wrapper for metadata scan operationspkg/sql/colexec/table_function/metadata_scan.go
• Added import for
sqlexecpackage and reorganized imports• Updated
SQL execution call to use
sqlexec.NewSqlProcess(proc)wrapperstore_mem.go
Update transaction event callback in memory storepkg/incrservice/store_mem.go
• Updated transaction event callback to use new
TxnEventCallbackstructure
• Modified callback function signature to include context,
operator, event and callback data parameters
sql.go
Integrate SqlProcess wrapper for IVF flat SQL operationspkg/vectorindex/ivfflat/sql.go
• Updated
GetVersionfunction to useSqlProcessinstead ofprocess.Process• Modified SQL execution and error context to use the
new interface
types.go
Enhance vector index CDC with configurable capacity and sonic JSONpkg/vectorindex/types.go
• Added capacity parameter to
NewVectorIndexCdcconstructor function•
Replaced
jsonpackage withsonicfor JSON marshalingivf_create.go
Integrate SqlProcess wrapper for IVF creation operationspkg/sql/colexec/table_function/ivf_create.go
• Updated version retrieval and SQL execution calls to use
sqlexec.NewSqlProcess(proc)wrapperservice.go
Update transaction event callback in partition servicepkg/partitionservice/service.go
• Updated transaction event callback to use new
TxnEventCallbackstructure
• Modified callback function signature to include context,
operator, event and callback data parameters
iteration.go
Update consumer creation with additional parameters in iterationpkg/iscp/iteration.go
• Updated
NewConsumercall to include engine and transaction clientparameters
storage_txn_client.go
Update memory storage transaction client for new callback interfacepkg/txn/storage/memorystorage/storage_txn_client.go
• Updated
AppendEventCallbackmethod signature to useTxnEventCallbackinstead of function type
watermark_updater.go
Make ExecWithResult function mockable for testingpkg/iscp/watermark_updater.go
• Made
ExecWithResultfunction a variable to allow for testing/mocking24 files
index_consumer_test.go
Update ISCP consumer tests for new interfacepkg/iscp/index_consumer_test.go
• Updated test functions to use new
NewConsumersignature withcnEngineandcnClientparameters• Replaced mock SQL executor with
ExecWithResultstub for cleaner testing• Added new test functions for
IVF index handling (
newTestIvfTableDef,newTestIvfConsumerInfo)•
Modified existing tests to work with the new consumer interface and
removed complex mock implementations
sync_test.go
Update HNSW sync tests for SqlProcess interfacepkg/vectorindex/hnsw/sync_test.go
• Updated all test functions to use
sqlexec.NewSqlProcess(proc)instead of passing
process.Processdirectly• Modified test calls from
CdcSynctoNewHnswSyncfollowed byRunOncepattern• Added new test
for continuous update operations with
TestSyncContinuousUpdateInsertShuffle2FilesF32WithSmallCap• Updated
mock function signatures to accept
SqlProcessparameteriscp_util_test.go
Add comprehensive tests for ISCP utility functionspkg/sql/compile/iscp_util_test.go
• Comprehensive test suite for ISCP utility functions
• Tests for
index CDC validation, task creation/deletion
• Mock functions for
testing error scenarios
• Coverage for async index parameter
validation
model_test.go
Update HNSW model tests for SQL process interfacepkg/vectorindex/hnsw/model_test.go
• Updated test functions to use
sqlexec.SqlProcessinstead ofprocess.Process• Modified mock streaming functions to accept new SQL
process interface
• Updated all test cases to create and use
SqlProcessinstancesengine_mock.go
Update engine mock for interface compatibilitypkg/frontend/test/engine_mock.go
• Updated mock method signatures to match engine interface changes
•
Added
HasTempEnginemethod to mock engine• Fixed parameter names and
method signatures for consistency
search_test.go
Update HNSW search tests for SQL process interfacepkg/vectorindex/hnsw/search_test.go
• Updated mock functions to use
sqlexec.SqlProcessinterface•
Modified test setup to create and use
SqlProcessinstances• Updated
all search operations to use new SQL process interface
search_test.go
Update IVF flat search tests for SQL process interfacepkg/vectorindex/ivfflat/search_test.go
• Updated mock streaming functions to use
sqlexec.SqlProcess•
Modified test setup to create and use
SqlProcessinstances• Updated
all search operations and mock interfaces consistently
ivf_search_test.go
Update IVF search table function tests for SQL process interfacepkg/sql/colexec/table_function/ivf_search_test.go
• Updated mock functions to use
sqlexec.SqlProcessinterface•
Modified mock search implementations for new interface
• Updated test
helper functions consistently
build_dml_util_test.go
Add tests for async FullText index functionalitypkg/sql/plan/build_dml_util_test.go
• Added tests for async FullText index functionality
• Tests for
parameter validation and error handling
• Coverage for both sync and
async index modes
fulltext_test.go
Integrate SqlProcess wrapper for fulltext test operationspkg/sql/colexec/table_function/fulltext_test.go
• Added import for
sqlexecpackage• Updated fake SQL execution
functions to accept
SqlProcessinstead ofprocess.Process• Modified
function signatures to use the new SQL process wrapper
hnsw_search_test.go
Update mock search interface for SqlProcess integrationpkg/sql/colexec/table_function/hnsw_search_test.go
• Added import for
sqlexecpackage• Updated mock search interface
methods to use
SqlProcessinstead ofprocess.Processhnsw_create_test.go
Update HNSW creation test for SqlProcess integrationpkg/sql/colexec/table_function/hnsw_create_test.go
• Added import for
sqlexecpackage• Updated mock SQL execution
function to use
SqlProcessinstead ofprocess.Processsqlexec_test.go
Update SQL execution tests for SqlProcess integrationpkg/vectorindex/sqlexec/sqlexec_test.go
• Updated test functions to create and use
SqlProcesswrapper•
Modified
RunTxncalls to use the new SQL process interfaceoperator_events_test.go
Update transaction event callback tests for new interfacepkg/txn/client/operator_events_test.go
• Updated transaction event callback test to use new
TxnEventCallbackstructure
• Modified callback function signature to match new
interface requirements
txn_mock.go
Update mock transaction operator for new callback interfacepkg/frontend/test/txn_mock.go
• Updated mock transaction operator interface to use
TxnEventCallbackinstead of function type
store_sql_test.go
Update test transaction operator for new callback interfacepkg/incrservice/store_sql_test.go
• Updated test transaction operator interface to use
TxnEventCallbackinstead of function type
service_test.go
Update bootstrap test transaction operator for new callback interfacepkg/bootstrap/service_test.go
• Updated test transaction operator interface to use
TxnEventCallbackinstead of function type
txn_test.go
Update frontend test transaction operator for new callback interfacepkg/frontend/txn_test.go
• Updated test transaction operator interface to use
TxnEventCallbackinstead of function type
entire_engine_test.go
Update engine test operator for new callback interfacepkg/vm/engine/entire_engine_test.go
• Updated test operator interface to use
TxnEventCallbackinstead offunction type
types_test.go
Update vector index CDC test for new constructor signaturepkg/vectorindex/types_test.go
• Updated
NewVectorIndexCdccall to include capacity parameter (8192)vector_ivf_async.result
Add test results for asynchronous IVF vector index functionalitytest/distributed/cases/vector/vector_ivf_async.result
• Added test results for asynchronous IVF vector index operations
•
Includes test cases for index creation, data loading, and vector
similarity searches
vector_hnsw.result
Update HNSW vector test results for asynchronous operationstest/distributed/cases/vector/vector_hnsw.result
• Added sleep statement and updated expected query results
• Modified
test results to account for asynchronous index updates
vector_hnsw_f64_async.sql
Add asynchronous HNSW F64 vector index test suitetest/distributed/cases/vector/vector_hnsw_f64_async.sql
• Added comprehensive test suite for asynchronous HNSW vector index
operations with F64 precision
• Includes tests for CRUD operations,
index creation, and vector similarity searches
vector_hnsw.sql
Update HNSW vector test for asynchronous index operationstest/distributed/cases/vector/vector_hnsw.sql
• Added sleep statement to allow for asynchronous index updates
•
Added table cleanup and updated test comments
5 files
alter.go
Integrate ISCP with alter table operationspkg/sql/compile/alter.go
• Added ISCP integration by dropping index CDC tasks for temporary
tables during alter table operations
• Implemented logic to handle
unaffected indexes during table alteration with ISCP job registration
• Added support for async index handling and improved index table
cloning logic
• Enhanced error handling and logging for ISCP-related
operations during alter table
ddl.go
Integrate ISCP job management with DDL operationspkg/sql/compile/ddl.go
• Added ISCP job management for database and table operations (create,
drop, rename)
• Integrated index CDC task creation and deletion for
vector and fulltext indexes
• Added async index handling logic to skip
synchronous operations for async indexes
• Enhanced alter table
operations with ISCP job lifecycle management
index_consumer.go
Implement ISCP integration with HNSW and SQL process refactoringpkg/iscp/index_consumer.go
• Added new imports for sonic JSON, types, and vector index components
• Replaced SQL executor factory with direct engine and transaction
client usage
• Implemented separate execution paths for HNSW vs other
index types
• Added
runHnswfunction for HNSW-specific CDC processingwith model synchronization
iscp_util.go
Add ISCP utility functions for CDC task managementpkg/sql/compile/iscp_util.go
• New utility file for ISCP (Index Sync Change Processing) integration
• Functions for CDC task management (create, delete,
register/unregister jobs)
• Index validation logic for async vector
indexes (HNSW, IVF, FullText)
• Transaction callback system for
deferred ISCP job registration
ddl_index_algo.go
Implement async index support with ISCP integrationpkg/sql/compile/ddl_index_algo.go
• Added async index support for FullText indexes with CDC task
creation
• Modified IVF index handling to support async mode with ISCP
job registration
• Updated HNSW index creation to handle both sync and
async modes
• Integrated transaction callback system for deferred
index operations
2 files
function_id.go
Remove HNSW CDC update function IDpkg/sql/plan/function/function_id.go
• Removed
HNSW_CDC_UPDATEfunction ID constant• Adjusted
FUNCTION_END_NUMBERaccordinglyfunction_id_test.go
Remove HNSW CDC update function from predefined function IDspkg/sql/plan/function/function_id_test.go
• Removed
HNSW_CDC_UPDATEfunction ID entry• Updated
FUNCTION_END_NUMBERfrom 351 to 3501 files
util.go
Add error handling to fulltext index SQL generationpkg/sql/compile/util.go
• Added error return type to
genInsertIndexTableSqlForFullTextIndexfunction
• Updated function to return error alongside SQL strings
9 files