编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

0500-使用Python2访问Kerberos环境下的Kafka

wxchong 2024-10-19 15:50:06 开源技术 63 ℃ 0 评论

温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。

Fayson的github:

https://github.com/fayson/cdhproject

提示:代码块部分可以左右滑动查看噢

1

文档编写目的

Kafka支持多种客户端语言(C/C++、Go、Java、JMS、.NET、Python)。Fayson在前面多篇文章介绍了Java访问Kerberos和非Kerberos环境下的Kafka,参考《

如何使用Java连接Kerberos的Kafka

》。本篇文章Fayson主要介绍使用Python2访问Kerberos环境下的Kafka。在学习本篇文章内容前你还需要知道《

如何通过Cloudera Manager为Kafka启用Kerberos及使用

》。

  • 内容概述:

1.环境准备

2.Python2示例代码

3.访问验证

4.总结

  • 测试环境:

1.操作系统:Redhat7.4

2.CM和CDH版本为5.15.0

3.CDK2.2.0(0.10.2)

4.Python 2.7.15

2

环境准备

在使用Python访问Kafka前,还需要为Python环境安装相关的Kafka包,这里Fayson使用官网推荐使用的confluent-kafka-python依赖包。该依赖包的GitHub地址为:https://github.com/confluentinc/confluent-kafka-python,关于confluent-kafka-python的详细说明可以参考GitHub。

如下为各个语言对Kafka功能的支持情况

https://docs.confluent.io/current/clients/index.html#feature-support

接下来准备Python访问Kafka的运行环境。

1.安装librdkafka依赖包,该依赖包为操作系统的依赖包

[root@cdh4 ~]# yum install -y librdkafka-devel python-devel

注意:安装的librdkafka依赖包的版本需要>=0.11.5,librdkafka是C语言实现的Apache Kafka高性能客户端,为生产和使用Kafka提供高效可靠的客户端。

2.由于我们访问的是Kerberos环境下的Kafka,所以需要使用源码模式安装confluent-kafka

[root@cdh4 anaconda2]# /opt/cloudera/anaconda2/bin/pip install --no-binary :all: confluent-kafka
[root@cdh4 anaconda2]# /opt/cloudera/anaconda2/bin/pip show confluent-kafka

3

Python2示例代码

1.如下为Python2访问Kerberos环境下Kafka示例代码

[root@cdh4 python_code]# vim kafka_test.py 
from confluent_kafka import Producer
import sys
conf = {'bootstrap.servers': 'cdh2.fayson.com:9092,cdh3.fayson.com:9092,cdh4.fayson.com:9092',
 'security.protocol':'sasl_plaintext',
 'sasl.kerberos.principal':'fayson@FAYSON.COM',
 'sasl.kerberos.keytab':'/data/disk1/python_code/fayson.keytab',
 'group.id':'testgroup'}
# Create Producer instance
p = Producer(**conf)
print(p)
def acked(err, msg):
 """Delivery report callback called (from flush()) on successful or failed delivery of the message."""
 if err is not None:
 print("failed to deliver message: {}".format(err.str()))
 else:
 print("produced to: {} [{}] @ {}".format(msg.topic(), msg.partition(), msg.offset()))
p.produce('test', value='python test value', callback=acked)
p.flush()

2.关于Kafka支持的属性配置可以参考如下地址

https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

4

访问验证

本文提供的示例代码为向Kerberos环境Kafka的test Topic中发送消息,在命令行使用Kafka提供的kafka-console-consumer命令消费Python示例生产的消息。

1.准备客户端消费配置文件

jaas.conf内容如下:

[root@cdh05 consumer]# more jaas.conf 
KafkaClient {
 com.sun.security.auth.module.Krb5LoginModule required
 useKeyTab=true
 keyTab="/data/disk1/python_code/fayson.keytab"
 principal="fayson@FAYSON.COM";
};

client.properties内容如下:

[root@cdh05 consumer]# more client.properties 
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
group.id=testgroup

2.在命令行运行如下脚本启动客户端消费

export KAFKA_OPTS="-Djava.security.auth.login.config=/data/disk1/python_code/consumer/jaas.conf"
kafka-console-consumer --topic test --from-beginning --bootstrap-server cdh2.fayson.com:9092,cdh3.fayson.com:9092,cdh4.fayson.com:9092 --consumer.config /data/disk1/python_code/consumer/client.properties

3.在命令行运行python2的示例代码向test Topic发送“python test value”消息

[root@cdh4 python_code]# /opt/cloudera/anaconda2/bin/python kafka_test.py

4.查看Kafka消费程序接收到两条消息

5

总结

1.confluent-kafka-python依赖包需要Python的环境>= 2.7 or Python 3.x。

2.如果使用confluent-kafka-python访问Kerberos环境下的Kafka,需要安装librdkafka及其依赖包,然后使用PyPi命令通过源码的方式安装。

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。

温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。

推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表