English | 中文
This is a Apache Pulsar client library implemented in php Reference PulsarApi.proto
PHP 7.x (Not supported PHP8)
Protobuf Extension
Because Apache Pulsar uses the proto2 syntax
Since the Google protobuf for PHP extension does not support the proto2 syntax
So install the allegro/php-protobuf extension instead rather than protocolbuffers/protobuf
Unfortunately, since the allegro/php-protobuf extension library does not support php8, it cannot support php8 either
ZLib Extension(If you want to use zlib compression)
composer require ikilobyte/pulsar-client-phpallegro/php-protobuf
cd /usr/local/src git clone https://github.com/allegro/php-protobuf cd php-protobuf phpize ./configure --with-php-config=$(which php-config) make && make install # add to php.ini [protobuf] extension=protobuf.so<?phpusePulsar\Authentication\Jwt; usePulsar\Compression\Compression; usePulsar\Producer; usePulsar\ProducerOptions; usePulsar\MessageOptions; require_once__DIR__ . '/vendor/autoload.php'; $options = newProducerOptions(); // If permission authentication is available// Only JWT authentication is currently supported$options->setAuthentication(newJwt('token')); $options->setConnectTimeout(3); $options->setTopic('persistent://public/default/demo'); $options->setCompression(Compression::ZLIB); $producer = newProducer('pulsar://localhost:6650', $options); $producer->connect(); for ($i = 0; $i < 10; $i++){$messageID = $producer->send(sprintf('hello %d',$i)); echo'messageID ' . $messageID . "\n"} // Sending messages asynchronously//for ($i = 0; $i < 10; $i++){// $producer->sendAsync(sprintf('hello-async %d',$i),function(string $messageID){// echo 'messageID ' . $messageID . "\n";// });//}////// Add this line when sending asynchronously//$producer->wait();// Sending delayed messagesfor ($i = 0; $i < 10; $i++){$producer->send(sprintf('hello-delay %d',$i),[ MessageOptions::DELAY_SECONDS => $i * 5, // Seconds ])} // close$producer->close();<?phpusePulsar\Authentication\Jwt; usePulsar\Consumer; usePulsar\ConsumerOptions; usePulsar\SubscriptionType; require_once__DIR__ . '/vendor/autoload.php'; $options = newConsumerOptions(); // If permission authentication is available// Only JWT authentication is currently supported$options->setAuthentication(newJwt('token')); $options->setConnectTimeout(3); $options->setTopic('persistent://public/default/demo'); $options->setSubscription('logic'); $options->setSubscriptionType(SubscriptionType::Shared); // Configure how many seconds Nack's messages are redelivered, the default is 1 minute$options->setNackRedeliveryDelay(20); $consumer = newConsumer('pulsar://localhost:6650', $options); $consumer->connect(); while (true){$message = $consumer->receive(); echosprintf('Got message 【%s】messageID[%s] topic[%s] publishTime[%s]', $message->getPayload(), $message->getMessageId(), $message->getTopic(), $message->getPublishTime() ) . "\n"; // ... // Remember to confirm that the message is complete after processing$consumer->ack($message); // When processing fails, you can also execute the Nack// The message will be re-delivered after the specified time// $consumer->nack($message); } $consumer->close();- ProducerOptions
- setTopic()
- setAuthentication()
- setConnectTimeout()
- setProducerName()
- setCompression()
- ConsumerOptions
- setTopic()
- setAuthentication()
- setConnectTimeout()
- setConsumerName()
- setSubscription()
- setSubscriptionType()
- setNackRedeliveryDelay()
- setReceiveQueueSize()
- MessageOptions
- DELAY_SECONDS
MIT LICENSE