网站首页 > 开源技术 正文
# Kafka_深入探秘者(8):kafka 高级应用--2
段子手168
## 四、kafka springboot+kafka 事务 001
### 1、在 kafka_spring_learn 工程中,修改 application.properties 配置文件,添加事务。
```java
# kafka_spring_learn\src\main\resources\application.properties
logging.level.root=INFO
# spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
# IP 地址换成你自己的虚拟机 IP 地址
#spring.kafka.producer.bootstrap-servers=192.168.19.128:9092
#spring.kafka.consumer.bootstrap-servers=192.168.19.128:9092
spring.kafka.producer.bootstrap-servers=172.18.30.110:9092
spring.kafka.consumer.bootstrap-servers=172.18.30.110:9092
# 事务的支持
spring.kafka.producer.transaction-id-prefix=kafka_tx.
```
### 2、在 kafka_spring_learn 工程中,修改 KafkaLearnApplication.java 启动类,添加事务。
```java
/**
* kafka_spring_learn\src\main\java\djh\it\kafkalearn\KafkaLearnApplication.java
*
* 2024-6-24 创建 KafkaLearnApplication.java 启动类文件
*/
package djh.it.kafkalearn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@SpringBootApplication
@RequestMapping
//避免Gson版本冲突快捷配置
@EnableAutoConfiguration(exclude = {org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration.class})
@Transactional
public class KafkaLearnApplication {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaLearnApplication.class);
public static void main(String[] args) {
SpringApplication.run(KafkaLearnApplication.class, args);
}
@RequestMapping("/index")
public String index(){
return "hello,kafka spring comming";
}
@Autowired
private KafkaTemplate template;
private static final String topic = "heima";
// 消息生产者
@GetMapping("/send/{input}")
public String sendToKafka(@PathVariable String input){
this.template.send(topic, input);
//事务的支持
template.executeInTransaction(t ->{
t.send(topic, input);
if("error".equals(input)){
throw new RuntimeException("input is error");
}
t.send(topic, input+" anthor");
return true;
});
return "Send success! " + input;
}
//消息的接收
@KafkaListener(id ="", topics = topic, groupId = "group.demo")
public void listener(String input){
LOGGER.info("message input value:{}", input);
}
}
```
### 3、启动 KafkaLearnApplication.java 启动类,进行测试。
1)浏览器地址栏输入: localhost:8080/send/kafka
输出: Send success! kafka
2)浏览器地址栏输入: localhost:8080/send/error
会抛出异常。
## 五、kafka springboot+kafka 事务 002
### 1、在 kafka_spring_learn 工程中,修改 application.properties 配置文件,添加事务。
```java
# kafka_spring_learn\src\main\resources\application.properties
logging.level.root=INFO
# spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
# IP 地址换成你自己的虚拟机 IP 地址
#spring.kafka.producer.bootstrap-servers=192.168.19.128:9092
#spring.kafka.consumer.bootstrap-servers=192.168.19.128:9092
spring.kafka.producer.bootstrap-servers=172.18.30.110:9092
spring.kafka.consumer.bootstrap-servers=172.18.30.110:9092
# 事务的支持
spring.kafka.producer.transaction-id-prefix=kafka_tx.
```
### 2、在 kafka_spring_learn 工程中,修改 KafkaLearnApplication.java 启动类,添加事务。
```java
/**
* kafka_spring_learn\src\main\java\djh\it\kafkalearn\KafkaLearnApplication.java
*
* 2024-6-24 创建 KafkaLearnApplication.java 启动类文件
*/
package djh.it.kafkalearn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@SpringBootApplication
@RequestMapping
//避免Gson版本冲突快捷配置
@EnableAutoConfiguration(exclude = {org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration.class})
@Transactional
public class KafkaLearnApplication {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaLearnApplication.class);
public static void main(String[] args) {
SpringApplication.run(KafkaLearnApplication.class, args);
}
@RequestMapping("/index")
public String index(){
return "hello,kafka spring comming";
}
@Autowired
private KafkaTemplate template;
private static final String topic = "heima";
// 消息生产者
@GetMapping("/send/{input}")
public String sendToKafka(@PathVariable String input){
this.template.send(topic, input);
//事务的支持
template.executeInTransaction(t ->{
t.send(topic, input);
if("error".equals(input)){
throw new RuntimeException("input is error");
}
t.send(topic, input+" anthor");
return true;
});
return "Send success! " + input;
}
// 消息生产者2: 演示事务
@GetMapping("/send2/{input}")
@Transactional(rollbackFor = RuntimeException.class)
public String sendToKafkaTransaction(@PathVariable String input){
//事务的支持
template.send(topic, input);
if("error".equals(input)){
throw new RuntimeException("input is error2!");
}
template.send(topic, input + " anthor2");
return "Send success2! " + input;
}
//消息的接收
@KafkaListener(id ="", topics = topic, groupId = "group.demo")
public void listener(String input){
LOGGER.info("message input value:{}", input);
}
}
```
### 3、启动 KafkaLearnApplication.java 启动类,进行测试。
1)浏览器地址栏输入: localhost:8080/send2/kafka
输出: Send success2! kafka
2)浏览器地址栏输入: localhost:8080/send2/error
会抛出异常。服务器上重新打开一个客户消费,也收不到消息。
### 4、消息中间件选型对比:
#### 4.1 资料文档
Kafka: 中。有 kafka 作者自己写的书,网上资料也有一些,
rabbitmq: 多。有一些不错的书,网上资料多。
zeromq: 少。没有专门写 zeromg 的书,网上的资料多是一些代码的实现和简单介绍。
rocketmq: 少。没有专门写rocketmq的书,网上的资料良莠不齐,官方文档很简洁,但是对技术细节没有过多的描述。
activemq: 多。没有专门写activemq的书,网上资料多
#### 4.2 开发语言
Kafka: Scala
rabbitmg: Erlang
zeromg:c
rocketmg: java
activemg: java
#### 4.3 支持的协议
Kafka: 自己定义的一套.【基于TCP)
rabbitmg: AMQPI
zeromq: TCP、UDP
rocketmq: 自己定义的一套.
activemq: OpenWire、STOMP、REST、XMPP、AMQP
#### 4.4 消息存储
Kafka: 内存、磁盘、数据库。支持大量堆积。
rocketmq: 磁盘。支持大量堆积。
rabbitmq: 内存、磁盘、支持少量规程。
activemq: 内存、磁盘、数据库。支持少量堆积。
zeromq: 消息发送端的内存或者磁盘中。不支持持久化。
#### 4.5 消息事务
Kafka: 支持
rabbitmq: 支持。客户端将信道设置为事务模式只有当消息被 rabbitMq 接收,事务才能提交成功,否则在捕获异常后进行回滚。使用事务会使得性能有所下降
zeromq: 不支持
rocketmq: 支持
activemq: 支持。
#### 4.6 负载均衡
Kafka: 支持负载均衡。
rabbitmq: 对负载均衡支持不好。
zeromq: 去中心化,不支持负载均衡。本身只是一个多线程网络库。
rocketmq: 支持负载均衡。
activemq: 支持负载均衡。可以基于 zookeeper 实现负载均衡。
#### 4.7 集群方式:
Kafka: 天然的 Leader-Slave 无状态集群。
rabbitmq: 支持简单集群,复制模式,对高级集群模式支持不好。
zeromq: 去中心化,不支持集群。
rocketmq: 常用,多对 Master-Slave 模式。开源版本需手动切换 Slave 模式变成 Master 模式。
activemq: 。支持简单集群模式,比如 主-备,对高级集群模式支持不好。
#### 4.8 管理界面
Kafka: 一般
rabbitmq: 好
zeromq: 无
rocketmq: 无
activemq: 一般
#### 4.9 可用性
Kafka: 非常高(分布式)
rabbitmg: 高(主从)。
zeromg: 高。
rocketmq: 非常高(分布式)
activemq: 高(主从)
#### 4.10 消息重复
Kafka:支持at least once、at most once
rabbitmg:支持at least once、at most once
zeromq:只有重传机制,但是没有持久化,消息丢了重传也没有用。既不是atleastonce、也不是at most
once、更不是exactly only once
rocketmq:支持atleast once
activemq:支持atleast once
#### 4.11 吞吐量 TPS
Kafka: 极大
Kafka 按批次发送消,息和消费消,息。发送端将多个小消,息合并,批量发向 Broker,消费端每次取出一个批次的消,息批量外理。
## 六、kafka spark+kafka
### 1、流式处理 Spark
- Spark 最初诞生于美国加州大学伯克利分校(UCBerkeley)的AMP实验室,是一个可应用于大规模数据处理的快速、通用引擎。
- 2013年,Spark 加入 Apache 孵化器项目后,开始获得迅猛的发展,如今已成为 Apache 软件基金会最重要的三大分布式计算系统开源项目之一(即 Hadoop、Spark、Storm)。
- Spark 最初的设计目标是使数据分析更快--不仅运行速度快,也要能快速、容易地编写程序。
- 为了使程序运行更快,Spark 提供了内存计算,减少了迭代计算时的I0开销;
- 而为了使编写程序更为容易,Spark 使用简练、优雅的 Scala 语言编写,基于 Scala 提供了交互式的编程体验。
- 虽然,Hadoop 已成为大数据的事实标准,但其 MapReduce 分布式计算模型仍存在诸多缺陷,
- 而 Spark 不仅具备 Hadoop MapReduce 所具有的优点,且解决了 Hadoop MapReduce 的缺陷。
- Spark 正以其结构一体化、功能多元化的优势逐渐成为当今大数据领域最热门的大数据计算平台。
### 2、Spark 安装与应用
Spark 官网下载:
http://spark.apache.org/downloads.html
https://spark.apache.org/downloads.html
https://archive.apache.org/dist/spark/
### 3、下载完成安装包,解压即安装
```java
# 创建目录并切换目录:
mkdir /usr/local/spark/
cd /usr/local/spark/
# 上传安装包到服务器
sftp> put
# 解压即安装
tar -zxvf apark-2.4.4-bin-hadoop2.7
# 配置 JDK
vim sbin/spark-config.sh
# 你的 jdk 安装路径(java 默认安装路径:/usr/lib/jvm/java-8-openjdk-amd64/ )
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
# 切换到 spark 的安装目录
cd /usr/local/spark/apark-2.4.4-bin-hadoop2.7/
# 启动 spark
sbin/start-all.sh starting
# 验证
jps -l2819 kafka.Kafka
```
### 4、浏览器地址栏输入: http://localhost:8080
http://127.0.0.1:8080
http://172.18.30.110:8080
### 5、Spark 和 Kafka 整合:
#### 5.1 在 kafka_spring_learn 工程中,创建 Spark 和 Kafka 整合 类 SparkStreamingFromKafka.java
```java
/**
* kafka_spring_learn\src\main\java\djh\it\kafkalearn\spark\SparkStreamingFromKafka.java
*
* 2024-6-26 创建 Spark 和 Kafka 整合 类 SparkStreamingFromKafka.java
*/
package djh.it.kafkalearn.spark;
import lombok.SneakyThrows;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
public class SparkStreamingFromKafka {
@SneakyThrows
public static void main( String[] args ) {
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName(("SparkStreamingFromKafka"));
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(10));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "172.18.30.110:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "sparkStreaming");
Collection<String> topics = Arrays.asList("heima");
JavaInputDStream<ConsumerRecord<String, String>> javaInputDStream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaParams));
JavaPairDStream<String, String> javaPairDStream = javaInputDStream.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call( ConsumerRecord<String, String> consumerRecord ) throws Exception {
return new Tuple2<>(consumerRecord.key(), consumerRecord.value());
}
});
javaPairDStream.foreachRDD(new VoidFunction<JavaPairRDD<String, String>>() {
@Override
public void call( JavaPairRDD<String, String> javaPairRDD ) throws Exception {
javaPairRDD.foreach(new VoidFunction<Tuple2<String, String>>() {
@Override
public void call( Tuple2<String, String> tuple2 ) throws Exception {
System.out.println(tuple2._2);
}
});
}
});
streamingContext.start();
streamingContext.awaitTermination();
}
}
```
#### 5.2、在服务器端打开一个生产消息发送服务,进行消息发送测试。
```java
# 切换到 kafka 安装目录下:
cd /usr/local/kafka/kafka_2.12-2.8.0
# 打开生产消息服务:
bin/kafka-console-producer.sh --broker-list 172.18.30.110:9092 --topic heima
# 开始发送消息:
>hello nihao
>to^H^H
>producer broker kafka
>
# 在 idea 启动类控制台这边就会收到 消息,这就是 Spark 流式处理。
```
猜你喜欢
- 2024-09-30 设计模式学习笔记:原型模式以及深浅拷贝的区别
- 2024-09-30 金三银四涨薪计划Java不懂这些核心技能,还想去大厂?
- 2024-09-30 我赌你不懂系列:序列化是什么(序列化的概念)
- 2024-09-30 使用MVP+RxJava+Retrofit框架(mvp retrofit rxjava)
- 2024-09-30 Java开发必须要掌握的20个核心技术,你掌握了多少?
- 2024-09-30 Android 上的序列化和反序列化(序列化和反序列化使用场景)
- 2024-09-30 114、mvc各部分;反射一般场景(反射setaccessible)
- 2024-09-27 # Kafka_深入探秘者(8):kafka 高级应用--1
- 2024-09-27 为什么强烈禁止开发人员使用isSuccess作为变量名
- 2024-09-27 重新认识一个强大的 Gson,从一个线上 BUG 说起
你 发表评论:
欢迎- 03-19基于layui+springcloud的企业级微服务框架
- 03-19开箱即用的前端开发模板,扩展Layui原生UI样式,集成第三方组件
- 03-19SpringMVC +Spring +Mybatis + Layui通用后台管理系统OneManageV2.1
- 03-19SpringBoot+LayUI后台管理系统开发脚手架
- 03-19layui下拉菜单form.render局部刷新方法亲测有效
- 03-19Layui 遇到的坑(记录贴)(layui chm)
- 03-19基于ASP.NET MVC + Layui的通用后台开发框架
- 03-19LayUi自定义模块的定义与使用(layui自定义表格)
- 最近发表
-
- 基于layui+springcloud的企业级微服务框架
- 开箱即用的前端开发模板,扩展Layui原生UI样式,集成第三方组件
- SpringMVC +Spring +Mybatis + Layui通用后台管理系统OneManageV2.1
- SpringBoot+LayUI后台管理系统开发脚手架
- layui下拉菜单form.render局部刷新方法亲测有效
- Layui 遇到的坑(记录贴)(layui chm)
- 基于ASP.NET MVC + Layui的通用后台开发框架
- LayUi自定义模块的定义与使用(layui自定义表格)
- Layui 2.9.11正式发布(layui2.6)
- Layui 2.9.13正式发布(layui2.6)
- 标签列表
-
- jdk (81)
- putty (66)
- rufus (78)
- 内网穿透 (89)
- okhttp (70)
- powertoys (74)
- windowsterminal (81)
- netcat (65)
- ghostscript (65)
- veracrypt (65)
- asp.netcore (70)
- wrk (67)
- aspose.words (80)
- itk (80)
- ajaxfileupload.js (66)
- sqlhelper (67)
- express.js (67)
- phpmailer (67)
- xjar (70)
- redisclient (78)
- wakeonlan (66)
- tinygo (85)
- startbbs (72)
- webftp (82)
- vsvim (79)
本文暂时没有评论,来添加一个吧(●'◡'●)