diff --git a/aws-proxy/tests/proxy/test_kinesis.py b/aws-proxy/tests/proxy/test_kinesis.py new file mode 100644 index 0000000..acc46fe --- /dev/null +++ b/aws-proxy/tests/proxy/test_kinesis.py @@ -0,0 +1,251 @@ +# Note/disclosure: This file has been (partially or fully) generated by an AI agent. +import boto3 +import pytest +from botocore.exceptions import ClientError +from localstack.aws.connect import connect_to +from localstack.utils.strings import short_uid +from localstack.utils.sync import retry + +from aws_proxy.shared.models import ProxyConfig + + +def test_kinesis_requests(start_aws_proxy, cleanups): + stream_name_aws = f"test-stream-aws-{short_uid()}" + stream_name_local = f"test-stream-local-{short_uid()}" + + # start proxy - only forwarding requests for stream name matching `test-stream-aws-*` + config = ProxyConfig(services={"kinesis": {"resources": f".*:{stream_name_aws}"}}) + start_aws_proxy(config) + + # create clients + region_name = "us-east-1" + kinesis_client = connect_to(region_name=region_name).kinesis + kinesis_client_aws = boto3.client("kinesis", region_name=region_name) + + # create stream in AWS + kinesis_client_aws.create_stream(StreamName=stream_name_aws, ShardCount=1) + cleanups.append( + lambda: kinesis_client_aws.delete_stream( + StreamName=stream_name_aws, EnforceConsumerDeletion=True + ) + ) + + # wait for stream to become active + def _wait_for_stream_active(): + response = kinesis_client_aws.describe_stream(StreamName=stream_name_aws) + if response["StreamDescription"]["StreamStatus"] != "ACTIVE": + raise AssertionError("Stream not active yet") + + retry(_wait_for_stream_active, retries=30, sleep=2) + + # assert that local call for this stream is proxied + stream_local = kinesis_client.describe_stream(StreamName=stream_name_aws) + stream_aws = kinesis_client_aws.describe_stream(StreamName=stream_name_aws) + assert ( + stream_local["StreamDescription"]["StreamName"] + == stream_aws["StreamDescription"]["StreamName"] + ) + assert ( + stream_local["StreamDescription"]["StreamARN"] + == stream_aws["StreamDescription"]["StreamARN"] + ) + + # verify that requesting a non-existent stream with LocalStack client + # does not create it in AWS (negative test) + with pytest.raises(ClientError) as ctx: + kinesis_client_aws.describe_stream(StreamName=stream_name_local) + assert ctx.value.response["Error"]["Code"] == "ResourceNotFoundException" + + # put record to AWS stream, get it back locally + kinesis_client_aws.put_record( + StreamName=stream_name_aws, Data=b"test data 1", PartitionKey="partition-1" + ) + + # get shard iterator + shards = kinesis_client.describe_stream(StreamName=stream_name_aws)[ + "StreamDescription" + ]["Shards"] + shard_id = shards[0]["ShardId"] + shard_iterator_response = kinesis_client.get_shard_iterator( + StreamName=stream_name_aws, + ShardId=shard_id, + ShardIteratorType="TRIM_HORIZON", + ) + shard_iterator = shard_iterator_response["ShardIterator"] + + # get records + records_response = kinesis_client.get_records(ShardIterator=shard_iterator) + records = records_response["Records"] + assert len(records) == 1 + assert records[0]["Data"] == b"test data 1" + assert records[0]["PartitionKey"] == "partition-1" + + # put record locally, get it back with AWS client + kinesis_client.put_record( + StreamName=stream_name_aws, Data=b"test data 2", PartitionKey="partition-2" + ) + + # get shard iterator from AWS + shard_iterator_response_aws = kinesis_client_aws.get_shard_iterator( + StreamName=stream_name_aws, + ShardId=shard_id, + ShardIteratorType="TRIM_HORIZON", + ) + shard_iterator_aws = shard_iterator_response_aws["ShardIterator"] + + # get all records from AWS + records_response_aws = kinesis_client_aws.get_records( + ShardIterator=shard_iterator_aws + ) + records_aws = records_response_aws["Records"] + assert len(records_aws) == 2 # both records should be present + assert records_aws[0]["Data"] == b"test data 1" + assert records_aws[1]["Data"] == b"test data 2" + + # test list_streams - should include proxied stream + streams_local = kinesis_client.list_streams()["StreamNames"] + assert stream_name_aws in streams_local + + streams_aws = kinesis_client_aws.list_streams()["StreamNames"] + assert stream_name_aws in streams_aws + + +def test_kinesis_readonly_operations(start_aws_proxy, cleanups): + stream_name = f"test-readonly-stream-{short_uid()}" + + # start proxy - forwarding requests for Kinesis in read-only mode + config = ProxyConfig( + services={"kinesis": {"resources": [f".*:{stream_name}"], "read_only": True}} + ) + start_aws_proxy(config) + + # create clients + kinesis_client = connect_to().kinesis + kinesis_client_aws = boto3.client("kinesis") + + # create stream in AWS (this should succeed as it's direct AWS client) + kinesis_client_aws.create_stream(StreamName=stream_name, ShardCount=1) + cleanups.append( + lambda: kinesis_client_aws.delete_stream( + StreamName=stream_name, EnforceConsumerDeletion=True + ) + ) + + # wait for stream to become active + def _wait_for_stream_active(): + response = kinesis_client_aws.describe_stream(StreamName=stream_name) + if response["StreamDescription"]["StreamStatus"] != "ACTIVE": + raise AssertionError("Stream not active yet") + + retry(_wait_for_stream_active, retries=30, sleep=2) + + # assert that local call for describe_stream is proxied and results are consistent + stream_local = kinesis_client.describe_stream(StreamName=stream_name) + stream_aws = kinesis_client_aws.describe_stream(StreamName=stream_name) + assert ( + stream_local["StreamDescription"]["StreamName"] + == stream_aws["StreamDescription"]["StreamName"] + ) + assert ( + stream_local["StreamDescription"]["StreamARN"] + == stream_aws["StreamDescription"]["StreamARN"] + ) + + # assert that local call for list_streams is proxied + streams_local = kinesis_client.list_streams()["StreamNames"] + streams_aws = kinesis_client_aws.list_streams()["StreamNames"] + assert stream_name in streams_local + assert stream_name in streams_aws + + # Put record to AWS stream using direct AWS client + kinesis_client_aws.put_record( + StreamName=stream_name, Data=b"test data aws", PartitionKey="partition-1" + ) + + # Get shard iterator and verify data can be read through proxy + shards = kinesis_client.describe_stream(StreamName=stream_name)[ + "StreamDescription" + ]["Shards"] + shard_id = shards[0]["ShardId"] + shard_iterator_response = kinesis_client.get_shard_iterator( + StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON" + ) + shard_iterator = shard_iterator_response["ShardIterator"] + + # Get records - this should work in read-only mode + records_response = kinesis_client.get_records(ShardIterator=shard_iterator) + records = records_response["Records"] + assert len(records) == 1 + assert records[0]["Data"] == b"test data aws" + + # Attempt to put record using the proxied client in read-only mode + # This should fail because LocalStack doesn't have the stream (it's in AWS) + with pytest.raises(ClientError) as excinfo: + kinesis_client.put_record( + StreamName=stream_name, Data=b"should not reach AWS", PartitionKey="p1" + ) + assert excinfo.value.response["Error"]["Code"] == "ResourceNotFoundException" + + +def test_kinesis_resource_name_matching(start_aws_proxy, cleanups): + stream_name_match = f"proxy-stream-{short_uid()}" + stream_name_nomatch = f"local-stream-{short_uid()}" + + # start proxy - only forwarding requests for streams starting with "proxy-" + config = ProxyConfig(services={"kinesis": {"resources": ".*:proxy-.*"}}) + start_aws_proxy(config) + + # create clients + kinesis_client = connect_to().kinesis + kinesis_client_aws = boto3.client("kinesis") + + # create stream in AWS that matches the pattern + kinesis_client_aws.create_stream(StreamName=stream_name_match, ShardCount=1) + cleanups.append( + lambda: kinesis_client_aws.delete_stream( + StreamName=stream_name_match, EnforceConsumerDeletion=True + ) + ) + + # wait for AWS stream to become active + def _wait_for_aws_stream_active(): + response = kinesis_client_aws.describe_stream(StreamName=stream_name_match) + if response["StreamDescription"]["StreamStatus"] != "ACTIVE": + raise AssertionError("AWS stream not active yet") + + retry(_wait_for_aws_stream_active, retries=30, sleep=2) + + # assert that the matching stream is proxied + stream_local = kinesis_client.describe_stream(StreamName=stream_name_match) + stream_aws = kinesis_client_aws.describe_stream(StreamName=stream_name_match) + assert ( + stream_local["StreamDescription"]["StreamARN"] + == stream_aws["StreamDescription"]["StreamARN"] + ) + + # verify that a stream name that doesn't match the pattern and doesn't exist + # is not found in AWS + with pytest.raises(ClientError) as ctx: + kinesis_client_aws.describe_stream(StreamName=stream_name_nomatch) + assert ctx.value.response["Error"]["Code"] == "ResourceNotFoundException" + + # Put and get records through the proxied stream + kinesis_client.put_record( + StreamName=stream_name_match, Data=b"test data", PartitionKey="partition-1" + ) + + # Get shard iterator + shards = kinesis_client_aws.describe_stream(StreamName=stream_name_match)[ + "StreamDescription" + ]["Shards"] + shard_id = shards[0]["ShardId"] + shard_iterator_response = kinesis_client_aws.get_shard_iterator( + StreamName=stream_name_match, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON" + ) + shard_iterator = shard_iterator_response["ShardIterator"] + + # Get records from AWS - should see the record we put through LocalStack + records_response = kinesis_client_aws.get_records(ShardIterator=shard_iterator) + records = records_response["Records"] + assert len(records) == 1 + assert records[0]["Data"] == b"test data"