编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

# Kafka_深入探秘者(8):kafka 高级应用--2

wxchong 2024-09-27 05:42:39 开源技术 12 ℃ 0 评论

#寻找热爱表达的你#

# Kafka_深入探秘者(8):kafka 高级应用--2

段子手168

## 四、kafka springboot+kafka 事务 001

### 1、在 kafka_spring_learn 工程中,修改 application.properties 配置文件,添加事务。

```java

# kafka_spring_learn\src\main\resources\application.properties

logging.level.root=INFO

# spring.kafka.producer.bootstrap-servers=127.0.0.1:9092

# IP 地址换成你自己的虚拟机 IP 地址

#spring.kafka.producer.bootstrap-servers=192.168.19.128:9092

#spring.kafka.consumer.bootstrap-servers=192.168.19.128:9092

spring.kafka.producer.bootstrap-servers=172.18.30.110:9092

spring.kafka.consumer.bootstrap-servers=172.18.30.110:9092

# 事务的支持

spring.kafka.producer.transaction-id-prefix=kafka_tx.

```

### 2、在 kafka_spring_learn 工程中,修改 KafkaLearnApplication.java 启动类,添加事务。

```java

/**

* kafka_spring_learn\src\main\java\djh\it\kafkalearn\KafkaLearnApplication.java

*

* 2024-6-24 创建 KafkaLearnApplication.java 启动类文件

*/

package djh.it.kafkalearn;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.EnableAutoConfiguration;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.transaction.annotation.Transactional;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.PathVariable;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

@RestController

@SpringBootApplication

@RequestMapping

//避免Gson版本冲突快捷配置

@EnableAutoConfiguration(exclude = {org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration.class})

@Transactional

public class KafkaLearnApplication {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaLearnApplication.class);

public static void main(String[] args) {

SpringApplication.run(KafkaLearnApplication.class, args);

}

@RequestMapping("/index")

public String index(){

return "hello,kafka spring comming";

}

@Autowired

private KafkaTemplate template;

private static final String topic = "heima";

// 消息生产者

@GetMapping("/send/{input}")

public String sendToKafka(@PathVariable String input){

this.template.send(topic, input);

//事务的支持

template.executeInTransaction(t ->{

t.send(topic, input);

if("error".equals(input)){

throw new RuntimeException("input is error");

}

t.send(topic, input+" anthor");

return true;

});

return "Send success! " + input;

}

//消息的接收

@KafkaListener(id ="", topics = topic, groupId = "group.demo")

public void listener(String input){

LOGGER.info("message input value:{}", input);

}

}

```

### 3、启动 KafkaLearnApplication.java 启动类,进行测试。

1)浏览器地址栏输入: localhost:8080/send/kafka

输出: Send success! kafka

2)浏览器地址栏输入: localhost:8080/send/error

会抛出异常。



## 五、kafka springboot+kafka 事务 002

### 1、在 kafka_spring_learn 工程中,修改 application.properties 配置文件,添加事务。

```java

# kafka_spring_learn\src\main\resources\application.properties

logging.level.root=INFO

# spring.kafka.producer.bootstrap-servers=127.0.0.1:9092

# IP 地址换成你自己的虚拟机 IP 地址

#spring.kafka.producer.bootstrap-servers=192.168.19.128:9092

#spring.kafka.consumer.bootstrap-servers=192.168.19.128:9092

spring.kafka.producer.bootstrap-servers=172.18.30.110:9092

spring.kafka.consumer.bootstrap-servers=172.18.30.110:9092

# 事务的支持

spring.kafka.producer.transaction-id-prefix=kafka_tx.

```

### 2、在 kafka_spring_learn 工程中,修改 KafkaLearnApplication.java 启动类,添加事务。

```java

/**

* kafka_spring_learn\src\main\java\djh\it\kafkalearn\KafkaLearnApplication.java

*

* 2024-6-24 创建 KafkaLearnApplication.java 启动类文件

*/

package djh.it.kafkalearn;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.EnableAutoConfiguration;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.transaction.annotation.Transactional;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.PathVariable;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

@RestController

@SpringBootApplication

@RequestMapping

//避免Gson版本冲突快捷配置

@EnableAutoConfiguration(exclude = {org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration.class})

@Transactional

public class KafkaLearnApplication {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaLearnApplication.class);

public static void main(String[] args) {

SpringApplication.run(KafkaLearnApplication.class, args);

}

@RequestMapping("/index")

public String index(){

return "hello,kafka spring comming";

}

@Autowired

private KafkaTemplate template;

private static final String topic = "heima";

// 消息生产者

@GetMapping("/send/{input}")

public String sendToKafka(@PathVariable String input){

this.template.send(topic, input);

//事务的支持

template.executeInTransaction(t ->{

t.send(topic, input);

if("error".equals(input)){

throw new RuntimeException("input is error");

}

t.send(topic, input+" anthor");

return true;

});

return "Send success! " + input;

}

// 消息生产者2: 演示事务

@GetMapping("/send2/{input}")

@Transactional(rollbackFor = RuntimeException.class)

public String sendToKafkaTransaction(@PathVariable String input){

//事务的支持

template.send(topic, input);

if("error".equals(input)){

throw new RuntimeException("input is error2!");

}

template.send(topic, input + " anthor2");

return "Send success2! " + input;

}

//消息的接收

@KafkaListener(id ="", topics = topic, groupId = "group.demo")

public void listener(String input){

LOGGER.info("message input value:{}", input);

}

}

```

### 3、启动 KafkaLearnApplication.java 启动类,进行测试。

1)浏览器地址栏输入: localhost:8080/send2/kafka

输出: Send success2! kafka

2)浏览器地址栏输入: localhost:8080/send2/error

会抛出异常。服务器上重新打开一个客户消费,也收不到消息。





### 4、消息中间件选型对比:

#### 4.1 资料文档

Kafka: 中。有 kafka 作者自己写的书,网上资料也有一些,

rabbitmq: 多。有一些不错的书,网上资料多。

zeromq: 少。没有专门写 zeromg 的书,网上的资料多是一些代码的实现和简单介绍。

rocketmq: 少。没有专门写rocketmq的书,网上的资料良莠不齐,官方文档很简洁,但是对技术细节没有过多的描述。

activemq: 多。没有专门写activemq的书,网上资料多

#### 4.2 开发语言

Kafka: Scala

rabbitmg: Erlang

zeromg:c

rocketmg: java

activemg: java

#### 4.3 支持的协议

Kafka: 自己定义的一套.【基于TCP)

rabbitmg: AMQPI

zeromq: TCP、UDP

rocketmq: 自己定义的一套.

activemq: OpenWire、STOMP、REST、XMPP、AMQP

#### 4.4 消息存储

Kafka: 内存、磁盘、数据库。支持大量堆积。

rocketmq: 磁盘。支持大量堆积。

rabbitmq: 内存、磁盘、支持少量规程。

activemq: 内存、磁盘、数据库。支持少量堆积。

zeromq: 消息发送端的内存或者磁盘中。不支持持久化。

#### 4.5 消息事务

Kafka: 支持

rabbitmq: 支持。客户端将信道设置为事务模式只有当消息被 rabbitMq 接收,事务才能提交成功,否则在捕获异常后进行回滚。使用事务会使得性能有所下降

zeromq: 不支持

rocketmq: 支持

activemq: 支持。

#### 4.6 负载均衡

Kafka: 支持负载均衡。

rabbitmq: 对负载均衡支持不好。

zeromq: 去中心化,不支持负载均衡。本身只是一个多线程网络库。

rocketmq: 支持负载均衡。

activemq: 支持负载均衡。可以基于 zookeeper 实现负载均衡。

#### 4.7 集群方式:

Kafka: 天然的 Leader-Slave 无状态集群。

rabbitmq: 支持简单集群,复制模式,对高级集群模式支持不好。

zeromq: 去中心化,不支持集群。

rocketmq: 常用,多对 Master-Slave 模式。开源版本需手动切换 Slave 模式变成 Master 模式。

activemq: 。支持简单集群模式,比如 主-备,对高级集群模式支持不好。

#### 4.8 管理界面

Kafka: 一般

rabbitmq: 好

zeromq: 无

rocketmq: 无

activemq: 一般

#### 4.9 可用性

Kafka: 非常高(分布式)

rabbitmg: 高(主从)。

zeromg: 高。

rocketmq: 非常高(分布式)

activemq: 高(主从)

#### 4.10 消息重复

Kafka:支持at least once、at most once

rabbitmg:支持at least once、at most once

zeromq:只有重传机制,但是没有持久化,消息丢了重传也没有用。既不是atleastonce、也不是at most

once、更不是exactly only once

rocketmq:支持atleast once

activemq:支持atleast once

#### 4.11 吞吐量 TPS

Kafka: 极大

Kafka 按批次发送消,息和消费消,息。发送端将多个小消,息合并,批量发向 Broker,消费端每次取出一个批次的消,息批量外理。

## 六、kafka spark+kafka

### 1、流式处理 Spark

- Spark 最初诞生于美国加州大学伯克利分校(UCBerkeley)的AMP实验室,是一个可应用于大规模数据处理的快速、通用引擎。

- 2013年,Spark 加入 Apache 孵化器项目后,开始获得迅猛的发展,如今已成为 Apache 软件基金会最重要的三大分布式计算系统开源项目之一(即 Hadoop、Spark、Storm)。

- Spark 最初的设计目标是使数据分析更快--不仅运行速度快,也要能快速、容易地编写程序。

- 为了使程序运行更快,Spark 提供了内存计算,减少了迭代计算时的I0开销;

- 而为了使编写程序更为容易,Spark 使用简练、优雅的 Scala 语言编写,基于 Scala 提供了交互式的编程体验。

- 虽然,Hadoop 已成为大数据的事实标准,但其 MapReduce 分布式计算模型仍存在诸多缺陷,

- 而 Spark 不仅具备 Hadoop MapReduce 所具有的优点,且解决了 Hadoop MapReduce 的缺陷。

- Spark 正以其结构一体化、功能多元化的优势逐渐成为当今大数据领域最热门的大数据计算平台。




### 2、Spark 安装与应用

Spark 官网下载:

http://spark.apache.org/downloads.html

https://spark.apache.org/downloads.html

https://archive.apache.org/dist/spark/

### 3、下载完成安装包,解压即安装

```java

# 创建目录并切换目录:

mkdir /usr/local/spark/

cd /usr/local/spark/

# 上传安装包到服务器

sftp> put

# 解压即安装

tar -zxvf apark-2.4.4-bin-hadoop2.7

# 配置 JDK

vim sbin/spark-config.sh

# 你的 jdk 安装路径(java 默认安装路径:/usr/lib/jvm/java-8-openjdk-amd64/ )

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64

# 切换到 spark 的安装目录

cd /usr/local/spark/apark-2.4.4-bin-hadoop2.7/

# 启动 spark

sbin/start-all.sh starting

# 验证

jps -l2819 kafka.Kafka

```

### 4、浏览器地址栏输入: http://localhost:8080

http://127.0.0.1:8080

http://172.18.30.110:8080


### 5、Spark 和 Kafka 整合:

#### 5.1 在 kafka_spring_learn 工程中,创建 Spark 和 Kafka 整合 类 SparkStreamingFromKafka.java

```java

/**

* kafka_spring_learn\src\main\java\djh\it\kafkalearn\spark\SparkStreamingFromKafka.java

*

* 2024-6-26 创建 Spark 和 Kafka 整合 类 SparkStreamingFromKafka.java

*/

package djh.it.kafkalearn.spark;

import lombok.SneakyThrows;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.api.java.function.VoidFunction;

import org.apache.spark.streaming.Durations;

import org.apache.spark.streaming.api.java.JavaInputDStream;

import org.apache.spark.streaming.api.java.JavaPairDStream;

import org.apache.spark.streaming.api.java.JavaStreamingContext;

import org.apache.spark.streaming.kafka010.ConsumerStrategies;

import org.apache.spark.streaming.kafka010.KafkaUtils;

import org.apache.spark.streaming.kafka010.LocationStrategies;

import scala.Tuple2;

import java.util.Arrays;

import java.util.Collection;

import java.util.HashMap;

import java.util.Map;

public class SparkStreamingFromKafka {

@SneakyThrows

public static void main( String[] args ) {

SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName(("SparkStreamingFromKafka"));

JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(10));

Map<String, Object> kafkaParams = new HashMap<>();

kafkaParams.put("bootstrap.servers", "172.18.30.110:9092");

kafkaParams.put("key.deserializer", StringDeserializer.class);

kafkaParams.put("value.deserializer", StringDeserializer.class);

kafkaParams.put("group.id", "sparkStreaming");

Collection<String> topics = Arrays.asList("heima");

JavaInputDStream<ConsumerRecord<String, String>> javaInputDStream = KafkaUtils.createDirectStream(

streamingContext,

LocationStrategies.PreferConsistent(),

ConsumerStrategies.Subscribe(topics, kafkaParams));

JavaPairDStream<String, String> javaPairDStream = javaInputDStream.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {

private static final long serialVersionUID = 1L;

@Override

public Tuple2<String, String> call( ConsumerRecord<String, String> consumerRecord ) throws Exception {

return new Tuple2<>(consumerRecord.key(), consumerRecord.value());

}

});

javaPairDStream.foreachRDD(new VoidFunction<JavaPairRDD<String, String>>() {

@Override

public void call( JavaPairRDD<String, String> javaPairRDD ) throws Exception {

javaPairRDD.foreach(new VoidFunction<Tuple2<String, String>>() {

@Override

public void call( Tuple2<String, String> tuple2 ) throws Exception {

System.out.println(tuple2._2);

}

});

}

});

streamingContext.start();

streamingContext.awaitTermination();

}

}

```

#### 5.2、在服务器端打开一个生产消息发送服务,进行消息发送测试。

```java

# 切换到 kafka 安装目录下:

cd /usr/local/kafka/kafka_2.12-2.8.0

# 打开生产消息服务:

bin/kafka-console-producer.sh --broker-list 172.18.30.110:9092 --topic heima

# 开始发送消息:

>hello nihao

>to^H^H

>producer broker kafka

>

# 在 idea 启动类控制台这边就会收到 消息,这就是 Spark 流式处理。

```


Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表