最简单使用实例
1. 消费端from kafka import KafkaConsumer
consumer = KafkaConsumer('my_topic', group_id='group2', bootstrap_servers=['localhost:9092'])
for msg in consumer:
print(msg)
第 1 个参数为 topic 的名称
group_id: 指定此消费者实例属于的组名,可以不指定
bootstrap_servers:指定 kafka 服务器
2. 生产端from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
future = producer.send('my_topic', key=b'my_key', value=b'my_value', partition=0)
result = future.get(timeout=10)
print(result)
producer.send 函数为发送消息
第 1 个参数为 topic 名称,必须指定
key:键,必须是字节字符串,可以不指定(但 key 和 value 必须指定 1 个),默认为 None
value:值,必须是字节字符串,可以不指定(但 key 和 value 必须指定 1 个),默认为 None
partition:指定发送的 partition,由于 kafka 默认配置 1 个 partition,固为 0
future.get 函数等待单条消息发送完成或超时,经测试,必须有这个函数,不然发送不出去,或用 time.sleep 代替
3. 发送或接收消息解析
消费者端接收消息如下:ConsumerRecord(topic='my_topic', partition=0, offset=4, timestamp=1529569531392, timestamp_type=0, key=b'my_value', value=None, checksum=None, serialized_key_size=8, serialized_value_size=-1)
topic
partition
offset:这条消息的偏移量
timestamp:时间戳
timestamp_type:时间戳类型
key:key 值,字节类型
value:value 值,字节类型
checksum:消息的校验和
serialized_key_size: 序列化 key 的大小
serialized_value_size:序列化 value 的大小,可以看到 value=None 时,大小为 -1
KafkaConsumer
手动分配 partitionfrom kafka import KafkaConsumer
from kafka import TopicPartition
consumer = KafkaConsumer(group_id='group2', bootstrap_servers=['localhost:9092'])
consumer.assign([TopicPartition(topic='my_topic', partition=0)])
for msg in consumer:
print(msg)
超时处理from kafka import KafkaConsumer
consumer = KafkaConsumer('my_topic', group_id='group2', bootstrap_servers=['localhost:9092'], consumer_timeout_ms=1000)
for msg in consumer:
print(msg)
若不指定 consumer_timeout_ms,默认一直循环等待接收,若指定,则超时返回,不再等待
consumer_timeout_ms:毫秒数
订阅多个 topicfrom kafka import KafkaConsumer
consumer = KafkaConsumer(group_id='group2', bootstrap_servers=['localhost:9092'])
consumer.subscribe(topics=['my_topic', 'topic_1'])
for msg in consumer:
print(msg)
可同时接收多个 topic 消息
也可用正则订阅一类 topicfrom kafka import KafkaConsumer
import json
consumer = KafkaConsumer(group_id='group2', bootstrap_servers=['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('ascii')))
consumer.subscribe(pattern='^my.*')
for msg in consumer:
print(msg)
解码 json 数据
编码 (生产者): value_serializer
解码 (消费者): value_deserializer
1. 先看 producer 发送的 json 数据from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serialzerlambda m: json.dumps(m).encode('ascii'))
future = producer.send('my_topic', value={'value_1': 'value_2'}, partition=0)
future.get(timeout=10)
2. consumer 没有解码收到的数据ConsumerRecord(topic='my_topic', partition=0, offset=22, timestamp=1529575016310, timestamp_type=0, key=None, value=b'{"value_1": "value_2"}', checksum=None, serialized_key_size=-1, serialized_value_size=22)
可以看到 value 为原始的 json 字节数据,接下来可以再做一步解码操作
3. consumer 自动解码from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('ascii')))
consumer.subscribe(topics= ['my_topic', 'topic_1'])
for msg in consumer:
print(msg)
接收结果:ConsumerRecord(topic='my_topic', partition=0, offset=23, timestamp=1529575235994, timestamp_type=0, key=None, value={'value_1': 'value_2'}, checksum=None, serialized_key_size=-1, serialized_value_size=22)
可以看到接收结果中,value 已经自动解码,并为字符串类型
不仅 value 可以 json,key 也可以,只需指定 key_deserializer
KafkaProducer
发送字符串类型的 key 和 valuefrom kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], key_serializer=str.encode, value_serializer=str.encode)
future = producer.send('my_topic', key='key_3', value='value_3', partition=0)
future.get(timeout=10)
指定 key_serializer 和 value_serializer 为 str.encode, 但消费者收到的还是字节字符串
若想要消费者收到的为字符串类型,就需要解码操作,key_deserializer=bytes.decodefrom kafka import KafkaConsumer
consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'], key_deserializer= bytes.decode, value_deserializer= bytes.decode)
consumer.subscribe(pattern= '^my.*')
for msg in consumer:
print(msg)
可压缩消息发送compression_type='gzip'
若消息过大,还可压缩消息发送,可选值为 ‘gzip’, ‘snappy’, ‘lz4’, or Nonefrom kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], compression_type='gzip')
future = producer.send('my_topic', key=b'key_3', value=b'value_3', partition=0)
future.get(timeout=10)
发送 msgpack 消息
msgpack 为 MessagePack 的简称,是高效二进制序列化类库,比 json 高效producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})
Reference
https://zhuanlan.zhihu.com/p/38330574