Skip to content

[SPARK-55729][SS] Support state data source reader for new state format v4 on stream-stream join#54845

Open
HeartSaVioR wants to merge 9 commits intoapache:masterfrom
HeartSaVioR:SPARK-55729
Open

[SPARK-55729][SS] Support state data source reader for new state format v4 on stream-stream join#54845
HeartSaVioR wants to merge 9 commits intoapache:masterfrom
HeartSaVioR:SPARK-55729

Conversation

@HeartSaVioR
Copy link
Contributor

What changes were proposed in this pull request?

This PR proposes to support reading state via state data source reader, for new state format v4 on stream-stream join.

state data source reader supports both options (joinSide, storeName) for reading the state in stream-stream join operator. This PR enables both options for the format v4. The only difference is that the storeNames are different in v4, although we expect most users would only deal with joinSide option. Reading state rows with storeName would only be needed when debugging.

Why are the changes needed?

State data source reader didn't support reading the state with the new state format v4 in stream-stream join. This PR will enable the support.

Does this PR introduce any user-facing change?

Yes, state data source reader will be able to read the state with the new state format v4 in stream-stream join. In terms of UX there will be no difference with older state format versions, though.

How was this patch tested?

Existing UTs are expanded to test for state format version 4.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude 4.6 opus

val vSchema = manager.readSchemaFile().find { schema =>
schema.colFamilyName == storeNames(1)
}.map(_.valueSchema).get
// Try v3 CF names first; if not found, use v4 CF names
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why so ? we have the format version in the offset log right ?

case 1 if Utils.isTesting => new SchemaV1Writer
case 2 => new SchemaV2Writer
case 3 => new SchemaV3Writer
case v if v >= 3 => new SchemaV3Writer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe just be explicit for supported versions and throw an error for anything > 4 ?

testStreamStreamJoin(3)
}

test("stream-stream join, state ver 4") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test for the change tracking option also ?

@HeartSaVioR
Copy link
Contributor Author

val useMultipleValuesPerKey = SchemaUtil.checkVariableType(stateVariableInfoOpt,
StateVariableType.ListState)
val useMultipleValuesPerKey = StatePartitionReaderUtils.isMultiValuedCF(
joinColFamilyOpt.getOrElse(""), stateVariableInfoOpt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the .getOrElse("") a bit confusing. Maybe we can do something like joinColFamilyOpt.exists(StatePartitionReaderUtils.v4JoinCFNames.contains) or have isMultiValuedCF take a Option[String].

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - 1f1fd48

val schemas = manager.readSchemaFile()

val kSchema = schemas.find(_.colFamilyName == v4Names(0)).map(_.keySchema).get
val vSchema = schemas.find(_.colFamilyName == v4Names(0)).map(_.valueSchema).get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see v3 uses storeNames(1) for vSchema. This being v4Names(0) is a bit confusing. Maybe extract into a val and add a comment so it is easier to understand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - 1f1fd48

@HeartSaVioR
Copy link
Contributor Author

https://github.com/HeartSaVioR/spark/runs/68291049885
Again, protobuf breaking change CI failure again.

Copy link
Contributor

@dylanwong250 dylanwong250 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants