-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
62 lines (52 loc) · 1.94 KB
/
main.py
File metadata and controls
62 lines (52 loc) · 1.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
"""Synapse Layer — Python Basic Example
Store a memory and recall it with semantic search.
"""
import os
import requests
from dotenv import load_dotenv
load_dotenv()
TOKEN = os.getenv("SYNAPSE_TOKEN", "")
BASE_URL = os.getenv("SYNAPSE_BASE_URL", "https://forge.synapselayer.org")
if not TOKEN or TOKEN == "sk_connect_xxx":
print("ERROR: Set SYNAPSE_TOKEN in .env")
print("Get yours at: https://forge.synapselayer.org/dashboard/connect")
exit(1)
HEADERS = {"Content-Type": "application/json", "Authorization": f"Bearer {TOKEN}"}
AGENT = "python-example"
def store_memory():
"""Store a memory via the Forge API."""
print("[store] Saving memory...")
resp = requests.post(f"{BASE_URL}/api/forge", headers=HEADERS, json={
"action": "store",
"agent": AGENT,
"content": "The user prefers dark mode and communicates in Portuguese.",
"intent": "user_preference",
})
resp.raise_for_status()
print("[store] Done.\n")
def recall_memory():
"""Recall memories with a semantic query (cross-agent)."""
print("[recall] Searching for user preferences...")
resp = requests.post(f"{BASE_URL}/api/forge", headers=HEADERS, json={
"action": "recall",
"query": "What are the user preferences?",
"topK": 5,
})
resp.raise_for_status()
data = resp.json()
memories = data.get("memories", [])
if memories:
for i, mem in enumerate(memories):
print(f" [{i}] content: {mem.get('content', 'N/A')}")
print(f" tq_score: {mem.get('trustQuotient', 'N/A')}")
print(f"\n[recall] {len(memories)} memories found.")
else:
print("[recall] No memories found yet.")
if __name__ == "__main__":
try:
store_memory()
recall_memory()
except requests.exceptions.HTTPError as e:
print(f"API Error: {e.response.status_code} — {e.response.text[:200]}")
except Exception as e:
print(f"Error: {e}")