1+ from dataclasses import dataclass
2+
13from rid_lib .types import KoiNetNode , KoiNetEdge
2- import structlog
3- from koi_net .config . full_node import (
4+ from koi_net . core import FullNode
5+ from koi_net .config import (
46 FullNodeConfig ,
57 ServerConfig ,
68 KoiNetConfig ,
7- NodeProfile ,
9+ FullNodeProfile ,
810 NodeProvides
911)
10- from koi_net .core import FullNode
11- from koi_net .processor .context import HandlerContext
12- from koi_net .processor .handler import HandlerType , KnowledgeHandler
13- from koi_net .processor .knowledge_object import KnowledgeObject
14- from koi_net .protocol .event import Event , EventType
15- from koi_net .protocol .edge import EdgeType , generate_edge_bundle
16-
17- log = structlog .stdlib .get_logger ()
1812
13+ from koi_net .components .interfaces import HandlerType , KnowledgeHandler
14+ from koi_net .components import NodeIdentity , Cache , EventQueue , KobjQueue
15+ from koi_net .protocol import (
16+ KnowledgeObject ,
17+ Event ,
18+ EventType ,
19+ EdgeType ,
20+ generate_edge_bundle
21+ )
1922
2023class CoordinatorConfig (FullNodeConfig ):
2124 server : ServerConfig = ServerConfig (port = 8080 )
2225 koi_net : KoiNetConfig = KoiNetConfig (
2326 node_name = "coordinator" ,
24- node_profile = NodeProfile (
27+ node_profile = FullNodeProfile (
2528 provides = NodeProvides (
2629 event = [KoiNetNode , KoiNetEdge ],
2730 state = [KoiNetNode , KoiNetEdge ]
@@ -30,39 +33,48 @@ class CoordinatorConfig(FullNodeConfig):
3033 rid_types_of_interest = [KoiNetNode , KoiNetEdge ]
3134 )
3235
33- @KnowledgeHandler .create (
34- HandlerType .Network ,
35- rid_types = [KoiNetNode ])
36- def handshake_handler (ctx : HandlerContext , kobj : KnowledgeObject ):
37- log .info ("Handling node handshake" )
38-
39- # only respond if node declares itself as NEW
40- if kobj .event_type != EventType .NEW :
41- return
42-
43- log .info ("Sharing this node's bundle with peer" )
44- identity_bundle = ctx .cache .read (ctx .identity .rid )
45- ctx .event_queue .push (
46- event = Event .from_bundle (EventType .NEW , identity_bundle ),
47- target = kobj .rid
48- )
36+ @dataclass
37+ class HandshakeHandler (KnowledgeHandler ):
38+ identity : NodeIdentity
39+ cache : Cache
40+ event_queue : EventQueue
41+ kobj_queue : KobjQueue
4942
50- log . info ( "Proposing new edge" )
51- # defer handling of proposed edge
43+ handler_type = HandlerType . Network
44+ rid_types = ( KoiNetNode ,)
5245
53- edge_bundle = generate_edge_bundle (
54- source = kobj .rid ,
55- target = ctx .identity .rid ,
56- edge_type = EdgeType .WEBHOOK ,
57- rid_types = [KoiNetNode , KoiNetEdge ]
58- )
46+ def handle (self , kobj : KnowledgeObject ):
47+ # only respond if node declares itself as NEW
48+ if not (kobj .event_type == EventType .NEW and kobj .source == kobj .rid ):
49+ return
50+
51+ self .log .info ("Handling node handshake" )
52+
53+ self .log .info ("Sharing this node's bundle with peer" )
54+ identity_bundle = self .cache .read (self .identity .rid )
55+ self .event_queue .push (
56+ event = Event .from_bundle (
57+ event_type = EventType .NEW ,
58+ bundle = identity_bundle ),
59+ target = kobj .rid
60+ )
61+
62+ self .log .info ("Proposing new edge" )
63+ # defer handling of proposed edge
5964
60- ctx .kobj_queue .push (rid = edge_bundle .rid , event_type = EventType .FORGET )
61- ctx .kobj_queue .push (bundle = edge_bundle )
65+ edge_bundle = generate_edge_bundle (
66+ source = kobj .rid ,
67+ target = self .identity .rid ,
68+ edge_type = EdgeType .WEBHOOK ,
69+ rid_types = [KoiNetNode , KoiNetEdge ]
70+ )
71+
72+ self .kobj_queue .push (rid = edge_bundle .rid , event_type = EventType .FORGET )
73+ self .kobj_queue .push (bundle = edge_bundle )
6274
6375class CoordinatorNode (FullNode ):
6476 config_schema = CoordinatorConfig
65- knowledge_handlers = FullNode . knowledge_handlers + [ handshake_handler ]
77+ handshake_handler = HandshakeHandler
6678
6779if __name__ == "__main__" :
6880 CoordinatorNode ().run ()
0 commit comments