-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathtest_kinesis.py
More file actions
251 lines (208 loc) · 9.82 KB
/
test_kinesis.py
File metadata and controls
251 lines (208 loc) · 9.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# Note/disclosure: This file has been (partially or fully) generated by an AI agent.
import boto3
import pytest
from botocore.exceptions import ClientError
from localstack.aws.connect import connect_to
from localstack.utils.strings import short_uid
from localstack.utils.sync import retry
from aws_proxy.shared.models import ProxyConfig
def test_kinesis_requests(start_aws_proxy, cleanups):
stream_name_aws = f"test-stream-aws-{short_uid()}"
stream_name_local = f"test-stream-local-{short_uid()}"
# start proxy - only forwarding requests for stream name matching `test-stream-aws-*`
config = ProxyConfig(services={"kinesis": {"resources": f".*:{stream_name_aws}"}})
start_aws_proxy(config)
# create clients
region_name = "us-east-1"
kinesis_client = connect_to(region_name=region_name).kinesis
kinesis_client_aws = boto3.client("kinesis", region_name=region_name)
# create stream in AWS
kinesis_client_aws.create_stream(StreamName=stream_name_aws, ShardCount=1)
cleanups.append(
lambda: kinesis_client_aws.delete_stream(
StreamName=stream_name_aws, EnforceConsumerDeletion=True
)
)
# wait for stream to become active
def _wait_for_stream_active():
response = kinesis_client_aws.describe_stream(StreamName=stream_name_aws)
if response["StreamDescription"]["StreamStatus"] != "ACTIVE":
raise AssertionError("Stream not active yet")
retry(_wait_for_stream_active, retries=30, sleep=2)
# assert that local call for this stream is proxied
stream_local = kinesis_client.describe_stream(StreamName=stream_name_aws)
stream_aws = kinesis_client_aws.describe_stream(StreamName=stream_name_aws)
assert (
stream_local["StreamDescription"]["StreamName"]
== stream_aws["StreamDescription"]["StreamName"]
)
assert (
stream_local["StreamDescription"]["StreamARN"]
== stream_aws["StreamDescription"]["StreamARN"]
)
# verify that requesting a non-existent stream with LocalStack client
# does not create it in AWS (negative test)
with pytest.raises(ClientError) as ctx:
kinesis_client_aws.describe_stream(StreamName=stream_name_local)
assert ctx.value.response["Error"]["Code"] == "ResourceNotFoundException"
# put record to AWS stream, get it back locally
kinesis_client_aws.put_record(
StreamName=stream_name_aws, Data=b"test data 1", PartitionKey="partition-1"
)
# get shard iterator
shards = kinesis_client.describe_stream(StreamName=stream_name_aws)[
"StreamDescription"
]["Shards"]
shard_id = shards[0]["ShardId"]
shard_iterator_response = kinesis_client.get_shard_iterator(
StreamName=stream_name_aws,
ShardId=shard_id,
ShardIteratorType="TRIM_HORIZON",
)
shard_iterator = shard_iterator_response["ShardIterator"]
# get records
records_response = kinesis_client.get_records(ShardIterator=shard_iterator)
records = records_response["Records"]
assert len(records) == 1
assert records[0]["Data"] == b"test data 1"
assert records[0]["PartitionKey"] == "partition-1"
# put record locally, get it back with AWS client
kinesis_client.put_record(
StreamName=stream_name_aws, Data=b"test data 2", PartitionKey="partition-2"
)
# get shard iterator from AWS
shard_iterator_response_aws = kinesis_client_aws.get_shard_iterator(
StreamName=stream_name_aws,
ShardId=shard_id,
ShardIteratorType="TRIM_HORIZON",
)
shard_iterator_aws = shard_iterator_response_aws["ShardIterator"]
# get all records from AWS
records_response_aws = kinesis_client_aws.get_records(
ShardIterator=shard_iterator_aws
)
records_aws = records_response_aws["Records"]
assert len(records_aws) == 2 # both records should be present
assert records_aws[0]["Data"] == b"test data 1"
assert records_aws[1]["Data"] == b"test data 2"
# test list_streams - should include proxied stream
streams_local = kinesis_client.list_streams()["StreamNames"]
assert stream_name_aws in streams_local
streams_aws = kinesis_client_aws.list_streams()["StreamNames"]
assert stream_name_aws in streams_aws
def test_kinesis_readonly_operations(start_aws_proxy, cleanups):
stream_name = f"test-readonly-stream-{short_uid()}"
# start proxy - forwarding requests for Kinesis in read-only mode
config = ProxyConfig(
services={"kinesis": {"resources": [f".*:{stream_name}"], "read_only": True}}
)
start_aws_proxy(config)
# create clients
kinesis_client = connect_to().kinesis
kinesis_client_aws = boto3.client("kinesis")
# create stream in AWS (this should succeed as it's direct AWS client)
kinesis_client_aws.create_stream(StreamName=stream_name, ShardCount=1)
cleanups.append(
lambda: kinesis_client_aws.delete_stream(
StreamName=stream_name, EnforceConsumerDeletion=True
)
)
# wait for stream to become active
def _wait_for_stream_active():
response = kinesis_client_aws.describe_stream(StreamName=stream_name)
if response["StreamDescription"]["StreamStatus"] != "ACTIVE":
raise AssertionError("Stream not active yet")
retry(_wait_for_stream_active, retries=30, sleep=2)
# assert that local call for describe_stream is proxied and results are consistent
stream_local = kinesis_client.describe_stream(StreamName=stream_name)
stream_aws = kinesis_client_aws.describe_stream(StreamName=stream_name)
assert (
stream_local["StreamDescription"]["StreamName"]
== stream_aws["StreamDescription"]["StreamName"]
)
assert (
stream_local["StreamDescription"]["StreamARN"]
== stream_aws["StreamDescription"]["StreamARN"]
)
# assert that local call for list_streams is proxied
streams_local = kinesis_client.list_streams()["StreamNames"]
streams_aws = kinesis_client_aws.list_streams()["StreamNames"]
assert stream_name in streams_local
assert stream_name in streams_aws
# Put record to AWS stream using direct AWS client
kinesis_client_aws.put_record(
StreamName=stream_name, Data=b"test data aws", PartitionKey="partition-1"
)
# Get shard iterator and verify data can be read through proxy
shards = kinesis_client.describe_stream(StreamName=stream_name)[
"StreamDescription"
]["Shards"]
shard_id = shards[0]["ShardId"]
shard_iterator_response = kinesis_client.get_shard_iterator(
StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON"
)
shard_iterator = shard_iterator_response["ShardIterator"]
# Get records - this should work in read-only mode
records_response = kinesis_client.get_records(ShardIterator=shard_iterator)
records = records_response["Records"]
assert len(records) == 1
assert records[0]["Data"] == b"test data aws"
# Attempt to put record using the proxied client in read-only mode
# This should fail because LocalStack doesn't have the stream (it's in AWS)
with pytest.raises(ClientError) as excinfo:
kinesis_client.put_record(
StreamName=stream_name, Data=b"should not reach AWS", PartitionKey="p1"
)
assert excinfo.value.response["Error"]["Code"] == "ResourceNotFoundException"
def test_kinesis_resource_name_matching(start_aws_proxy, cleanups):
stream_name_match = f"proxy-stream-{short_uid()}"
stream_name_nomatch = f"local-stream-{short_uid()}"
# start proxy - only forwarding requests for streams starting with "proxy-"
config = ProxyConfig(services={"kinesis": {"resources": ".*:proxy-.*"}})
start_aws_proxy(config)
# create clients
kinesis_client = connect_to().kinesis
kinesis_client_aws = boto3.client("kinesis")
# create stream in AWS that matches the pattern
kinesis_client_aws.create_stream(StreamName=stream_name_match, ShardCount=1)
cleanups.append(
lambda: kinesis_client_aws.delete_stream(
StreamName=stream_name_match, EnforceConsumerDeletion=True
)
)
# wait for AWS stream to become active
def _wait_for_aws_stream_active():
response = kinesis_client_aws.describe_stream(StreamName=stream_name_match)
if response["StreamDescription"]["StreamStatus"] != "ACTIVE":
raise AssertionError("AWS stream not active yet")
retry(_wait_for_aws_stream_active, retries=30, sleep=2)
# assert that the matching stream is proxied
stream_local = kinesis_client.describe_stream(StreamName=stream_name_match)
stream_aws = kinesis_client_aws.describe_stream(StreamName=stream_name_match)
assert (
stream_local["StreamDescription"]["StreamARN"]
== stream_aws["StreamDescription"]["StreamARN"]
)
# verify that a stream name that doesn't match the pattern and doesn't exist
# is not found in AWS
with pytest.raises(ClientError) as ctx:
kinesis_client_aws.describe_stream(StreamName=stream_name_nomatch)
assert ctx.value.response["Error"]["Code"] == "ResourceNotFoundException"
# Put and get records through the proxied stream
kinesis_client.put_record(
StreamName=stream_name_match, Data=b"test data", PartitionKey="partition-1"
)
# Get shard iterator
shards = kinesis_client_aws.describe_stream(StreamName=stream_name_match)[
"StreamDescription"
]["Shards"]
shard_id = shards[0]["ShardId"]
shard_iterator_response = kinesis_client_aws.get_shard_iterator(
StreamName=stream_name_match, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON"
)
shard_iterator = shard_iterator_response["ShardIterator"]
# Get records from AWS - should see the record we put through LocalStack
records_response = kinesis_client_aws.get_records(ShardIterator=shard_iterator)
records = records_response["Records"]
assert len(records) == 1
assert records[0]["Data"] == b"test data"