-
Notifications
You must be signed in to change notification settings - Fork 215
Expand file tree
/
Copy pathpipedream_tool.py
More file actions
216 lines (175 loc) · 7.34 KB
/
pipedream_tool.py
File metadata and controls
216 lines (175 loc) · 7.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
from typing import Optional, Dict, Any
from crewai_tools import BaseTool
from pydantic import Field
import os
import requests
from json import JSONDecodeError
from agentstack.exceptions import ToolError
# TODO: Future Enhancements
# - Add support for workflow-specific operations (create/update/delete)
# - Implement workflow creation with component chaining
# - Add workflow update capabilities
# - Support workflow deletion and cleanup
#
# - Implement webhook management capabilities
# - Add webhook creation and configuration
# - Support webhook event filtering
# - Implement webhook deletion and updates
#
# - Add component version control integration
# - Support component versioning
# - Add version rollback capabilities
# - Implement version comparison
#
# - Support custom component deployment
# - Add custom component creation
# - Support component testing and validation
# - Implement component publishing
class PipedreamToolError(ToolError):
"""Exception raised for Pipedream-specific tool errors."""
pass
class PipedreamClient:
"""Client for interacting with Pipedream API"""
def __init__(self, api_key: str):
self.base_url = "https://api.pipedream.com/v1/connect"
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def list_apps(self, query: str = None) -> dict:
"""List available Pipedream apps"""
params = {"q": query} if query else {}
return self._request("GET", "/apps", params=params)
def list_components(self, app: str) -> dict:
"""List available components for an app"""
return self._request("GET", f"/actions?app={app}")
def get_component_definition(self, key: str) -> dict:
"""Get component definition and props"""
return self._request("GET", f"/components/{key}")
def run_action(self, component_id: str, inputs: Dict[str, Any]) -> dict:
"""Execute a Pipedream component action"""
return self._request("POST", "/actions/run", json={
"id": component_id,
"configured_props": inputs
})
def deploy_source(self, component_id: str, webhook_url: str, config: Dict[str, Any]) -> dict:
"""Deploy a Pipedream component source"""
return self._request("POST", "/triggers/deploy", json={
"id": component_id,
"webhook_url": webhook_url,
"configured_props": config
})
def _request(self, method: str, path: str, **kwargs) -> dict:
"""Make request to Pipedream API"""
response = requests.request(method, f"{self.base_url}{path}",
headers=self.headers, **kwargs)
if not response.ok:
raise PipedreamToolError(f"API request failed: {response.text}")
try:
return response.json()
except JSONDecodeError:
raise PipedreamToolError("Invalid JSON response from Pipedream API")
class PipedreamListAppsTool(BaseTool):
name: str = "List Pipedream Apps"
description: str = "List available Pipedream apps with optional search query"
client: Optional[PipedreamClient] = Field(default=None, exclude=True)
model_config = {
"arbitrary_types_allowed": True,
"extra": "allow"
}
def __init__(self, api_key: str):
super().__init__()
self.client = PipedreamClient(api_key)
def _run(self, query: str = None) -> str:
"""List available Pipedream apps with optional search query"""
try:
return self.client.list_apps(query)["data"]
except Exception as e:
raise PipedreamToolError(f"Failed to list apps: {str(e)}")
class PipedreamListComponentsTool(BaseTool):
name: str = "List Pipedream Components"
description: str = "List available components for a Pipedream app"
client: Optional[PipedreamClient] = Field(default=None, exclude=True)
model_config = {
"arbitrary_types_allowed": True,
"extra": "allow"
}
def __init__(self, api_key: str):
super().__init__()
self.client = PipedreamClient(api_key)
def _run(self, app: str) -> str:
"""List available components for the specified app"""
try:
return self.client.list_components(app)["data"]
except Exception as e:
raise PipedreamToolError(f"Failed to list components: {str(e)}")
class PipedreamGetPropsTool(BaseTool):
name: str = "Get Pipedream Component Properties"
description: str = "Get component definition and configuration options"
client: Optional[PipedreamClient] = Field(default=None, exclude=True)
model_config = {
"arbitrary_types_allowed": True,
"extra": "allow"
}
def __init__(self, api_key: str):
super().__init__()
self.client = PipedreamClient(api_key)
def _run(self, key: str) -> str:
"""Get component definition and configuration options"""
try:
return self.client.get_component_definition(key)["data"]
except Exception as e:
raise PipedreamToolError(f"Failed to get component properties: {str(e)}")
class PipedreamActionTool(BaseTool):
name: str = "Execute Pipedream Action"
description: str = "Execute a Pipedream component action with specified inputs"
client: Optional[PipedreamClient] = Field(default=None, exclude=True)
model_config = {
"arbitrary_types_allowed": True,
"extra": "allow"
}
def __init__(self, api_key: str):
super().__init__()
self.client = PipedreamClient(api_key)
def _run(self, component_id: str, inputs: Dict[str, Any]) -> str:
"""
Execute a Pipedream component action.
Args:
component_id: The ID of the Pipedream component to execute
inputs: Dictionary of input parameters for the component
Returns:
str: JSON response from the component execution
Raises:
PipedreamToolError: If the API request fails or returns an error
"""
try:
return self.client.run_action(component_id, inputs)
except Exception as e:
raise PipedreamToolError(f"Failed to execute action: {str(e)}")
class PipedreamSourceTool(BaseTool):
name: str = "Deploy Pipedream Source"
description: str = "Deploy a Pipedream source component with webhook configuration"
client: Optional[PipedreamClient] = Field(default=None, exclude=True)
model_config = {
"arbitrary_types_allowed": True,
"extra": "allow"
}
def __init__(self, api_key: str):
super().__init__()
self.client = PipedreamClient(api_key)
def _run(self, component_id: str, webhook_url: str, config: Dict[str, Any]) -> str:
"""
Deploy a Pipedream component source.
Args:
component_id: The ID of the Pipedream component to deploy
webhook_url: The URL where events will be sent
config: Dictionary of configuration parameters for the component
Returns:
str: JSON response from the component deployment
Raises:
PipedreamToolError: If the API request fails or returns an error
"""
try:
return self.client.deploy_source(component_id, webhook_url, config)
except Exception as e:
raise PipedreamToolError(f"Failed to deploy source: {str(e)}")