Skip to content
Open
58 changes: 58 additions & 0 deletions micropython/ftespnow/examples/client_side_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import ftespnow

# Initialize ESP-NOW client
esp = ftespnow.CLIENT()

# Connect to ESP-NOW server
esp.connect("a4f00f772d15") # Change to actual server mac address

# Send a message
message = "Hello"
sent = esp.send_message(message)
if sent: # Check if server received the data
print("Message received by server")
else:
print("Message not received by server")

# Receive a message
received_data = esp.receive_message()
if received_data is None: # Check if any data was received
print("No message was received (timed out)")
else:
print(f"Here is the received data: {received_data}")

# Send a .txt file
txt_sent = esp.send_txt("filepath/filename.txt")
if txt_sent: # Check if server received the data
print("File received by server")
else:
print("File not received by server")

# Send a .json file
json_sent = esp.send_json("filepath/filename.json")
if json_sent: # Check if server received the data
print("File received by server")
else:
print("File not received by server")

# Write received data to .txt file
txt_received = esp.receive_to_txt(
"filepath/filename.txt", mode="w"
) # Set mode to 'w' so file is truncated before writing
if txt_received:
print("File received successfully")
else:
print("No file received. Destination file was not created/modified")

# Write received data to .json file
json_received = esp.receive_to_json(
"filepath/filename.json", mode="w"
) # Set mode to 'w' so file is truncated before writing
if json_received:
print("File received successfully")
else:
print("No file received. Destination file was not created/modified")

# Write received data to a python dictionary
data_dict = esp.receive_to_dict()
print(data_dict)
56 changes: 56 additions & 0 deletions micropython/ftespnow/examples/server_side_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import ftespnow

# Initialize ESP-NOW client
esp = ftespnow.SERVER()

# Send a message
message = "Hello"
peer = "a4f00f772d15" # Mac address of the client that you want to send data to
sent = esp.send_message(peer, message)
if sent: # Check if client received the data
print("Message received by clientclient")
else:
print("Message not received by client")

# Receive a message
received_data = esp.receive_message()
if received_data is None: # Check if any data was received
print("No message was received (timed out)")
else:
print(f"Here is the received data: {received_data}")

# Send a .txt file
txt_sent = esp.send_txt(peer, "filepath/filename.txt")
if txt_sent: # Check if client received the data
print("File received by client")
else:
print("File not received by client")

# Send a .json file
json_sent = esp.send_json(peer, "filepath/filename.json")
if json_sent: # Check if client received the data
print("File received by client")
else:
print("File not received by client")

# Write received data to .txt file
txt_received = esp.receive_to_txt(
"filepath/filename.txt", mode="w"
) # Set mode to 'w' so file is truncated before writing
if txt_received:
print("File received successfully")
else:
print("No file received. Destination file was not created/modified")

# Write received data to .json file
json_received = esp.receive_to_json(
"filepath/filename.json", mode="w"
) # Set mode to 'w' so file is truncated before writing
if json_received: # Check if any data was received
print("File received successfully")
else:
print("No file received. Destination file was not created/modified")

# Write received data to a python dictionary
data_dict = esp.receive_to_dict() # Will return {} if no data was received
print(data_dict)
212 changes: 212 additions & 0 deletions micropython/ftespnow/ftespnow-client/ftespnow/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import espnow
import json


class CLIENT:
def __init__(self, *, timeout: int = 5) -> None:
self.esp = espnow.ESPNow()
self.timeout = timeout

def configure(self, *, timeout: int = 5) -> None:
self.timeout: int = timeout

def connect(self, peer: str) -> None:
self.peer: str = peer
self.esp.active(True)
self.esp.add_peer(self.peer)

def send_message(self, data: str) -> bool:
"""
Send a string

Args:

data (str): Data to be sent

Returns:

bool: Confirmation flag (`True` if data was received, `False` otherwise)
"""

ack: bool = self.esp.send(self.peer, data)
return ack

def receive_message(self, recv_timeout: int = 5) -> list | None:
"""
Receive a string

Args:

recv_timeout (int, optional): Reception timeout. Defaults to 5.

Returns:

list | None: `[<sender's mac address (str)>, <message (str)>]` | `None` if no message is received
"""

received = self.esp.recv(recv_timeout)
if received[0] is None:
return
return received

def send_txt(self, filename: str) -> bool:
"""
Parse and send a `.txt` file as a `string`

Args:

filename (str): Filepath of the desired file to be sent with file name and extension

Returns:

sent (bool): Confirmation flag (`True` if data was received, `False` otherwise)
"""

with open(filename, "r") as f:
data: str = str(f.readlines())
sent: bool = self.send_message(data)
return sent

def send_json(self, filename: str, *, indent: int = 4) -> bool:
"""
Parse and send a `.json` file as a `string`

Args:

filename (str): Filepath of the desired file to be sent with file name and extension

indent (int, optional): Desired indent of the resulting parsed `string` (for formatting purposes). Defaults to 4.

Returns:

sent (bool): Confirmation flag (`True` if data was received, `False` otherwise)
"""

with open(filename, "r") as f:
unparsed = json.load(f)
parsed: str = json.dumps(unparsed, indent=indent)
sent: bool = self.send_message(parsed)
return sent

def receive_to_txt(self, target_file: str, mode: str = "a") -> bool:
"""
Write received `string` into a `.txt` file.

**Will not write or create file if no data is received**

Args:

target_file (str): Filepath of the destination file for the received data with file name and extension.

mode (str, optional): Editing mode

- `r` - Read only

- `w` - Write only (truncates file before writing)

- `x` - Create a new file and open it for writing (raises `FileExistsError` if file already exists)

- `a` - Append to the end of the file (default)

- `b` - Binary mode

- `t` - Text mode

- `+` - Update (read and write)

Read `open()`_ for more information

Returns:

received (bool): Confirmation flag (`True` if data was received, `False` otherwise)

.. _open(): https://docs.python.org/3/library/functions.html#open
"""

if ".txt" not in target_file:
raise SyntaxError("File format must be .txt")
try:
received: bool = False
data: list | None = self.receive_message()
if data is None:
return received
data_list: list[str] = str(data[-1]).split("\n")
if data_list[-1] == "":
data_list = data_list[:-1]
with open(target_file, mode) as f:
f.writelines(data_list)
return not received
except SyntaxError:
raise

def receive_to_json(self, target_file: str, mode: str = "a") -> bool:
"""
Write received `string` into a `.json` file.

**Will not write or create file if no data is received**

Args:

target_file (str): Filepath of the destination file for the received data with file name and extension.

mode (str, optional): Editing mode

- `r` - Read only

- `w` - Write only (truncates file before writing)

- `x` - Create a new file and open it for writing (raises `FileExistsError` if file already exists)

- `a` - Append to the end of the file (default)

- `b` - Binary mode

- `t` - Text mode

- `+` - Update (read and write)

Read `open()`_ for more information

Returns:

received (bool): Confirmation flag (`True` if data was received, `False` otherwise)

.. _open(): https://docs.python.org/3/library/functions.html#open
"""

if ".json" not in target_file:
raise SyntaxError("File format must be .json")
try:
received: bool = False
data: list | None = self.receive_message()
if data is None:
return received
mac: str = str(data[0])
message = json.loads(str(data[-1]))
unparsed: dict = {"mac": mac, "message": message}
with open(target_file, mode) as f:
json.dump(unparsed, f)
return not received
except SyntaxError:
raise

def receive_to_dict(self) -> dict:
"""
Unparses received `string` into a `dict` object

Args:

None:

Returns:

unparsed (dict): `dictionary` object containing unparsed equivalent of the received `.json`
"""

data: list | None = self.receive_message()
if data is None:
return {}
mac: str = str(data[0])
message = json.loads(str(data[-1]))
unparsed: dict = {"mac": mac, "message": message}
return unparsed
7 changes: 7 additions & 0 deletions micropython/ftespnow/ftespnow-client/manifest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
metadata(
description="Client-side commands",
version="0.1.0",
)

require("ftespnow")
package("ftespnow")
Loading
Loading