tencent cloud

消息队列 CKafka 版

动态与公告
新功能发布记录
Broker 版本升级记录
公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 CKafka 版
产品优势
应用场景
技术架构
产品系列介绍
开源 Kafka 版本支持说明
与开源 Kafka 对比
高可用
使用限制
地域和可用区
相关云服务
产品计费
计费概述
价格说明
计费示例
按小时付费转包年包月
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
VPC 网络接入
公网域名接入
用户指南
使用流程指引
配置账号权限
创建实例
配置 Topic
连接实例
管理消息
管理消费组
管理实例
变更实例规格
配置限流
配置弹性伸缩策略
配置高级特性
查看监控和配置告警
使用连接器同步数据
实践教程
集群资源评估
客户端实践教程
日志接入
开源生态对接
替换支撑路由(旧)
迁移指南
迁移方案概述
使用开源工具迁移集群
故障处理
Topic 相关
客户端相关
消息相关
API 参考
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK 参考
SDK 概述
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
连接器相关 SDK
安全与合规
权限管理
网络安全
删除保护
事件记录
云 API 审计
常见问题
实例相关
Topic 相关
Consumer Group 相关
客户端相关
网络问题
监控相关
消息相关
服务协议
服务等级协议
联系我们
词汇表

PHP SDK

PDF
聚焦模式
字号
最后更新时间: 2026-01-05 15:20:01

操作场景

本文介绍使用 PHP 客户端连接 CKafka 弹性 Topic 并收发消息的操作步骤。

前提条件

操作步骤

步骤1:准备环境

1. rdkafka 官方页面 查找最新的 rdkafka php 扩展包版本。
说明:
不同版本的包对 PHP 版本要求不同,这里仅以 4.1.2 为示例。
2. 安装 rdkafka 扩展。
wget --no-check-certificate https://pecl.php.net/get/rdkafka-4.1.2.tgz
pear install rdkafka-4.1.2.tgz
# 安装成功会提示 "install ok" 和 "You should add "extension=rdkafka.so" to php.ini"
# 如果安装失败,若提示could not extract the package.xml file from "rdkafka-4.1.2.tgz", 请手动解压后,把packge.xml文件复制进rdkafka目录中再执行pear install package.xml进行安装。
# 其他错误请根据提示解决
# 安装成功后在 php.ini 添加 extension=rdkafka.so
# 执行 php --ini 后,Loaded Configuration File: 显示的就是 php.ini 所在位置
echo 'extension=rdkafka.so' >> /etc/php.ini

步骤2:创建 Topic 和订阅关系

1. 在控制台的弹性 Topic 列表页面创建一个 Topic。


2. 单击 Topic 的 “ID” 进入基本信息页面,获取用户名、密码和地址信息。


3. 订阅关系页签,新建一个订阅关系(消费组)。
img



步骤3:生产消息

1. 编写生产消息程序 Producer.php。
<?php

$setting = require __DIR__ . '/CKafkaSetting.php';

$conf = new RdKafka\\Conf();
// 设置入口服务,请通过控制台获取对应的服务地址。
$conf->set('bootstrap.servers', $setting['bootstrap_servers']);
// ---------- 启用 SASL 验证时需要设置 ----------
// SASL 验证机制类型默认选用 PLAIN
$conf->set('sasl.mechanism', 'PLAIN');
// 设置用户名:**用户管理**中配置的用户名
$conf->set('sasl.username', $setting['sasl_username']);
// 设置密码:**用户管理**中配置的密码
$conf->set('sasl.password', $setting['sasl_password']);
// 在本地配置 ACL 策略。
$conf->set('security.protocol', 'SASL_PLAINTEXT');
// ---------- 启用 SASL 验证时需要设置 ----------
// Kafka producer 的 ack 有 3 种机制,分别说明如下:
// -1 或 all:Broker 在 leader 收到数据并同步给所有 ISR 中的 follower 后,才应答给 Producer 继续发送下一条(批)消息。
// 这种配置提供了最高的数据可靠性,只要有一个已同步的副本存活就不会有消息丢失。注意:这种配置不能确保所有的副本读写入该数据才返回,
// 可以配合 Topic 级别参数 min.insync.replicas 使用。
// 0:生产者不等待来自 broker 同步完成的确认,继续发送下一条(批)消息。这种配置生产性能最高,但数据可靠性最低
//(当服务器故障时可能会有数据丢失,如果 leader 已死但是 producer 不知情,则 broker 收不到消息)
// 1: 生产者在 leader 已成功收到的数据并得到确认后再发送下一条(批)消息。这种配置是在生产吞吐和数据可靠性之间的权衡
//(如果leader已死但是尚未复制,则消息可能丢失)
// 用户不显示配置时,默认值为1。用户根据自己的业务情况进行设置
$conf->set('acks', '1');
// 请求发生错误时重试次数,建议将该值设置为大于0,失败重试最大程度保证消息不丢失
$conf->set('retries', '0');
// 发送请求失败时到下一次重试请求之间的时间
$conf->set('retry.backoff.ms', 100);
// producer 网络请求的超时时间。
$conf->set('socket.timeout.ms', 6000);
$conf->set('reconnect.backoff.max.ms', 3000);

// 注册发送消息的回调
$conf->setDrMsgCb(function ($kafka, $message) {
echo '**Producer**发送消息:message=' . var_export($message, true) . "\\n";
});
// 注册发送消息错误的回调
$conf->setErrorCb(function ($kafka, $err, $reason) {
echo "**Producer**发送消息错误:err=$err reason=$reason \\n";
});

$producer = new RdKafka\\Producer($conf);
// Debug 时请设置为 LOG_DEBUG
//$producer->setLogLevel(LOG_DEBUG);
$topicConf = new RdKafka\\TopicConf();
$topic = $producer->newTopic($setting['topic_name'], $topicConf);
// 生产消息并发送
for ($i = 0; $i < 5; $i++) {
// RD_KAFKA_PARTITION_UA 让 kafka 自由选择分区
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
$producer->poll(0);
}

while ($producer->getOutQLen() > 0) {
$producer->poll(50);
}

echo "**Producer**消息发送成功\\n";

2. 运行 Producer.php 发送消息。
php Producer.php
3. 查看运行结果。
>**Producer**发送消息:message=RdKafka\\Message::__set_state(array(
> 'err' => 0,
> 'topic_name' => 'topic_name',
> 'timestamp' => 1618800895159,
> 'partition' => 0,
> 'payload' => 'Message 0',
> 'len' => 9,
> 'key' => NULL,
> 'offset' => 0,
> 'headers' => NULL,
>))
>**Producer**发送消息:message=RdKafka\\Message::__set_state(array(
> 'err' => 0,
> 'topic_name' => 'topic_name',
> 'timestamp' => 1618800895159,
> 'partition' => 0,
> 'payload' => 'Message 1',
> 'len' => 9,
> 'key' => NULL,
> 'offset' => 1,
> 'headers' => NULL,
>))

...

>**Producer**消息发送成功

步骤4:消费消息

1. 编写消息订阅消费程序 Consumer.php。
<?php

$setting = require __DIR__ . '/CKafkaSetting.php';

$conf = new RdKafka\\Conf();
$conf->set('group.id', $setting['group_id']);
// 设置入口服务,请通过控制台获取对应的服务地址。
$conf->set('bootstrap.servers', $setting['bootstrap_servers']);
// ---------- 启用 SASL 验证时需要设置 ----------
// SASL 验证机制类型默认选用 PLAIN
$conf->set('sasl.mechanism', 'PLAIN');
// 设置用户名:*用户管理**中配置的用户名
$conf->set('sasl.username', $setting['sasl_username']);
// 设置密码:**用户管理**中配置的密码
$conf->set('sasl.password', $setting['sasl_password']);
// 在本地配置 ACL 策略。
$conf->set('security.protocol', 'SASL_PLAINTEXT');
// ---------- 启用 SASL 验证时需要设置 ----------
// 使用 Kafka 消费分组机制时,消费者超时时间。当 Broker 在该时间内没有收到消费者的心跳时,
// 认为该消费者故障失败,Broker 发起重新 Rebalance 过程。
$conf->set('session.timeout.ms', 10000);
// 客户端请求超时时间,如果超过这个时间没有收到应答,则请求超时失败
$conf->set('request.timeout.ms', 305000);
// 设置客户端内部重试间隔。
$conf->set('reconnect.backoff.max.ms', 3000);

$topicConf = new RdKafka\\TopicConf();
#$topicConf->set('auto.commit.interval.ms', 100);
// offset重置策略,请根据业务场景酌情设置。设置不当可能导致数据消费缺失。
$topicConf->set('auto.offset.reset', 'earliest');
$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\\KafkaConsumer($conf);
// Debug 时请设置为 LOG_DEBUG
//$consumer->setLogLevel(LOG_DEBUG);
$consumer->subscribe([$setting['topic_name']]);

$isConsuming = true;
while ($isConsuming) {
$message = $consumer->consume(10 * 1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo "**消费者**接收到消息:" . var_export($message, true) . "\\n";
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "**消费者**等待信息消息中\\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "**消费者**等待超时\\n";
$isConsuming = false;
break;
default:
throw new \\Exception($message->errstr(), $message->err);
break;
}
}
2. 运行 Consumer.php 消费消息。
php Consumer.php
3. 查看运行结果。
>**消费者**接收到消息:RdKafka\\Message::__set_state(array(
> 'err' => 0,
> 'topic_name' => 'topic_name',
> 'timestamp' => 1618800895159,
> 'partition' => 0,
> 'payload' => 'Message 0',
> 'len' => 9,
> 'key' => NULL,
> 'offset' => 0,
> 'headers' => NULL,
>))
>**消费者**接收到消息:RdKafka\\Message::__set_state(array(
> 'err' => 0,
> 'topic_name' => 'topic_name',
> 'timestamp' => 1618800895159,
> 'partition' => 0,
> 'payload' => 'Message 1',
> 'len' => 9,
> 'key' => NULL,
> 'offset' => 1,
> 'headers' => NULL,
>))


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈