diff --git a/src/pathsim_chem/process/__init__.py b/src/pathsim_chem/process/__init__.py index daf192b..ca810c1 100644 --- a/src/pathsim_chem/process/__init__.py +++ b/src/pathsim_chem/process/__init__.py @@ -8,3 +8,8 @@ from .heat_exchanger import * from .flash_drum import * from .distillation import * +from .multicomponent_flash import * +from .pfr import * +from .mixer import * +from .valve import * +from .heater import * diff --git a/src/pathsim_chem/process/heater.py b/src/pathsim_chem/process/heater.py new file mode 100644 index 0000000..36e8b94 --- /dev/null +++ b/src/pathsim_chem/process/heater.py @@ -0,0 +1,63 @@ +######################################################################################### +## +## Duty-Specified Heater/Cooler Block +## +######################################################################################### + +# IMPORTS =============================================================================== + +from pathsim.blocks.function import Function + +# BLOCKS ================================================================================ + +class Heater(Function): + """Algebraic duty-specified heater/cooler with no thermal mass. + + Raises or lowers the stream temperature by a specified heat duty. + Flow passes through unchanged. + + Mathematical Formulation + ------------------------- + .. math:: + + T_{out} = T_{in} + \\frac{Q}{F \\, \\rho \\, C_p} + + .. math:: + + F_{out} = F_{in} + + Parameters + ---------- + rho : float + Fluid density [kg/m³]. + Cp : float + Fluid heat capacity [J/(kg·K)]. + """ + + input_port_labels = { + "F": 0, + "T_in": 1, + "Q": 2, + } + + output_port_labels = { + "F_out": 0, + "T_out": 1, + } + + def __init__(self, rho=1000.0, Cp=4184.0): + if rho <= 0: + raise ValueError(f"'rho' must be positive but is {rho}") + if Cp <= 0: + raise ValueError(f"'Cp' must be positive but is {Cp}") + + self.rho = rho + self.Cp = Cp + super().__init__(func=self._eval) + + def _eval(self, F, T_in, Q): + if F > 0: + T_out = T_in + Q / (F * self.rho * self.Cp) + else: + T_out = T_in + return (F, T_out) diff --git a/src/pathsim_chem/process/mixer.py b/src/pathsim_chem/process/mixer.py new file mode 100644 index 0000000..7b81905 --- /dev/null +++ b/src/pathsim_chem/process/mixer.py @@ -0,0 +1,55 @@ +######################################################################################### +## +## 2-Stream Mixer Block +## +######################################################################################### + +# IMPORTS =============================================================================== + +from pathsim.blocks.function import Function + +# BLOCKS ================================================================================ + +class Mixer(Function): + """Algebraic 2-stream mixer with mass and energy balance. + + Mixes two streams by mass balance (additive flows) and energy balance + (flow-weighted temperature mixing). No phase change or reaction. + + Mathematical Formulation + ------------------------- + .. math:: + + F_{out} = F_1 + F_2 + + .. math:: + + T_{out} = \\frac{F_1 \\, T_1 + F_2 \\, T_2}{F_{out}} + + Parameters + ---------- + None — this is a purely algebraic block. + """ + + input_port_labels = { + "F_1": 0, + "T_1": 1, + "F_2": 2, + "T_2": 3, + } + + output_port_labels = { + "F_out": 0, + "T_out": 1, + } + + def __init__(self): + super().__init__(func=self._eval) + + def _eval(self, F_1, T_1, F_2, T_2): + F_out = F_1 + F_2 + if F_out > 0: + T_out = (F_1 * T_1 + F_2 * T_2) / F_out + else: + T_out = 0.5 * (T_1 + T_2) + return (F_out, T_out) diff --git a/src/pathsim_chem/process/multicomponent_flash.py b/src/pathsim_chem/process/multicomponent_flash.py new file mode 100644 index 0000000..a5e3b24 --- /dev/null +++ b/src/pathsim_chem/process/multicomponent_flash.py @@ -0,0 +1,251 @@ +######################################################################################### +## +## Multi-Component Isothermal Flash Drum Block +## +######################################################################################### + +# IMPORTS =============================================================================== + +import numpy as np +from scipy.optimize import brentq + +from pathsim.blocks.dynsys import DynamicalSystem + +# BLOCKS ================================================================================ + +class MultiComponentFlash(DynamicalSystem): + """Generalized isothermal flash drum for N components with Raoult's law VLE. + + Models a flash drum with liquid holdup for an N-component mixture. Feed enters + as liquid, vapor and liquid streams exit. Temperature and pressure are specified + as inputs. VLE is computed using K-values from Raoult's law with Antoine + equation vapor pressures. + + The Rachford-Rice equation is solved iteratively using Brent's method: + + .. math:: + + f(\\beta) = \\sum_i \\frac{z_i (K_i - 1)}{1 + \\beta (K_i - 1)} = 0 + + K-values from Raoult's law: + + .. math:: + + K_i = \\frac{\\exp(A_i - B_i / (T + C_i))}{P} + + Dynamic States + --------------- + The holdup moles of each component in the liquid phase: + + .. math:: + + \\frac{dN_i}{dt} = F z_i - V y_i - L x_i + + Parameters + ---------- + N_comp : int + Number of components (must be >= 2). + holdup : float + Total liquid holdup [mol]. Assumed approximately constant. + antoine_A : array_like + Antoine A parameters for each component [ln(Pa)]. + antoine_B : array_like + Antoine B parameters for each component [K]. + antoine_C : array_like + Antoine C parameters for each component [K]. + N0 : array_like or None + Initial component holdup moles [mol]. If None, equal split assumed. + """ + + def __init__(self, N_comp=3, holdup=100.0, + antoine_A=None, antoine_B=None, antoine_C=None, + N0=None): + + # input validation + if N_comp < 2: + raise ValueError(f"'N_comp' must be >= 2 but is {N_comp}") + if holdup <= 0: + raise ValueError(f"'holdup' must be positive but is {holdup}") + + self.N_comp = int(N_comp) + self.holdup = holdup + nc = self.N_comp + + # default Antoine parameters: benzene / toluene / p-xylene (ln(Pa), K) + if antoine_A is not None: + self.antoine_A = np.array(antoine_A, dtype=float) + else: + self.antoine_A = np.array([20.7936, 20.9064, 20.9891])[:nc] + + if antoine_B is not None: + self.antoine_B = np.array(antoine_B, dtype=float) + else: + self.antoine_B = np.array([2788.51, 3096.52, 3346.65])[:nc] + + if antoine_C is not None: + self.antoine_C = np.array(antoine_C, dtype=float) + else: + self.antoine_C = np.array([-52.36, -53.67, -57.84])[:nc] + + if len(self.antoine_A) != nc: + raise ValueError(f"Antoine parameters must have length {nc}") + if len(self.antoine_B) != nc: + raise ValueError(f"Antoine parameters must have length {nc}") + if len(self.antoine_C) != nc: + raise ValueError(f"Antoine parameters must have length {nc}") + + # initial holdup (equal moles by default) + if N0 is not None: + x0 = np.array(N0, dtype=float) + else: + x0 = np.full(nc, holdup / nc) + + if len(x0) != nc: + raise ValueError(f"'N0' must have length {nc}") + + # dynamic port labels: set before super().__init__() + # inputs: F, z_1, ..., z_{nc-1}, T, P + inp = {"F": 0} + for i in range(1, nc): + inp[f"z_{i}"] = i + inp["T"] = nc + inp["P"] = nc + 1 + self.input_port_labels = inp + + n_inputs = nc + 2 + + # outputs: V_rate, L_rate, y_1, ..., y_{nc-1}, x_1, ..., x_{nc-1} + out = {"V_rate": 0, "L_rate": 1} + for i in range(1, nc): + out[f"y_{i}"] = 1 + i + for i in range(1, nc): + out[f"x_{i}"] = nc + i + self.output_port_labels = out + + # shared VLE computation + def _solve_vle(z, T, P): + """Solve N-component Rachford-Rice, return (beta, y, x).""" + P_sat = np.exp(self.antoine_A - self.antoine_B / (T + self.antoine_C)) + K = P_sat / P + + # bubble/dew point checks + bubble = np.sum(z * K) + K_safe = np.where(K > 1e-12, K, 1e-12) + dew = np.sum(z / K_safe) + + if bubble <= 1.0: + # subcooled: all liquid + beta = 0.0 + y = z * K + y_s = y.sum() + if y_s > 0: + y = y / y_s + return beta, y, z.copy() + + if dew <= 1.0: + # superheated: all vapor + beta = 1.0 + x_eq = z / K_safe + x_s = x_eq.sum() + if x_s > 0: + x_eq = x_eq / x_s + return beta, z.copy(), x_eq + + # two-phase: solve Rachford-Rice via Brent's method + Km1 = K - 1.0 + + def rr_func(beta): + return np.sum(z * Km1 / (1.0 + beta * Km1)) + + # Whitson & Michelsen bounds + K_max = K.max() + K_min = K.min() + beta_min = max(0.0, 1.0 / (1.0 - K_max)) if K_max != 1.0 else 0.0 + beta_max = min(1.0, 1.0 / (1.0 - K_min)) if K_min != 1.0 else 1.0 + + # ensure valid bracket + beta_min = max(beta_min, 0.0) + beta_max = min(beta_max, 1.0) + if beta_min >= beta_max: + beta_min, beta_max = 0.0, 1.0 + + try: + beta = brentq(rr_func, beta_min, beta_max, xtol=1e-12) + except ValueError: + # fallback: try full [0, 1] bracket + try: + beta = brentq(rr_func, 0.0, 1.0, xtol=1e-12) + except ValueError: + beta = 0.5 + + beta = np.clip(beta, 0.0, 1.0) + + y = z * K / (1.0 + beta * Km1) + x_eq = z / (1.0 + beta * Km1) + + # normalize for numerical safety + y_s = y.sum() + x_s = x_eq.sum() + if y_s > 0: + y = y / y_s + if x_s > 0: + x_eq = x_eq / x_s + + return beta, y, x_eq + + def _pad_u(u): + u = np.atleast_1d(u) + if len(u) < n_inputs: + padded = np.zeros(n_inputs) + padded[:len(u)] = u + return padded + return u + + def _extract_z(u): + """Extract full composition vector from inputs (last component inferred).""" + z_partial = u[1:nc] # z_1 ... z_{nc-1} + z_last = 1.0 - np.sum(z_partial) + return np.append(z_partial, z_last) + + # rhs of flash drum ode + def _fn_d(x, u, t): + u = _pad_u(u) + F_in = u[0] + z = _extract_z(u) + T = u[nc] + P = u[nc + 1] + + beta, y, x_eq = _solve_vle(z, T, P) + + V_rate = beta * F_in + L_rate = (1.0 - beta) * F_in + + return F_in * z - V_rate * y - L_rate * x_eq + + # algebraic output + def _fn_a(x, u, t): + u = _pad_u(u) + F_in = u[0] + z = _extract_z(u) + T = u[nc] + P = u[nc + 1] + + beta, y, x_eq = _solve_vle(z, T, P) + + V_rate = beta * F_in + L_rate = (1.0 - beta) * F_in + + # output: V_rate, L_rate, y_1..y_{nc-1}, x_1..x_{nc-1} + result = np.empty(2 + 2 * (nc - 1)) + result[0] = V_rate + result[1] = L_rate + result[2:2 + nc - 1] = y[:nc - 1] + result[2 + nc - 1:] = x_eq[:nc - 1] + + return result + + super().__init__( + func_dyn=_fn_d, + func_alg=_fn_a, + initial_value=x0, + ) diff --git a/src/pathsim_chem/process/pfr.py b/src/pathsim_chem/process/pfr.py new file mode 100644 index 0000000..05b2875 --- /dev/null +++ b/src/pathsim_chem/process/pfr.py @@ -0,0 +1,209 @@ +######################################################################################### +## +## Plug Flow Reactor (PFR) Block +## +######################################################################################### + +# IMPORTS =============================================================================== + +import numpy as np + +from pathsim.blocks.dynsys import DynamicalSystem + +# CONSTANTS ============================================================================= + +R_GAS = 8.314 # universal gas constant [J/(mol·K)] + +# BLOCKS ================================================================================ + +class PFR(DynamicalSystem): + """Plug flow reactor with Arrhenius kinetics and energy balance. + + Discretized tubular reactor divided into N cells along its length. + Each cell has concentration and temperature states with nth-order + kinetics and an energy balance including heat of reaction. + + Mathematical Formulation + ------------------------- + For each cell :math:`i = 1, \\ldots, N`: + + .. math:: + + \\frac{dC_i}{dt} = \\frac{F}{V_{cell}} (C_{i-1} - C_i) - k(T_i) \\, C_i^n + + .. math:: + + \\frac{dT_i}{dt} = \\frac{F}{V_{cell}} (T_{i-1} - T_i) + + \\frac{(-\\Delta H_{rxn})}{\\rho \\, C_p} \\, k(T_i) \\, C_i^n + + where the Arrhenius rate constant is: + + .. math:: + + k(T) = k_0 \\, \\exp\\!\\left(-\\frac{E_a}{R \\, T}\\right) + + The state vector is ordered as + :math:`[C_1, T_1, C_2, T_2, \\ldots, C_N, T_N]`. + + Parameters + ---------- + N_cells : int + Number of discretization cells [-]. + V : float + Total reactor volume [m³]. + F : float + Volumetric flow rate [m³/s]. + k0 : float + Pre-exponential Arrhenius factor [1/s for n=1]. + Ea : float + Activation energy [J/mol]. + n : float + Reaction order [-]. + dH_rxn : float + Heat of reaction [J/mol]. Negative for exothermic. + rho : float + Fluid density [kg/m³]. + Cp : float + Fluid heat capacity [J/(kg·K)]. + C0 : float + Initial concentration [mol/m³]. + T0 : float + Initial temperature [K]. + """ + + input_port_labels = { + "C_in": 0, + "T_in": 1, + } + + output_port_labels = { + "C_out": 0, + "T_out": 1, + } + + def __init__(self, N_cells=5, V=1.0, F=0.1, k0=1e6, Ea=50000.0, n=1.0, + dH_rxn=-50000.0, rho=1000.0, Cp=4184.0, + C0=0.0, T0=300.0): + + # input validation + if N_cells < 1: + raise ValueError(f"'N_cells' must be >= 1 but is {N_cells}") + if V <= 0: + raise ValueError(f"'V' must be positive but is {V}") + if F <= 0: + raise ValueError(f"'F' must be positive but is {F}") + if rho <= 0: + raise ValueError(f"'rho' must be positive but is {rho}") + if Cp <= 0: + raise ValueError(f"'Cp' must be positive but is {Cp}") + + # store parameters + self.N_cells = int(N_cells) + self.V = V + self.F = F + self.k0 = k0 + self.Ea = Ea + self.n = n + self.dH_rxn = dH_rxn + self.rho = rho + self.Cp = Cp + + N = self.N_cells + + # initial state: interleaved [C_1, T_1, C_2, T_2, ...] + x0 = np.empty(2 * N) + x0[0::2] = C0 + x0[1::2] = T0 + + # ensure u has expected 2 elements (handles framework probing) + def _pad_u(u): + u = np.atleast_1d(u) + if len(u) < 2: + padded = np.zeros(2) + padded[:len(u)] = u + return padded + return u + + # rhs of PFR ode (vectorized) + def _fn_d(x, u, t): + u = _pad_u(u) + C_in, T_in = u + N = self.N_cells + + V_cell = self.V / N + f_flow = self.F / V_cell + rcp = (-self.dH_rxn) / (self.rho * self.Cp) + + C = x[0::2] + T = x[1::2] + + # upstream values with boundary conditions + C_prev = np.empty(N) + C_prev[0] = C_in + C_prev[1:] = C[:-1] + + T_prev = np.empty(N) + T_prev[0] = T_in + T_prev[1:] = T[:-1] + + # Arrhenius rate per cell + k = self.k0 * np.exp(-self.Ea / (R_GAS * T)) + r = k * np.abs(C)**self.n # abs for numerical safety + + dx = np.empty(2 * N) + dx[0::2] = f_flow * (C_prev - C) - r + dx[1::2] = f_flow * (T_prev - T) + rcp * r + + return dx + + # analytical jacobian (block-tridiagonal structure) + def _jc_d(x, u, t): + N = self.N_cells + + V_cell = self.V / N + f_flow = self.F / V_cell + rcp = (-self.dH_rxn) / (self.rho * self.Cp) + + C = x[0::2] + T = x[1::2] + + k = self.k0 * np.exp(-self.Ea / (R_GAS * T)) + dk_dT = k * self.Ea / (R_GAS * T**2) + + dim = 2 * N + J = np.zeros((dim, dim)) + + for i in range(N): + ci = 2 * i # concentration index + ti = 2 * i + 1 # temperature index + + C_i = max(abs(C[i]), 1e-30) + dr_dC = k[i] * self.n * C_i**(self.n - 1) if C_i > 0 else 0.0 + dr_dT = dk_dT[i] * C_i**self.n + + # dC_i/dC_i, dC_i/dT_i + J[ci, ci] = -f_flow - dr_dC + J[ci, ti] = -dr_dT + + # dT_i/dC_i, dT_i/dT_i + J[ti, ci] = rcp * dr_dC + J[ti, ti] = -f_flow + rcp * dr_dT + + # upstream coupling: dC_i/dC_{i-1}, dT_i/dT_{i-1} + if i > 0: + J[ci, 2*(i-1)] = f_flow + J[ti, 2*(i-1) + 1] = f_flow + + return J + + # output: last cell values + def _fn_a(x, u, t): + N = self.N_cells + return np.array([x[2*(N-1)], x[2*(N-1) + 1]]) + + super().__init__( + func_dyn=_fn_d, + jac_dyn=_jc_d, + func_alg=_fn_a, + initial_value=x0, + ) diff --git a/src/pathsim_chem/process/valve.py b/src/pathsim_chem/process/valve.py new file mode 100644 index 0000000..f0ec89c --- /dev/null +++ b/src/pathsim_chem/process/valve.py @@ -0,0 +1,58 @@ +######################################################################################### +## +## Pressure-Drop Valve Block +## +######################################################################################### + +# IMPORTS =============================================================================== + +import numpy as np + +from pathsim.blocks.function import Function + +# BLOCKS ================================================================================ + +class Valve(Function): + """Algebraic pressure-drop valve with standard flow equation. + + Models an isenthalpic valve (no temperature change for liquids) + with flow proportional to the square root of the pressure drop. + + Mathematical Formulation + ------------------------- + .. math:: + + F = C_v \\sqrt{|P_{in} - P_{out}|} \\cdot \\mathrm{sign}(P_{in} - P_{out}) + + .. math:: + + T_{out} = T_{in} + + Parameters + ---------- + Cv : float + Valve flow coefficient. Must be positive. + """ + + input_port_labels = { + "P_in": 0, + "P_out": 1, + "T_in": 2, + } + + output_port_labels = { + "F": 0, + "T_out": 1, + } + + def __init__(self, Cv=1.0): + if Cv <= 0: + raise ValueError(f"'Cv' must be positive but is {Cv}") + + self.Cv = Cv + super().__init__(func=self._eval) + + def _eval(self, P_in, P_out, T_in): + dP = P_in - P_out + F = self.Cv * np.sqrt(np.abs(dP)) * np.sign(dP) + return (F, T_in) diff --git a/tests/process/test_heater.py b/tests/process/test_heater.py new file mode 100644 index 0000000..65e84fc --- /dev/null +++ b/tests/process/test_heater.py @@ -0,0 +1,117 @@ +######################################################################################## +## +## TESTS FOR +## 'process.heater.py' +## +######################################################################################## + +# IMPORTS ============================================================================== + +import unittest + +from pathsim_chem.process import Heater + + +# TESTS ================================================================================ + +class TestHeater(unittest.TestCase): + """Test the duty-specified heater/cooler block.""" + + def test_init_default(self): + """Test default parameters.""" + H = Heater() + self.assertEqual(H.rho, 1000.0) + self.assertEqual(H.Cp, 4184.0) + + def test_init_custom(self): + """Test custom parameters.""" + H = Heater(rho=800.0, Cp=3000.0) + self.assertEqual(H.rho, 800.0) + self.assertEqual(H.Cp, 3000.0) + + def test_init_validation(self): + """Test input validation.""" + with self.assertRaises(ValueError): + Heater(rho=0) + with self.assertRaises(ValueError): + Heater(rho=-1) + with self.assertRaises(ValueError): + Heater(Cp=0) + with self.assertRaises(ValueError): + Heater(Cp=-1) + + def test_port_labels(self): + """Test port label definitions.""" + self.assertEqual(Heater.input_port_labels["F"], 0) + self.assertEqual(Heater.input_port_labels["T_in"], 1) + self.assertEqual(Heater.input_port_labels["Q"], 2) + self.assertEqual(Heater.output_port_labels["F_out"], 0) + self.assertEqual(Heater.output_port_labels["T_out"], 1) + + def test_heating(self): + """Positive Q should increase temperature.""" + H = Heater(rho=1000.0, Cp=4184.0) + F = 0.1 # m³/s + T_in = 300.0 + Q = 41840.0 # W + + H.inputs[0] = F + H.inputs[1] = T_in + H.inputs[2] = Q + + H.update(None) + + # dT = Q / (F * rho * Cp) = 41840 / (0.1 * 1000 * 4184) = 0.1 K + expected_T = T_in + Q / (F * 1000.0 * 4184.0) + self.assertAlmostEqual(H.outputs[1], expected_T, places=6) + + def test_cooling(self): + """Negative Q should decrease temperature.""" + H = Heater(rho=1000.0, Cp=4184.0) + H.inputs[0] = 0.1 + H.inputs[1] = 350.0 + H.inputs[2] = -41840.0 + + H.update(None) + + self.assertLess(H.outputs[1], 350.0) + + def test_zero_duty(self): + """Q=0 should pass temperature through unchanged.""" + H = Heater() + H.inputs[0] = 0.1 + H.inputs[1] = 350.0 + H.inputs[2] = 0.0 + + H.update(None) + + self.assertAlmostEqual(H.outputs[1], 350.0) + + def test_flow_passthrough(self): + """F_out should equal F_in.""" + H = Heater() + H.inputs[0] = 0.5 + H.inputs[1] = 300.0 + H.inputs[2] = 10000.0 + + H.update(None) + + self.assertAlmostEqual(H.outputs[0], 0.5) + + def test_zero_flow(self): + """With zero flow, temperature should remain unchanged.""" + H = Heater() + H.inputs[0] = 0.0 + H.inputs[1] = 350.0 + H.inputs[2] = 10000.0 + + H.update(None) + + self.assertAlmostEqual(H.outputs[0], 0.0) + self.assertAlmostEqual(H.outputs[1], 350.0) + + +# RUN TESTS LOCALLY ==================================================================== + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/tests/process/test_mixer.py b/tests/process/test_mixer.py new file mode 100644 index 0000000..a164c19 --- /dev/null +++ b/tests/process/test_mixer.py @@ -0,0 +1,99 @@ +######################################################################################## +## +## TESTS FOR +## 'process.mixer.py' +## +######################################################################################## + +# IMPORTS ============================================================================== + +import unittest + +from pathsim_chem.process import Mixer + + +# TESTS ================================================================================ + +class TestMixer(unittest.TestCase): + """Test the 2-stream mixer block.""" + + def test_port_labels(self): + """Test port label definitions.""" + self.assertEqual(Mixer.input_port_labels["F_1"], 0) + self.assertEqual(Mixer.input_port_labels["T_1"], 1) + self.assertEqual(Mixer.input_port_labels["F_2"], 2) + self.assertEqual(Mixer.input_port_labels["T_2"], 3) + self.assertEqual(Mixer.output_port_labels["F_out"], 0) + self.assertEqual(Mixer.output_port_labels["T_out"], 1) + + def test_mass_balance(self): + """F_out should equal F_1 + F_2.""" + M = Mixer() + M.inputs[0] = 5.0 # F_1 + M.inputs[1] = 350.0 # T_1 + M.inputs[2] = 3.0 # F_2 + M.inputs[3] = 300.0 # T_2 + + M.update(None) + + self.assertAlmostEqual(M.outputs[0], 8.0) # F_out + + def test_energy_balance(self): + """F_1*T_1 + F_2*T_2 should equal F_out*T_out.""" + M = Mixer() + F_1, T_1 = 5.0, 350.0 + F_2, T_2 = 3.0, 300.0 + + M.inputs[0] = F_1 + M.inputs[1] = T_1 + M.inputs[2] = F_2 + M.inputs[3] = T_2 + + M.update(None) + + F_out = M.outputs[0] + T_out = M.outputs[1] + self.assertAlmostEqual(F_1 * T_1 + F_2 * T_2, F_out * T_out, places=8) + + def test_equal_temperatures(self): + """When both streams at same T, output T should match.""" + M = Mixer() + M.inputs[0] = 5.0 + M.inputs[1] = 350.0 + M.inputs[2] = 3.0 + M.inputs[3] = 350.0 + + M.update(None) + + self.assertAlmostEqual(M.outputs[1], 350.0) + + def test_zero_flow_one_stream(self): + """With one stream at zero flow, output should match the other.""" + M = Mixer() + M.inputs[0] = 0.0 + M.inputs[1] = 300.0 + M.inputs[2] = 5.0 + M.inputs[3] = 400.0 + + M.update(None) + + self.assertAlmostEqual(M.outputs[0], 5.0) + self.assertAlmostEqual(M.outputs[1], 400.0) + + def test_zero_total_flow(self): + """With zero total flow, should not crash.""" + M = Mixer() + M.inputs[0] = 0.0 + M.inputs[1] = 300.0 + M.inputs[2] = 0.0 + M.inputs[3] = 400.0 + + M.update(None) + + self.assertAlmostEqual(M.outputs[0], 0.0) + + +# RUN TESTS LOCALLY ==================================================================== + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/tests/process/test_multicomponent_flash.py b/tests/process/test_multicomponent_flash.py new file mode 100644 index 0000000..09e572e --- /dev/null +++ b/tests/process/test_multicomponent_flash.py @@ -0,0 +1,253 @@ +######################################################################################## +## +## TESTS FOR +## 'process.multicomponent_flash.py' +## +######################################################################################## + +# IMPORTS ============================================================================== + +import unittest +import numpy as np + +from pathsim_chem.process import MultiComponentFlash + +from pathsim.solvers import EUF + + +# TESTS ================================================================================ + +class TestMultiComponentFlash(unittest.TestCase): + """Test the multi-component isothermal flash drum block.""" + + def test_init_default(self): + """Test default initialization (ternary).""" + F = MultiComponentFlash() + self.assertEqual(F.N_comp, 3) + self.assertEqual(F.holdup, 100.0) + self.assertEqual(len(F.antoine_A), 3) + self.assertEqual(len(F.antoine_B), 3) + self.assertEqual(len(F.antoine_C), 3) + + def test_init_custom(self): + """Test custom initialization.""" + F = MultiComponentFlash( + N_comp=4, holdup=200.0, + antoine_A=[20.0, 21.0, 22.0, 23.0], + antoine_B=[2800.0, 3000.0, 3200.0, 3400.0], + antoine_C=[-50.0, -52.0, -54.0, -56.0], + N0=[60.0, 50.0, 50.0, 40.0], + ) + self.assertEqual(F.N_comp, 4) + self.assertEqual(F.holdup, 200.0) + + F.set_solver(EUF, parent=None) + self.assertTrue(np.allclose(F.engine.initial_value, [60.0, 50.0, 50.0, 40.0])) + + def test_init_validation(self): + """Test input validation.""" + with self.assertRaises(ValueError): + MultiComponentFlash(N_comp=1) + with self.assertRaises(ValueError): + MultiComponentFlash(holdup=0) + with self.assertRaises(ValueError): + MultiComponentFlash(holdup=-10) + with self.assertRaises(ValueError): + MultiComponentFlash(N_comp=3, antoine_A=[1.0, 2.0]) # wrong length + + def test_port_labels_ternary(self): + """Test port labels for ternary system.""" + F = MultiComponentFlash(N_comp=3) + # inputs: F, z_1, z_2, T, P + self.assertEqual(F.input_port_labels["F"], 0) + self.assertEqual(F.input_port_labels["z_1"], 1) + self.assertEqual(F.input_port_labels["z_2"], 2) + self.assertEqual(F.input_port_labels["T"], 3) + self.assertEqual(F.input_port_labels["P"], 4) + # outputs: V_rate, L_rate, y_1, y_2, x_1, x_2 + self.assertEqual(F.output_port_labels["V_rate"], 0) + self.assertEqual(F.output_port_labels["L_rate"], 1) + self.assertEqual(F.output_port_labels["y_1"], 2) + self.assertEqual(F.output_port_labels["y_2"], 3) + self.assertEqual(F.output_port_labels["x_1"], 4) + self.assertEqual(F.output_port_labels["x_2"], 5) + + def test_port_labels_binary(self): + """Test port labels for binary system.""" + F = MultiComponentFlash(N_comp=2) + # inputs: F, z_1, T, P + self.assertEqual(F.input_port_labels["F"], 0) + self.assertEqual(F.input_port_labels["z_1"], 1) + self.assertEqual(F.input_port_labels["T"], 2) + self.assertEqual(F.input_port_labels["P"], 3) + # outputs: V_rate, L_rate, y_1, x_1 + self.assertEqual(F.output_port_labels["V_rate"], 0) + self.assertEqual(F.output_port_labels["L_rate"], 1) + self.assertEqual(F.output_port_labels["y_1"], 2) + self.assertEqual(F.output_port_labels["x_1"], 3) + + def test_default_holdup(self): + """Default holdup should split equally between components.""" + F = MultiComponentFlash(N_comp=3, holdup=90.0) + F.set_solver(EUF, parent=None) + self.assertTrue(np.allclose(F.engine.initial_value, [30.0, 30.0, 30.0])) + + def test_output_flow_balance(self): + """V_rate + L_rate should equal F_in.""" + F = MultiComponentFlash(N_comp=3) + F.set_solver(EUF, parent=None) + + F_in = 10.0 + F.inputs[0] = F_in # F + F.inputs[1] = 0.33 # z_1 + F.inputs[2] = 0.34 # z_2 (z_3 = 0.33) + F.inputs[3] = 370.0 # T [K] + F.inputs[4] = 101325.0 # P [Pa] + + F.update(None) + + V_rate = F.outputs[0] + L_rate = F.outputs[1] + self.assertAlmostEqual(V_rate + L_rate, F_in, places=8) + + def test_vle_consistency(self): + """Vapor and liquid compositions should sum to 1.""" + F = MultiComponentFlash(N_comp=3) + F.set_solver(EUF, parent=None) + + F.inputs[0] = 10.0 # F + F.inputs[1] = 0.33 # z_1 + F.inputs[2] = 0.34 # z_2 + F.inputs[3] = 370.0 # T [K] + F.inputs[4] = 101325.0 # P [Pa] + + F.update(None) + + # outputs: V_rate, L_rate, y_1, y_2, x_1, x_2 + y_1 = F.outputs[2] + y_2 = F.outputs[3] + y_3 = 1.0 - y_1 - y_2 + x_1 = F.outputs[4] + x_2 = F.outputs[5] + x_3 = 1.0 - x_1 - x_2 + + self.assertAlmostEqual(y_1 + y_2 + y_3, 1.0, places=6) + self.assertAlmostEqual(x_1 + x_2 + x_3, 1.0, places=6) + + def test_light_component_enriched_in_vapor(self): + """The most volatile component should be enriched in vapor phase.""" + F = MultiComponentFlash(N_comp=3) + F.set_solver(EUF, parent=None) + + F.inputs[0] = 10.0 + F.inputs[1] = 0.33 + F.inputs[2] = 0.34 + F.inputs[3] = 370.0 + F.inputs[4] = 101325.0 + + F.update(None) + + V_rate = F.outputs[0] + L_rate = F.outputs[1] + + # Only check if two-phase + if V_rate > 0 and L_rate > 0: + y_1 = F.outputs[2] + x_1 = F.outputs[4] + # Component 1 (benzene) is most volatile => y_1 > x_1 + self.assertGreater(y_1, x_1) + + def test_no_feed(self): + """With zero feed, rates should be zero.""" + F = MultiComponentFlash(N_comp=3) + F.set_solver(EUF, parent=None) + + F.inputs[0] = 0.0 # F = 0 + F.inputs[1] = 0.33 + F.inputs[2] = 0.34 + F.inputs[3] = 350.0 + F.inputs[4] = 101325.0 + + F.update(None) + + self.assertAlmostEqual(F.outputs[0], 0.0) # V_rate + self.assertAlmostEqual(F.outputs[1], 0.0) # L_rate + + def test_subcooled_all_liquid(self): + """At low temperature, all feed should remain liquid.""" + F = MultiComponentFlash(N_comp=3) + F.set_solver(EUF, parent=None) + + F.inputs[0] = 10.0 + F.inputs[1] = 0.33 + F.inputs[2] = 0.34 + F.inputs[3] = 280.0 # very low T + F.inputs[4] = 101325.0 + + F.update(None) + + V_rate = F.outputs[0] + L_rate = F.outputs[1] + self.assertAlmostEqual(V_rate, 0.0, places=6) + self.assertAlmostEqual(L_rate, 10.0, places=6) + + def test_superheated_all_vapor(self): + """At very high temperature / low pressure, all feed should vaporize.""" + F = MultiComponentFlash(N_comp=3) + F.set_solver(EUF, parent=None) + + F.inputs[0] = 10.0 + F.inputs[1] = 0.33 + F.inputs[2] = 0.34 + F.inputs[3] = 500.0 # very high T + F.inputs[4] = 1000.0 # very low P + + F.update(None) + + V_rate = F.outputs[0] + L_rate = F.outputs[1] + self.assertAlmostEqual(V_rate, 10.0, places=6) + self.assertAlmostEqual(L_rate, 0.0, places=6) + + def test_binary_consistency(self): + """Binary MultiComponentFlash should give same results as FlashDrum.""" + from pathsim_chem.process import FlashDrum + + # Use same Antoine parameters + A = [20.7936, 20.9064] + B = [2788.51, 3096.52] + C = [-52.36, -53.67] + + mc = MultiComponentFlash(N_comp=2, holdup=100.0, + antoine_A=A, antoine_B=B, antoine_C=C) + fd = FlashDrum(holdup=100.0, + antoine_A=A, antoine_B=B, antoine_C=C) + + mc.set_solver(EUF, parent=None) + fd.set_solver(EUF, parent=None) + + # Set same inputs + mc.inputs[0] = 10.0 # F + mc.inputs[1] = 0.5 # z_1 + mc.inputs[2] = 360.0 # T + mc.inputs[3] = 101325.0 # P + + fd.inputs[0] = 10.0 # F + fd.inputs[1] = 0.5 # z_1 + fd.inputs[2] = 360.0 # T + fd.inputs[3] = 101325.0 # P + + mc.update(None) + fd.update(None) + + # Compare V_rate, L_rate, y_1, x_1 + self.assertAlmostEqual(mc.outputs[0], fd.outputs[0], places=4) # V_rate + self.assertAlmostEqual(mc.outputs[1], fd.outputs[1], places=4) # L_rate + self.assertAlmostEqual(mc.outputs[2], fd.outputs[2], places=4) # y_1 + self.assertAlmostEqual(mc.outputs[3], fd.outputs[3], places=4) # x_1 + + +# RUN TESTS LOCALLY ==================================================================== + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/tests/process/test_pfr.py b/tests/process/test_pfr.py new file mode 100644 index 0000000..df7ba6b --- /dev/null +++ b/tests/process/test_pfr.py @@ -0,0 +1,160 @@ +######################################################################################## +## +## TESTS FOR +## 'process.pfr.py' +## +######################################################################################## + +# IMPORTS ============================================================================== + +import unittest +import numpy as np + +from pathsim_chem.process import PFR + +from pathsim.solvers import EUF + + +# TESTS ================================================================================ + +class TestPFR(unittest.TestCase): + """Test the plug flow reactor block.""" + + def test_init_default(self): + """Test default initialization.""" + P = PFR() + self.assertEqual(P.N_cells, 5) + self.assertEqual(P.V, 1.0) + self.assertEqual(P.F, 0.1) + self.assertEqual(P.k0, 1e6) + self.assertEqual(P.Ea, 50000.0) + self.assertEqual(P.n, 1.0) + + def test_init_custom(self): + """Test custom initialization.""" + P = PFR(N_cells=10, V=2.0, F=0.5, k0=1e4, Ea=40000.0, n=2.0, + dH_rxn=-30000.0, rho=800.0, Cp=3000.0, C0=1.5, T0=350.0) + self.assertEqual(P.N_cells, 10) + self.assertEqual(P.V, 2.0) + + P.set_solver(EUF, parent=None) + state = P.engine.initial_value + self.assertEqual(len(state), 20) # 2 * N_cells + self.assertTrue(np.all(state[0::2] == 1.5)) # concentrations + self.assertTrue(np.all(state[1::2] == 350.0)) # temperatures + + def test_init_validation(self): + """Test input validation.""" + with self.assertRaises(ValueError): + PFR(N_cells=0) + with self.assertRaises(ValueError): + PFR(V=-1) + with self.assertRaises(ValueError): + PFR(F=0) + with self.assertRaises(ValueError): + PFR(rho=-1) + with self.assertRaises(ValueError): + PFR(Cp=0) + + def test_port_labels(self): + """Test port label definitions.""" + self.assertEqual(PFR.input_port_labels["C_in"], 0) + self.assertEqual(PFR.input_port_labels["T_in"], 1) + self.assertEqual(PFR.output_port_labels["C_out"], 0) + self.assertEqual(PFR.output_port_labels["T_out"], 1) + + def test_state_size(self): + """Test that state vector has correct size.""" + for N in [1, 3, 10]: + P = PFR(N_cells=N) + P.set_solver(EUF, parent=None) + self.assertEqual(len(P.engine.initial_value), 2 * N) + + def test_output_initial(self): + """Test outputs at initial state.""" + P = PFR(N_cells=3, C0=2.0, T0=350.0) + P.set_solver(EUF, parent=None) + P.update(None) + + # Last cell values + self.assertAlmostEqual(P.outputs[0], 2.0) # C_out + self.assertAlmostEqual(P.outputs[1], 350.0) # T_out + + def test_no_reaction(self): + """With k0=0, no reaction occurs. At steady state C_out = C_in.""" + P = PFR(N_cells=3, V=1.0, F=1.0, k0=0.0, Ea=0.0, n=1.0, + dH_rxn=0.0, C0=1.0, T0=350.0) + P.set_solver(EUF, parent=None) + + P.inputs[0] = 1.0 # C_in + P.inputs[1] = 350.0 # T_in + + # When C=C_in and T=T_in everywhere, with no reaction, derivatives = 0 + x = P.engine.get() + u = np.array([1.0, 350.0]) + dx = P.op_dyn(x, u, 0) + + self.assertTrue(np.allclose(dx, 0.0, atol=1e-10)) + + def test_reaction_direction(self): + """With reaction, concentration should decrease along the reactor.""" + R_GAS = 8.314 + T = 350.0 + k0, Ea = 1e6, 50000.0 + k = k0 * np.exp(-Ea / (R_GAS * T)) + + P = PFR(N_cells=1, V=1.0, F=1.0, k0=k0, Ea=Ea, n=1.0, + dH_rxn=0.0, C0=1.0, T0=T) + P.set_solver(EUF, parent=None) + + # Set inlet = current state, so flow terms cancel + P.inputs[0] = 1.0 # C_in + P.inputs[1] = T # T_in + + x = np.array([1.0, T]) + u = np.array([1.0, T]) + dx = P.op_dyn(x, u, 0) + + # dC/dt = 0 (flow) - k*C = -k + self.assertAlmostEqual(dx[0], -k, places=5) + # dT/dt = 0 since dH_rxn=0 + self.assertAlmostEqual(dx[1], 0.0, places=5) + + def test_energy_conservation_exothermic(self): + """For exothermic reaction (dH_rxn < 0), temperature should increase.""" + P = PFR(N_cells=1, V=1.0, F=1.0, k0=1e3, Ea=20000.0, n=1.0, + dH_rxn=-50000.0, rho=1000.0, Cp=4184.0, + C0=2.0, T0=350.0) + P.set_solver(EUF, parent=None) + + P.inputs[0] = 2.0 + P.inputs[1] = 350.0 + + x = np.array([2.0, 350.0]) + u = np.array([2.0, 350.0]) + dx = P.op_dyn(x, u, 0) + + # Concentration should decrease (reaction consuming) + self.assertLess(dx[0], 0) + # Temperature should increase (exothermic) + self.assertGreater(dx[1], 0) + + def test_jacobian_stability(self): + """Diagonal of Jacobian should be negative (stable).""" + P = PFR(N_cells=2, V=1.0, F=0.1, k0=1e3, Ea=30000.0, n=1.0, + dH_rxn=0.0, C0=1.0, T0=350.0) + P.set_solver(EUF, parent=None) + + x = P.engine.get() + u = np.array([1.0, 350.0]) + J = P.op_dyn.jac_x(x, u, 0) + + # All diagonal elements should be negative + for i in range(len(x)): + self.assertLess(J[i, i], 0) + + +# RUN TESTS LOCALLY ==================================================================== + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/tests/process/test_valve.py b/tests/process/test_valve.py new file mode 100644 index 0000000..d38e417 --- /dev/null +++ b/tests/process/test_valve.py @@ -0,0 +1,98 @@ +######################################################################################## +## +## TESTS FOR +## 'process.valve.py' +## +######################################################################################## + +# IMPORTS ============================================================================== + +import unittest +import numpy as np + +from pathsim_chem.process import Valve + + +# TESTS ================================================================================ + +class TestValve(unittest.TestCase): + """Test the pressure-drop valve block.""" + + def test_init_default(self): + """Test default Cv.""" + V = Valve() + self.assertEqual(V.Cv, 1.0) + + def test_init_custom(self): + """Test custom Cv.""" + V = Valve(Cv=5.0) + self.assertEqual(V.Cv, 5.0) + + def test_init_validation(self): + """Test input validation.""" + with self.assertRaises(ValueError): + Valve(Cv=0) + with self.assertRaises(ValueError): + Valve(Cv=-1) + + def test_port_labels(self): + """Test port label definitions.""" + self.assertEqual(Valve.input_port_labels["P_in"], 0) + self.assertEqual(Valve.input_port_labels["P_out"], 1) + self.assertEqual(Valve.input_port_labels["T_in"], 2) + self.assertEqual(Valve.output_port_labels["F"], 0) + self.assertEqual(Valve.output_port_labels["T_out"], 1) + + def test_forward_flow(self): + """Flow should be positive when P_in > P_out.""" + V = Valve(Cv=2.0) + V.inputs[0] = 200000.0 # P_in + V.inputs[1] = 100000.0 # P_out + V.inputs[2] = 350.0 # T_in + + V.update(None) + + F = V.outputs[0] + self.assertGreater(F, 0) + expected = 2.0 * np.sqrt(100000.0) + self.assertAlmostEqual(F, expected, places=4) + + def test_reverse_flow(self): + """Flow should be negative when P_in < P_out.""" + V = Valve(Cv=2.0) + V.inputs[0] = 100000.0 # P_in + V.inputs[1] = 200000.0 # P_out + V.inputs[2] = 350.0 + + V.update(None) + + F = V.outputs[0] + self.assertLess(F, 0) + + def test_zero_dp(self): + """No flow when pressures are equal.""" + V = Valve(Cv=2.0) + V.inputs[0] = 101325.0 + V.inputs[1] = 101325.0 + V.inputs[2] = 350.0 + + V.update(None) + + self.assertAlmostEqual(V.outputs[0], 0.0) + + def test_temperature_passthrough(self): + """Temperature should pass through unchanged (isenthalpic).""" + V = Valve() + V.inputs[0] = 200000.0 + V.inputs[1] = 100000.0 + V.inputs[2] = 375.0 + + V.update(None) + + self.assertAlmostEqual(V.outputs[1], 375.0) + + +# RUN TESTS LOCALLY ==================================================================== + +if __name__ == '__main__': + unittest.main(verbosity=2)