Kinesis
Amazon Kinesis Data Streams for real-time data streaming and processing.
Configuration
| Property | Value |
|---|---|
| Protocol | AwsJson1_1 |
| Signing Name | kinesis |
| Target Prefix | Kinesis_20131202 |
| Persistence | No |
Quick Start
Create a stream, put records, get a shard iterator, and read the records back:
# Create a stream with 2 shards
curl -s http://localhost:4566 \
-H "Content-Type: application/x-amz-json-1.1" \
-H "X-Amz-Target: Kinesis_20131202.CreateStream" \
-H "Authorization: AWS4-HMAC-SHA256 Credential=test/20260421/us-east-1/kinesis/aws4_request, SignedHeaders=host, Signature=fake" \
-d '{"StreamName":"my-stream","ShardCount":2}'
# Put a record (data must be base64-encoded)
curl -s http://localhost:4566 \
-H "Content-Type: application/x-amz-json-1.1" \
-H "X-Amz-Target: Kinesis_20131202.PutRecord" \
-H "Authorization: AWS4-HMAC-SHA256 Credential=test/20260421/us-east-1/kinesis/aws4_request, SignedHeaders=host, Signature=fake" \
-d '{"StreamName":"my-stream","PartitionKey":"user-123","Data":"SGVsbG8gV29ybGQ="}'
# Get a shard iterator (TRIM_HORIZON = from beginning)
ITERATOR=$(curl -s http://localhost:4566 \
-H "Content-Type: application/x-amz-json-1.1" \
-H "X-Amz-Target: Kinesis_20131202.GetShardIterator" \
-H "Authorization: AWS4-HMAC-SHA256 Credential=test/20260421/us-east-1/kinesis/aws4_request, SignedHeaders=host, Signature=fake" \
-d '{"StreamName":"my-stream","ShardId":"shardId-000000000000","ShardIteratorType":"TRIM_HORIZON"}' \
| jq -r '.ShardIterator')
# Read records
curl -s http://localhost:4566 \
-H "Content-Type: application/x-amz-json-1.1" \
-H "X-Amz-Target: Kinesis_20131202.GetRecords" \
-H "Authorization: AWS4-HMAC-SHA256 Credential=test/20260421/us-east-1/kinesis/aws4_request, SignedHeaders=host, Signature=fake" \
-d "{\"ShardIterator\":\"$ITERATOR\",\"Limit\":100}"Operations
Streams
CreateStream— create a new data stream- Input:
StreamName(required),ShardCount(required, number of shards 1–N) - Returns: empty response; stream becomes
ACTIVEimmediately
- Input:
DeleteStream— delete a stream and all its data- Input:
StreamName, optionalEnforceConsumerDeletion
- Input:
DescribeStream— get full stream description including shard details- Input:
StreamName, optionalLimit,ExclusiveStartShardId - Returns:
StreamDescriptionwithStreamName,StreamARN,StreamStatus(ACTIVE),Shards(list withShardId,SequenceNumberRange,HashKeyRange)
- Input:
DescribeStreamSummary— get a lightweight stream summary- Input:
StreamName - Returns:
StreamDescriptionSummary(no shard details)
- Input:
ListStreams— list all streams in the account/region- Returns:
StreamNameslist
- Returns:
ListShards— list shards in a stream with pagination- Input:
StreamName, optionalNextToken,MaxResults - Returns:
Shardslist,NextToken
- Input:
Records
PutRecord— write a single data record to a stream- Input:
StreamName,PartitionKey(determines shard via MD5 hash),Data(base64-encoded bytes) - Returns:
ShardId,SequenceNumber(monotonically increasing within a shard)
- Input:
PutRecords— write multiple records in a single batch call (up to 500)- Input:
StreamName,Records(list of{Data, PartitionKey}) - Returns:
Records(list withShardId,SequenceNumber, orErrorCode),FailedRecordCount
- Input:
GetShardIterator— get a position marker for reading records from a shard- Input:
StreamName,ShardId(e.g.,shardId-000000000000),ShardIteratorType(TRIM_HORIZON|LATEST|AT_SEQUENCE_NUMBER|AFTER_SEQUENCE_NUMBER|AT_TIMESTAMP) - Returns:
ShardIterator(opaque string, valid for 5 minutes in real AWS)
- Input:
GetRecords— read records from a shard starting at a shard iterator- Input:
ShardIterator, optionalLimit(max records per call) - Returns:
Records(list withSequenceNumber,ApproximateArrivalTimestamp,Data(base64),PartitionKey),NextShardIterator,MillisBehindLatest
- Input:
Shard Management
MergeShards— merge two adjacent shards into one- Input:
StreamName,ShardToMerge,AdjacentShardToMerge
- Input:
SplitShard— split a shard into two shards- Input:
StreamName,ShardToSplit,NewStartingHashKey(MD5 hex value at which to split)
- Input:
Tags
AddTagsToStream/RemoveTagsFromStream/ListTagsForStream— manage tags on streams
Retention
IncreaseStreamRetentionPeriod— extend the data retention period (up to 365 days)- Input:
StreamName,RetentionPeriodHours
- Input:
DecreaseStreamRetentionPeriod— reduce the data retention period (minimum 24 hours)- Input:
StreamName,RetentionPeriodHours
- Input:
Curl Examples
# 1. Put multiple records in a batch
curl -s http://localhost:4566 \
-H "Content-Type: application/x-amz-json-1.1" \
-H "X-Amz-Target: Kinesis_20131202.PutRecords" \
-H "Authorization: AWS4-HMAC-SHA256 Credential=test/20260421/us-east-1/kinesis/aws4_request, SignedHeaders=host, Signature=fake" \
-d '{"StreamName":"my-stream","Records":[{"Data":"eyJldmVudCI6ImNsaWNrIn0=","PartitionKey":"user-1"},{"Data":"eyJldmVudCI6InZpZXcifQ==","PartitionKey":"user-2"}]}'
# 2. List all streams
curl -s http://localhost:4566 \
-H "Content-Type: application/x-amz-json-1.1" \
-H "X-Amz-Target: Kinesis_20131202.ListStreams" \
-H "Authorization: AWS4-HMAC-SHA256 Credential=test/20260421/us-east-1/kinesis/aws4_request, SignedHeaders=host, Signature=fake" \
-d '{}'
# 3. Increase retention to 7 days
curl -s http://localhost:4566 \
-H "Content-Type: application/x-amz-json-1.1" \
-H "X-Amz-Target: Kinesis_20131202.IncreaseStreamRetentionPeriod" \
-H "Authorization: AWS4-HMAC-SHA256 Credential=test/20260421/us-east-1/kinesis/aws4_request, SignedHeaders=host, Signature=fake" \
-d '{"StreamName":"my-stream","RetentionPeriodHours":168}'SDK Example
import {
KinesisClient,
CreateStreamCommand,
PutRecordsCommand,
GetShardIteratorCommand,
GetRecordsCommand,
DescribeStreamCommand,
} from '@aws-sdk/client-kinesis';
const kinesis = new KinesisClient({
region: 'us-east-1',
endpoint: 'http://localhost:4566',
credentials: { accessKeyId: 'test', secretAccessKey: 'test' },
});
// Create stream
await kinesis.send(new CreateStreamCommand({
StreamName: 'events-stream',
ShardCount: 2,
}));
// Put records
const { Records, FailedRecordCount } = await kinesis.send(new PutRecordsCommand({
StreamName: 'events-stream',
Records: [
{
Data: Buffer.from(JSON.stringify({ type: 'click', userId: 'u-123' })),
PartitionKey: 'u-123',
},
{
Data: Buffer.from(JSON.stringify({ type: 'purchase', userId: 'u-456', amount: 29.99 })),
PartitionKey: 'u-456',
},
],
}));
console.log('Failed:', FailedRecordCount);
console.log('Placed on shard:', Records?.[0]?.ShardId);
// Read records back from shard 0
const { ShardIterator } = await kinesis.send(new GetShardIteratorCommand({
StreamName: 'events-stream',
ShardId: 'shardId-000000000000',
ShardIteratorType: 'TRIM_HORIZON',
}));
const { Records: readRecords } = await kinesis.send(new GetRecordsCommand({
ShardIterator: ShardIterator!,
Limit: 100,
}));
readRecords?.forEach(record => {
const data = JSON.parse(Buffer.from(record.Data!).toString());
console.log(`${record.SequenceNumber}: ${JSON.stringify(data)}`);
});Behavior Notes
- Records are stored in memory per shard. The default retention period is 24 hours but automatic eviction is not enforced in AWSim.
- Partition keys are hashed using MD5 to route records to shards — this matches real Kinesis behavior, ensuring consistent shard assignment for the same partition key.
- Shard IDs are zero-padded 12-digit integers:
shardId-000000000000,shardId-000000000001, etc. GetShardIteratorwithTRIM_HORIZONalways returns an iterator pointing to the first record in the shard.NextShardIteratorinGetRecordsresponse can be used for subsequent polling.- State is in-memory only and lost on restart.