pulsar是一个消息系统,具有多租户、高性能、低延迟等优势。
客户端支持Java、Python、Go和C++。
在Python中如何创建生产者和消费者呢?
先上代码:
import pulsar
import json
def produce_method():
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('my-topic')
for i in range(10):
print(f"Hello-{i}")
dict_message = {
"userId": "00001",
"productName": "pulsar-test"
}
mes = json.dumps(dict_message).encode('utf-8')
producer.send(mes )
client.close()
def consumer_method():
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('my-topic', 'my-subscription')
while True:
msg = consumer.receive()
try:
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
consumer.acknowledge(msg)
except:
print(f"msg is {msg}")
consumer.negative_acknowledge(msg)
client.close()
这里使用到了pulsar包,本地可以安装pulsar包,安装方法可以在网上找下,因为我使用的是部署到docker中的pulsar,所以为了保持版本一致,直接将docker中的pulsar包拷贝到了本地的site-packages目录中直接在pycharm中进行引用。
代码可以加个main方法,然后扔到docker中去测试pulsar的生产者和消费者了,本地没有搭建pulsar是不能跑的。
如果需要进行token认证则要进行如下定义
client = Client('pulsar://localhost:6650/'
authentication=AuthenticationToken('eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY'))
并确保broker.conf中配置如下
# 启用认证和鉴权
authenticationEnabled=true
authorizationEnabled=true
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
# 设置Broker 自身的认证。 Used when the broker connects to other brokers, either in same or other clusters
brokerClientTlsEnabled=true
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
brokerClientAuthenticationParameters={"token":"eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.9OHgE9ZUDeBTZs7nSMEFIuGNEX18FLR3qvy8mqxSxXw"}
# Or, alternatively, read token from file
# brokerClientAuthenticationParameters={"file":"///path/to/proxy-token.txt"}
brokerClientTrustCertsFilePath=/path/my-ca/certs/ca.cert.pem
# If this flag is set then the broker authenticates the original Auth data
# else it just accepts the originalPrincipal and authorizes it (if required).
authenticateOriginalAuthData=true
# If using secret key (Note: key files must be DER-encoded)
tokenSecretKey=file:///path/to/secret.key
# The key can also be passed inline:
# tokenSecretKey=data:;base64,FLFyW0oLJ2Fi22KKCm21J18mbAdztfSHN/lAT5ucEKU=
# If using public/private (Note: key files must be DER-encoded)
# tokenPublicKey=file:///path/to/public.key
proxy.conf中配置如下
# For clients connecting to the proxy
authenticationEnabled=true
authorizationEnabled=true
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
tokenSecretKey=file:///path/to/secret.key
# For the proxy to connect to brokers
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
brokerClientAuthenticationParameters={"token":"eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.9OHgE9ZUDeBTZs7nSMEFIuGNEX18FLR3qvy8mqxSxXw"}
# Or, alternatively, read token from file
# brokerClientAuthenticationParameters={"file":"///path/to/proxy-token.txt"}
# Whether client authorization credentials are forwared to the broker for re-authorization.
# Authentication must be enabled via authenticationEnabled=true for this to take effect.
forwardAuthorizationCredentials=true
才能保证token认证开关时打开的,否在关闭的情况下,传不传token都能收到生产的消息。