forked from Dronolab/interop-mavproxy-module
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmavproxy_interop.py
More file actions
128 lines (112 loc) · 4.23 KB
/
mavproxy_interop.py
File metadata and controls
128 lines (112 loc) · 4.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/env python
import socket
import json
from MAVProxy.modules.lib import mp_module
class InteropModule(mp_module.MPModule):
def __init__(self, mpstate):
super(InteropModule, self).__init__(mpstate, "interop", "interoperability module")
# The drone/autopilot firmware ip address like 127.0.0.1
self.ip = "127.0.0.1"
# The port you want to use in order to send your json via UDP
self.port = 5005
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def mavlink_packet(self, m):
mtype = m.get_type()
# https://pixhawk.ethz.ch/mavlink/#ATTITUDE
if mtype == "ATTITUDE":
response = {
"packet_id": 30,
"time_boot_ms": m.time_boot_ms,
"roll": m.roll,
"pitch": m.pitch,
"yaw": m.yaw,
"rollspeed": m.rollspeed,
"pitchspeed": m.pitchspeed,
"yawspeed": m.yawspeed
}
self.sock.sendto(json.dumps(response), (self.ip, self.port))
elif mtype == "ALTITUDE":
response = {
"packet_id": 141,
"time_usec": m.time_usec,
"altitude_monotonic": m.altitude_monotonic,
"altitude_amsl": m.altitude_amsl,
"altitude_local": m.altitude_local,
"altitude_relative": m.altitude_relative,
"altitude_terrain": m.altitude_terrain,
"bottom_clearance": m.bottom_clearance
}
self.sock.sendto(json.dumps(response), (self.ip, self.port))
elif mtype == "HIGHRES_IMU":
response = {
"packet_id": 105,
"time_usec": m.time_usec,
"xacc": m.xacc,
"yacc": m.yacc,
"zacc": m.zacc,
"xgyro": m.xgyro,
"ygyro": m.ygyro,
"zgyro": m.zgyro,
"xmag": m.xmag,
"ymag": m.ymag,
"zmag": m.zmag,
"abs_pressure": m.abs_pressure,
"diff_pressure": m.diff_pressure,
"pressure_alt": m.pressure_alt,
"temperature": m.temperature,
"fields_updated": m.fields_updated
}
self.sock.sendto(json.dumps(response), (self.ip, self.port))
elif mtype == "GPS_RAW_INT":
response = {
"packet_id": 24,
"time_usec": m.time_usec,
"fix_type": m.fix_type,
"lat": m.lat,
"lon": m.lon,
"alt": m.alt,
"eph": m.eph,
"epv": m.epv,
"vel": m.vel,
"cog": m.cog,
"satellites_visible": m.satellites_visible
}
self.sock.sendto(json.dumps(response), (self.ip, self.port))
elif mtype == "GLOBAL_POSITION_INT":
response = {
"packet_id": 33,
"time_boot_ms": m.time_boot_ms,
"lat": m.lat,
"lon": m.lon,
"alt": m.alt,
"relative_alt": m.relative_alt,
"vx": m.vx,
"vy": m.vy,
"vz": m.vz,
"hdg": m.hdg
}
self.sock.sendto(json.dumps(response), (self.ip, self.port))
elif mtype == "VFR_HUD":
response = {
"packet_id": 74,
"groundspeed": m.groundspeed,
"heading": m.heading,
"throttle": m.throttle,
"alt": m.alt,
"climb": m.climb
}
self.sock.sendto(json.dumps(response), (self.ip, self.port))
elif mtype == "LOCAL_POSITION_NED":
response = {
"packet_id": 32,
"time_boot_ms": m.time_boot_ms,
"x": m.x,
"y": m.y,
"z": m.z,
"vx": m.vx,
"vy": m.vy,
"vz": m.vz
}
self.sock.sendto(json.dumps(response), (self.ip, self.port))
def init(mpstate):
return InteropModule(mpstate)