-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbuild_pgm.py
More file actions
199 lines (139 loc) · 5.93 KB
/
build_pgm.py
File metadata and controls
199 lines (139 loc) · 5.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# build_pgm.py
import math
import numpy as np
import pandas as pd
import networkx as nx
import config
from power_grid_model.validation import assert_valid_input_data, validate_input_data
from power_grid_model import (DatasetType,
LoadGenType,
ComponentType,
PowerGridModel,
CalculationType,
CalculationMethod,
MeasuredTerminalType,
initialize_array,)
def graph_to_pgm(graph: nx.Graph,
slack_node: int,
voltage_df: pd.DataFrame,
pq_df: pd.DataFrame
) -> PowerGridModel:
input_data = {}
# ---------------- NODES ----------------
node_ids = []
u_rated = []
for node_id, data in graph.nodes(data=True):
subtype = data.get("subtype")
node_ids.append(node_id)
u_rated.append(config.SUBTYPE_VOLTAGE[subtype]) # always subtype 2 which is 400V
nodes = initialize_array(DatasetType.input, ComponentType.node, len(node_ids))
nodes["id"] = np.array(node_ids, dtype=np.int32)
nodes["u_rated"] = np.full(len(node_ids), config.LV_LN_VOLTAGE, dtype=np.float64)
# ---------------- LINES ----------------
line_ids = []
from_nodes = []
to_nodes = []
r1 = []
x1 = []
c1 = []
line_id = config.LINE_ID_START
for node_a, node_b, edge_data in graph.edges(data=True):
resistance = edge_data.get("r_ohm")
inductance = edge_data.get("l_h")
capacitance = edge_data.get("c_f")
reactance = 2.0 * math.pi * config.FREQUENCY_HZ * inductance
line_ids.append(line_id)
from_nodes.append(node_a)
to_nodes.append(node_b)
r1.append(resistance)
x1.append(reactance)
c1.append(capacitance)
line_id += 1
len_lines = len(line_ids)
lines = initialize_array(DatasetType.input, ComponentType.line, len_lines)
lines["id"] = np.array(line_ids, dtype=np.int32)
lines["from_node"] = np.array(from_nodes, dtype=np.int32)
lines["to_node"] = np.array(to_nodes, dtype=np.int32)
lines["from_status"] = np.ones(len_lines, dtype=np.int8)
lines["to_status"] = np.ones(len_lines, dtype=np.int8)
lines["r1"] = np.array(r1, dtype=np.float64)
lines["x1"] = np.array(x1, dtype=np.float64)
lines["c1"] = np.array(c1, dtype=np.float64)
lines["tan1"] = np.zeros(len_lines, dtype=np.float64)
# ---------------- SOURCE ----------------
source = initialize_array(DatasetType.input, ComponentType.source, 1)
source["id"][0] = config.SLACK_SOURCE_ID
source["node"][0] = slack_node
source["status"][0] = 1
source["u_ref"][0] = config.SLACK_U_REF_PU
source["u_ref_angle"][0] = config.SLACK_U_REF_ANGLE_RAD
source["sk"][0] = config.SLACK_SK_VA
source["rx_ratio"][0] = config.SLACK_RX_RATIO
source["z01_ratio"][0] = config.SLACK_Z01_RATIO
# ---------------- VOLTAGE SENSORS ----------------
voltage_len = len(voltage_df)
sym_voltage_sensor = initialize_array(DatasetType.input, ComponentType.sym_voltage_sensor, voltage_len)
sensor_id = config.VOLTAGE_SENSOR_ID_START
VOLTAGE_SIGMA = config.VOLAGE_SIGMA
for i, (_, row) in enumerate(voltage_df.iterrows()):
oid = row["oid"]
sym_voltage_sensor["id"][i] = sensor_id
sym_voltage_sensor["measured_object"][i] = int(oid)
sym_voltage_sensor["u_sigma"][i] = VOLTAGE_SIGMA
sym_voltage_sensor["u_measured"][i] = float(row["voltage"])
sensor_id += 1
# ---------------- SYM LOADS ----------------
load_len = len(pq_df)
sym_load = initialize_array(DatasetType.input, ComponentType.sym_load, load_len)
load_id = config.LOAD_ID_START
oid_to_load_id = {}
for i, (_, row) in enumerate(pq_df.iterrows()):
node_id = int(row["oid"])
sym_load["id"][i] = load_id
sym_load["node"][i] = node_id
sym_load["status"][i] = 1
sym_load["type"][i] = LoadGenType.const_power
oid_to_load_id[node_id] = load_id
load_id += 1
# ---------------- POWER SENSORS ----------------
pq_len = len(pq_df)
sym_power_sensor = initialize_array(DatasetType.input, ComponentType.sym_power_sensor, pq_len)
sensor_id = config.POWER_SENSOR_ID_START
for i, (_, row) in enumerate(pq_df.iterrows()):
node_oid = int(row["oid"])
sym_power_sensor["id"][i] = sensor_id
sym_power_sensor["measured_object"][i] = int(oid_to_load_id[node_oid])
sym_power_sensor["measured_terminal_type"][i] = MeasuredTerminalType.load
# P and Q are in kW and kVAr
P = float(row["P_TOTAL"]) * 1000.0
Q = float(row["Q_TOTAL"]) * 1000.0
sym_power_sensor["p_measured"][i] = P
sym_power_sensor["q_measured"][i] = Q
sym_power_sensor["p_sigma"][i] = config.POWER_SENSOR_P_SIGMA * abs(P) + 10
sym_power_sensor["q_sigma"][i] = config.POWER_SENSOR_Q_SIGMA * abs(Q) + 10
sensor_id += 1
# -------- Build & validate --------
input_data[ComponentType.node] = nodes
input_data[ComponentType.line] = lines
input_data[ComponentType.source] = source
input_data[ComponentType.sym_voltage_sensor] = sym_voltage_sensor
input_data[ComponentType.sym_power_sensor] = sym_power_sensor
input_data[ComponentType.sym_load] = sym_load
assert_valid_input_data(
input_data,
calculation_type=CalculationType.state_estimation,
symmetric=True,
)
validate_input_data(
input_data,
calculation_type=CalculationType.state_estimation,
symmetric=True
)
model = PowerGridModel(input_data)
result = model.calculate_state_estimation(
symmetric=True,
error_tolerance=config.ERROR_TOLERANCE,
max_iterations=config.MAX_ITERATIONS,
calculation_method=CalculationMethod.iterative_linear,
)
return result