Kafka REST Proxy

一、业务场景需求

某公司A,需要把游戏日志传给另外一家公司B。架构大致是,游戏服务器把日志传到kafka消息队列,kafka在到es进行数据分析。由于服务器是公司B自己提供,公司A就需要提供一个外网接口让对方把数据写到云ukafka。 由于ukafka只有内网IP,zookeeper中只能注册一个地址,通过nginx代理或者ssh端口转发都不能做到外网正常使用,除非节点本身有外网IP。这个时候就需要用到Kafka REST Proxy。

二、Kafka REST Proxy 介绍

Kafka REST代理为Kafka集群提供RESTful接口。它可以轻松生成和使用消息,查看集群状态,以及在不使用本机Kafka协议或客户端的情况下执行管理操作。用例示例包括从任何语言构建的任何前端应用程序向Kafka报告数据,将消息提取到尚不支持Kafka的流处理框架,以及脚本管理操作。

kafka rest proxy 是rest api接口,通过这个代理把数据转发到kafka的。
主要有两部分组成
- Schema Registry提供元数据的存储和解析。
- Producer的序列化和Consumer的反序列化都会去Schema Registry读取对应的Schema
通过Kafka REST Proxy API接口做代理,把外网传输数据转发到内网kafka中。

相关资料1 https://github.com/confluentinc/kafka-rest
相关资料2 https://docs.confluent.io/current/kafka-rest/docs/index.html

三、安装

1、安装jdk1.8 以上

下载JDK

1
2
cd /usr/local/src/kafka
wget -c http://download.oracle.com/otn-pub/java/jdk/8u131-b11/d54c1d3a095b4ff2b6607d096fa80163/jdk-8u131-linux-x64.rpm?AuthParam=1493105955_1f866324e85307d5f9b495e276577b05

安装JDK

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 检查jdk 是否安装
rpm -qa | grep jdk
# 安装jdk
cd /usr/local/src/kafka
rpm -ivh jdk-8u131-linux-x64.rpm
# 设置环境
cat >> /etc/profile<<"EOF"
export JAVA_HOME=/usr/java/jdk1.8.0_131
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
EOF
# 加载环境变量
. /etc/profile
2、添加相关源

vim /etc/yum.repos.d/confluent.repo

1
2
3
4
5
6
7
8
9
10
11
12
13
[Confluent.dist]
name=Confluent repository (dist)
baseurl=http://packages.confluent.io/rpm/3.1/6
gpgcheck=1
gpgkey=http://packages.confluent.io/rpm/3.1/archive.key
enabled=1

[Confluent]
name=Confluent repository
baseurl=http://packages.confluent.io/rpm/3.1
gpgcheck=1
gpgkey=http://packages.confluent.io/rpm/3.1/archive.key
enabled=1
3、安装
1
yum install confluent-kafka-rest confluent-schema-registry -y
4、修改confluent-schema-registry 配置

vim /etc/schema-registry/schema-registry.properties

1
2
3
4
listeners=http://0.0.0.0:8080 ## 提供给kafka-rest 连接的端口
kafkastore.connection.url=10.10.41.3x:2181,10.10.187.3x:2181,10.10.87.5x:2181 ## 实际kafka的zookeeper
kafkastore.topic=_schemas
debug=false

启动

1
schema-registry-start /etc/schema-registry/schema-registry.properties
5、修改confluent-kafka-rest配置

vim /etc/kafka-rest/kafka-rest.properties

1
2
3
4
5
#id=kafka-rest-test-server
#port=9092 ## 自定义端口 默认 8082
schema.registry.url=http://localhost:8080 ## 上面schema 端口
zookeeper.connect=10.10.41.3x:2181,10.10.187.3x:2181,10.10.87.5x:2181
## 实际kafka的zookeeper

启动

1
kafka-rest-start /etc/kafka-rest/kafka-rest.properties

测试

发送消息

1
2
3
curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json"  --data '{"records":[{"value":{"foo":"bar2"}}]}' "http://x.x.x.x:8082/topics/opstest" 
# 正常返回结果
{"offsets":[{"partition":0,"offset":5,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}

接收消息

1
2
3
4
./kafka-console-consumer.sh --zookeeper 10.10.x.x:2181 --from-beginning --topic opstest
# 结果
{"foo":"bar"}
{"foo":"bar2"}