博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
php+kafka+zookeeper+logstash
阅读量:5864 次
发布时间:2019-06-19

本文共 2126 字,大约阅读时间需要 7 分钟。

本文主要实现的目标是php连接kafka并且成功发送消息给kafka。为了验证这个连接和发送,另外配置了logstash监听kafka相对应的消息,然后转发到redis,原来我不知道对kafka比较陌生,不知道怎么看里面的消息内容(我知道安装包里有个consumer和producer的脚本) ^ _ ^

消息发送路径:php->kafka->logstash->redis

1.安装kafka

下载地址:

下载解压后进入根目录,

bin/zookeeper-server-start.sh config/zookeeper.properties &  开启zookeeperbin/kafka-server-start.sh config/server.properties & 开启kafka

另开一个终端然后

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka

这样就创建了一个topic为kafka的消息通道

如果这个步骤成功的话,可以通过另开终端发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka

执行之后就可以输入消息发送了。

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka --from-beginning

来接受上面终端发送的消息

2.安装php zookeeper扩展,并且用php发送消息

安装扩展之前需要安装zookeeper的c client,扩展有依赖,步骤如下

1.通过apt-get来安装(楼主用的是ubuntu)

2.通过源码安装,地址:

下载下来

cd zookeeper./configuremake && sudo make install

按照如上步骤,/usr/local/bin目录下就会多一个cli_mt

php的zookeeper的源码可以去pecl.php.net下载,然后老步骤

phpize./configure --with-libzookeeper=/usr/local/bin/cli_mt (如果你安装扩展的php不是默认的php,则需要带上--with-php-config参数)make && sudo make install

最后别忘了添加extension=zookeeper.so到php.ini

3.配置logstash

下载地址:

修改配置文件,由于楼主的logstash版本已经是5.2的了,所以又是一阵谷歌,才发现很多网上的配置都是1.2版本的,已经不兼容了。

input{    kafka{            bootstrap_servers=>"localhost:9092"                topics=>["kafka"]        }}output{    redis{            host=>"127.0.0.1"                port=>6379                key=>"kafka"                data_type=>"list"                password=>"123456"        }}

4.php发送消息

网上找了一圈,终于找到一个可以用的

也可以用

composer require "nmred/kafka-php"

php代码如下:

require "./vendor/autoload.php";$produce = \Kafka\Produce::getInstance('10.37.129.2:2181', 3000);$produce->setRequireAck(-1);$topicName = "kafka";$partitions = $produce->getAvailablePartitions($topicName);$partCount = count($partitions);var_dump('$partCount:'.$partCount);$count = 0;$message = json_encode(array('uid' => $count, 'age' => $count%100, 'datetime' => date('Y-m-d H:i:s')));//发送消息到不同的partition$partitionId = $count%$partCount;$produce->setMessages($topicName, $partitionId, array($message));$result = $produce->send();var_dump($result);

参考文章:

最后附一张截图

5131116-f950050278fff347.jpeg?

转载地址:http://zeynx.baihongyu.com/

你可能感兴趣的文章
va_list、va_start、va_arg、va_end的原理与使用
查看>>
Convertion of grey code and binary 格雷码和二进制数之间的转换
查看>>
如何培养说话的逻辑性
查看>>
js之form表单的获取
查看>>
在 Linux 中永久修改 USB 设备权限
查看>>
H5+app前端后台ajax交互总结
查看>>
重写 Ext.form.field 扩展功能
查看>>
SQL 查询数据库中所有表信息
查看>>
js中的getAttribute方法使用示例
查看>>
Linux下的搜索查找命令的详解(locate)
查看>>
[AngularJS] Isolate State Mutations in Angular Components
查看>>
回调函数的使用
查看>>
Struts2 中Parameters是如何获取值的
查看>>
C# Httpclient编程
查看>>
[TypeScript] Distinguishing between types of Strings in TypeScript
查看>>
【挖财工作笔记】idea使用指南
查看>>
grafana 邮件报警
查看>>
Vijos P1116 一元三次方程求解【多解,暴力,二分】
查看>>
stl--vector 操作实现
查看>>
AOP技术分析
查看>>