Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{
"dataFlowId": "append_view_once_flow",
"dataFlowGroup": "feature_samples_general",
"dataFlowType": "flow",
"targetFormat": "delta",
"targetDetails": {
"table": "append_view_once_flow",
"tableProperties": {
"delta.enableChangeDataFeed": "true"
}
},
"flowGroups": [
{
"flowGroupId": "main",
"flows": {
"f_customer_append_view_once": {
"flowType": "append_view",
"flowDetails": {
"targetTable": "append_view_once_flow",
"sourceView": "v_append_view_once_flow",
"once": true
},
"views": {
"v_append_view_once_flow": {
"mode": "batch",
"sourceType": "delta",
"sourceDetails": {
"database": "{staging_schema}",
"table": "customer"
}
}
}
}
}
}
]
}
7 changes: 5 additions & 2 deletions src/dataflow/flows/append_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def columnPrefixExceptions(self) -> List[str]:

@property
def once(self) -> bool:
"""Get the once flag."""
"""Get the once flag. Note: Setting 'once' requires a batch read."""
return self.flowDetails.get("once", False)

def create_flow(
Expand All @@ -55,14 +55,17 @@ def get_column_prefix_exceptions(flow_config: FlowConfig) -> List[str]:
return column_prefix_exceptions

spark = self.spark
spark_reader = spark.readStream
if self.once:
spark_reader = spark.read
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens in streaming flows if you have this turned on, have we tested this? and can we please add a sample for this as well if possible

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've successfully tested this manually, but I'm happy to add a sample as well.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome, yeah that would be good if you could add one into the features samples under bronze, we use these as regression testing for now and will later turn into the tests for CICD pipeline so if we have a test for every feature that would help make sure we don't break anything on future developments

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, sounds good.

I just finished testing this change on new samples locally with these results:

  • When the source view has mode: batch, the flow succeeds (the batch view is inserted "once" into the target streaming table during the initial pipeline run, and then ignored in subsequent runs).
  • When the source view has mode: stream, the flow fails with View 'v_append_view_once_stream_flow' is a streaming view and must be referenced using readStream.

This behaviour is expected, as customers should only ever use a batch view as the source for an append once flow. As outlined in the append_flow documentation:

Using once=True changes the flow: the return value must be a batch DataFrame in this case, not a streaming DataFrame.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exclude_columns = flow_config.exclude_columns
column_prefix_exceptions = get_column_prefix_exceptions(flow_config)

source_view_name = f'live.{self.sourceView}'

@dp.append_flow(name=self.flowName, target=self.targetTable, once=self.once)
def flow_transform():
df = spark.readStream.table(source_view_name)
df = spark_reader.table(source_view_name)
if "column_prefix" in self.flowDetails:
prefix = f"{self.columnPrefix.lower()}_"
df = df.select([
Expand Down