-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtask_witih_multiple_output.py
More file actions
75 lines (54 loc) · 1.87 KB
/
task_witih_multiple_output.py
File metadata and controls
75 lines (54 loc) · 1.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import time
import json
from datetime import datetime,timedelta
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import task,dag
default_args={
'owner':'Bibek'
}
# Instantiate your DAG
@dag('cross_task_communication_taskflow_with_multiple_output', default_args=default_args,
tags=['python','dependencies','taskflow_api'], schedule_interval=None)
def passing_data_with_taskflow_api():
@task
def get_order_prices():
order_price_data={
'o1':234,
'o2':123,
'o3':45,
'o4':32,
'o5':97,
'o6':74
}
return order_price_data
@task(multiple_outputs=True)
def compute_sum_and_avg(order_price_data: dict):
total=0
count=0
for order in order_price_data:
total +=order_price_data[order]
count +=1
avg= total / count
return {'total_price':total,'avg_price':avg}
# @task
# def display_result(price_summary_data: dict):
# total=price_summary_data['total_price']
# avg=price_summary_data['avg_price']
# print("Total price of goods is {total}".format(total=total))
# print("Avg price of goods is {avgs}".format(avgs=avg))
# order_price_data=get_order_prices()
# price_summary_data=compute_sum_and_avg(order_price_data)
# display_result(price_summary_data)
@task
def display_result(total,avg):
print("Total price of goods is {total}".format(total=total))
print("Avg price of goods is {avgs}".format(avgs=avg))
order_price_data=get_order_prices()
price_summary_data=compute_sum_and_avg(order_price_data)
display_result(
price_summary_data['total_price'],
price_summary_data['avg_price']
)
passing_data_with_taskflow_api()