Here is a Bash script you can ue to retrieve a test message from a Kinesis Stream. The script can be modified to read from either the oldest message (TRIM_HORIZON
), the latest message (LATEST
), or go back over the last hour (AT_TIMESTAMP
). The default is LATEST
.
_57#!/bin/bash_57_57JQ_CHECK=$(which jq)_57if [ -z "$JQ_CHECK" ]; then_57 echo_57 echo "This script requires the jq JSON processor. Please install for your OS from https://stedolan.github.io/jq/download/"_57 echo_57 exit 1_57fi_57_57if [ $# -ne 1 ]; then_57 echo_57 echo "usage: $0 <stream_name>"_57 echo_57 exit 1_57fi_57_57# Set the stream name_57STREAM_NAME=$1_57_57# Choose the iterator type:_57# TRIM HORIZON is for starting at the begining of the Kinesis Stream._57# This can take a while if you have a lot of records._57# To use TRIM HORIZON, uncomment the following line:_57# TYPE=TRIM_HORIZON_57_57# AT_TIMESTAMP allows you to go back to a point in time. This is set for going back one hour_57# To use AT_TIMESTAMP, uncomment the following two lines:_57# TIMESTAMP=$(($(date +%s) - 3600)).000_57# TYPE="AT_TIMESTAMP --timestamp $TIMESTAMP"_57_57# LATEST means start at the most current point in the stream and read forward_57TYPE=LATEST_57_57# Get a list of shards_57SHARDS=$(aws kinesis list-shards --stream-name $STREAM_NAME | jq -r .Shards[].ShardId)_57_57# Get all the starting points_57SHARD_ITERATOR=()_57i=0_57for shard in $SHARDS ; do_57 SHARD_ITERATOR[$i]=$(aws kinesis get-shard-iterator --shard-id $shard --shard-iterator-type $TYPE --stream-name $STREAM_NAME --query 'ShardIterator')_57 i=$((i+1))_57done_57_57# Start getting events from all shards and display them_57while [ 1 ] ; do_57 len=${#SHARD_ITERATOR[@]}_57 for (( i=0; i < $len; i++ )); do_57 DATA=$(aws kinesis get-records --limit 50 --shard-iterator ${SHARD_ITERATOR[$i]})_57 SHARD_ITERATOR[$i]=$(echo $DATA | jq -r .NextShardIterator)_57 ROWS=$(echo $DATA | jq -r .Records[].Data?)_57 for row in $ROWS; do_57 echo $row | base64 -d | jq ._57 done_57 done_57done