Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 251 additions & 0 deletions aws-proxy/tests/proxy/test_kinesis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
# Note/disclosure: This file has been (partially or fully) generated by an AI agent.
import boto3
import pytest
from botocore.exceptions import ClientError
from localstack.aws.connect import connect_to
from localstack.utils.strings import short_uid
from localstack.utils.sync import retry

from aws_proxy.shared.models import ProxyConfig


def test_kinesis_requests(start_aws_proxy, cleanups):
stream_name_aws = f"test-stream-aws-{short_uid()}"
stream_name_local = f"test-stream-local-{short_uid()}"

# start proxy - only forwarding requests for stream name matching `test-stream-aws-*`
config = ProxyConfig(services={"kinesis": {"resources": f".*:{stream_name_aws}"}})
start_aws_proxy(config)

# create clients
region_name = "us-east-1"
kinesis_client = connect_to(region_name=region_name).kinesis
kinesis_client_aws = boto3.client("kinesis", region_name=region_name)

# create stream in AWS
kinesis_client_aws.create_stream(StreamName=stream_name_aws, ShardCount=1)
cleanups.append(
lambda: kinesis_client_aws.delete_stream(
StreamName=stream_name_aws, EnforceConsumerDeletion=True
)
)

# wait for stream to become active
def _wait_for_stream_active():
response = kinesis_client_aws.describe_stream(StreamName=stream_name_aws)
if response["StreamDescription"]["StreamStatus"] != "ACTIVE":
raise AssertionError("Stream not active yet")

retry(_wait_for_stream_active, retries=30, sleep=2)

# assert that local call for this stream is proxied
stream_local = kinesis_client.describe_stream(StreamName=stream_name_aws)
stream_aws = kinesis_client_aws.describe_stream(StreamName=stream_name_aws)
assert (
stream_local["StreamDescription"]["StreamName"]
== stream_aws["StreamDescription"]["StreamName"]
)
assert (
stream_local["StreamDescription"]["StreamARN"]
== stream_aws["StreamDescription"]["StreamARN"]
)

# verify that requesting a non-existent stream with LocalStack client
# does not create it in AWS (negative test)
with pytest.raises(ClientError) as ctx:
kinesis_client_aws.describe_stream(StreamName=stream_name_local)
assert ctx.value.response["Error"]["Code"] == "ResourceNotFoundException"

# put record to AWS stream, get it back locally
kinesis_client_aws.put_record(
StreamName=stream_name_aws, Data=b"test data 1", PartitionKey="partition-1"
)

# get shard iterator
shards = kinesis_client.describe_stream(StreamName=stream_name_aws)[
"StreamDescription"
]["Shards"]
shard_id = shards[0]["ShardId"]
shard_iterator_response = kinesis_client.get_shard_iterator(
StreamName=stream_name_aws,
ShardId=shard_id,
ShardIteratorType="TRIM_HORIZON",
)
shard_iterator = shard_iterator_response["ShardIterator"]

# get records
records_response = kinesis_client.get_records(ShardIterator=shard_iterator)
records = records_response["Records"]
assert len(records) == 1
assert records[0]["Data"] == b"test data 1"
assert records[0]["PartitionKey"] == "partition-1"

# put record locally, get it back with AWS client
kinesis_client.put_record(
StreamName=stream_name_aws, Data=b"test data 2", PartitionKey="partition-2"
)

# get shard iterator from AWS
shard_iterator_response_aws = kinesis_client_aws.get_shard_iterator(
StreamName=stream_name_aws,
ShardId=shard_id,
ShardIteratorType="TRIM_HORIZON",
)
shard_iterator_aws = shard_iterator_response_aws["ShardIterator"]

# get all records from AWS
records_response_aws = kinesis_client_aws.get_records(
ShardIterator=shard_iterator_aws
)
records_aws = records_response_aws["Records"]
assert len(records_aws) == 2 # both records should be present
assert records_aws[0]["Data"] == b"test data 1"
assert records_aws[1]["Data"] == b"test data 2"

# test list_streams - should include proxied stream
streams_local = kinesis_client.list_streams()["StreamNames"]
assert stream_name_aws in streams_local

streams_aws = kinesis_client_aws.list_streams()["StreamNames"]
assert stream_name_aws in streams_aws


def test_kinesis_readonly_operations(start_aws_proxy, cleanups):
stream_name = f"test-readonly-stream-{short_uid()}"

# start proxy - forwarding requests for Kinesis in read-only mode
config = ProxyConfig(
services={"kinesis": {"resources": [f".*:{stream_name}"], "read_only": True}}
)
start_aws_proxy(config)

# create clients
kinesis_client = connect_to().kinesis
kinesis_client_aws = boto3.client("kinesis")

# create stream in AWS (this should succeed as it's direct AWS client)
kinesis_client_aws.create_stream(StreamName=stream_name, ShardCount=1)
cleanups.append(
lambda: kinesis_client_aws.delete_stream(
StreamName=stream_name, EnforceConsumerDeletion=True
)
)

# wait for stream to become active
def _wait_for_stream_active():
response = kinesis_client_aws.describe_stream(StreamName=stream_name)
if response["StreamDescription"]["StreamStatus"] != "ACTIVE":
raise AssertionError("Stream not active yet")

retry(_wait_for_stream_active, retries=30, sleep=2)

# assert that local call for describe_stream is proxied and results are consistent
stream_local = kinesis_client.describe_stream(StreamName=stream_name)
stream_aws = kinesis_client_aws.describe_stream(StreamName=stream_name)
assert (
stream_local["StreamDescription"]["StreamName"]
== stream_aws["StreamDescription"]["StreamName"]
)
assert (
stream_local["StreamDescription"]["StreamARN"]
== stream_aws["StreamDescription"]["StreamARN"]
)

# assert that local call for list_streams is proxied
streams_local = kinesis_client.list_streams()["StreamNames"]
streams_aws = kinesis_client_aws.list_streams()["StreamNames"]
assert stream_name in streams_local
assert stream_name in streams_aws

# Put record to AWS stream using direct AWS client
kinesis_client_aws.put_record(
StreamName=stream_name, Data=b"test data aws", PartitionKey="partition-1"
)

# Get shard iterator and verify data can be read through proxy
shards = kinesis_client.describe_stream(StreamName=stream_name)[
"StreamDescription"
]["Shards"]
shard_id = shards[0]["ShardId"]
shard_iterator_response = kinesis_client.get_shard_iterator(
StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON"
)
shard_iterator = shard_iterator_response["ShardIterator"]

# Get records - this should work in read-only mode
records_response = kinesis_client.get_records(ShardIterator=shard_iterator)
records = records_response["Records"]
assert len(records) == 1
assert records[0]["Data"] == b"test data aws"

# Attempt to put record using the proxied client in read-only mode
# This should fail because LocalStack doesn't have the stream (it's in AWS)
with pytest.raises(ClientError) as excinfo:
kinesis_client.put_record(
StreamName=stream_name, Data=b"should not reach AWS", PartitionKey="p1"
)
assert excinfo.value.response["Error"]["Code"] == "ResourceNotFoundException"


def test_kinesis_resource_name_matching(start_aws_proxy, cleanups):
stream_name_match = f"proxy-stream-{short_uid()}"
stream_name_nomatch = f"local-stream-{short_uid()}"

# start proxy - only forwarding requests for streams starting with "proxy-"
config = ProxyConfig(services={"kinesis": {"resources": ".*:proxy-.*"}})
start_aws_proxy(config)

# create clients
kinesis_client = connect_to().kinesis
kinesis_client_aws = boto3.client("kinesis")

# create stream in AWS that matches the pattern
kinesis_client_aws.create_stream(StreamName=stream_name_match, ShardCount=1)
cleanups.append(
lambda: kinesis_client_aws.delete_stream(
StreamName=stream_name_match, EnforceConsumerDeletion=True
)
)

# wait for AWS stream to become active
def _wait_for_aws_stream_active():
response = kinesis_client_aws.describe_stream(StreamName=stream_name_match)
if response["StreamDescription"]["StreamStatus"] != "ACTIVE":
raise AssertionError("AWS stream not active yet")

retry(_wait_for_aws_stream_active, retries=30, sleep=2)

# assert that the matching stream is proxied
stream_local = kinesis_client.describe_stream(StreamName=stream_name_match)
stream_aws = kinesis_client_aws.describe_stream(StreamName=stream_name_match)
assert (
stream_local["StreamDescription"]["StreamARN"]
== stream_aws["StreamDescription"]["StreamARN"]
)

# verify that a stream name that doesn't match the pattern and doesn't exist
# is not found in AWS
with pytest.raises(ClientError) as ctx:
kinesis_client_aws.describe_stream(StreamName=stream_name_nomatch)
assert ctx.value.response["Error"]["Code"] == "ResourceNotFoundException"

# Put and get records through the proxied stream
kinesis_client.put_record(
StreamName=stream_name_match, Data=b"test data", PartitionKey="partition-1"
)

# Get shard iterator
shards = kinesis_client_aws.describe_stream(StreamName=stream_name_match)[
"StreamDescription"
]["Shards"]
shard_id = shards[0]["ShardId"]
shard_iterator_response = kinesis_client_aws.get_shard_iterator(
StreamName=stream_name_match, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON"
)
shard_iterator = shard_iterator_response["ShardIterator"]

# Get records from AWS - should see the record we put through LocalStack
records_response = kinesis_client_aws.get_records(ShardIterator=shard_iterator)
records = records_response["Records"]
assert len(records) == 1
assert records[0]["Data"] == b"test data"