Ref. www.arundhaj.com/blog/getting-started-kinesis-python.html
Amazon Kinesis is a fully managed stream hosted on AWS. It is used to collect and process large streams of data in real time. Along with Kinesis Analytics, Kinesis Firehose, AWS Lambda, AWS S3, AWS EMR you can build a robust distributed application to power your real-time monitoring dashboards, do massive scale batch analytics, etc.
First create a Kinesis stream using the following aws-cli command
> aws kinesis create-stream --stream-name python-stream --shard-count 1
The following code, say kinesis_producer.py will put records to the stream continuosly every 5 seconds
import boto3
import json
from datetime import datetime
import calendar
import random
import time
my_stream_name = 'python-stream'
kinesis_client = boto3.client('kinesis', region_name='us-east-1')
def put_to_stream(thing_id, property_value, property_timestamp):
payload = {
'prop': str(property_value),
'timestamp': str(property_timestamp),
'thing_id': thing_id
}
print payload
put_response = kinesis_client.put_record(
StreamName=my_stream_name,
Data=json.dumps(payload),
PartitionKey=thing_id)
while True:
property_value = random.randint(40, 120)
property_timestamp = calendar.timegm(datetime.utcnow().timetuple())
thing_id = 'aa-bb'
put_to_stream(thing_id, property_value, property_timestamp)
# wait for 5 second
time.sleep(5)
Start consuming with kinesis_consumer.py as shown below. The following consumer will start consuming the data as the producer puts to the stream.
import boto3
import json
from datetime import datetime
import time
my_stream_name = 'python-stream'
kinesis_client = boto3.client('kinesis', region_name='us-east-1')
response = kinesis_client.describe_stream(StreamName=my_stream_name)
my_shard_id = response['StreamDescription']['Shards'][0]['ShardId']
shard_iterator = kinesis_client.get_shard_iterator(StreamName=my_stream_name,
ShardId=my_shard_id,
ShardIteratorType='LATEST')
my_shard_iterator = shard_iterator['ShardIterator']
record_response = kinesis_client.get_records(ShardIterator=my_shard_iterator,
Limit=2)
while 'NextShardIterator' in record_response:
record_response = kinesis_client.get_records(ShardIterator=record_response['NextShardIterator'],
Limit=2)
print record_response
# wait for 5 seconds
time.sleep(5)
Assuming you have the credentials appropriately configured.
Hope this helps!
'Industry 4.0 > Python' 카테고리의 다른 글
The right and wrong way to set Python 3 as default on a Mac (0) | 2021.03.31 |
---|---|
zsh: no matches found: requests[security] (0) | 2021.03.31 |
if…elif…else in Python (0) | 2021.03.31 |
Python에서 가장 큰 값 (0) | 2015.06.19 |
Python에서 main() 함수 구현 (0) | 2015.06.17 |
댓글