-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathspark_greedy.py
More file actions
45 lines (38 loc) · 1.58 KB
/
spark_greedy.py
File metadata and controls
45 lines (38 loc) · 1.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# -*- coding: utf-8 -*-
"""
Created on Tue Sep 29 12:44:24 2015
Spark greedy functions
@author: charlesliu
"""
from main import cascade_trials
import networkx as nx
def greedy_trials(sc, num_trials, g, k, N, t=float("inf"), partitions=8):
results = []
nodes = []
grdd = sc.parallelize(g.nodes(), partitions)
grdd = grdd.filter(lambda x: len(nx.edges(g, nbunch=x)) > 0)
for trial in range(0, num_trials):
result = greedy_search(grdd, g, k, N, t)
results.append(result[1])
nodes.append(result[0])
return {"nodes": nodes, "results": results}
def greedy_search(graph_rdd, graph, select_count, trials, iterations=float("inf")):
max_influence = (set(), 0)
for iteration in range(1, select_count+1):
pairsRDD = graph_rdd.map(lambda x: (max_influence[0] | set([x]), cascade_trials(trials, max_influence[0] | set([x]), graph, iterations)))
pairsRDD = pairsRDD.filter(lambda x: len(x[0]) == iteration)
max_influence = pairsRDD.takeOrdered(1, key=lambda x: -x[1]['mean'])[0]
return max_influence
# N%parts = 0
def spark_trials(rdd,N, nodes, graph, max_iterations=float("inf")):
rdd = rdd.map(lambda x: cascade_trials(N, nodes, graph, max_iterations))
return rdd.reduce(lambda x,y: {'time': x['time'] + y['time'],'mean': 0.5*x['mean'] + 0.5*y['mean'],'std': 0.5*x['std'] + 0.5*y['std']})
def node_count(results):
ret = {}
for result in results["nodes"]:
for node in result:
if node not in ret:
ret[node] = 1
else:
ret[node] += 1
return ret