VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > Python基础教程 >
  • python中kafka生产者和消费者实现

安装kafka-python:

C:\anaconda3\Scripts>pip install kafka-python

import datetime
import json
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError

'''
使用kafka-python的生产模块
'''
class Kafka_producer():
    def __init__(self, bootstrapServers, kafkaTopic):
        self.bootstrapServers = bootstrapServers
        self.kafkaTopic = kafkaTopic
        self.producer = KafkaProducer(bootstrap_servers=self.bootstrapServers)

    def sendjsondata(self, params):
        try:
            parmas_message = json.dumps(params)
            producer = self.producer
            future = producer.send(self.kafkaTopic, parmas_message.encode('utf-8'))
            producer.flush()
            recordMetadata = future.get(timeout=10)
            print(recordMetadata, datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
        except KafkaError as e:
            print(e)

'''
使用Kafka-python的消费模块
'''
class Kafka_consumer():
    def __init__(self, bootstrapServers, kafkaTopic, groupId):
        self.kafkaTopic = kafkaTopic
        self.bootstrapServers = bootstrapServers
        self.groupId = groupId
        self.consumer = KafkaConsumer(self.kafkaTopic, group_id=self.groupId, bootstrap_servers=self.bootstrapServers)

    def consume_data(self):
        try:
            for message in self.consumer:
                yield message
        except BaseException as e:
            print(e)

if __name__ == '__main__':
    bootstrapServers = ['ip1:port1', 'ip2:port2', 'ip3:port3']
    topicStr = '主题'

    print('-' * 20)
    print('生产者')
    print('-' * 20)

    producer = Kafka_producer(bootstrapServers, topicStr)
    for id in range(5):
        params = '{tst}:{null}---' + str(id)
        producer.sendjsondata(params)

    print('-' * 20)
    print('消费者')
    print('-' * 20)

    groupId = 'group名称'
    consumer = Kafka_consumer(bootstrapServers, topicStr, groupId)
    message = consumer.consume_data()
    for i in message:
        print(i.value)

相关教程