-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathloopy_modules.py
More file actions
179 lines (132 loc) · 6.16 KB
/
loopy_modules.py
File metadata and controls
179 lines (132 loc) · 6.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
'''
This file implements the detection algorithms (message passing) on markov random field of graphs generated by ER model.
The algorithms defined in this file would be imported by bin/varying_loopy.py
For the specifics about the algorithms, please see the description in manuscript/amp.pdf.
'''
import numpy as np
import itertools
import factorgraph as fg
import maxsum
import alphaBP
from scipy.stats import multivariate_normal
######################################################################
class ML(object):
def __init__(self, hparam):
self.hparam = hparam
self.constellation = hparam.constellation
pass
def detect(self, S, b):
proposals = list( itertools.product(self.constellation, repeat=self.hparam.num_tx) )
threshold = np.inf
solution = None
for x in proposals:
tmp = np.matmul(np.array(x), S).dot(np.array(x)) + b.dot(x)
if tmp < threshold:
threshold = tmp
solution = x
return solution
class Marginal(object):
"""Compute all the marginals for a given distributions"""
def __init__(self, hparam):
self.hparam = hparam
self.constellation = hparam.constellation
pass
def detect(self, S, b):
proposals = list( itertools.product(self.constellation, repeat=S.shape[0]) )
array_proposals = np.array(proposals)
prob = []
for x in proposals:
tmp = np.matmul(np.array(x), S).dot(np.array(x)) + b.dot(x)
prob.append(np.exp(-tmp))
prob = np.array(prob)
marginals = []
for i in range(b.shape[0]):
this_marginal = []
for code in self.constellation:
subset_idx = array_proposals[:, i]==code
this_marginal.append(np.sum( prob[subset_idx]))
# normalize the marginal
this_marginal = np.array(this_marginal)
this_marginal = this_marginal/this_marginal.sum()
marginals.append( this_marginal)
return np.array(marginals)
class LoopyBP(object):
def __init__(self, noise_var, hparam):
# get the constellation
self.constellation = hparam.constellation
self.hparam = hparam
# set the graph
self.graph = fg.Graph()
# add the discrete random variables to graph
self.n_symbol = hparam.num_tx
for idx in range(hparam.num_tx):
self.graph.rv("x{}".format(idx), len(self.constellation))
def set_potential(self, S, b):
s = S
for var_idx in range(self.hparam.num_tx):
# set the first type of potentials, the standalone potentials
f_x_i = np.exp( - s[var_idx, var_idx] * np.power(self.constellation, 2)
- b[var_idx] * np.array(self.constellation))
self.graph.factor(["x{}".format(var_idx)],
potential=f_x_i)
for var_idx in range(self.hparam.num_tx):
for var_jdx in range(var_idx + 1, self.hparam.num_tx):
# set the cross potentials
if s[var_idx, var_jdx] > 0:
t_ij = np.exp(-2* np.array(self.constellation)[None,:].T
* s[var_idx, var_jdx] * np.array(self.constellation))
self.graph.factor(["x{}".format(var_jdx), "x{}".format(var_idx)],
potential=t_ij)
def fit(self, S, b, stop_iter=10):
""" set potentials and run message passing"""
self.set_potential(S, b)
# run BP
iters, converged = self.graph.lbp(normalize=True,max_iters=stop_iter)
def detect_signal_by_mean(self):
estimated_signal = []
rv_marginals = dict(self.graph.rv_marginals())
for idx in range(self.n_symbol):
x_marginal = rv_marginals["x{}".format(idx)]
estimated_signal.append(self.constellation[x_marginal.argmax()])
return estimated_signal
def marginals(self):
marginal_prob = []
rv_marginals = dict(self.graph.rv_marginals())
for idx in range(self.n_symbol):
x_marginal = rv_marginals["x{}".format(idx)]
x_marginal = np.array(x_marginal)
x_marginal = x_marginal/x_marginal.sum()
marginal_prob.append(x_marginal)
return np.array(marginal_prob)
class AlphaBP(LoopyBP):
def __init__(self, noise_var, hparam):
self.hparam = hparam
# get the constellation
self.constellation = hparam.constellation
self.n_symbol = hparam.num_tx
# set the graph
self.graph = alphaBP.alphaGraph(alpha=hparam.alpha)
# add the discrete random variables to graph
for idx in range(hparam.num_tx ):
self.graph.rv("x{}".format(idx), len(self.constellation))
class MMSEalphaBP(AlphaBP):
def set_potential(self, S, b):
s = S
inv = np.linalg.inv(np.eye(s.shape[0]) + 2 * s )
prior_u = inv.dot(b)
for var_idx in range(s.shape[1]):
# set the first type of potentials, the standalone potentials
f_x_i = np.exp( - s[var_idx, var_idx] * np.power(self.constellation, 2)
- b[var_idx] * np.array(self.constellation))
prior_i = np.exp(-0.5 * np.power(self.constellation - prior_u[var_idx], 2) \
/ (inv[var_idx, var_idx]) )
self.graph.factor(["x{}".format(var_idx)],
potential=f_x_i * prior_i)
for var_idx in range(s.shape[1]):
for var_jdx in range(var_idx + 1, s.shape[1]):
# set the cross potentials
if s[var_idx, var_jdx] > 0:
t_ij = np.exp(- 2 * np.array(self.constellation)[None,:].T
* s[var_idx, var_jdx] * np.array(self.constellation))
self.graph.factor(["x{}".format(var_jdx), "x{}".format(var_idx)],
potential=t_ij)