网站首页 > 开源技术 正文
温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。
Fayson的github:
https://github.com/fayson/cdhproject
提示:代码块部分可以左右滑动查看噢
1
文档编写目的
Kafka支持多种客户端语言(C/C++、Go、Java、JMS、.NET、Python)。Fayson在前面多篇文章介绍了Java访问Kerberos和非Kerberos环境下的Kafka,参考《
如何使用Java连接Kerberos的Kafka
》。本篇文章Fayson主要介绍使用Python2访问Kerberos环境下的Kafka。在学习本篇文章内容前你还需要知道《
如何通过Cloudera Manager为Kafka启用Kerberos及使用
》。
- 内容概述:
1.环境准备
2.Python2示例代码
3.访问验证
4.总结
- 测试环境:
1.操作系统:Redhat7.4
2.CM和CDH版本为5.15.0
3.CDK2.2.0(0.10.2)
4.Python 2.7.15
2
环境准备
在使用Python访问Kafka前,还需要为Python环境安装相关的Kafka包,这里Fayson使用官网推荐使用的confluent-kafka-python依赖包。该依赖包的GitHub地址为:https://github.com/confluentinc/confluent-kafka-python,关于confluent-kafka-python的详细说明可以参考GitHub。
如下为各个语言对Kafka功能的支持情况
https://docs.confluent.io/current/clients/index.html#feature-support
接下来准备Python访问Kafka的运行环境。
1.安装librdkafka依赖包,该依赖包为操作系统的依赖包
[root@cdh4 ~]# yum install -y librdkafka-devel python-devel
注意:安装的librdkafka依赖包的版本需要>=0.11.5,librdkafka是C语言实现的Apache Kafka高性能客户端,为生产和使用Kafka提供高效可靠的客户端。
2.由于我们访问的是Kerberos环境下的Kafka,所以需要使用源码模式安装confluent-kafka
[root@cdh4 anaconda2]# /opt/cloudera/anaconda2/bin/pip install --no-binary :all: confluent-kafka [root@cdh4 anaconda2]# /opt/cloudera/anaconda2/bin/pip show confluent-kafka
3
Python2示例代码
1.如下为Python2访问Kerberos环境下Kafka示例代码
[root@cdh4 python_code]# vim kafka_test.py from confluent_kafka import Producer import sys conf = {'bootstrap.servers': 'cdh2.fayson.com:9092,cdh3.fayson.com:9092,cdh4.fayson.com:9092', 'security.protocol':'sasl_plaintext', 'sasl.kerberos.principal':'fayson@FAYSON.COM', 'sasl.kerberos.keytab':'/data/disk1/python_code/fayson.keytab', 'group.id':'testgroup'} # Create Producer instance p = Producer(**conf) print(p) def acked(err, msg): """Delivery report callback called (from flush()) on successful or failed delivery of the message.""" if err is not None: print("failed to deliver message: {}".format(err.str())) else: print("produced to: {} [{}] @ {}".format(msg.topic(), msg.partition(), msg.offset())) p.produce('test', value='python test value', callback=acked) p.flush()
2.关于Kafka支持的属性配置可以参考如下地址
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
4
访问验证
本文提供的示例代码为向Kerberos环境Kafka的test Topic中发送消息,在命令行使用Kafka提供的kafka-console-consumer命令消费Python示例生产的消息。
1.准备客户端消费配置文件
jaas.conf内容如下:
[root@cdh05 consumer]# more jaas.conf KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/data/disk1/python_code/fayson.keytab" principal="fayson@FAYSON.COM"; };
client.properties内容如下:
[root@cdh05 consumer]# more client.properties security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka group.id=testgroup
2.在命令行运行如下脚本启动客户端消费
export KAFKA_OPTS="-Djava.security.auth.login.config=/data/disk1/python_code/consumer/jaas.conf" kafka-console-consumer --topic test --from-beginning --bootstrap-server cdh2.fayson.com:9092,cdh3.fayson.com:9092,cdh4.fayson.com:9092 --consumer.config /data/disk1/python_code/consumer/client.properties
3.在命令行运行python2的示例代码向test Topic发送“python test value”消息
[root@cdh4 python_code]# /opt/cloudera/anaconda2/bin/python kafka_test.py
4.查看Kafka消费程序接收到两条消息
5
总结
1.confluent-kafka-python依赖包需要Python的环境>= 2.7 or Python 3.x。
2.如果使用confluent-kafka-python访问Kerberos环境下的Kafka,需要安装librdkafka及其依赖包,然后使用PyPi命令通过源码的方式安装。
提示:代码块部分可以左右滑动查看噢
为天地立心,为生民立命,为往圣继绝学,为万世开太平。
温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。
推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。
原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操
猜你喜欢
- 2024-10-19 Seata 中Resource Manager (RM) 本地事务管理
- 2024-10-19 kill-9导致Kakfa重启失败,说多了都是泪
- 2024-10-19 快速掌握Kafka系列《三》配置项总结
- 2024-10-19 Flink 参数配置和常见参数调优(flink配置详解)
- 2024-10-19 基于 Flink 实现的商品实时推荐系统(附源码)
- 2024-10-19 Kafka+Spark Streaming管理offset的两种方法
- 2024-10-19 Kafka大厂高频面试题:在保证高性能、高吞吐的同时保证高可用性
- 2024-10-19 Kafka+Spark Streaming管理offset的几种方法
- 2024-10-19 清华同方大数据岗位面试题(清华同方数据库官网)
- 2024-10-19 # Kafka_深入探秘者(10):kafka 监控--2
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- jdk (81)
- putty (66)
- rufus (78)
- 内网穿透 (89)
- okhttp (70)
- powertoys (74)
- windowsterminal (81)
- netcat (65)
- ghostscript (65)
- veracrypt (65)
- asp.netcore (70)
- wrk (67)
- aspose.words (80)
- itk (80)
- ajaxfileupload.js (66)
- sqlhelper (67)
- express.js (67)
- phpmailer (67)
- xjar (70)
- redisclient (78)
- wakeonlan (66)
- tinygo (85)
- startbbs (72)
- webftp (82)
- vsvim (79)
本文暂时没有评论,来添加一个吧(●'◡'●)