English | 中文
这是一个用php实现的Apache Pulsar客户端库,基于PulsarApi.proto
PHP >= 7.0 (Not supported PHP8)
Protobuf Extension
因为Apache Pulsar使用proto2语法
由于Google protobuf的PHP扩展并不支持proto2语法
所以是安装allegro/php-protobuf扩展,而不是安装 protocolbuffers/protobuf
很遗憾的是,由于allegro/php-protobuf扩展库不支持php8,所以它也不能支持php8
ZLib Extension(如果你想使用zlib压缩)
composer require ikilobyte/pulsar-client-php
allegro/php-protobuf
cd /usr/local/src
git clone https://github.com/allegro/php-protobuf
cd php-protobuf
phpize
./configure --with-php-config=$(which php-config)
make && make install
# add to php.ini
[protobuf]
extension=protobuf.so
<?php
use Pulsar\Authentication\Jwt;
use Pulsar\Compression\Compression;
use Pulsar\Producer;
use Pulsar\ProducerOptions;
require_once __DIR__ . '/vendor/autoload.php';
$options = new ProducerOptions();
// If permission authentication is available
// Only JWT authentication is currently supported
$options->setAuthentication(new Jwt('token'));
$options->setConnectTimeout(3);
$options->setTopic('persistent://public/default/demo');
$options->setCompression(Compression::ZLIB);
$producer = new Producer('pulsar://localhost:6650', $options);
$producer->connect();
for ($i = 0; $i < 10; $i++) {
$messageID = $producer->send(sprintf('hello %d',$i));
echo 'messageID ' . $messageID . "\n";
}
// Sending messages asynchronously
//for ($i = 0; $i < 10; $i++) {
// $producer->sendAsync(sprintf('hello-async %d',$i),function(string $messageID){
// echo 'messageID ' . $messageID . "\n";
// });
//}
//
Add this line when sending asynchronously
//$producer->wait();
// Sending delayed messages
for ($i = 0; $i < 10; $i++) {
$producer->send(sprintf('hello-delay %d',$i),[
\Pulsar\MessageOptions::DELAY_SECONDS => $i * 5, // Seconds
]);
}
// close
$producer->close();
<?php
use Pulsar\Authentication\Jwt;
use Pulsar\Consumer;
use Pulsar\ConsumerOptions;
use Pulsar\SubscriptionType;
require_once __DIR__ . '/vendor/autoload.php';
$options = new ConsumerOptions();
// If permission authentication is available
// Only JWT authentication is currently supported
$options->setAuthentication(new Jwt('token'));
$options->setConnectTimeout(3);
$options->setTopic('persistent://public/default/demo');
$options->setSubscription('logic');
$options->setSubscriptionType(SubscriptionType::Shared);
// Configure how many seconds Nack's messages are redelivered, the default is 1 minute
$options->setNackRedeliveryDelay(20);
$consumer = new Consumer('pulsar://localhost:6650', $options);
$consumer->connect();
while (true) {
$message = $consumer->receive();
echo sprintf('Got message 【%s】messageID[%s] topic[%s] publishTime[%s]',
$message->getPayload(),
$message->getMessageId(),
$message->getTopic(),
$message->getPublishTime(),
) . "\n";
// ...
// Remember to confirm that the message is complete after processing
$consumer->ack($message);
// When processing fails, you can also execute the Nack
// The message will be re-delivered after the specified time
// $consumer->nack($message);
}
$consumer->close();
MIT LICENSE