消息中间件Kafka - PHP操作使用Kafka

PHP使用Kafka

我们需要安装libkafkardkafka

安装libkafka

  1. 下载

    去GitHub上克隆下来

    git clone https://github.com/edenhill/librdkafka.git

  2. 安装

cd librdkafka/

`./configure && make && make install`

安装成功界面 没有报错就是安装成功

![14c1ce5d9719d16db1f2de5e9eed9553.png](https://img-blog.csdnimg.cn/20190320100758703.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQxMzIwMjgx,size_16,color_FFFFFF,t_70)

安装rdkafka

  1. 下载

    git clone https://github.com/arnaud-lb/php-rdkafka

    cd php-rdkafka/

  2. 为php安装扩展

在php-rdkafka这个目录下

`phpize`

然后会生成源代码安装的脚本

把php-config的位置改成自己php-config的位置

` ./configure --with-php-config=/usr/local/php/bin/php-config`

编译安装

`make && make install`

成功后会出现一个文件夹

![70c28ed68f37346a84abe96a2fe03aae.png](https://img-blog.csdnimg.cn/20190320100828577.png)

这个位置就是保存的我们刚刚安装的扩展

进入该目录

`cd /usr/local/php/lib/php/extensions/no-debug-non-zts-20170718/`

会发现出现个rdkafka.so文件

![2bb11d41b5e08b42a3fad4fc181eb791.png](https://img-blog.csdnimg.cn/20190320100957658.png)

修改php.ini文件加入  这里的路径就是写自己rdkafka.so文件的路径

`extension=/usr/local/php/lib/php/extensions/no-debug-non-zts-20170718/rdkafka.so `
重启php

php-m

出现rdkafka就是安装成功

![fe40f168197be21fb00db6d06428df24.png](https://img-blog.csdnimg.cn/20190320101008302.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQxMzIwMjgx,size_16,color_FFFFFF,t_70)

php操作kafka

运行前先开启我们的zookeeper和kafka 上篇文章有如何开启

  1. 运行producer
    kafka默认端口9092 vim producer.php
     <?php
        $rk = new RdKafka\Producer();
        $rk->setLogLevel(LOG_DEBUG);
        $rk->addBrokers("ip:9092");       
        $topic = $rk->newTopic("test");
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, "要发送的消息");
               
               
    
  2. 运行consumer
    vim consumer.php
     <?php
        $rk = new RdKafka\Consumer();
        $rk->setLogLevel(LOG_DEBUG);
        $rk->addBrokers("ip");
        $topic = $rk->newTopic("test");
        $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
        while(true){
            sleep(1);
            $msg = $topic->consume(0, 1000);
            if ($msg) {
                echo $msg->payload, "\n";
            }          
        }    
        
        
    
    开启两个窗口一个运行consumer 一个运行producer php consumer.php
    php producer.php会发现我们已经简单的会使用kafka了。

消息中间件Kafka - 介绍及安装

Kafka介绍

优势

  • 高吞吐量:非常普通的硬件Kafka也可以支持每秒数百万的消息
  • 支持通过Kafka服务器和消费机集群来区分消息
  • 支持Hadoop并行数据加载

关键概念

  • Broker:Kafka集群中的一台或多台服务器统称为broker。
  • Topic:Kafka处理的消息源(feeds of messages)的不同分类。
  • Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
  • Message:消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。
  • Producers:消息和数据生产者,向Kafka的一个topic发布消息的过程叫做producers。
  • Consumers:消息和数据的消费者,订阅topics并处理其发布的消息的过程叫做consumers。

9443d8a422fad65faca5bcfe843dc5f1.png

安装

  1. 下载
    先安装jdk 然后jdk的安装方式在elasticsearch的安装文章中有,这里就不写了
    kafka官网 wget https://www-us.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz 解压 tar -xzvf kafka_2.11-2.1.1.tgz
  1. 修改配置文件

    cd kafka_2.11-2.1.1/config

    zookeeper.properties 是zookeeper的配置文件,默认端口号2181,可不做修改

    server.properties 是kafka配置文件,将 zookeeper.connect 这行 改为自己的zookeeper地址和端口号

    修改完成之后 返回kafka主目录

    cd ..

  2. 运行zookeeper和kafka

    运行zookeeper

    bin/zookeeper-server-start.sh config/zookeeper.properties

    不要关闭此窗口 再开一个新窗口 重新进入kafka目录

    运行kafka

    bin/kafka-server-start.sh config/server.properties

  3. 运行producer和consumer

    跟上步操作一样 不要关闭窗口 重新开 重新进入kafka目录

    创建一个topic为test

    把ip和port改为自己zookeeper的

    bin/kafka-topics.sh --create --zookeeper ip:port --replication-factor 1 --partitions 1 --topic test

    运行producer

    bin/kafka-console-producer.sh --broker-list ip:port --topic test

    跟上步操作一样 不要关闭窗口 重新开 重新进入kafka目录

    运行consumer

    bin/kafka-console-consumer.sh --bootstrap-server ip:port --topic test --from-beginning

    然后在producer发送信息 会发现 consumer的窗口会出现你发送的消息

    d4b0ffa28a974f98c733bb3e054b83ac.png
    8d4e50024a47893502c701ecea979d27.png

消息队列 - 应用场景

消息队列

相关概念

消息队列中间件时分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

异步处理
  • 场景说明:用户注册成功后,发送注册邮件,再发送注册短信。
  • 串行方式:将注册信息写入数据库成功后,向用户发送邮件,再发送注册短信,将结果返回客户端。
  • 并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信,以上三个任务完成后,返回给客户端。
  • 消息队列:将注册信息写入数据库成功后,注册信息写入消息队列,发送邮件和短信的消费者异步读取消息队列,写入消息队列即将返回给客户端。
    19ce1bd87b7cc498e92733342f4d9f01.png
应用解耦
  • 场景说明:用户下单后,订单系统需要通知库存系统。
  • 传统方式:订单系统调用库存系统的接口。
  • 消息队列->
  • 订单系统*:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
  • 库存系统*:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作。
    5828d130146f855b2730f9e404537d5f.png
流量削锋
  • 场景说明:秒杀活动,一般会因为流量过大,导致流量暴增。
  • 传统方式:服务端突然接受来自前端的大量订单请求
  • 消息队列:在应用前端加入消息队列->
  1. 用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面
  2. 秒杀业务根据消息队列中的请求信息,再做后续处理
    bf97f667722518fa1bb367b4ce3959fc.png
日志处理
  • 解决大量日志传输的问题
  • 日志采集客户端,负责日志数据采集,写入消息队列
  • 消息队列,负责日志数据的接收,存储和转发
  • 日志处理应用:订阅并消费消息队列中的日志数据
    abb22f7464ce76ffb6ff86a2da88efb4.png
消息通讯
  • 点对点消息队列,或者聊天室
    440b22e6237a9a48f1599c6dea10fee8.png
  • 客户端A和客户端B使用同一队列,进行消息通讯
  • 客户端A,客户端B,客服端N订阅同一主题,进行消息发布和接收
    7abde092c2b5ab7cecc9b23b4dd3bc2d.png

主要产品

目前在生成环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等