-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathp2p_nvlink.py
More file actions
72 lines (49 loc) · 1.7 KB
/
p2p_nvlink.py
File metadata and controls
72 lines (49 loc) · 1.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import os
import torch
import torch.distributed as dist
from dlslime import _slime_c
def run_benchmark():
dist.init_process_group(backend="nccl")
rank = dist.get_rank()
world_size = dist.get_world_size()
local_rank = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(local_rank)
device = torch.device(f"cuda:{local_rank}")
if world_size < 2:
raise RuntimeError("需要至少 2 个 GPU 才能运行此 P2P 测试")
ep = _slime_c.NVLinkEndpoint()
if rank == 0:
tensor = torch.zeros([16], device=device, dtype=torch.uint8)
role = "Initiator"
else:
tensor = torch.ones([16], device=device, dtype=torch.uint8)
role = "Target"
ep.register_memory_region(
tensor.data_ptr(),
int(tensor.storage_offset()),
tensor.numel() * tensor.itemsize,
"buffer",
)
local_info = ep.endpoint_info()
gather_list = [{} for _ in range(world_size)]
dist.all_gather_object(gather_list, local_info)
target_rank = 1 - rank
remote_info = gather_list[target_rank]
print(f"[Rank {rank}] Connecting to Rank {target_rank}...")
ep.connect(remote_info)
dist.barrier()
if rank == 0:
ep.read(
[("buffer", "buffer", 8, 0, 8)],
None,
)
torch.cuda.synchronize()
print(f"[Rank {rank}] After Read: {tensor}")
assert torch.all(tensor[:8] == 1), f"First half check failed: {tensor[:8]}"
assert torch.all(tensor[8:] == 0), f"Second half check failed: {tensor[8:]}"
print("run nvlink p2p read example successful")
dist.barrier()
del ep
dist.destroy_process_group()
if __name__ == "__main__":
run_benchmark()