Skip to content

Commit c480c67

Browse files
authored
Merge pull request #25 from elecbug/v0.7
Update v0.7.2
2 parents 4ab19d0 + 6b7db8d commit c480c67

5 files changed

Lines changed: 118 additions & 31 deletions

File tree

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,5 @@ log
3232
venv/
3333
__pycache__/
3434
*.pyc
35-
temp
35+
temp
36+
temp/

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ go get github.com/elecbug/netkit@latest
1919
- [`graph`](./network-graph/graph/): Library for creating and building graphs.
2020
- [`algorithm`](./network-graph/algorithm/): Library containing various graph algorithms.
2121

22-
# Extensible
22+
### Extensible
2323

2424
- [`bimap`](./bimap/): Bidirectional map with O(1) lookups key->value and value->key.
2525
- [`slice`](./slice/): Generic helpers: binary search, stable merge sort, parallel sort, and `IsSorted`.

p2p/network.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@ package p2p
22

33
import (
44
"strconv"
5+
"sync"
56

67
"github.com/elecbug/netkit/network-graph/graph"
78
"github.com/elecbug/netkit/network-graph/node"
89
)
910

1011
// GenerateNetwork creates a P2P network from the given graph.
1112
// nodeLatency and edgeLatency are functions that generate latencies for nodes and edges respectively.
12-
func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency func() float64) (map[ID]*Node, error) {
13+
func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency, queuingLatency func() float64) (map[ID]*Node, error) {
1314
nodes := make(map[ID]*Node)
1415
maps := make(map[node.ID]ID)
1516

@@ -22,9 +23,9 @@ func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency func() float64) (m
2223
}
2324

2425
n := &Node{
25-
ID: ID(num),
26-
Latency: nodeLatency(),
27-
Edges: make(map[ID]Edge),
26+
ID: ID(num),
27+
ValidationLatency: nodeLatency(),
28+
Edges: make(map[ID]Edge),
2829
}
2930

3031
nodes[n.ID] = n
@@ -57,9 +58,14 @@ func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency func() float64) (m
5758

5859
// RunNetworkSimulation starts the message handling routines for all nodes in the network.
5960
func RunNetworkSimulation(nodes map[ID]*Node) {
61+
wg := &sync.WaitGroup{}
62+
wg.Add(len(nodes))
63+
6064
for _, n := range nodes {
61-
n.eachRun(nodes)
65+
n.eachRun(nodes, wg)
6266
}
67+
68+
wg.Wait()
6369
}
6470

6571
// Publish sends a message to the specified node's message queue.

p2p/node.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ type Edge struct {
2222

2323
// Node represents a node in the P2P network.
2424
type Node struct {
25-
ID ID
26-
Latency float64
27-
Edges map[ID]Edge
25+
ID ID
26+
ValidationLatency float64
27+
QueuingLatency float64
28+
Edges map[ID]Edge
2829

2930
RecvFrom map[string]map[ID]struct{} // content -> set of senders
3031
SentTo map[string]map[ID]struct{} // content -> set of targets
@@ -40,13 +41,15 @@ func (n *Node) Degree() int {
4041
}
4142

4243
// eachRun starts the message handling routine for the node.
43-
func (n *Node) eachRun(network map[ID]*Node) {
44-
go func() {
45-
n.msgQueue = make(chan Message, 1000)
46-
n.RecvFrom = make(map[string]map[ID]struct{})
47-
n.SentTo = make(map[string]map[ID]struct{})
48-
n.SeenAt = make(map[string]time.Time)
44+
func (n *Node) eachRun(network map[ID]*Node, wg *sync.WaitGroup) {
45+
defer wg.Done()
46+
47+
n.msgQueue = make(chan Message, 1000)
48+
n.RecvFrom = make(map[string]map[ID]struct{})
49+
n.SentTo = make(map[string]map[ID]struct{})
50+
n.SeenAt = make(map[string]time.Time)
4951

52+
go func() {
5053
for msg := range n.msgQueue {
5154
first := false
5255
var excludeSnapshot map[ID]struct{}
@@ -66,7 +69,7 @@ func (n *Node) eachRun(network map[ID]*Node) {
6669

6770
if first {
6871
go func(content string, exclude map[ID]struct{}) {
69-
time.Sleep(time.Duration(n.Latency) * time.Millisecond)
72+
time.Sleep(time.Duration(n.ValidationLatency) * time.Millisecond)
7073
n.publish(network, content, exclude)
7174
}(msg.Content, excludeSnapshot)
7275
}
@@ -86,6 +89,10 @@ func copyIDSet(src map[ID]struct{}) map[ID]struct{} {
8689
// publish sends the message to neighbors, excluding 'exclude' and already-sent targets.
8790
func (n *Node) publish(network map[ID]*Node, content string, exclude map[ID]struct{}) {
8891
n.mu.Lock()
92+
defer n.mu.Unlock()
93+
94+
time.Sleep(time.Duration(n.QueuingLatency) * time.Millisecond)
95+
8996
if _, ok := n.SentTo[content]; !ok {
9097
n.SentTo[content] = make(map[ID]struct{})
9198
}
@@ -103,10 +110,10 @@ func (n *Node) publish(network map[ID]*Node, content string, exclude map[ID]stru
103110
n.SentTo[content][edge.TargetID] = struct{}{}
104111

105112
edgeCopy := edge
113+
106114
go func(e Edge) {
107115
time.Sleep(time.Duration(e.Latency) * time.Millisecond)
108116
network[e.TargetID].msgQueue <- Message{From: n.ID, Content: content}
109117
}(edgeCopy)
110118
}
111-
n.mu.Unlock()
112119
}

p2p/p2p_test.go

Lines changed: 86 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,118 @@
11
package p2p_test
22

33
import (
4+
"encoding/json"
5+
"fmt"
6+
"math"
47
"math/rand"
8+
"os"
9+
"sync"
510
"testing"
611
"time"
712

13+
"github.com/elecbug/netkit/network-graph/algorithm"
814
"github.com/elecbug/netkit/network-graph/graph/standard_graph"
915
"github.com/elecbug/netkit/p2p"
1016
)
1117

1218
func TestGenerateNetwork(t *testing.T) {
13-
g := standard_graph.ErdosRenyiGraph(1000, 0.005, true)
19+
g := standard_graph.ErdosRenyiGraph(1000, 0.05, true)
1420
t.Logf("Generated graph with %d nodes and %d edges\n", len(g.Nodes()), g.EdgeCount())
1521
src := rand.NewSource(time.Now().UnixNano())
1622

1723
nodeLatency := func() float64 { return p2p.LogNormalRand(5.704, 0.5, src) }
1824
edgeLatency := func() float64 { return p2p.LogNormalRand(5.704, 0.3, src) }
25+
queuingLatency := func() float64 { return p2p.LogNormalRand(5.0, 0.2, src) }
1926

20-
nw, _ := p2p.GenerateNetwork(g, nodeLatency, edgeLatency)
27+
nw, _ := p2p.GenerateNetwork(g, nodeLatency, edgeLatency, queuingLatency)
2128
t.Logf("Generated network with %d nodes\n", len(nw))
2229
for id, node := range nw {
23-
t.Logf("Node %d: latency=%.2fms, edges=%v\n", id, node.Latency, node.Edges)
30+
t.Logf("Node %d: validation_latency=%.2fms, queuing_latency=%.2fms, edges=%v\n", id, node.ValidationLatency, node.QueuingLatency, node.Edges)
2431
}
2532

26-
p2p.RunNetworkSimulation(nw)
27-
p2p.Publish(nw[0], "Hello, P2P Network!")
33+
msg := "Hello, P2P World!"
2834

29-
time.Sleep(5 * time.Second)
35+
p2p.RunNetworkSimulation(nw)
36+
p2p.Publish(nw[0], msg)
37+
time.Sleep(1 * time.Second)
3038

3139
count := 0
40+
result := make(map[string]map[string]any)
41+
3242
for id, node := range nw {
33-
c := len(node.SentTo["Hello, P2P Network!"])
43+
c := len(node.SentTo[msg])
3444
t.Logf("Node %d sent %d/%d\n", id, c, len(node.Edges))
35-
t.Logf("Node %d data: recv: %v, sent: %v, seen: %v\n",
36-
id,
37-
node.RecvFrom["Hello, P2P Network!"],
38-
node.SentTo["Hello, P2P Network!"],
39-
node.SeenAt["Hello, P2P Network!"],
40-
)
45+
46+
result[fmt.Sprintf("node_%d", id)] = map[string]any{}
47+
result[fmt.Sprintf("node_%d", id)]["recv"] = node.RecvFrom[msg]
48+
result[fmt.Sprintf("node_%d", id)]["sent"] = node.SentTo[msg]
49+
result[fmt.Sprintf("node_%d", id)]["seen"] = node.SeenAt[msg]
50+
4151
count += c
4252
}
4353

4454
t.Logf("Total received count: %d\n", count)
55+
56+
data, _ := json.Marshal(result)
57+
58+
os.WriteFile("p2p_result.log", data, 0644)
59+
}
60+
61+
func TestExpCase(t *testing.T) {
62+
run := false
63+
64+
if run {
65+
for i := 4; i <= 11; i++ {
66+
wg := &sync.WaitGroup{}
67+
68+
for j := 0; j < 60; j++ {
69+
wg.Add(1)
70+
go func(j int) {
71+
defer wg.Done()
72+
filename := fmt.Sprintf("temp/p2p_result-%02d-%03d.log", i, j)
73+
74+
if _, err := os.Stat(filename); err == nil {
75+
t.Logf("File %s already exists, skipping...\n", filename)
76+
return
77+
}
78+
79+
t.Logf("Experiment case: %02d-%03d\n", i, j)
80+
r := rand.New(rand.NewSource(time.Now().UnixNano()))
81+
82+
n := r.Int()%20 + 170
83+
g := standard_graph.ErdosRenyiGraph(n, float64(i)/float64(n), true)
84+
85+
nodeLatency := func() float64 { return p2p.LogNormalRand(math.Log(0), 0.01, rand.NewSource(time.Now().UnixNano())) }
86+
edgeLatency := func() float64 { return p2p.LogNormalRand(math.Log(500), 0.01, rand.NewSource(time.Now().UnixNano())) }
87+
queuingLatency := func() float64 { return p2p.LogNormalRand(math.Log(0), 0.01, rand.NewSource(time.Now().UnixNano())) }
88+
89+
nw, _ := p2p.GenerateNetwork(g, nodeLatency, edgeLatency, queuingLatency)
90+
msg := "Hello, P2P World!"
91+
92+
t.Logf("Generated graph with %d nodes and %d edges\n", len(g.Nodes()), g.EdgeCount())
93+
94+
p2p.RunNetworkSimulation(nw)
95+
p2p.Publish(nw[0], msg)
96+
time.Sleep(5 * time.Second)
97+
98+
result := make(map[string]map[string]any)
99+
100+
for id, node := range nw {
101+
result[fmt.Sprintf("node_%d", id)] = map[string]any{}
102+
result[fmt.Sprintf("node_%d", id)]["recv"] = node.RecvFrom[msg]
103+
result[fmt.Sprintf("node_%d", id)]["sent"] = node.SentTo[msg]
104+
result[fmt.Sprintf("node_%d", id)]["seen"] = node.SeenAt[msg]
105+
}
106+
107+
data, _ := json.Marshal(result)
108+
109+
os.WriteFile(filename, data, 0644)
110+
111+
algorithm.CacheClear()
112+
}(j)
113+
}
114+
115+
wg.Wait()
116+
}
117+
}
45118
}

0 commit comments

Comments
 (0)