[SPARK-55729][SS] Support state data source reader for new state format v4 on stream-stream join#54845
[SPARK-55729][SS] Support state data source reader for new state format v4 on stream-stream join#54845HeartSaVioR wants to merge 9 commits intoapache:masterfrom
Conversation
reflect review comments (by claude, need another confirm)
19a0871 to
3902f7b
Compare
...rc/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
Show resolved
Hide resolved
| val vSchema = manager.readSchemaFile().find { schema => | ||
| schema.colFamilyName == storeNames(1) | ||
| }.map(_.valueSchema).get | ||
| // Try v3 CF names first; if not found, use v4 CF names |
There was a problem hiding this comment.
Why so ? we have the format version in the offset log right ?
| case 1 if Utils.isTesting => new SchemaV1Writer | ||
| case 2 => new SchemaV2Writer | ||
| case 3 => new SchemaV3Writer | ||
| case v if v >= 3 => new SchemaV3Writer |
There was a problem hiding this comment.
nit: maybe just be explicit for supported versions and throw an error for anything > 4 ?
| testStreamStreamJoin(3) | ||
| } | ||
|
|
||
| test("stream-stream join, state ver 4") { |
There was a problem hiding this comment.
Do we have a test for the change tracking option also ?
|
CI failure seems unrelated |
...org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala
Show resolved
Hide resolved
| val useMultipleValuesPerKey = SchemaUtil.checkVariableType(stateVariableInfoOpt, | ||
| StateVariableType.ListState) | ||
| val useMultipleValuesPerKey = StatePartitionReaderUtils.isMultiValuedCF( | ||
| joinColFamilyOpt.getOrElse(""), stateVariableInfoOpt) |
There was a problem hiding this comment.
I find the .getOrElse("") a bit confusing. Maybe we can do something like joinColFamilyOpt.exists(StatePartitionReaderUtils.v4JoinCFNames.contains) or have isMultiValuedCF take a Option[String].
| val schemas = manager.readSchemaFile() | ||
|
|
||
| val kSchema = schemas.find(_.colFamilyName == v4Names(0)).map(_.keySchema).get | ||
| val vSchema = schemas.find(_.colFamilyName == v4Names(0)).map(_.valueSchema).get |
There was a problem hiding this comment.
I see v3 uses storeNames(1) for vSchema. This being v4Names(0) is a bit confusing. Maybe extract into a val and add a comment so it is easier to understand.
...est/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
Show resolved
Hide resolved
...g/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala
Show resolved
Hide resolved
|
https://github.com/HeartSaVioR/spark/runs/68291049885 |
What changes were proposed in this pull request?
This PR proposes to support reading state via state data source reader, for new state format v4 on stream-stream join.
state data source reader supports both options (joinSide, storeName) for reading the state in stream-stream join operator. This PR enables both options for the format v4. The only difference is that the storeNames are different in v4, although we expect most users would only deal with joinSide option. Reading state rows with storeName would only be needed when debugging.
Why are the changes needed?
State data source reader didn't support reading the state with the new state format v4 in stream-stream join. This PR will enable the support.
Does this PR introduce any user-facing change?
Yes, state data source reader will be able to read the state with the new state format v4 in stream-stream join. In terms of UX there will be no difference with older state format versions, though.
How was this patch tested?
Existing UTs are expanded to test for state format version 4.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude 4.6 opus