python使用kafka原理详解真实完整版_kafka-python 基本使用

戚俊美
2023-12-01

最简单使用实例

1. 消费端from kafka import KafkaConsumer

consumer = KafkaConsumer('my_topic', group_id='group2', bootstrap_servers=['localhost:9092'])

for msg in consumer:

print(msg)

第 1 个参数为 topic 的名称

group_id: 指定此消费者实例属于的组名,可以不指定

bootstrap_servers:指定 kafka 服务器

2. 生产端from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

future = producer.send('my_topic', key=b'my_key', value=b'my_value', partition=0)

result = future.get(timeout=10)

print(result)

producer.send 函数为发送消息

第 1 个参数为 topic 名称,必须指定

key:键,必须是字节字符串,可以不指定(但 key 和 value 必须指定 1 个),默认为 None

value:值,必须是字节字符串,可以不指定(但 key 和 value 必须指定 1 个),默认为 None

partition:指定发送的 partition,由于 kafka 默认配置 1 个 partition,固为 0

future.get 函数等待单条消息发送完成或超时,经测试,必须有这个函数,不然发送不出去,或用 time.sleep 代替

3. 发送或接收消息解析

消费者端接收消息如下:ConsumerRecord(topic='my_topic', partition=0, offset=4, timestamp=1529569531392, timestamp_type=0, key=b'my_value', value=None, checksum=None, serialized_key_size=8, serialized_value_size=-1)

topic

partition

offset:这条消息的偏移量

timestamp:时间戳

timestamp_type:时间戳类型

key:key 值,字节类型

value:value 值,字节类型

checksum:消息的校验和

serialized_key_size: 序列化 key 的大小

serialized_value_size:序列化 value 的大小,可以看到 value=None 时,大小为 -1

KafkaConsumer

手动分配 partitionfrom kafka import KafkaConsumer

from kafka import TopicPartition

consumer = KafkaConsumer(group_id='group2', bootstrap_servers=['localhost:9092'])

consumer.assign([TopicPartition(topic='my_topic', partition=0)])

for msg in consumer:

print(msg)

超时处理from kafka import KafkaConsumer

consumer = KafkaConsumer('my_topic', group_id='group2', bootstrap_servers=['localhost:9092'], consumer_timeout_ms=1000)

for msg in consumer:

print(msg)

若不指定 consumer_timeout_ms,默认一直循环等待接收,若指定,则超时返回,不再等待

consumer_timeout_ms:毫秒数

订阅多个 topicfrom kafka import KafkaConsumer

consumer = KafkaConsumer(group_id='group2', bootstrap_servers=['localhost:9092'])

consumer.subscribe(topics=['my_topic', 'topic_1'])

for msg in consumer:

print(msg)

可同时接收多个 topic 消息

也可用正则订阅一类 topicfrom kafka import KafkaConsumer

import json

consumer = KafkaConsumer(group_id='group2', bootstrap_servers=['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('ascii')))

consumer.subscribe(pattern='^my.*')

for msg in consumer:

print(msg)

解码 json 数据

编码 (生产者): value_serializer

解码 (消费者): value_deserializer

1. 先看 producer 发送的 json 数据from kafka import KafkaProducer

import json

producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serialzerlambda m: json.dumps(m).encode('ascii'))

future = producer.send('my_topic', value={'value_1': 'value_2'}, partition=0)

future.get(timeout=10)

2. consumer 没有解码收到的数据ConsumerRecord(topic='my_topic', partition=0, offset=22, timestamp=1529575016310, timestamp_type=0, key=None, value=b'{"value_1": "value_2"}', checksum=None, serialized_key_size=-1, serialized_value_size=22)

可以看到 value 为原始的 json 字节数据,接下来可以再做一步解码操作

3. consumer 自动解码from kafka import KafkaConsumer

import json

consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('ascii')))

consumer.subscribe(topics= ['my_topic', 'topic_1'])

for msg in consumer:

print(msg)

接收结果:ConsumerRecord(topic='my_topic', partition=0, offset=23, timestamp=1529575235994, timestamp_type=0, key=None, value={'value_1': 'value_2'}, checksum=None, serialized_key_size=-1, serialized_value_size=22)

可以看到接收结果中,value 已经自动解码,并为字符串类型

不仅 value 可以 json,key 也可以,只需指定 key_deserializer

KafkaProducer

发送字符串类型的 key 和 valuefrom kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'], key_serializer=str.encode, value_serializer=str.encode)

future = producer.send('my_topic', key='key_3', value='value_3', partition=0)

future.get(timeout=10)

指定 key_serializer 和 value_serializer 为 str.encode, 但消费者收到的还是字节字符串

若想要消费者收到的为字符串类型,就需要解码操作,key_deserializer=bytes.decodefrom kafka import KafkaConsumer

consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'], key_deserializer= bytes.decode, value_deserializer= bytes.decode)

consumer.subscribe(pattern= '^my.*')

for msg in consumer:

print(msg)

可压缩消息发送compression_type='gzip'

若消息过大,还可压缩消息发送,可选值为 ‘gzip’, ‘snappy’, ‘lz4’, or Nonefrom kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'], compression_type='gzip')

future = producer.send('my_topic', key=b'key_3', value=b'value_3', partition=0)

future.get(timeout=10)

发送 msgpack 消息

msgpack 为 MessagePack 的简称,是高效二进制序列化类库,比 json 高效producer = KafkaProducer(value_serializer=msgpack.dumps)

producer.send('msgpack-topic', {'key': 'value'})

Reference

https://zhuanlan.zhihu.com/p/38330574

 类似资料: