|
2 | 2 | End-to-end tests for LiveKit RTC library. |
3 | 3 |
|
4 | 4 | These tests verify core functionality of the LiveKit RTC library including: |
5 | | -- Publishing and subscribing to audio tracks |
| 5 | +- Publishing and subscribing to audio & data tracks |
6 | 6 | - Audio stream consumption and energy verification |
7 | 7 | - Room lifecycle events (connect, disconnect, track publish/unpublish) |
8 | 8 | - Connection state transitions |
@@ -434,3 +434,82 @@ def on_state_changed(state: rtc.ConnectionState): |
434 | 434 | finally: |
435 | 435 | if room.isconnected(): |
436 | 436 | await room.disconnect() |
| 437 | + |
| 438 | + |
| 439 | +@pytest.mark.asyncio |
| 440 | +@skip_if_no_credentials() |
| 441 | +async def test_data_track(): |
| 442 | + """Test that a published data track delivers frames with correct payloads and timestamps.""" |
| 443 | + FRAME_COUNT = 5 |
| 444 | + PAYLOAD_SIZE = 64 |
| 445 | + |
| 446 | + TRACK_NAME = "test-track" |
| 447 | + PUBLISHER_IDENTITY = "dt-publisher" |
| 448 | + SUBSCRIBER_IDENTITY = "dt-subscriber" |
| 449 | + |
| 450 | + room_name = unique_room_name("test-data-track") |
| 451 | + url = os.getenv("LIVEKIT_URL") |
| 452 | + |
| 453 | + publisher_room = rtc.Room() |
| 454 | + subscriber_room = rtc.Room() |
| 455 | + |
| 456 | + publisher_token = create_token(PUBLISHER_IDENTITY, room_name) |
| 457 | + subscriber_token = create_token(SUBSCRIBER_IDENTITY, room_name) |
| 458 | + |
| 459 | + remote_track_event = asyncio.Event() |
| 460 | + remote_track = None |
| 461 | + |
| 462 | + @subscriber_room.on("remote_data_track_published") |
| 463 | + def on_remote_data_track_published(track: rtc.RemoteDataTrack): |
| 464 | + nonlocal remote_track |
| 465 | + remote_track = track |
| 466 | + remote_track_event.set() |
| 467 | + |
| 468 | + try: |
| 469 | + await subscriber_room.connect(url, subscriber_token) |
| 470 | + await publisher_room.connect(url, publisher_token) |
| 471 | + |
| 472 | + local_track = await publisher_room.local_participant.publish_data_track(TRACK_NAME) |
| 473 | + assert local_track.info.sid is not None |
| 474 | + assert local_track.info.name == TRACK_NAME |
| 475 | + assert local_track.is_published() |
| 476 | + |
| 477 | + await asyncio.wait_for(remote_track_event.wait(), timeout=10.0) |
| 478 | + assert remote_track is not None |
| 479 | + assert remote_track.info.name == TRACK_NAME |
| 480 | + assert remote_track.publisher_identity == PUBLISHER_IDENTITY |
| 481 | + assert remote_track.is_published() |
| 482 | + |
| 483 | + subscription = await remote_track.subscribe() |
| 484 | + |
| 485 | + async def push_frames(): |
| 486 | + for i in range(FRAME_COUNT): |
| 487 | + frame = rtc.DataTrackFrame( |
| 488 | + payload=bytes([i] * PAYLOAD_SIZE) |
| 489 | + ).with_user_timestamp_now() |
| 490 | + local_track.try_push(frame) |
| 491 | + await asyncio.sleep(0.1) |
| 492 | + local_track.unpublish() |
| 493 | + |
| 494 | + async def publish_and_receive(): |
| 495 | + push_task = asyncio.create_task(push_frames()) |
| 496 | + recv_count = 0 |
| 497 | + async for frame in subscription: |
| 498 | + first_byte = frame.payload[0] |
| 499 | + assert all(b == first_byte for b in frame.payload), "Payload bytes are not uniform" |
| 500 | + assert len(frame.payload) == PAYLOAD_SIZE |
| 501 | + assert frame.user_timestamp is not None |
| 502 | + latency = frame.duration_since_timestamp() |
| 503 | + assert latency is not None and latency < 5.0, ( |
| 504 | + f"Timestamp latency too high or missing: {latency}" |
| 505 | + ) |
| 506 | + recv_count += 1 |
| 507 | + await push_task |
| 508 | + return recv_count |
| 509 | + |
| 510 | + recv_count = await asyncio.wait_for(publish_and_receive(), timeout=10.0) |
| 511 | + assert recv_count > 0, "No frames were received" |
| 512 | + |
| 513 | + finally: |
| 514 | + await publisher_room.disconnect() |
| 515 | + await subscriber_room.disconnect() |
0 commit comments